1. 目標
概要
この Codelab では、Vertex AI Vision アプリケーションをエンドツーエンドで作成し、販売店の動画映像を使用してキューサイズをモニタリングすることに重点を置きます。事前トレーニング済みの専用モデルの占有率分析の組み込み機能を使用して、次のことを取得します。
- キューに立っている人の数を数えます。
- カウンターでサービスを受けている人数を数えます。
学習内容
- Vertex AI Vision でアプリケーションを作成してデプロイする方法
- 動画ファイルを使用して RTSP ストリームを設定し、Jupyter ノートブックから vaictl を使用して Vertex AI Vision にストリームを取り込む方法。
- 利用人数の分析モデルとそのさまざまな機能の使用方法。
- Vertex AI Vision のメディア ウェアハウス内の動画を検索する方法。
- 出力を BigQuery に接続する方法、モデルの json 出力から分析情報を抽出する SQL クエリを記述し、出力を使用して元の動画にラベルとアノテーションを付ける方法。
費用:
このラボを Google Cloud で実行するための総費用は約 $2 です。
2. 始める前に
プロジェクトを作成して API を有効にします。
- Google Cloud コンソールの [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。注: この手順で作成するリソースをそのまま保持する予定でない場合、既存のプロジェクトを選択するのではなく、新しいプロジェクトを作成してください。チュートリアルの終了後にそのプロジェクトを削除すれば、プロジェクトに関連するすべてのリソースを削除できます。プロジェクト セレクタに移動
- Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
- Compute Engine、Vertex API、Notebook API、Vision AI API を有効にします。API を有効にする
サービス アカウントを作成します。
- Google Cloud コンソールで [サービス アカウントの作成] ページに移動します。[サービス アカウントの作成] に移動
- プロジェクトを選択します。
- [サービス アカウント名] フィールドに名前を入力します。Google Cloud コンソールでは、この名前に基づいて [サービス アカウント ID] フィールドに値が設定されます。[サービス アカウントの説明] フィールドに説明を入力します。(クイックスタート用のサービス アカウントなど)。
- [作成して続行] をクリックします。
- プロジェクトへのアクセス権を付与するには、サービス アカウントに次のロールを付与します。
- Vision AI >Vision AI 編集者
- Compute Engine >Compute インスタンス管理者(ベータ版)
- BigQuery >BigQuery 管理者。
[ロールを選択] リストで、ロールを選択します。ロールを追加するには、[別のロールを追加] をクリックして各ロールを追加します。
- [続行] をクリックします。
- [完了] をクリックして、サービス アカウントの作成を完了します。ブラウザ ウィンドウは閉じないでください。次のステップでこれを使用します。
3. Jupyter ノートブックを設定する
Occupancy Analytics 内でアプリを作成する前に、アプリで使用するストリームを登録する必要があります。
このチュートリアルでは、動画をホストする Jupyter ノートブック インスタンスを作成し、そのストリーミング動画データをノートブックから送信します。当社が Jupyter ノートブックを使用している理由は、シェルコマンドやカスタムの前処理/後処理コードを 1 か所で実行できる柔軟性があるため、迅速なテストに非常に適しています。このノートブックは、以下の目的で使用します。
- rtsp サーバーをバックグラウンド プロセスとして実行
- vaictl コマンドをバックグラウンド プロセスとして実行する
- クエリと処理コードを実行して占有率分析の出力を分析する
Jupyter ノートブックを作成する
Jupyter ノートブック インスタンスから動画を送信するための最初のステップは、前のステップで作成したサービス アカウントを使用してノートブックを作成することです。
- コンソールで、[Vertex AI] ページに移動します。Vertex AI Workbench に移動
- [ユーザー管理のノートブック] をクリックする

- [新しいノートブック >Tensorflow Enterprise 2.6(LTS を含む)>GPU を使用しない場合

- Jupyter ノートブックの名前を入力します。詳しくは、リソースの命名規則をご覧ください。

- [詳細オプション] をクリックします。
- [Permissions Sections] まで下にスクロールします。
- [Compute Engine のデフォルトのサービス アカウントを使用する] オプションのチェックボックスをオフにします。
- 前の手順で作成したサービス アカウントのメールアドレスを追加します。[作成] をクリックします。

- インスタンスが作成されたら、[JUPYTERLAB を開く] をクリックします。
4. ノートブックを設定して動画をストリーミングする
Occupancy Analytics 内でアプリを作成する前に、アプリで使用するストリームを登録する必要があります。
このチュートリアルでは、Jupyter ノートブック インスタンスを使用して動画をホストし、そのストリーミング動画データをノートブック ターミナルから送信します。
vaictl コマンドライン ツールをダウンロードする
- 開いた Jupyterlab インスタンスで、ランチャーから [Notebook] を開きます。

- ノートブック セルで次のコマンドを使用して、Vertex AI Vision(vaictl)コマンドライン ツール、rtsp サーバー コマンドライン ツール、open-cv ツールをダウンロードします。
!wget -q https://github.com/aler9/rtsp-simple-server/releases/download/v0.20.4/rtsp-simple-server_v0.20.4_linux_amd64.tar.gz !wget -q https://github.com/google/visionai/releases/download/v0.0.4/visionai_0.0-4_amd64.deb !tar -xf rtsp-simple-server_v0.20.4_linux_amd64.tar.gz !pip install opencv-python --quiet !sudo apt-get -qq remove -y visionai !sudo apt-get -qq install -y ./visionai_0.0-4_amd64.deb !sudo apt-get -qq install -y ffmpeg 5. ストリーミング用の動画ファイルを取り込む
必要なコマンドライン ツールを使用してノートブック環境を設定したら、サンプル動画ファイルをコピーしてから、vaictl を使用して動画データを占有率分析アプリにストリーミングできます。
新しいストリームを登録する
- Vertex AI Vision の左側のパネルで [ストリーム] タブをクリックします。
- 上部にある [Register] ボタンをクリックします。

- [ストリーム名] に「queue-stream」と入力します。
- [リージョン] で、前の手順でノートブックの作成時に選択したのと同じリージョンを選択します。
- [Register] をクリックします。
VM にサンプル動画をコピーする
- ノートブックで、次の wget コマンドを使用してサンプル動画をコピーします。
!wget -q https://github.com/vagrantism/interesting-datasets/raw/main/video/collective_activity/seq25_h264.mp4 VM から動画をストリーミングしてストリームにデータを取り込む
- このローカル動画ファイルをアプリの入力ストリームに送信するには、ノートブックのセルで次のコマンドを使用します。次の変数置換を行う必要があります。
- PROJECT_ID: Google Cloud プロジェクト ID。
- LOCATION: 地域 ID。(us-central1 など)。詳細については、クラウドのロケーションをご覧ください。
- LOCAL_FILE: ローカルの動画ファイルのファイル名。例:
seq25_h264.mp4
PROJECT_ID='<Your Google Cloud project ID>' LOCATION='<Your stream location>' LOCAL_FILE='seq25_h264.mp4' STREAM_NAME='queue-stream' - rtsp-simple-server を起動し、rtsp プロトコルで動画ファイルをストリーミングします
import os import time import subprocess subprocess.Popen(["nohup", "./rtsp-simple-server"], stdout=open('rtsp_out.log', 'a'), stderr=open('rtsp_err.log', 'a'), preexec_fn=os.setpgrp) time.sleep(5) - FFmpeg コマンドライン ツールを使用して、RTSP ストリームで動画をループします。
subprocess.Popen(["nohup", "ffmpeg", "-re", "-stream_loop", "-1", "-i", LOCAL_FILE, "-c", "copy", "-f", "rtsp", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('ffmpeg_out.log', 'a'), stderr=open('ffmpeg_err.log', 'a'), preexec_fn=os.setpgrp) time.sleep(5) - vaictl コマンドライン ツールを使用して、RTSP サーバー URI から Vertex AI Vision ストリーム「queue-stream」に動画をストリーミングする確認します。
subprocess.Popen(["nohup", "vaictl", "-p", PROJECT_ID, "-l", LOCATION, "-c", "application-cluster-0", "--service-endpoint", "visionai.googleapis.com", "send", "rtsp", "to", "streams", "queue-stream", "--rtsp-uri", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('vaictl_out.log', 'a'), stderr=open('vaictl_err.log', 'a'), preexec_fn=os.setpgrp) vaictl の取り込みオペレーションを開始してから、動画がダッシュボードに表示されるまでに 100 秒ほどかかる場合があります。
ストリームの取り込みが可能になったら、Vertex AI Vision ダッシュボードの [ストリーム] タブでキュー ストリーム ストリームを選択して動画フィードを表示できます。

6. アプリケーションを作成する
最初のステップは、データを処理するアプリを作成することです。アプリは、以下を接続する自動パイプラインと考えることができます。
- データの取り込み: 動画フィードがストリームに取り込まれます。
- データ分析: 取り込み後に AI(コンピュータ ビジョン)モデルを追加できます。
- データ ストレージ: 2 つのバージョンの動画フィード(元のストリームと AI モデルによって処理されたストリーム)をメディア ウェアハウスに保存できます。
Google Cloud コンソールでは、アプリはグラフとして表されます。
空のアプリを作成する
アプリグラフにデータを入力するには、まず空のアプリを作成する必要があります。
Google Cloud コンソールでアプリを作成します。
- Google Cloud コンソールに移動します。
- Vertex AI Vision ダッシュボードの [アプリケーション] タブを開きます。[アプリケーション] タブに移動
- [Create] ボタンをクリックします。

- アプリ名として「queue-app'」と入力し、リージョンを選択します。
- [作成] をクリックします。
アプリ コンポーネント ノードを追加する
空のアプリケーションを作成したら、3 つのノードをアプリグラフに追加できます。
- 取り込みノード: ノートブックで作成した RTSP 動画サーバーから送信されたデータを取り込むストリーム リソース。
- 処理ノード: 取り込まれたデータを処理する占有率分析モデル。
- ストレージ ノード: 処理済みの動画を保存するメディア ウェアハウス。メタデータ ストアとして機能します。メタデータ ストアには、取り込まれた動画データに関する分析情報と、AI モデルによって推測された情報が含まれます。
コンソールでコンポーネント ノードをアプリに追加します。
- Vertex AI Vision ダッシュボードの [アプリケーション] タブを開きます。[アプリケーション] タブに移動
処理パイプラインのグラフ可視化が表示されます。
データ取り込みノードを追加する
- 入力ストリーム ノードを追加するには、サイドメニューの [コネクタ] セクションで [ストリーム] オプションを選択します。
- 表示された [ストリーム] メニューの [ソース] セクションで、[ストリームを追加] を選択します。
- [ストリームを追加] メニューで [queue-stream] を選択します。
- アプリグラフにストリームを追加するには、[Add streams] をクリックします。
データ処理ノードを追加する
- 宿泊人数モデルノードを追加するには、サイドメニューの [専用モデル] セクションで [定員の分析] オプションを選択します。
- [People] はデフォルトの選択のままにします。[Vehicles] がすでに選択されている場合は、オフにします。

- [Advanced Options] セクションで、[Create Active Zones/Lines ] をクリックします。

- ポリゴンツールを使用してアクティブ エリアを描画し、そのエリアにいる人数を数えます。それに応じてゾーンにラベルを付ける

- 上部の戻る矢印をクリックします。

- チェックボックスをクリックして、混雑を検知するための滞留時間の設定を追加します。

データ ストレージ ノードを追加する
- 出力先(ストレージ)ノードを追加するには、サイドメニューの [Connectors] セクションで [Vision AI Warehouse] オプションを選択します。
- [Vertex AI Warehouse] コネクタをクリックしてメニューを開き、[ウェアハウスを接続] をクリックします。
- [Connect warehouse] メニューで、[Create new warehouse] を選択します。ウェアハウスに「queue-warehouse」という名前を付け、TTL 期間は 14 日のままにします。
- [作成] ボタンをクリックしてウェアハウスを追加します。
7. 出力を BigQuery テーブルに接続する
BigQuery コネクタを Vertex AI Vision アプリに追加すると、接続されているアプリモデルの出力がすべてターゲット テーブルに取り込まれます。
独自の BigQuery テーブルを作成して、アプリに BigQuery コネクタを追加する際にそのテーブルを指定することも、Vertex AI Vision アプリ プラットフォームで自動的に作成することもできます。
テーブルの自動作成
Vertex AI Vision アプリ プラットフォームが自動的にテーブルを作成できるようにする場合は、BigQuery コネクタノードを追加する際にこのオプションを指定できます。
テーブルの自動作成を使用する場合は、次のデータセットとテーブルの条件が適用されます。
- データセット: 自動的に作成されるデータセットの名前は visionai_dataset です。
- テーブル: 自動的に作成されるテーブル名は visionai_dataset.APPLICATION_ID です。
- エラー処理:
- 同じデータセットに同じ名前のテーブルが存在する場合、自動作成は行われません。
- Vertex AI Vision ダッシュボードの [アプリケーション] タブを開きます。[アプリケーション] タブに移動
- リストのアプリケーション名の横にある [View app] を選択します。
- アプリケーション ビルダー ページの [Connectors] セクションで、[BigQuery] を選択します。
- [BigQuery のパス] フィールドは空のままにします。

- [store metadata from:] で [occupancy Analytics] のみを選択し、ストリームのチェックボックスをオフにします。
最終的なアプリグラフは次のようになります。

8. 使用するアプリをデプロイする
必要なコンポーネントをすべて含めてエンドツーエンドのアプリをビルドしたら、アプリの使用の最後のステップとしてアプリをデプロイします。
- Vertex AI Vision ダッシュボードの [アプリケーション] タブを開きます。[アプリケーション] タブに移動
- リストの queue-app アプリの横にある [View app] を選択します。
- [Studio] ページで、[デプロイ] ボタンをクリックします。
- 次の確認ダイアログで [Deploy] をクリックします。デプロイ オペレーションが完了するまでに数分かかることがあります。デプロイが完了すると、ノードの横に緑色のチェックマークが表示されます。

9. ストレージ ウェアハウス内の動画コンテンツを検索する
動画データを処理アプリに取り込むと、分析された動画データを表示し、利用人数の分析情報に基づいてデータを検索できます。
- Vertex AI Vision ダッシュボードの [ウェアハウス] タブを開きます。[ウェアハウス] タブに移動
- リストでキュー ウェアハウス ウェアハウスを見つけて、[アセットを表示] をクリックします。
- [People count] セクションで、[Min] の値を 1 に、[Max] の値を 5 に設定します。
- Vertex AI Vision のメディア ウェアハウスに保存されている処理済みの動画データをフィルタするには、[検索] をクリックします。

Google Cloud コンソールの検索条件に一致する動画データのビュー。
10. BigQuery テーブルを使用して出力にアノテーションを付けて分析する
- ノートブックで、セル内の次の変数を初期化します。
DATASET_ID='vision_ai_dataset' bq_table=f'{PROJECT_ID}.{DATASET_ID}.queue-app' frame_buffer_size=10000 frame_buffer_error_milliseconds=5 dashboard_update_delay_seconds=3 rtsp_url='rtsp://localhost:8554/seq25_h264' - 次に、以下のコードを使用して、rtsp ストリームからフレームをキャプチャします。
import cv2 import threading from collections import OrderedDict from datetime import datetime, timezone frame_buffer = OrderedDict() frame_buffer_lock = threading.Lock() stream = cv2.VideoCapture(rtsp_url) def read_frames(stream): global frames while True: ret, frame = stream.read() frame_ts = datetime.now(timezone.utc).timestamp() * 1000 if ret: with frame_buffer_lock: while len(frame_buffer) >= frame_buffer_size: _ = frame_buffer.popitem(last=False) frame_buffer[frame_ts] = frame frame_buffer_thread = threading.Thread(target=read_frames, args=(stream,)) frame_buffer_thread.start() print('Waiting for stream initialization') while not list(frame_buffer.keys()): pass print('Stream Initialized') - BigQuery テーブルからデータ タイムスタンプとアノテーション情報を pull し、キャプチャしたフレーム画像を保存するディレクトリを作成します。
from google.cloud import bigquery import pandas as pd client = bigquery.Client(project=PROJECT_ID) query = f""" SELECT MAX(ingestion_time) AS ts FROM `{bq_table}` """ bq_max_ingest_ts_df = client.query(query).to_dataframe() bq_max_ingest_epoch = str(int(bq_max_ingest_ts_df['ts'][0].timestamp()*1000000)) bq_max_ingest_ts = bq_max_ingest_ts_df['ts'][0] print('Preparing to pull records with ingestion time >', bq_max_ingest_ts) if not os.path.exists(bq_max_ingest_epoch): os.makedirs(bq_max_ingest_epoch) print('Saving output frames to', bq_max_ingest_epoch) - 次のコードを使用してフレームにアノテーションを付けます。
import json import base64 import numpy as np from IPython.display import Image, display, HTML, clear_output im_width = stream.get(cv2.CAP_PROP_FRAME_WIDTH) im_height = stream.get(cv2.CAP_PROP_FRAME_HEIGHT) dashdelta = datetime.now() framedata = {} cntext = lambda x: {y['entity']['labelString']: y['count'] for y in x} try: while True: try: annotations_df = client.query(f''' SELECT ingestion_time, annotation FROM `{bq_table}` WHERE ingestion_time > TIMESTAMP("{bq_max_ingest_ts}") ''').to_dataframe() except ValueError as e: continue bq_max_ingest_ts = annotations_df['ingestion_time'].max() for _, row in annotations_df.iterrows(): with frame_buffer_lock: frame_ts = np.asarray(list(frame_buffer.keys())) delta_ts = np.abs(frame_ts - (row['ingestion_time'].timestamp() * 1000)) delta_tx_idx = delta_ts.argmin() closest_ts_delta = delta_ts[delta_tx_idx] closest_ts = frame_ts[delta_tx_idx] if closest_ts_delta > frame_buffer_error_milliseconds: continue image = frame_buffer[closest_ts] annotations = json.loads(row['annotation']) for box in annotations['identifiedBoxes']: image = cv2.rectangle( image, ( int(box['normalizedBoundingBox']['xmin']*im_width), int(box['normalizedBoundingBox']['ymin']*im_height) ), ( int((box['normalizedBoundingBox']['xmin'] + box['normalizedBoundingBox']['width'])*im_width), int((box['normalizedBoundingBox']['ymin'] + box['normalizedBoundingBox']['height'])*im_height) ), (255, 0, 0), 2 ) img_filename = f"{bq_max_ingest_epoch}/{row['ingestion_time'].timestamp() * 1000}.png" cv2.imwrite(img_filename, image) binimg = base64.b64encode(cv2.imencode('.jpg', image)[1]).decode() curr_framedata = { 'path': img_filename, 'timestamp_error': closest_ts_delta, 'counts': { **{ k['annotation']['displayName'] : cntext(k['counts']) for k in annotations['stats']["activeZoneCounts"] }, 'full-frame': cntext(annotations['stats']["fullFrameCount"]) } } framedata[img_filename] = curr_framedata if (datetime.now() - dashdelta).total_seconds() > dashboard_update_delay_seconds: dashdelta = datetime.now() clear_output() display(HTML(f''' <h1>Queue Monitoring Application</h1> <p>Live Feed of the queue camera:</p> <p><img alt="" src="{img_filename}" style="float: left;"/></a></p> <table border="1" cellpadding="1" cellspacing="1" style="width: 500px;"> <caption>Current Model Outputs</caption> <thead> <tr><th scope="row">Metric</th><th scope="col">Value</th></tr> </thead> <tbody> <tr><th scope="row">Serving Area People Count</th><td>{curr_framedata['counts']['serving-zone']['Person']}</td></tr> <tr><th scope="row">Queueing Area People Count</th><td>{curr_framedata['counts']['queue-zone']['Person']}</td></tr> <tr><th scope="row">Total Area People Count</th><td>{curr_framedata['counts']['full-frame']['Person']}</td></tr> <tr><th scope="row">Timestamp Error</th><td>{curr_framedata['timestamp_error']}</td></tr> </tbody> </table> <p> </p> ''')) except KeyboardInterrupt: print('Stopping Live Monitoring') 
- ノートブックのメニューバーの [Stop] ボタンを使用してアノテーション タスクを停止する

- 次のコードを使用すると、個々のフレームに再度アクセスできます。
from IPython.html.widgets import Layout, interact, IntSlider imgs = sorted(list(framedata.keys())) def loadimg(frame): display(framedata[imgs[frame]]) display(Image(open(framedata[imgs[frame]]['path'],'rb').read())) interact(loadimg, frame=IntSlider( description='Frame #:', value=0, min=0, max=len(imgs)-1, step=1, layout=Layout(width='100%'))) 
11. 完了
これでラボは終了です。
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。
プロジェクトの削除
リソースを個別に削除する
リソース
https://cloud.google.com/vision-ai/docs/overview
https://cloud.google.com/vision-ai/docs/occupancy-count-tutorial