在 Dataproc 執行自然語言處理的 PySpark

1. 總覽

自然語言處理 (NLP) 旨在研究文字資料,並進行分析,隨著透過網際網路產生的文字數量持續增加,現今的機構也越來越常運用網路文字取得與業務相關的資訊。

自然語言處理功能可用於各種用途,包括翻譯語言、分析情緒,以及從頭開始生成語句等等。我們在研究領域投入了大量心力,徹底改變了我們處理文字的方式。

我們將探討如何運用自然語言處理大量文字資料。這實在是一項艱鉅的任務!幸好,我們將運用 Spark MLlibspark-nlp 等程式庫來簡化這項作業。

2. 我們的應用實例

我們 (虛構) 機構「FoodCorp」的首席數據資料學家想深入瞭解食品業的趨勢。我們可以透過 Reddit subreddit r/food 中以貼文形式取得文字資料資料庫,日後就能探索人們談論時的話題。

其中一種方法是使用名為「主題模型」的自然語言處理方法。主題模型是一種統計方法,可識別一組文件語意含義的趨勢。換句話說,我們可以在 Reddit「posts」的語料庫上建構主題模型系統會產生「主題」清單或一組描述趨勢的字詞群組。

建立模型時,我們會使用 Latent Dirichlet 配置 (LDA) 演算法,通常用於將文字分群。如需 LDA 的詳細介紹,請按這裡

3. 建立專案

如果您還沒有 Google 帳戶 (Gmail 或 Google Apps),請先建立帳戶。登入 Google Cloud Platform 控制台 ( console.cloud.google.com),並建立新專案:

7e541d932b20c074.png

2deefc9295d114ea.png

螢幕截圖取自 2016-02-10 12:45:26.png

接下來,您需要在 Cloud 控制台中啟用計費功能,才能使用 Google Cloud 資源。

執行本程式碼研究室的成本不應超過數美元,但如果您決定使用更多資源或讓資源繼續運作,費用會增加。PySpark-BigQuerySpark-NLP 程式碼研究室會介紹「清除」功能網址。

新使用者符合 $300 美元免費試用資格的 Google Cloud Platform。

4. 設定我們的環境

首先,我們必須啟用 Dataproc 和 Compute Engine API。

按一下畫面左上方的「選單」圖示。

2bfc27ef9ba2ec7d.png

從下拉式選單中選取 API 管理員。

408af5f32c4b7c25.png

按一下「啟用 API 和服務」

a9c0e84296a7ba5b.png

搜尋「Compute Engine」。按一下「Google Compute Engine API」出現在畫面上的結果清單中

b6adf859758d76b3.png

在 Google Compute Engine 頁面上,按一下「啟用」

da5584a1cbc77104.png

啟用後,按一下向左箭頭即可返回。

現在搜尋「Google Dataproc API」然後啟用

f782195d8e3d732a.png

接著,按一下 Cloud 控制台右上角的按鈕,開啟 Cloud Shell:

a10c47ee6ca41c54.png

在接下來的程式碼研究室中,我們會設定一些環境變數以供參考。首先,為要建立的 Dataproc 叢集選擇名稱 (例如「my-cluster」),並在您的環境中設定該叢集。你可以任意命名。

CLUSTER_NAME=my-cluster 

接著,從這裡提供的其中一個可用區選取可用區。例如 us-east1-b.

REGION=us-east1 

最後,我們要設定工作要讀取資料的來源值區。值區 bm_reddit 中有可用的資料範例,但如果您在這項工作之前完成了使用 PySpark 預先處理 BigQuery 資料產生的資料,歡迎繼續使用。

BUCKET_NAME=bm_reddit 

設定好環境變數後,請執行下列指令,建立 Dataproc 叢集:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \      --region ${REGION} \      --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \      --worker-machine-type n1-standard-8 \      --num-workers 4 \      --image-version 1.4-debian10 \      --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \      --optional-components=JUPYTER,ANACONDA \      --enable-component-gateway 

以下是各項指令的步驟:

gcloud beta dataproc clusters create ${CLUSTER_NAME}:系統會使用您先前提供的名稱開始建立 Dataproc 叢集。這裡包含 beta,以啟用 Dataproc 的 Beta 版功能,例如元件閘道。

--zone=${ZONE}:這會設定叢集的位置。

--worker-machine-type n1-standard-8:這是工作站使用的類型

--num-workers 4:我們的叢集有四個工作站。

--image-version 1.4-debian9:這代表要使用的 Dataproc 映像檔版本。

--initialization-actions ...初始化動作是建立叢集和工作站時執行的自訂指令碼。可以是使用者建立並儲存在 GCS 值區,也可以從公開值區 dataproc-initialization-actions 參照。這裡包含的初始化動作將允許使用 Pip 安裝 Python 套件 (隨附 --metadata 標記)。

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp':這是要安裝至 Dataproc 的套件清單 (以空格分隔)。在這個範例中,我們會安裝 google-cloud-storage Python 用戶端程式庫和 spark-nlp

--optional-components=ANACONDA選用元件是與 Dataproc 搭配使用的常見套件,會在建立 Dataproc 叢集時自動安裝。相較於初始化動作,使用選用元件的優點包括啟動速度更快,以及針對特定 Dataproc 版本進行測試。總體來說,它們比較可靠。

--enable-component-gateway:這個旗標讓我們利用 Dataproc 的元件閘道來查看常見的 UI,例如 Zeppelin、Jupyter 或 Spark 歷史記錄。注意:其中部分元素需要相關的「選用元件」。

如要深入瞭解 Dataproc,請參閱這個程式碼研究室

接著,在 Cloud Shell 中執行下列指令,使用程式碼範例複製存放區,並將 cd 複製到正確的目錄:

cd git clone https://github.com/GoogleCloudPlatform/cloud-dataproc cd cloud-dataproc/codelabs/spark-nlp 

5. Spark MLlib

Spark MLlib 是以 Apache Spark 編寫的可擴充機器學習程式庫,MLlib 利用一組經過微調的機器學習演算法,充分發揮 Spark 的效率,進而分析大量資料。且有 Java、Scala、Python 和 R 版本的 API。在本程式碼研究室中,我們會特別著重於 Python。

MLlib 包含大量的 Transformer 和 estimator。轉換器是一種可變動或更改資料的工具,通常是使用 transform() 函式,而估算器是預先建構的演算法,您可以用來訓練資料,通常是使用 fit() 函式。

轉換器的範例包括:

  • 符記化 (透過字詞字串建立數字向量)
  • one-hot 編碼 (建立代表字串中出現字詞的數字稀疏向量)
  • 停用字詞移除工具 (移除沒有在字串中新增語意值的字詞)

估算器的例子包括:

  • 分類 (這是蘋果還是橘子?)
  • 迴歸 (這個蘋果需要多少費用?)
  • 分群法 (彼此的蘋果有多相似?)
  • 決策樹 (如果顏色 == 橘色,則是橘色,否則為蘋果)
  • 降低維度 (可以從資料集中移除特徵,但仍能區分蘋果和橘子?)。

MLlib 也包含適用於機器學習的其他常見方法的工具,例如超參數調整、選擇和跨驗證。

此外,MLlib 包含 Pipelines API,可讓您使用可重新執行的不同轉換器建構資料轉換管道。

6. Spark-NLP

Spark-nlp 是由 John Snow Labs 建立的程式庫,用於使用 Spark 執行高效率的自然語言處理工作。其中包含了內建註解工具,可用於處理一般工作,例如:

  • 符記化 (透過字詞字串建立數字向量)
  • 建立字詞嵌入 (透過向量定義字詞之間的關係)
  • 語句標記 (哪些字詞是名詞?何謂動詞?)

在本程式碼研究室的涵蓋範圍之外,Spark-nlp 也與 TensorFlow 完美整合。

或許最重要的是,Spark-NLP 提供可輕鬆掛接至 MLlib 管線的元件,得以擴充 Spark MLlib 的功能。

7. 自然語言處理的最佳做法

從資料擷取實用資訊之前,我們必須先完成部分內部整理工作。我們提供的預先處理作業步驟如下:

代碼化

我們首先要做的第一件事是「權杖化」實體媒介包括儲存空間陣列 傳統硬碟、磁帶和 USB 隨身碟等包括擷取資料,並根據「符記」分割資料或字詞一般來說,我們會在這個步驟移除標點符號,並將所有字詞設為小寫。舉例來說,假設其中包含以下字串:What time is it? 權杖化後,這個句子會包含四個符記:「what" , "time", "is", "it". 我們不希望模型將 what 這個字詞視為兩個不同大小寫不同的字詞。此外,標點符號通常無法幫助我們更好地從字詞推論,因此我們也會刪除該字詞。

正規化

我們通常會使用「正規化」實體媒介包括儲存空間陣列 傳統硬碟、磁帶和 USB 隨身碟等這項功能會將意思相似的字詞替換為相同的字詞。例如,如果字詞是「fought」、「battled」和「dueled」然後正規化以取代「battled」和「dueled」並且顯示 "fought" 字樣。

字根變化

詞幹分析會取代字詞的根意義。例如「汽車」、「汽車」和「汽車」都會換成「汽車」,因為所有這些字詞在其根本上都是同一個字。

移除停用字詞

停用字詞是 "and" 等字詞和「the」通常不會對語句的語意含意新增值我們通常希望移除這些範例,以減少文字資料集中的雜訊。

8. 執行此工作

來看看要執行的工作。您可以在 cloud-dataproc/codelabs/spark-nlp/topic_model.py 找到這個程式碼。請花至少幾分鐘閱讀課程內容及相關留言,瞭解具體情況。我們也將重點說明以下幾個部分:

# Python imports import sys  # spark-nlp components. Each one is incorporated into our pipeline. from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer from sparknlp.base import DocumentAssembler, Finisher  # A Spark Session is how we interact with Spark SQL to create Dataframes from pyspark.sql import SparkSession  # These allow us to create a schema for our data from pyspark.sql.types import StructField, StructType, StringType, LongType  # Spark Pipelines allow us to sequentially add components such as transformers from pyspark.ml import Pipeline  # These are components we will incorporate into our pipeline. from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF  # LDA is our model of choice for topic modeling from pyspark.ml.clustering import LDA  # Some transformers require the usage of other Spark ML functions. We import them here from pyspark.sql.functions import col, lit, concat  # This will help catch some PySpark errors from pyspark.sql.utils import AnalysisException  # Assign bucket where the data lives try:     bucket = sys.argv[1] except IndexError:     print("Please provide a bucket name")     sys.exit(1)  # Create a SparkSession under the name "reddit". Viewable via the Spark UI spark = SparkSession.builder.appName("reddit topic model").getOrCreate()  # Create a three column schema consisting of two strings and a long integer fields = [StructField("title", StringType(), True),           StructField("body", StringType(), True),           StructField("created_at", LongType(), True)] schema = StructType(fields)  # We'll attempt to process every year / month combination below. years = ['2016', '2017', '2018', '2019'] months = ['01', '02', '03', '04', '05', '06',           '07', '08', '09', '10', '11', '12']  # This is the subreddit we're working with. subreddit = "food"  # Create a base dataframe. reddit_data = spark.createDataFrame([], schema)  # Keep a running list of all files that will be processed files_read = []  for year in years:     for month in months:          # In the form of <project-id>.<dataset>.<table>         gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"          # If the table doesn't exist we will simply continue and not         # log it into our "tables_read" list         try:             reddit_data = (                 spark.read.format('csv')                 .options(codec="org.apache.hadoop.io.compress.GzipCodec")                 .load(gs_uri, schema=schema)                 .union(reddit_data)             )              files_read.append(gs_uri)          except AnalysisException:             continue  if len(files_read) == 0:     print('No files read')     sys.exit(1)  # Replacing null values with their respective typed-equivalent is usually # easier to work with. In this case, we'll replace nulls with empty strings. # Since some of our data doesn't have a body, we can combine all of the text # for the titles and bodies so that every row has useful data.  df_train = (     reddit_data     # Replace null values with an empty string     .fillna("")     .select(          # Combine columns         concat(             # First column to concatenate. col() is used to specify that we're referencing a column             col("title"),             # Literal character that will be between the concatenated columns.             lit(" "),             # Second column to concatenate.             col("body")         # Change the name of the new column         ).alias("text")     ) )  # Now, we begin assembling our pipeline. Each component here is used to some transformation to the data. # The Document Assembler takes the raw text data and convert it into a format that can # be tokenized. It becomes one of spark-nlp native object types, the "Document". document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")  # The Tokenizer takes data that is of the "Document" type and tokenizes it. # While slightly more involved than this, this is effectively taking a string and splitting # it along ths spaces, so each word is its own string. The data then becomes the # spark-nlp native type "Token". tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")  # The Normalizer will group words together based on similar semantic meaning. normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")  # The Stemmer takes objects of class "Token" and converts the words into their # root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced # with the word "car". stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")  # The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp # components. For instance, we can now feed the data into components from Spark MLlib. finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")  # Stopwords are common words that generally don't add much detail to the meaning # of a body of text. In English, these are mostly "articles" such as the words "the" # and "of". stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")  # Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track # of the vocabulary that's being created so we can map our topics back to their # corresponding words. # TF (term frequency) creates a matrix that counts how many times each word in the # vocabulary appears in each body of text. This then gives each word a weight based # on its frequency. tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")  # Here we implement the IDF portion. IDF (Inverse document frequency) reduces # the weights of commonly-appearing words. idf = IDF(inputCol="raw_features", outputCol="features")  # LDA creates a statistical representation of how frequently words appear # together in order to create "topics" or groups of commonly appearing words. lda = LDA(k=10, maxIter=10)  # We add all of the transformers into a Pipeline object. Each transformer # will execute in the ordered provided to the "stages" parameter pipeline = Pipeline(     stages = [         document_assembler,         tokenizer,         normalizer,         stemmer,         finisher,         stopword_remover,         tf,         idf,         lda     ] )  # We fit the data to the model. model = pipeline.fit(df_train)  # Now that we have completed a pipeline, we want to output the topics as human-readable. # To do this, we need to grab the vocabulary generated from our pipeline, grab the topic # model and do the appropriate mapping.  The output from each individual component lives # in the model object. We can access them by referring to them by their position in # the pipeline via model.stages[<ind>]  # Let's create a reference our vocabulary. vocab = model.stages[-3].vocabulary  # Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(), # we load the output into a Python array. raw_topics = model.stages[-1].describeTopics().collect()  # Lastly, let's get the indices of the vocabulary terms from our topics topic_inds = [ind.termIndices for ind in raw_topics]  # The indices we just grab directly map to the term at position <ind> from our vocabulary. # Using the below code, we can generate the mappings from our topic indices to our vocabulary. topics = [] for topic in topic_inds:     _topic = []     for ind in topic:         _topic.append(vocab[ind])     topics.append(_topic)  # Let's see our topics! for i, topic in enumerate(topics, start=1):     print(f"topic {i}: {topic}") 

執行工作

現在讓我們開始執行工作。繼續執行下列指令:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\     --region ${REGION}\     --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\     --driver-log-levels root=FATAL \     topic_model.py \     -- ${BUCKET_NAME} 

這個指令可讓您使用 Dataproc Jobs API。加入 pyspark 指令,即可向叢集表示這是 PySpark 工作。我們提供叢集名稱、這裡提供的選用參數,以及內含工作的檔案名稱。在本例中,我們提供 --properties 參數,讓我們變更 Spark、Yarn 或 Dataproc 的各種屬性。我們即將變更 Spark 屬性 packages,以便通知 Spark 要在工作中納入 spark-nlp。我們也提供了 --driver-log-levels root=FATAL 參數,這些參數會隱藏 PySpark 中大部分的記錄輸出 (錯誤除外)。一般來說,Spark 記錄往往雜訊過多。

最後,-- ${BUCKET} 是 Python 指令碼本身的指令列引數,可提供值區名稱。請注意,--${BUCKET} 之間的空格。

執行工作幾分鐘後,您應該會看到包含我們的模型的輸出內容:

167f4c839385dcf0.png

太棒了!您能否查看模型的輸出結果,藉此推斷趨勢?那我們的?

從上方的輸出內容中,其中一個可能從主題 8 與早餐美食有關,以及主題 9 的甜點。

9. 清除

完成本快速入門導覽課程後,如要避免系統向您的 GCP 帳戶收取不必要的費用,請按照下列步驟操作:

  1. 刪除環境和您建立的 Cloud Storage 值區
  2. 刪除 Dataproc 環境

如果您只針對本程式碼研究室建立專案,也可以選擇刪除專案:

  1. 在 GCP 控制台中,前往「專案頁面。
  2. 在專案清單中,選取要刪除的專案,然後按一下「Delete」(刪除)
  3. 在方塊中輸入專案 ID,然後按一下「Shut down」(關閉) 即可刪除專案。

授權

本作品採用創用 CC 姓名標示 3.0 通用授權,以及 Apache 2.0 授權。