並行控制

本文說明如何搭配發布至主題的訊息使用並行控制。

並行控制可協助您覆寫用戶端程式庫用來發布訊息的預設背景 (I/O) 執行緒數量。這可讓發布端用戶端平行傳送訊息。

並行控制是 Pub/Sub 高階用戶端程式庫的可用功能。使用低階程式庫時,您也可以實作自己的並行控制。

並行控制支援取決於用戶端程式庫的程式設計語言。對於支援平行執行緒的語言實作 (例如 C++、Go 和 Java),用戶端程式庫會預設選擇執行緒數量。

本頁說明並行控制的概念,以及如何為發布商用戶端設定這項功能。如要設定訂閱端用戶端進行並行控制,請參閱透過並行控制處理更多訊息

事前準備

設定發布工作流程前,請務必先完成下列工作:

必要的角色

如要取得將訊息發布至主題所需的權限,請要求管理員為您授予主題的 Pub/Sub 發布者 (roles/pubsub.publisher) 身分與存取權管理角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

您或許還可透過自訂角色或其他預先定義的角色取得必要權限。

您需要其他權限,才能建立或更新主題和訂閱項目。

並行控制設定

並行控制變數的預設值和變數名稱可能因用戶端程式庫而異。舉例來說,在 Java 用戶端程式庫中,設定並行控制的方法為 setExecutorProvider()setChannelProvider()。詳情請參閱 API 參考說明文件

  • setExecutorProvider() 可讓你自訂用於處理發布回應的執行器供應商。舉例來說,您可以將執行器供應器變更為傳回單一共用執行器的供應器,並在多個發布者用戶端中,限制執行緒數量。這項設定有助於限制建立的執行緒數量。

  • setChannelProvider() 可讓您自訂用於開啟 Pub/Sub 連線的管道供應商。除非您想在多個發布商用戶端中使用相同管道,否則通常不需要設定這個值。在過多用戶端重複使用管道,可能會導致 GOAWAYENHANCE_YOUR_CALM 錯誤。如果在應用程式記錄或 Cloud Logs 中看到這些錯誤,請建立更多管道。

並行控制的程式碼範例

C++

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件

namespace pubsub = ::google::cloud::pubsub; using ::google::cloud::future; using ::google::cloud::GrpcBackgroundThreadPoolSizeOption; using ::google::cloud::Options; using ::google::cloud::StatusOr; [](std::string project_id, std::string topic_id) {   auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));   // Override the default number of background (I/O) threads. By default the   // library uses `std::thread::hardware_concurrency()` threads.   auto options = Options{}.set<GrpcBackgroundThreadPoolSizeOption>(8);   auto publisher = pubsub::Publisher(       pubsub::MakePublisherConnection(std::move(topic), std::move(options)));    std::vector<future<void>> ids;   for (char const* data : {"1", "2", "3", "go!"}) {     ids.push_back(         publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())             .then([data](future<StatusOr<std::string>> f) {               auto s = f.get();               if (!s) return;               std::cout << "Sent '" << data << "' (" << *s << ")\n";             }));   }   publisher.Flush();   // Block until they are actually sent.   for (auto& id : ids) id.get(); }

Go

以下範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的操作說明設定 Go 環境。詳情請參閱 Pub/Sub Go API 參考說明文件

import ( 	"context" 	"fmt" 	"io"  	"cloud.google.com/go/pubsub/v2" )  func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error { 	// projectID := "my-project-id" 	// topicID := "my-topic" 	// msg := "Hello World" 	ctx := context.Background() 	client, err := pubsub.NewClient(ctx, projectID) 	if err != nil { 		return fmt.Errorf("pubsub.NewClient: %w", err) 	} 	defer client.Close()  	// client.Publisher can be passed a topic ID (e.g. "my-topic") or 	// a fully qualified name (e.g. "projects/my-project/topics/my-topic"). 	// If a topic ID is provided, the project ID from the client is used. 	publisher := client.Publisher(topicID) 	publisher.PublishSettings.NumGoroutines = 1  	result := publisher.Publish(ctx, &pubsub.Message{Data: []byte(msg)}) 	// Block until the result is returned and a server-generated 	// ID is returned for the published message. 	id, err := result.Get(ctx) 	if err != nil { 		return fmt.Errorf("Get: %w", err) 	} 	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id) 	return nil } 

Java

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件

 import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.cloud.pubsub.v1.Publisher; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;  public class PublishWithConcurrencyControlExample {   public static void main(String... args) throws Exception {     // TODO(developer): Replace these variables before running the sample.     String projectId = "your-project-id";     String topicId = "your-topic-id";      publishWithConcurrencyControlExample(projectId, topicId);   }    public static void publishWithConcurrencyControlExample(String projectId, String topicId)       throws IOException, ExecutionException, InterruptedException {     TopicName topicName = TopicName.of(projectId, topicId);     Publisher publisher = null;     List<ApiFuture<String>> messageIdFutures = new ArrayList<>();      try {       // Provides an executor service for processing messages. The default       // `executorProvider` used by the publisher has a default thread count of       // 5 * the number of processors available to the Java virtual machine.       ExecutorProvider executorProvider =           InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();        // `setExecutorProvider` configures an executor for the publisher.       publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();        // schedule publishing one message at a time : messages get automatically batched       for (int i = 0; i < 100; i++) {         String message = "message " + i;         ByteString data = ByteString.copyFromUtf8(message);         PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();          // Once published, returns a server-assigned message id (unique within the topic)         ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);         messageIdFutures.add(messageIdFuture);       }     } finally {       // Wait on any pending publish requests.       List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();        System.out.println("Published " + messageIds.size() + " messages with concurrency control.");        if (publisher != null) {         // When finished with the publisher, shutdown to free up resources.         publisher.shutdown();         publisher.awaitTermination(1, TimeUnit.MINUTES);       }     }   } } 

Ruby

以下範例使用 Ruby Pub/Sub 用戶端程式庫 v3。如果您仍在使用第 2 版程式庫,請參閱 第 3 版遷移指南。如要查看 Ruby 第 2 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照「快速入門:使用用戶端程式庫」的操作說明設定 Ruby 環境。詳情請參閱 Pub/Sub Ruby API 參考說明文件

# topic_id = "your-topic-id"  pubsub = Google::Cloud::PubSub.new publisher = pubsub.publisher topic_id, async: {   threads: {     # Use exactly one thread for publishing message and exactly one thread     # for executing callbacks     publish:  1,     callback: 1   } }  publisher.publish_async "This is a test message." do |result|   raise "Failed to publish the message." unless result.succeeded?   puts "Message published asynchronously." end  # Stop the async_publisher to send all queued messages immediately. publisher.async_publisher.stop.wait!

後續步驟