从 Dataflow 写入 BigQuery

本文档介绍了如何将数据从 Dataflow 写入 BigQuery。

概览

BigQuery I/O 连接器支持使用以下方法将数据写入 BigQuery:

  • STORAGE_WRITE_API。在此模式下,连接器使用 BigQuery Storage Write API 直接将数据写入 BigQuery 存储。Storage Write API 将流式提取和批量加载整合到一个高性能 API 中。此模式可保证“正好一次”语义。
  • STORAGE_API_AT_LEAST_ONCE。此模式也使用 Storage Write API,但提供“至少一次”语义。此模式可缩短大多数流水线的延迟时间。不过,可能会出现重复写入。
  • FILE_LOADS。在此模式下,连接器将输入数据写入 Cloud Storage 中的暂存文件。然后,连接器运行 BigQuery 加载作业以将数据加载到 BigQuery 中。 该模式是有界限 PCollections 的默认模式,最常见于批处理流水线中。
  • STREAMING_INSERTS。在此模式中,连接器使用旧版流式插入 API。此模式是无界限 PCollections 的默认模式,但不建议用于新项目。

选择写入方法时,请考虑以下几点:

  • 对于流处理作业,请考虑使用 STORAGE_WRITE_APISTORAGE_API_AT_LEAST_ONCE,因为这些模式会直接写入 BigQuery 存储,而不使用中间暂存文件。
  • 如果使用“至少一次”流处理模式运行流水线,请将写入模式设置为 STORAGE_API_AT_LEAST_ONCE。此设置更高效,并且与“至少一次”流处理模式的语义相匹配。
  • 文件加载和 Storage Write API 具有不同的配额和限制
  • 加载作业使用共享 BigQuery 槽池或预留槽。如需使用预留槽,请在预留分配类型为 PIPELINE 的项目中运行加载作业。如果您使用共享 BigQuery 槽池,加载作业是免费的。不过,BigQuery 不保证共享数据池的可用容量。如需了解详情,请参阅预留简介

最大并行数量

  • 对于流式处理流水线中的 FILE_LOADSSTORAGE_WRITE_API,连接器会将数据分片为多个文件或流。通常,我们建议调用 withAutoSharding 以启用自动分片。

  • 对于批处理流水线中的 FILE_LOADS,连接器会将数据写入分区文件,然后分区文件会并行加载到 BigQuery 中。

  • 对于批处理流水线中的 STORAGE_WRITE_API,每个工作器都会创建一个或多个要写入 BigQuery 的流(由分片总数决定)。

  • 对于 STORAGE_API_AT_LEAST_ONCE,有单个默认写入流。多个工作器附加到此写入流。

性能

下表显示了各种 BigQuery I/O 读取选项的性能指标。工作负载使用 Java 版 Apache Beam SDK 2.49.0 在一个 e2-standard2 工作器上运行。它们未使用 Runner v2。

1 亿条记录 | 1 KB | 1 列 吞吐量(字节) 吞吐量(元素)
Storage Write 55 MBps 每秒 54,000 个元素
Avro Load 78 MBps 每秒 77,000 个元素
Json Load 54 MBps 每秒 53,000 个元素

这些指标基于简单的批处理流水线。它们旨在比较 I/O 连接器之间的性能,不一定代表实际流水线。Dataflow 流水线性能很复杂,它受到多个因素的影响,包括虚拟机类型、正在处理的数据量、外部来源和接收器的性能以及用户代码。指标基于运行 Java SDK,不代表其他语言 SDK 的性能特征。如需了解详情,请参阅 Beam IO 性能

最佳做法

本部分介绍有关从 Dataflow 写入 BigQuery 的最佳实践。

一般注意事项

  • Storage Write API 具有配额限制。对于大多数流水线,连接器会处理这些限制。但是,某些场景可能会耗尽可用的 Storage Write API 流。例如,如果流水线使用自动分片和自动扩缩并具有大量目标位置,则可能会发生此问题,尤其是在具有多变工作负载的长时间运行的作业中。如果发生此问题,请考虑使用 STORAGE_WRITE_API_AT_LEAST_ONCE 来避免此问题。

  • 使用 Google Cloud 指标监控 Storage Write API 配额用量。

  • 使用文件加载时,Avro 的性能通常优于 JSON。如需使用 Avro,请调用 withAvroFormatFunction

  • 默认情况下,加载作业与 Dataflow 作业在同一项目中运行。如需指定其他项目,请调用 withLoadJobProjectId

  • 使用 Java SDK 时,请考虑创建一个表示 BigQuery 表架构的类。然后,在流水线中调用 useBeamSchema,以便在 Apache Beam Row 和 BigQuery TableRow 类型之间自动转换。如需查看架构类的示例,请参阅 ExampleModel.java

  • 如果要加载具有包含数千个字段的复杂架构的表,请考虑调用 withMaxBytesPerPartition 来为每个加载作业设置较小的大小上限。

流处理流水线

以下建议适用于流处理流水线。

  • 对于流式处理流水线,我们建议使用 Storage Write API(STORAGE_WRITE_APISTORAGE_API_AT_LEAST_ONCE)。

  • 流式处理流水线可以使用文件加载,但这种方法有以下缺点:

    • 它需要使用数据选取功能才能写入文件。您无法使用全局窗口。
    • 使用共享槽池时,BigQuery 会尽力而为地加载文件。写入记录与记录在 BigQuery 中可用之间可能存在明显延迟。
    • 如果加载作业失败(例如由于数据错误或架构不匹配),则整个流水线将会失败。
  • 请尽可能考虑使用 STORAGE_WRITE_API_AT_LEAST_ONCE。这可能会导致将重复的记录写入 BigQuery,但比 STORAGE_WRITE_API 费用更低,且可伸缩性更强。

  • 一般而言,请避免使用 STREAMING_INSERTS。流式插入比 Storage Write API 要贵,并且性能更低。

  • 数据分片可以提高流式处理流水线的性能。对于大多数流水线而言,自动分片是一个很好的起点。但是,您可以按如下方式调整分片:

  • 如果您使用流式插入,我们建议您将 retryTransientErrors 设置为重试政策

批处理流水线

以下建议适用于批处理流水线。

  • 对于大多数大型批处理流水线,我们建议先尝试 FILE_LOADS。批处理流水线可以使用 STORAGE_WRITE_API,但对于大规模情况(1,000 个或更多的 vCPU),或者如果并发流水线正在运行,可能会超出配额限制。Apache Beam 不会限制批量 STORAGE_WRITE_API 作业的写入流数量上限,因此作业最终会达到 BigQuery Storage API 限制。

  • 使用 FILE_LOADS 时,您可能会耗尽共享 BigQuery 槽池或预留槽池。如果遇到此类故障,请尝试以下方法:

    • 减小作业的工作器数量上限或工作器大小。
    • 购买更多预留槽
    • 考虑使用 STORAGE_WRITE_API
  • 使用 STORAGE_WRITE_API 可能会使中小型流水线(少于 1,000 个 vCPU)受益。对于这些较小的作业,如果您需要死信队列FILE_LOADS 共享槽池不足,请考虑使用 STORAGE_WRITE_API

  • 如果您可以容忍重复数据,请考虑使用 STORAGE_WRITE_API_AT_LEAST_ONCE。此模式可能会导致将重复记录写入 BigQuery,但费用可能会比 STORAGE_WRITE_API 选项低。

  • 不同写入模式的执行方式可能会因流水线的特性而异。可通过实验找到最适合您的工作负载的写入模式。

处理行级错误

本部分介绍了如何处理可能在行级层发生的错误,例如输入数据格式错误或架构不匹配。

对于 Storage Write API,所有无法写入的行都会被放入单独的 PCollection 中。如需获取此集合,请对 WriteResult 对象调用 getFailedStorageApiInserts。如需查看此方法的示例,请参阅将数据流式插入 BigQuery

最好将错误发送到死信队列或表,以供日后进行处理。如需详细了解此模式,请参阅 BigQueryIO 死信模式

对于 FILE_LOADS,如果在加载数据时发生错误,则加载作业会失败,并且流水线会抛出运行时异常。您可以在 Dataflow 日志中查看错误或查看 BigQuery 作业历史记录。I/O 连接器不会返回个别失败行的相关信息。

如需详细了解如何排查错误,请参阅 BigQuery 连接器错误

示例

以下示例展示了如何使用 Dataflow 写入 BigQuery。

写入现有表

以下示例会创建一个将 PCollection<MyData> 写入 BigQuery 的批处理流水线,其中 MyData 是自定义数据类型。

BigQueryIO.write() 方法会返回 BigQueryIO.Write<T> 类型,用于配置写入操作。如需了解详情,请参阅 Apache Beam 文档中的写入表格。此代码示例会将数据写入现有表 (CREATE_NEVER) 并将新行附加到表 (WRITE_APPEND)。

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.api.services.bigquery.model.TableRow; import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create;  public class BigQueryWrite {   // A custom datatype for the source data.   @DefaultCoder(AvroCoder.class)   public static class MyData {     public String name;     public Long age;      public MyData() {}      public MyData(String name, Long age) {       this.name = name;       this.age = age;     }   }    public static void main(String[] args) {     // Example source data.     final List<MyData> data = Arrays.asList(         new MyData("Alice", 40L),         new MyData("Bob", 30L),         new MyData("Charlie", 20L)     );      // Parse the pipeline options passed into the application. Example:     //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME     // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options     PipelineOptionsFactory.register(ExamplePipelineOptions.class);     ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)         .withValidation()         .as(ExamplePipelineOptions.class);      // Create a pipeline and apply transforms.     Pipeline pipeline = Pipeline.create(options);     pipeline         // Create an in-memory PCollection of MyData objects.         .apply(Create.of(data))         // Write the data to an exiting BigQuery table.         .apply(BigQueryIO.<MyData>write()             .to(String.format("%s:%s.%s",                 options.getProjectId(),                 options.getDatasetName(),                 options.getTableName()))             .withFormatFunction(                 (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))             .withCreateDisposition(CreateDisposition.CREATE_NEVER)             .withWriteDisposition(WriteDisposition.WRITE_APPEND)             .withMethod(Write.Method.STORAGE_WRITE_API));     pipeline.run().waitUntilFinish();   } }

写入新表或现有表

以下示例在目标表不存在时通过将创建处置方式设置为 CREATE_IF_NEEDED来创建新表。使用此选项时,您必须提供表架构。如果创建新表,则连接器使用此架构。

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create;  public class BigQueryWriteWithSchema {   // A custom datatype for the source data.   @DefaultCoder(AvroCoder.class)   public static class MyData {     public String name;     public Long age;      public MyData() {}      public MyData(String name, Long age) {       this.name = name;       this.age = age;     }   }    public static void main(String[] args) {     // Example source data.     final List<MyData> data = Arrays.asList(         new MyData("Alice", 40L),         new MyData("Bob", 30L),         new MyData("Charlie", 20L)     );      // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.     TableSchema schema = new TableSchema()         .setFields(             Arrays.asList(                 new TableFieldSchema()                     .setName("user_name")                     .setType("STRING")                     .setMode("REQUIRED"),                 new TableFieldSchema()                     .setName("age")                     .setType("INT64") // Defaults to NULLABLE             )         );      // Parse the pipeline options passed into the application. Example:     //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME     // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options     PipelineOptionsFactory.register(ExamplePipelineOptions.class);     ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)         .withValidation()         .as(ExamplePipelineOptions.class);      // Create a pipeline and apply transforms.     Pipeline pipeline = Pipeline.create(options);     pipeline         // Create an in-memory PCollection of MyData objects.         .apply(Create.of(data))         // Write the data to a new or existing BigQuery table.         .apply(BigQueryIO.<MyData>write()             .to(String.format("%s:%s.%s",                 options.getProjectId(),                 options.getDatasetName(),                 options.getTableName()))             .withFormatFunction(                 (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))             .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)             .withSchema(schema)             .withMethod(Write.Method.STORAGE_WRITE_API)         );     pipeline.run().waitUntilFinish();   } }

将数据流式传输到 BigQuery

以下示例展示了如何通过将写入模式设置为 STORAGE_WRITE_API,使用“正好一次”语义来流式插入数据

并非所有流处理流水线都需要“正好一次”语义。例如,您可以从目标表中手动移除重复项。如果您的场景可以接受重复记录的可能性,请考虑将写入方法设置为 STORAGE_API_AT_LEAST_ONCE 来使用“至少一次”语义。此方法通常更高效,可缩短大多数流水线的延迟时间。

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.api.services.bigquery.model.TableRow; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.joda.time.Duration; import org.joda.time.Instant;  public class BigQueryStreamExactlyOnce {   // Create a PTransform that sends simulated streaming data. In a real application, the data   // source would be an external source, such as Pub/Sub.   private static TestStream<String> createEventSource() {     Instant startTime = new Instant(0);     return TestStream.create(StringUtf8Coder.of())         .advanceWatermarkTo(startTime)         .addElements(             TimestampedValue.of("Alice,20", startTime),             TimestampedValue.of("Bob,30",                 startTime.plus(Duration.standardSeconds(1))),             TimestampedValue.of("Charles,40",                 startTime.plus(Duration.standardSeconds(2))),             TimestampedValue.of("Dylan,Invalid value",                 startTime.plus(Duration.standardSeconds(2))))         .advanceWatermarkToInfinity();   }    public static PipelineResult main(String[] args) {     // Parse the pipeline options passed into the application. Example:     //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME     // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options     PipelineOptionsFactory.register(ExamplePipelineOptions.class);     ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)         .withValidation()         .as(ExamplePipelineOptions.class);     options.setStreaming(true);      // Create a pipeline and apply transforms.     Pipeline pipeline = Pipeline.create(options);     pipeline         // Add a streaming data source.         .apply(createEventSource())         // Map the event data into TableRow objects.         .apply(MapElements             .into(TypeDescriptor.of(TableRow.class))             .via((String x) -> {               String[] columns = x.split(",");               return new TableRow().set("user_name", columns[0]).set("age", columns[1]);             }))         // Write the rows to BigQuery         .apply(BigQueryIO.writeTableRows()             .to(String.format("%s:%s.%s",                 options.getProjectId(),                 options.getDatasetName(),                 options.getTableName()))             .withCreateDisposition(CreateDisposition.CREATE_NEVER)             .withWriteDisposition(WriteDisposition.WRITE_APPEND)             .withMethod(Write.Method.STORAGE_WRITE_API)             // For exactly-once processing, set the triggering frequency.             .withTriggeringFrequency(Duration.standardSeconds(5)))         // Get the collection of write errors.         .getFailedStorageApiInserts()         .apply(MapElements.into(TypeDescriptors.strings())             // Process each error. In production systems, it's useful to write the errors to             // another destination, such as a dead-letter table or queue.             .via(                 x -> {                   System.out.println("Failed insert: " + x.getErrorMessage());                   System.out.println("Row: " + x.getRow());                   return "";                 }));     return pipeline.run();   } }

后续步骤