批次傳送訊息

批次訊息傳遞功能會建立採用自訂批次設定的發布端用戶端,並透過該用戶端發布幾則訊息。

本文說明如何使用批次訊息傳遞功能,將訊息發布至主題。

事前準備

設定發布工作流程前,請務必先完成下列工作:

必要的角色

如要取得將訊息發布至主題所需的權限,請要求管理員授予主題的 Pub/Sub 發布者 (roles/pubsub.publisher) 身分與存取權管理角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

您或許還可透過自訂角色或其他預先定義的角色取得必要權限。

您需要其他權限,才能建立或更新主題和訂閱項目。

使用批次訊息傳遞功能

請參閱下列程式碼範例,瞭解如何為發布商設定批次訊息傳送設定。

C++

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件

namespace pubsub = ::google::cloud::pubsub; using ::google::cloud::future; using ::google::cloud::Options; using ::google::cloud::StatusOr; [](std::string project_id, std::string topic_id) {   auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));   // By default, the publisher will flush a batch after 10ms, after it   // contains more than 100 message, or after it contains more than 1MiB of   // data, whichever comes first. This changes those defaults.   auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(       std::move(topic),       Options{}           .set<pubsub::MaxHoldTimeOption>(std::chrono::milliseconds(20))           .set<pubsub::MaxBatchBytesOption>(4 * 1024 * 1024L)           .set<pubsub::MaxBatchMessagesOption>(200)));    std::vector<future<void>> ids;   for (char const* data : {"1", "2", "3", "go!"}) {     ids.push_back(         publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())             .then([data](future<StatusOr<std::string>> f) {               auto s = f.get();               if (!s) return;               std::cout << "Sent '" << data << "' (" << *s << ")\n";             }));   }   publisher.Flush();   // Block until they are actually sent.   for (auto& id : ids) id.get(); }

C#

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C# 設定操作說明進行操作。詳情請參閱 Pub/Sub C# API 參考說明文件

 using Google.Api.Gax; using Google.Cloud.PubSub.V1; using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks;  public class PublishBatchedMessagesAsyncSample {     public async Task<int> PublishBatchMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)     {         TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);          // Default Settings:         // byteCountThreshold: 1000000         // elementCountThreshold: 100         // delayThreshold: 10 milliseconds         var customSettings = new PublisherClient.Settings         {             BatchingSettings = new BatchingSettings(                 elementCountThreshold: 50,                 byteCountThreshold: 10240,                 delayThreshold: TimeSpan.FromMilliseconds(500))         };          PublisherClient publisher = await new PublisherClientBuilder         {             TopicName = topicName,             Settings = customSettings         }.BuildAsync();          int publishedMessageCount = 0;         var publishTasks = messageTexts.Select(async text =>         {             try             {                 string message = await publisher.PublishAsync(text);                 Console.WriteLine($"Published message {message}");                 Interlocked.Increment(ref publishedMessageCount);             }             catch (Exception exception)             {                 Console.WriteLine($"An error occurred when publishing message {text}: {exception.Message}");             }         });         await Task.WhenAll(publishTasks);         // PublisherClient instance should be shutdown after use.         // The TimeSpan specifies for how long to attempt to publish locally queued messages.         await publisher.ShutdownAsync(TimeSpan.FromSeconds(15));         return publishedMessageCount;     } }

Go

以下範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的操作說明設定 Go 環境。詳情請參閱 Pub/Sub Go API 參考說明文件

import ( 	"context" 	"fmt" 	"io" 	"strconv" 	"time"  	"cloud.google.com/go/pubsub/v2" )  func publishWithSettings(w io.Writer, projectID, topicID string) error { 	// projectID := "my-project-id" 	// topicID := "my-topic" 	ctx := context.Background() 	client, err := pubsub.NewClient(ctx, projectID) 	if err != nil { 		return fmt.Errorf("pubsub.NewClient: %w", err) 	} 	defer client.Close()  	// client.Publisher can be passed a topic ID (e.g. "my-topic") or 	// a fully qualified name (e.g. "projects/my-project/topics/my-topic"). 	// If a topic ID is provided, the project ID from the client is used. 	publisher := client.Publisher(topicID) 	publisher.PublishSettings.ByteThreshold = 5000 	publisher.PublishSettings.CountThreshold = 10 	publisher.PublishSettings.DelayThreshold = 100 * time.Millisecond  	var results []*pubsub.PublishResult 	var resultErrors []error 	for i := 0; i < 10; i++ { 		result := publisher.Publish(ctx, &pubsub.Message{ 			Data: []byte("Message " + strconv.Itoa(i)), 		}) 		results = append(results, result) 	} 	// The Get method blocks until a server-generated ID or 	// an error is returned for the published message. 	for i, res := range results { 		id, err := res.Get(ctx) 		if err != nil { 			resultErrors = append(resultErrors, err) 			fmt.Fprintf(w, "Failed to publish: %v", err) 			continue 		} 		fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id) 	} 	if len(resultErrors) != 0 { 		return fmt.Errorf("Get: %v", resultErrors[len(resultErrors)-1]) 	} 	fmt.Fprintf(w, "Published messages with batch settings.") 	return nil } 

Java

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件

 import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.gax.batching.BatchingSettings; import com.google.cloud.pubsub.v1.Publisher; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.threeten.bp.Duration;  public class PublishWithBatchSettingsExample {   public static void main(String... args) throws Exception {     // TODO(developer): Replace these variables before running the sample.     String projectId = "your-project-id";     String topicId = "your-topic-id";      publishWithBatchSettingsExample(projectId, topicId);   }    public static void publishWithBatchSettingsExample(String projectId, String topicId)       throws IOException, ExecutionException, InterruptedException {     TopicName topicName = TopicName.of(projectId, topicId);     Publisher publisher = null;     List<ApiFuture<String>> messageIdFutures = new ArrayList<>();      try {       // Batch settings control how the publisher batches messages       long requestBytesThreshold = 5000L; // default : 1000 bytes       long messageCountBatchSize = 100L; // default : 100 message        Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms        // Publish request get triggered based on request size, messages count & time since last       // publish, whichever condition is met first.       BatchingSettings batchingSettings =           BatchingSettings.newBuilder()               .setElementCountThreshold(messageCountBatchSize)               .setRequestByteThreshold(requestBytesThreshold)               .setDelayThreshold(publishDelayThreshold)               .build();        // Create a publisher instance with default settings bound to the topic       publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();        // schedule publishing one message at a time : messages get automatically batched       for (int i = 0; i < 100; i++) {         String message = "message " + i;         ByteString data = ByteString.copyFromUtf8(message);         PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();          // Once published, returns a server-assigned message id (unique within the topic)         ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);         messageIdFutures.add(messageIdFuture);       }     } finally {       // Wait on any pending publish requests.       List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();        System.out.println("Published " + messageIds.size() + " messages with batch settings.");        if (publisher != null) {         // When finished with the publisher, shutdown to free up resources.         publisher.shutdown();         publisher.awaitTermination(1, TimeUnit.MINUTES);       }     }   } }

Node.js

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**  * TODO(developer): Uncomment these variables before running the sample.  */ // const topicName = 'YOUR_TOPIC_NAME'; // const data = JSON.stringify({foo: 'bar'}); // const maxMessages = 10; // const maxWaitTime = 10;  // Imports the Google Cloud client library const {PubSub} = require('@google-cloud/pubsub');  // Creates a client; cache this for further use const pubSubClient = new PubSub();  async function publishBatchedMessages(   topicNameOrId,   data,   maxMessages,   maxWaitTime, ) {   // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)   const dataBuffer = Buffer.from(data);    // Cache topic objects (publishers) and reuse them.   const publishOptions = {     batching: {       maxMessages: maxMessages,       maxMilliseconds: maxWaitTime * 1000,     },   };   const batchPublisher = pubSubClient.topic(topicNameOrId, publishOptions);    const promises = [];   for (let i = 0; i < 10; i++) {     promises.push(       (async () => {         const messageId = await batchPublisher.publishMessage({           data: dataBuffer,         });         console.log(`Message ${messageId} published.`);       })(),     );   }   await Promise.all(promises); }

Node.js

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**  * TODO(developer): Uncomment these variables before running the sample.  */ // const topicName = 'YOUR_TOPIC_NAME'; // const data = JSON.stringify({foo: 'bar'}); // const maxMessages = 10; // const maxWaitTime = 10;  // Imports the Google Cloud client library import {PublishOptions, PubSub} from '@google-cloud/pubsub';  // Creates a client; cache this for further use const pubSubClient = new PubSub();  async function publishBatchedMessages(   topicNameOrId: string,   data: string,   maxMessages: number,   maxWaitTime: number, ) {   // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)   const dataBuffer = Buffer.from(data);    // Cache topic objects (publishers) and reuse them.   const publishOptions: PublishOptions = {     batching: {       maxMessages: maxMessages,       maxMilliseconds: maxWaitTime * 1000,     },   };   const batchPublisher = pubSubClient.topic(topicNameOrId, publishOptions);    const promises: Promise<void>[] = [];   for (let i = 0; i < 10; i++) {     promises.push(       (async () => {         const messageId = await batchPublisher.publishMessage({           data: dataBuffer,         });         console.log(`Message ${messageId} published.`);       })(),     );   }   await Promise.all(promises); }

PHP

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 PHP 設定說明進行操作。 詳情請參閱 Pub/Sub PHP API 參考說明文件

use Google\Cloud\PubSub\PubSubClient;  /**  * Publishes a message for a Pub/Sub topic.  *  * The publisher should be used in conjunction with the `google-cloud-batch`  * daemon, which should be running in the background.  *  * To start the daemon, from your project root call `vendor/bin/google-cloud-batch daemon`.  *  * @param string $projectId  The Google project ID.  * @param string $topicName  The Pub/Sub topic name.  * @param string $message    The message to publish.  */ function publish_message_batch($projectId, $topicName, $message) {     // Check if the batch daemon is running.     if (getenv('IS_BATCH_DAEMON_RUNNING') !== 'true') {         trigger_error(             'The batch daemon is not running. Call ' .             '`vendor/bin/google-cloud-batch daemon` from ' .             'your project root to start the daemon.',             E_USER_NOTICE         );     }      $batchOptions = [         'batchSize' => 100, // Max messages for each batch.         'callPeriod' => 0.01, // Max time in seconds between each batch publish.     ];      $pubsub = new PubSubClient([         'projectId' => $projectId,     ]);     $topic = $pubsub->topic($topicName);     $publisher = $topic->batchPublisher([         'batchOptions' => $batchOptions     ]);      for ($i = 0; $i < 10; $i++) {         $publisher->publish(['data' => $message]);     }      print('Messages enqueued for publication.' . PHP_EOL); }

Python

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Python 設定操作說明來進行。詳情請參閱 Pub/Sub Python API 參考說明文件

from concurrent import futures from google.cloud import pubsub_v1  # TODO(developer) # project_id = "your-project-id" # topic_id = "your-topic-id"  # Configure the batch to publish as soon as there are 10 messages # or 1 KiB of data, or 1 second has passed. batch_settings = pubsub_v1.types.BatchSettings(     max_messages=10,  # default 100     max_bytes=1024,  # default 1 MB     max_latency=1,  # default 10 ms ) publisher = pubsub_v1.PublisherClient(batch_settings) topic_path = publisher.topic_path(project_id, topic_id) publish_futures = []  # Resolve the publish future in a separate thread. def callback(future: pubsub_v1.publisher.futures.Future) -> None:     message_id = future.result()     print(message_id)  for n in range(1, 10):     data_str = f"Message number {n}"     # Data must be a bytestring     data = data_str.encode("utf-8")     publish_future = publisher.publish(topic_path, data)     # Non-blocking. Allow the publisher client to batch multiple messages.     publish_future.add_done_callback(callback)     publish_futures.append(publish_future)  futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)  print(f"Published messages with batch settings to {topic_path}.")

Ruby

以下範例使用 Ruby Pub/Sub 用戶端程式庫 v3。如果您仍在使用第 2 版程式庫,請參閱 第 3 版遷移指南。如要查看 Ruby 第 2 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照「快速入門:使用用戶端程式庫」的操作說明設定 Ruby 環境。詳情請參閱 Pub/Sub Ruby API 參考說明文件

# topic_id = "your-topic-id"  pubsub = Google::Cloud::PubSub.new # Start sending messages in one request once the size of all queued messages # reaches 1 MB or the number of queued messages reaches 20 publisher = pubsub.publisher topic_id, async: {   max_bytes:    1_000_000,   max_messages: 20 }  10.times do |i|   publisher.publish_async "This is message ##{i}." end  # Stop the async_publisher to send all queued messages immediately. publisher.async_publisher.stop.wait! puts "Messages published asynchronously in batch."

停用批次訊息傳遞功能

如要在用戶端程式庫中停用批次處理,請將 max_messages 的值設為 1。

批次傳訊和依序傳送

如果啟用依序傳送功能,但未確認批次中的任何訊息,系統就會重新傳送批次中的所有訊息,包括在未確認訊息之前傳送的訊息。

批次訊息的配額和限制

設定批次訊息傳送功能前,請先考量發布輸送量配額和批次大小上限等因素的影響。高階用戶端程式庫可確保批次要求不超過指定限制。

  • 即使實際訊息大小可能小於 1000 個位元組,系統仍會以 1000 個位元組做為計算費用的最小要求大小。
  • Pub/Sub 對單一批次發布要求設有限制,大小不得超過 10 MB,訊息數量不得超過 1,000 則。

詳情請參閱 Pub/Sub 配額與限制

後續步驟

如要瞭解如何設定進階發布選項,請參閱下列文章: