并发控制

本文档介绍了如何将并发控制与发布到主题的消息搭配使用。

并发控制有助于您替换客户端库用于发布消息的默认后台 (I/O) 线程数。这样,发布者客户端便可并行发送消息。

并发控制是 Pub/Sub 高级别客户端库中的一项可用功能。使用低级库时,您还可以实现自己的并发控制。

是否支持并发控制取决于客户端库的编程语言。对于支持并行线程的语言实现(如 C++、Go 和 Java),客户端库会默认选择线程数。

本页面介绍了并发控制的概念,以及如何为发布商客户端设置此功能。如需配置订阅者客户端以进行并发控制,请参阅通过并发控制处理更多消息

准备工作

在配置发布工作流之前,请确保您已完成以下任务:

所需的角色

如需获得向主题发布消息所需的权限,请让您的管理员为您授予主题的 Pub/Sub Publisher (roles/pubsub.publisher) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

您需要获得其他权限才能创建或更新主题和订阅。

并发控制配置

并发控制变量的默认值和变量名称可能因客户端库而异。例如,在 Java 客户端库中,用于配置并发控制的方法为 setExecutorProvider()setChannelProvider()。如需了解详情,请参阅 API 参考文档

  • setExecutorProvider() 可让您自定义用于处理发布响应的执行程序提供程序。例如,您可以将执行程序提供程序更改为在多个发布者客户端中返回线程数量有限的单个共享执行程序。此配置有助于限制创建的线程数。

  • 借助 setChannelProvider(),您可以自定义用于打开与 Pub/Sub 的连接的渠道提供程序。通常,您无需配置此值,除非您想在多个发布商客户端中使用同一渠道。在过多的客户端之间重复使用某个渠道可能会导致 GOAWAYENHANCE_YOUR_CALM 错误。如果您在应用的日志或 Cloud Logs 中看到这些错误,请创建更多渠道。

并发控制的代码示例

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub; using ::google::cloud::future; using ::google::cloud::GrpcBackgroundThreadPoolSizeOption; using ::google::cloud::Options; using ::google::cloud::StatusOr; [](std::string project_id, std::string topic_id) {   auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));   // Override the default number of background (I/O) threads. By default the   // library uses `std::thread::hardware_concurrency()` threads.   auto options = Options{}.set<GrpcBackgroundThreadPoolSizeOption>(8);   auto publisher = pubsub::Publisher(       pubsub::MakePublisherConnection(std::move(topic), std::move(options)));    std::vector<future<void>> ids;   for (char const* data : {"1", "2", "3", "go!"}) {     ids.push_back(         publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())             .then([data](future<StatusOr<std::string>> f) {               auto s = f.get();               if (!s) return;               std::cout << "Sent '" << data << "' (" << *s << ")\n";             }));   }   publisher.Flush();   // Block until they are actually sent.   for (auto& id : ids) id.get(); }

Go

以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅迁移到 v2 的指南。如需查看 v1 代码示例的列表,请参阅 已弃用的代码示例

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档

import ( 	"context" 	"fmt" 	"io"  	"cloud.google.com/go/pubsub/v2" )  func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error { 	// projectID := "my-project-id" 	// topicID := "my-topic" 	// msg := "Hello World" 	ctx := context.Background() 	client, err := pubsub.NewClient(ctx, projectID) 	if err != nil { 		return fmt.Errorf("pubsub.NewClient: %w", err) 	} 	defer client.Close()  	// client.Publisher can be passed a topic ID (e.g. "my-topic") or 	// a fully qualified name (e.g. "projects/my-project/topics/my-topic"). 	// If a topic ID is provided, the project ID from the client is used. 	publisher := client.Publisher(topicID) 	publisher.PublishSettings.NumGoroutines = 1  	result := publisher.Publish(ctx, &pubsub.Message{Data: []byte(msg)}) 	// Block until the result is returned and a server-generated 	// ID is returned for the published message. 	id, err := result.Get(ctx) 	if err != nil { 		return fmt.Errorf("Get: %w", err) 	} 	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id) 	return nil } 

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

 import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.cloud.pubsub.v1.Publisher; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;  public class PublishWithConcurrencyControlExample {   public static void main(String... args) throws Exception {     // TODO(developer): Replace these variables before running the sample.     String projectId = "your-project-id";     String topicId = "your-topic-id";      publishWithConcurrencyControlExample(projectId, topicId);   }    public static void publishWithConcurrencyControlExample(String projectId, String topicId)       throws IOException, ExecutionException, InterruptedException {     TopicName topicName = TopicName.of(projectId, topicId);     Publisher publisher = null;     List<ApiFuture<String>> messageIdFutures = new ArrayList<>();      try {       // Provides an executor service for processing messages. The default       // `executorProvider` used by the publisher has a default thread count of       // 5 * the number of processors available to the Java virtual machine.       ExecutorProvider executorProvider =           InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();        // `setExecutorProvider` configures an executor for the publisher.       publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();        // schedule publishing one message at a time : messages get automatically batched       for (int i = 0; i < 100; i++) {         String message = "message " + i;         ByteString data = ByteString.copyFromUtf8(message);         PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();          // Once published, returns a server-assigned message id (unique within the topic)         ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);         messageIdFutures.add(messageIdFuture);       }     } finally {       // Wait on any pending publish requests.       List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();        System.out.println("Published " + messageIds.size() + " messages with concurrency control.");        if (publisher != null) {         // When finished with the publisher, shutdown to free up resources.         publisher.shutdown();         publisher.awaitTermination(1, TimeUnit.MINUTES);       }     }   } } 

Ruby

以下示例使用 Ruby Pub/Sub 客户端库 v3。如果您仍在使用 v2 库,请参阅 迁移到 v3 的指南。如需查看 Ruby v2 代码示例的列表,请参阅 已弃用的代码示例

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

# topic_id = "your-topic-id"  pubsub = Google::Cloud::PubSub.new publisher = pubsub.publisher topic_id, async: {   threads: {     # Use exactly one thread for publishing message and exactly one thread     # for executing callbacks     publish:  1,     callback: 1   } }  publisher.publish_async "This is a test message." do |result|   raise "Failed to publish the message." unless result.succeeded?   puts "Message published asynchronously." end  # Stop the async_publisher to send all queued messages immediately. publisher.async_publisher.stop.wait!

后续步骤