搭配使用 BigQuery 連接器和 Google Cloud Serverless for Apache Spark

搭配 Apache Spark 使用 spark-bigquery-connector,在 BigQuery 中讀取和寫入資料。本教學課程示範使用 spark-bigquery-connector 的 PySpark 應用程式。

搭配工作負載使用 BigQuery 連接器

請參閱「無伺服器 Apache Spark 執行階段版本」,瞭解批次工作負載執行階段版本中安裝的 BigQuery 連接器版本。如果連接器未列出,請參閱下一節的說明,瞭解如何讓應用程式使用連接器。

如何搭配使用連接器和 Spark 執行階段 2.0 版

Spark 執行階段 2.0 版未安裝 BigQuery 連接器。使用 Spark 執行階段 2.0 版時,您可以透過下列其中一種方式,讓應用程式使用連接器:

  • 提交 Google Cloud Serverless for Apache Spark 批次工作負載時,請使用 jars 參數指向連接器 JAR 檔案。下列範例指定連接器 JAR 檔案 (如需可用連接器 JAR 檔案的清單,請參閱 GitHub 上的 GoogleCloudDataproc/spark-bigquery-connector 存放區)。
    • Google Cloud CLI 範例:
       gcloud dataproc batches submit pyspark \     --region=region \     --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \     ... other args 
  • 在 Spark 應用程式中加入連接器 JAR 檔案做為依附元件 (請參閱「針對連接器進行編譯」)

計算費用

本教學課程使用 Google Cloud的計費元件,包括:

  • Serverless for Apache Spark
  • BigQuery
  • Cloud Storage

使用 Pricing Calculator 可根據您的預測使用量來產生預估費用。

初次使用 Cloud Platform 的使用者可能符合免費試用的資格。

BigQuery I/O

這個範例會使用標準資料來源 API,將 BigQuery 中的資料讀取到 Spark DataFrame,以執行字數計算。

連接器會將字數統計輸出內容寫入 BigQuery,如下所示:

  1. 將資料緩衝處理到 Cloud Storage 值區中的暫時檔案

  2. 透過單一作業將資料從 Cloud Storage 值區複製到 BigQuery

  3. 在 BigQuery 載入作業完成後刪除 Cloud Storage 中的暫時檔案 (Spark 應用程式終止後,暫時檔案也會遭到刪除)。如果刪除失敗,您需要刪除任何不需要的臨時 Cloud Storage 檔案,這些檔案通常會放在 gs://YOUR_BUCKET/.spark-bigquery-JOB_ID-UUID 中。

設定帳單

根據預設,系統會向與憑證或服務帳戶相關聯的專案收取 API 使用費用。如要向其他專案收費,請設定下列設定:spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")

您也可以新增至讀取或寫入作業,如下所示: .option("parentProject", "<BILLED-GCP-PROJECT>")

提交 PySpark 字數統計批次工作負載

執行 Spark 批次工作負載,計算公開資料集中的字數。

  1. 開啟本機終端機或 Cloud Shell
  2. 在本地終端機或 Cloud Shell 中,使用 bq 指令列工具建立 wordcount_dataset
     bq mk wordcount_dataset 
  3. 使用 Google Cloud CLI 建立 Cloud Storage 值區。
     gcloud storage buckets create gs://YOUR_BUCKET 
    YOUR_BUCKET 替換為您建立的 Cloud Storage 值區名稱。
  4. 在文字編輯器中複製下列 PySpark 程式碼,以在本機建立 wordcount.py 檔案。
    #!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession  spark = SparkSession \   .builder \   .appName('spark-bigquery-demo') \   .getOrCreate()  # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "YOUR_BUCKET" spark.conf.set('temporaryGcsBucket', bucket)  # Load data from BigQuery. words = spark.read.format('bigquery') \   .option('table', 'bigquery-public-data:samples.shakespeare') \   .load() words.createOrReplaceTempView('words')  # Perform word count. word_count = spark.sql(     'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema()  # Saving the data to BigQuery word_count.write.format('bigquery') \   .option('table', 'wordcount_dataset.wordcount_output') \   .save()
  5. 提交 PySpark 批次工作負載:
     gcloud dataproc batches submit pyspark wordcount.py \     --region=REGION \     --deps-bucket=YOUR_BUCKET 
    終端機輸出範例:
     ... +---------+----------+ |     word|word_count| +---------+----------+ |     XVII|         2| |    spoil|        28| |    Drink|         7| |forgetful|         5| |   Cannot|        46| |    cures|        10| |   harder|        13| |  tresses|         3| |      few|        62| |  steel'd|         5| | tripping|         7| |   travel|        35| |   ransom|        55| |     hope|       366| |       By|       816| |     some|      1169| |    those|       508| |    still|       567| |      art|       893| |    feign|        10| +---------+----------+ only showing top 20 rows  root  |-- word: string (nullable = false)  |-- word_count: long (nullable = true) 

    如要在 Google Cloud 控制台中預覽輸出資料表,請開啟專案的 BigQuery 頁面,選取 wordcount_output 資料表,然後按一下「預覽」

瞭解詳情