使用 Dataflow 匯入、匯出及修改資料

本頁面說明如何使用 Spanner 適用的 Dataflow 連接器,匯入、匯出及修改 Spanner GoogleSQL 方言資料庫和 PostgreSQL 方言資料庫中的資料。

Dataflow 是一種可轉換並充實資料的代管服務。Spanner 的 Dataflow 連接器可讓您透過 Dataflow 管道對 Spanner 讀取及寫入資料,並視需要轉換或修改資料。您也能建立在 Spanner 和其他Google Cloud 產品間轉移資料的管道。

如要有效率地將大量資料移入或移出 Spanner,建議您採用 Dataflow 連接器。此外,如果資料庫需要進行 Partitioned DML 不支援的大規模轉換作業 (例如資料表移動,以及需要 JOIN 的大量刪除作業),建議使用這個方法。使用個別資料庫時,您也可以透過其他方式匯入及匯出資料:

  • 使用 Google Cloud 主控台將個別資料庫以 Avro 格式從 Spanner 匯出至 Cloud Storage。
  • 使用 Google Cloud 主控台將資料庫從您匯出至 Cloud Storage 的檔案重新匯入 Spanner。
  • 使用 REST API 或 Google Cloud CLI,在 Spanner 和 Cloud Storage 之間來回執行匯出匯入工作 (同樣使用 Avro 格式)。

Spanner 的 Dataflow 連接器是 Apache Beam Java SDK 的一部分,可提供用於執行上述操作的 API。如要進一步瞭解本頁討論的部分概念,例如 PCollection 物件和轉換,請參閱 Apache Beam 程式設計指南

將連接器新增至 Maven 專案

如要將 Google Cloud Dataflow 連接器新增至 Maven 專案,請在您的 pom.xml 檔案中新增 beam-sdks-java-io-google-cloud-platform Maven 構件做為依附元件。

舉例來說,假設您的 pom.xml 檔案將 beam.version 設定成適當的版本號碼,便會新增下列依附元件:

<dependency>     <groupId>org.apache.beam</groupId>     <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>     <version>${beam.version}</version> </dependency> 

從 Spanner 讀取資料

如要從 Spanner 讀取資料,請套用 SpannerIO.read 轉換。並利用 SpannerIO.Read 類別中的方法設定讀取作業。套用上述轉換會傳回 PCollection<Struct>,其中集合內的每個元素都代表讀取作業傳回的個別資料列。您可以根據自己想要的輸出結果,選擇是否使用特定 SQL 查詢來讀取 Spanner。

套用 SpannerIO.read 轉換會執行同步讀取作業,藉此傳回一致的資料視圖。除非您另行指定,否則系統一律會在您開始讀取時建立讀取結果的快照。請參閱讀取,進一步瞭解 Spanner 可執行的各種不同讀取類型。

使用查詢讀取資料

如要從 Spanner 讀取特定一組資料,請利用 SpannerIO.Read.withQuery 方法設定轉換,藉此指定 SQL 查詢。例如:

// Query for all the columns and rows in the specified Spanner table PCollection<Struct> records = pipeline.apply(     SpannerIO.read()         .withInstanceId(instanceId)         .withDatabaseId(databaseId)         .withQuery("SELECT * FROM " + options.getTable()));

不指定查詢而讀取資料

如要在不使用查詢的情況下讀取資料庫,您可以透過 SpannerIO.Read.withTable 方法指定資料表名稱,並透過 SpannerIO.Read.withColumns 方法指定要讀取的資料欄清單。例如:

GoogleSQL

// Query for all the columns and rows in the specified Spanner table PCollection<Struct> records = pipeline.apply(     SpannerIO.read()         .withInstanceId(instanceId)         .withDatabaseId(databaseId)         .withTable("Singers")         .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table PCollection<Struct> records = pipeline.apply(     SpannerIO.read()         .withInstanceId(instanceId)         .withDatabaseId(databaseId)         .withTable("singers")         .withColumns("singer_id", "first_name", "last_name"));

如要限制讀取的資料列,可以使用 SpannerIO.Read.withKeySet 方法指定要讀取的一組主鍵。

您也可以使用指定的次要索引讀取資料表。與 readUsingIndex API 呼叫相同,索引必須包含查詢結果中顯示的所有資料。

如要這麼做,請指定上一個範例所示的資料表,並使用 SpannerIO.Read.withIndex 方法指定包含所需資料欄值的 index。索引必須儲存轉換作業需要讀取的所有資料欄。系統會隱含儲存基礎資料表的主鍵。舉例來說,如要使用索引 SongsBySongName 讀取 Songs 資料表,請使用下列程式碼:

GoogleSQL

// Read the indexed columns from all rows in the specified index. PCollection<Struct> records =     pipeline.apply(         SpannerIO.read()             .withInstanceId(instanceId)             .withDatabaseId(databaseId)             .withTable("Songs")             .withIndex("SongsBySongName")             // Can only read columns that are either indexed, STORED in the index or             // part of the primary key of the Songs table,             .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index. PCollection<Struct> records =     pipeline.apply(         SpannerIO.read()             .withInstanceId(instanceId)             .withDatabaseId(databaseId)             .withTable("Songs")             .withIndex("SongsBySongName")             // Can only read columns that are either indexed, STORED in the index or             // part of the primary key of the songs table,             .withColumns("singer_id", "album_id", "track_id", "song_name"));

控管交易資料的過時程度

系統保證會在一致的資料快照上執行轉換。如要控管資料的過時程度,請使用 SpannerIO.Read.withTimestampBound 方法。詳情請參閱交易

在相同交易中讀取多份資料表

如要同時讀取多份資料表以確保資料一致性,請執行單一交易中的所有讀取作業,如要執行這項操作,請套用 createTransaction 轉換,建立 PCollectionView<Transaction> 物件,接著該物件就會建立交易。上述操作產生的視圖可透過 SpannerIO.Read.withTransaction 傳送至讀取作業。

GoogleSQL

SpannerConfig spannerConfig =     SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId); PCollectionView<Transaction> tx =     pipeline.apply(         SpannerIO.createTransaction()             .withSpannerConfig(spannerConfig)             .withTimestampBound(TimestampBound.strong())); PCollection<Struct> singers =     pipeline.apply(         SpannerIO.read()             .withSpannerConfig(spannerConfig)             .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")             .withTransaction(tx)); PCollection<Struct> albums =     pipeline.apply(         SpannerIO.read()             .withSpannerConfig(spannerConfig)             .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")             .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =     SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId); PCollectionView<Transaction> tx =     pipeline.apply(         SpannerIO.createTransaction()             .withSpannerConfig(spannerConfig)             .withTimestampBound(TimestampBound.strong())); PCollection<Struct> singers =     pipeline.apply(         SpannerIO.read()             .withSpannerConfig(spannerConfig)             .withQuery("SELECT singer_id, first_name, last_name FROM singers")             .withTransaction(tx)); PCollection<Struct> albums =     pipeline.apply(         SpannerIO.read()             .withSpannerConfig(spannerConfig)             .withQuery("SELECT singer_id, album_id, album_title FROM albums")             .withTransaction(tx));

從所有可用的資料表讀取資料

您可以從 Spanner 資料庫中所有可用的資料表讀取資料。

GoogleSQL

PCollection<Struct> allRecords =     pipeline         .apply(             SpannerIO.read()                 .withSpannerConfig(spannerConfig)                 .withBatching(false)                 .withQuery(                     "SELECT t.table_name FROM information_schema.tables AS t WHERE t"                         + ".table_catalog = '' AND t.table_schema = ''"))         .apply(             MapElements.into(TypeDescriptor.of(ReadOperation.class))                 .via(                     (SerializableFunction<Struct, ReadOperation>)                         input -> {                           String tableName = input.getString(0);                           return ReadOperation.create().withQuery("SELECT * FROM " + tableName);                         }))         .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =     pipeline         .apply(             SpannerIO.read()                 .withSpannerConfig(spannerConfig)                 .withBatching(false)                 .withQuery(                     Statement.newBuilder(                             "SELECT t.table_name FROM information_schema.tables AS t "                                 + "WHERE t.table_catalog = $1 AND t.table_schema = $2")                         .bind("p1")                         .to(spannerConfig.getDatabaseId().get())                         .bind("p2")                         .to("public")                         .build()))         .apply(             MapElements.into(TypeDescriptor.of(ReadOperation.class))                 .via(                     (SerializableFunction<Struct, ReadOperation>)                         input -> {                           String tableName = input.getString(0);                           return ReadOperation.create()                               .withQuery("SELECT * FROM \"" + tableName + "\"");                         }))         .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

排解不支援的查詢

Dataflow 連接器僅支援 Spanner SQL 查詢,其中查詢執行計畫的第一個運算子為分散式聯集。如果您嘗試透過查詢從 Spanner 讀取資料,並發現有例外狀況說明查詢 does not have a DistributedUnion at the root,請按照「瞭解 Spanner 如何執行查詢」一節的步驟操作,利用 Google Cloud 控制台擷取查詢的執行計畫。

如果您是使用不支援的 SQL 查詢,請將該查詢簡化,讓其中的分散式聯集做為查詢執行計畫的第一個運算子。此外,請移除匯總函式、資料表聯結,以及 DISTINCTGROUP BYORDER 運算子,因為這些運算子很可能會導致查詢無法正常運作。

建立寫入變異

請使用 Mutation 類別的 newInsertOrUpdateBuilder 方法,而非 newInsertBuilder 方法,除非 Java 管道絕對需要使用該方法。如果是 Python 管道,請使用 SpannerInsertOrUpdate,而不是 SpannerInsert。Dataflow 提供至少執行一次的強制保證,這表示系統可能會多次寫入變異。因此,只有 INSERT 變異可能會產生 com.google.cloud.spanner.SpannerException: ALREADY_EXISTS 錯誤,導致管道執行失敗。為避免發生這項錯誤,請改用 INSERT_OR_UPDATE 變異,這個方法會新增資料列,或更新已存在的資料列的資料欄值。INSERT_OR_UPDATE 異動可以套用多次。

寫入 Spanner 及轉換資料

您可以使用 SpannerIO.write 轉換執行一組輸入資料列變異,藉此透過 Dataflow 連接器將資料寫入 Spanner。為了增進效率,Dataflow 連接器會將變異批次分組。

以下範例說明如何將寫入轉換套用至變異的 PCollection

GoogleSQL

albums     // Spanner expects a Mutation object, so create it using the Album's data     .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {       @ProcessElement       public void processElement(ProcessContext c) {         Album album = c.element();         c.output(Mutation.newInsertOrUpdateBuilder("albums")             .set("singerId").to(album.singerId)             .set("albumId").to(album.albumId)             .set("albumTitle").to(album.albumTitle)             .build());       }     }))     // Write mutations to Spanner     .apply("WriteAlbums", SpannerIO.write()         .withInstanceId(instanceId)         .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =     pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton()); albums     // Spanner expects a Mutation object, so create it using the Album's data     .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {       @ProcessElement       public void processElement(ProcessContext c) {         Album album = c.element();         c.output(Mutation.newInsertOrUpdateBuilder("albums")             .set("singerId").to(album.singerId)             .set("albumId").to(album.albumId)             .set("albumTitle").to(album.albumTitle)             .build());       }     }))     // Write mutations to Spanner     .apply("WriteAlbums", SpannerIO.write()         .withInstanceId(instanceId)         .withDatabaseId(databaseId)         .withDialectView(dialectView));

如果轉換在完成前意外停止,已套用的變異將不會復原。

自動套用變異群組

您可以使用 MutationGroup 類別,確保變異群組能一起自動套用。系統保證會經由同一項交易來提交 MutationGroup 中的變異,但也可能重新嘗試進行該交易。

如有資料是一起儲存在金鑰體系中的相近位置,您可以將影響這類資料的變數集合成變數群組,藉此發揮最佳效能。由於 Spanner 會同時將父項和子項資料表的資料交錯列於父項資料表中,這類資料在金鑰體系中的位置一向相當接近。建議您將變數群組的結構調整為以下任一形式:其中一個變異是套用至父項資料表,而其他變異則套用至子項資料表;或者所有變異都會修改金鑰體系中儲存位置相近的資料。如要進一步瞭解 Spanner 如何儲存父項和子項資料表的資料,請參閱結構定義與資料模型。如果您並未依據建議的資料表階層來整理變異群組,或您存取的資料在金鑰體系中的儲存位置並不相近,Spanner 可能會需要執行兩階段修訂,進而導致效能減慢。詳情請參閱「位置的取捨」。

如要使用 MutationGroup,請建構 SpannerIO.write 轉換並呼叫 SpannerIO.Write.grouped 方法,該方法會傳回轉換,然後您就可以將轉換套用至 MutationGroup 物件的 PCollection

建立 MutationGroup 時,過程中列出的第一個變異會成為主要變異。如果變異群組會同時影響父項和子項資料表,主要變異應為父項資料表的變異。在其他情況下,您可以使用任何變異做為主要變異。Dataflow 連接器會利用主要變異來判定分區界線,以便有效率地將變異一同批次處理。

舉例來說,假設您的應用程式會監控行為,並標記出有問題的使用者行為供您檢查。發現帶有標記的行為時,您會希望透過更新 Users 資料表來禁止使用者存取應用程式,同時也需要將這類事件記錄在 PendingReviews 資料表。如要確保這兩份資料表皆可自動更新,請使用 MutationGroup

GoogleSQL

PCollection<MutationGroup> mutations =     suspiciousUserIds.apply(         MapElements.via(             new SimpleFunction<>() {                @Override               public MutationGroup apply(String userId) {                 // Immediately block the user.                 Mutation userMutation =                     Mutation.newUpdateBuilder("Users")                         .set("id")                         .to(userId)                         .set("state")                         .to("BLOCKED")                         .build();                 long generatedId =                     Hashing.sha1()                         .newHasher()                         .putString(userId, Charsets.UTF_8)                         .putLong(timestamp.getSeconds())                         .putLong(timestamp.getNanos())                         .hash()                         .asLong();                  // Add an entry to pending review requests.                 Mutation pendingReview =                     Mutation.newInsertOrUpdateBuilder("PendingReviews")                         .set("id")                         .to(generatedId) // Must be deterministically generated.                         .set("userId")                         .to(userId)                         .set("action")                         .to("REVIEW ACCOUNT")                         .set("note")                         .to("Suspicious activity detected.")                         .build();                  return MutationGroup.create(userMutation, pendingReview);               }             }));  mutations.apply(SpannerIO.write()     .withInstanceId(instanceId)     .withDatabaseId(databaseId)     .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =     pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton()); PCollection<MutationGroup> mutations = suspiciousUserIds     .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {        @Override       public MutationGroup apply(String userId) {         // Immediately block the user.         Mutation userMutation = Mutation.newUpdateBuilder("Users")             .set("id").to(userId)             .set("state").to("BLOCKED")             .build();         long generatedId = Hashing.sha1().newHasher()             .putString(userId, Charsets.UTF_8)             .putLong(timestamp.getSeconds())             .putLong(timestamp.getNanos())             .hash()             .asLong();          // Add an entry to pending review requests.         Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")             .set("id").to(generatedId)  // Must be deterministically generated.             .set("userId").to(userId)             .set("action").to("REVIEW ACCOUNT")             .set("note").to("Suspicious activity detected.")             .build();          return MutationGroup.create(userMutation, pendingReview);       }     }));  mutations.apply(SpannerIO.write()     .withInstanceId(instanceId)     .withDatabaseId(databaseId)     .withDialectView(dialectView)     .grouped());

建立變異群組時,做為引數提供的第一個變異會成為主要變異。由於本範例中的兩份資料表互不相關,因此沒有明確的主要變異。userMutation之所以最先列出,是因為我們選取它做為主要變異。雖然分開套用兩個變異的做法比較便捷,但無法確保單元性。在此情況下,變異群組會是最理想的選擇。

後續步驟