本指南提供了一个完整的端到端工作流,用于使用 Google Cloud 的 Vertex AI 平台和 Gemini 2.5 Flash 训练模型并对图像素材资源进行分类。您将学习如何在 Python Colab 环境中集成 BigQuery 以进行数据检索、集成 Cloud Storage 以进行资源管理,以及集成 Vertex AI 以进行机器学习推理。
配置
在运行代码示例之前,请设置以下特定于项目的变量:
PROJECT_ID = "PROJECT_ID" REGION = "REGION " # e.g., "us-central1" LOCATION = "LOCATION " # e.g., "us" CUSTOMER_ID = "CUSTOMER_ID" # required to subscribe to the dataset 环境设置
安装必需的依赖项并配置身份验证以访问 Google Cloud 服务:
# Install Google Cloud SDK dependencies for AI Platform integration !pip install google-cloud-aiplatform google-cloud-storage google-cloud-bigquery google-cloud-bigquery-data-exchange -q # Import core libraries for cloud services and machine learning operations import json import os from google.cloud import bigquery import vertexai from vertexai.generative_models import GenerativeModel, Part # Configure authentication for Google Cloud service access # Initiates OAuth flow in new browser tab if authentication required from google.colab import auth if os.environ.get("VERTEX_PRODUCT") != "COLAB_ENTERPRISE": from google.colab import auth auth.authenticate_user(project_id=PROJECT_ID) # Initialize Vertex AI client with project configuration vertexai.init(project=PROJECT_ID, location=REGION) print(f"Vertex AI initialized for project: {PROJECT_ID} in region: {REGION}") 订阅 Analytics Hub 数据集
您还必须订阅 Analytics Hub 数据集。
from google.cloud import bigquery_data_exchange_v1beta1 ah_client = bigquery_data_exchange_v1beta1.AnalyticsHubServiceClient() HUB_PROJECT_ID = 'maps-platform-analytics-hub' DATA_EXCHANGE_ID = f"imagery_insights_exchange_{LOCATION}" LINKED_DATASET_NAME = f"imagery_insights___preview___{LOCATION}" # subscribe to the listing (create a linked dataset in your consumer project) destination_dataset = bigquery_data_exchange_v1beta1.DestinationDataset() destination_dataset.dataset_reference.dataset_id = LINKED_DATASET_NAME destination_dataset.dataset_reference.project_id = PROJECT_ID destination_dataset.location = LOCATION LISTING_ID=f"imagery_insights_{CUSTOMER_ID.replace('-', '_')}__{LOCATION}" published_listing = f"projects/{HUB_PROJECT_ID}/locations/{LOCATION}/dataExchanges/{DATA_EXCHANGE_ID}/listings/{LISTING_ID}" request = bigquery_data_exchange_v1beta1.SubscribeListingRequest( destination_dataset=destination_dataset, name=published_listing, ) # request the subscription ah_client.subscribe_listing(request=request) 使用 BigQuery 提取数据
执行 BigQuery 查询,从 latest_observations 表中提取 Google Cloud Storage URI。这些 URI 将直接传递给 Vertex AI 模型以进行分类。
# Initialize BigQuery client bigquery_client = bigquery.Client(project=PROJECT_ID) # Define SQL query to retrieve observation records from imagery dataset query = f""" SELECT * FROM `{PROJECT_ID}.imagery_insights___preview___{LOCATION}.latest_observations` LIMIT 10; """ print(f"Executing BigQuery query:\n{query}") # Submit query job to BigQuery service and await completion query_job = bigquery_client.query(query) # Transform query results into structured data format for downstream processing # Convert BigQuery Row objects to dictionary representations for enhanced accessibility query_response_data = [] for row in query_job: query_response_data.append(dict(row)) # Extract Cloud Storage URIs from result set, filtering null values gcs_uris = [item.get("gcs_uri") for item in query_response_data if item.get("gcs_uri")] print(f"BigQuery query returned {len(query_response_data)} records.") print(f"Extracted {len(gcs_uris)} GCS URIs:") for uri in gcs_uris: print(uri) 图片分类函数
此辅助函数使用 Vertex AI 的 Gemini 2.5 Flash 模型处理图片分类:
def classify_image_with_gemini(gcs_uri: str, prompt: str = "What is in this image?") -> str: """ Performs multimodal image classification using Vertex AI's Gemini 2.5 Flash model. Leverages direct Cloud Storage integration to process image assets without local download requirements, enabling scalable batch processing workflows. Args: gcs_uri (str): Fully qualified Google Cloud Storage URI (format: gs://bucket-name/path/to/image.jpg) prompt (str): Natural language instruction for classification task execution Returns: str: Generated textual description from the generative model, or error message if classification pipeline fails Raises: Exception: Captures service-level errors and returns structured failure response """ try: # Instantiate Gemini 2.5 Flash model for inference operations model = GenerativeModel("gemini-2.5-flash") # Construct multimodal Part object from Cloud Storage reference # Note: MIME type may need dynamic inference for mixed image formats image_part = Part.from_uri(uri=gcs_uri, mime_type="image/jpeg") # Execute multimodal inference request with combined visual and textual inputs responses = model.generate_content([image_part, prompt]) return responses.text except Exception as e: print(f"Error classifying image from URI {gcs_uri}: {e}") return "Classification failed." 批量图片分类
处理所有提取的 URI 并生成分类:
classification_results = [] # Execute batch classification pipeline across all extracted GCS URIs for uri in gcs_uris: print(f"\nProcessing: {uri}") # Define comprehensive classification prompt for detailed feature extraction classification_prompt = "Describe this image in detail, focusing on any objects, signs, or features visible." # Invoke Gemini model for multimodal inference on current asset result = classify_image_with_gemini(uri, classification_prompt) # Aggregate structured results for downstream analytics and reporting classification_results.append({"gcs_uri": uri, "classification": result}) print(f"Classification for {uri}:\n{result}") 后续步骤
对图片进行分类后,您可以考虑以下高级工作流程:
- 模型微调:使用分类结果训练自定义模型。
- 自动处理:设置 Cloud Functions 以自动对新图片进行分类。
- 数据分析:对分类模式执行统计分析。
- 集成:将结果连接到下游应用。
问题排查
常见问题和解决方案:
- 身份验证错误:确保 IAM 角色和 API 启用情况正确无误。
- 速率限制:针对大型批次实现指数退避。
- 内存限制:对于大型数据集,以较小的批次处理图片。
- URI 格式错误:验证 GCS URI 是否采用
gs://bucket-name/path/to/image格式。
如需更多支持,请参阅 Vertex AI 文档和 BigQuery 文档。