將訊息發布至主題

這份文件提供發布訊息的相關資訊。

發布者應用程式可建立訊息,並將訊息傳送至「主題」。Pub/Sub 會為現有訂閱者提供至少一次的訊息傳送與最佳服務排序。

發布者應用程式的一般流程如下:

  1. 建立包含資料的訊息。
  2. 傳送請求至 Pub/Sub 伺服器,以將訊息發布至指定主題。

事前準備

設定發布工作流程前,請務必先完成下列工作:

必要的角色

如要取得將訊息發布至主題所需的權限,請要求管理員授予您主題的「Pub/Sub 發布者」 (roles/pubsub.publisher) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

您或許還可透過自訂角色或其他預先定義的角色取得必要權限。

您需要其他權限,才能建立或更新主題和訂閱項目。

訊息格式

訊息包含訊息資料和中繼資料的欄位。在訊息中指定至少下列其中一項:

Pub/Sub 服務會在訊息中加入下列欄位:

  • 主題專屬的訊息 ID
  • Pub/Sub 服務收到訊息時的時間戳記

如要進一步瞭解訊息,請參閱「訊息格式」。

發布訊息

您可以使用 Google Cloud 控制台、Google Cloud CLI、Pub/Sub API 和用戶端程式庫發布訊息。用戶端程式庫可以非同步發布訊息。

下列範例說明如何將訊息發布至主題。

控制台

如要發布訊息,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的「Pub/Sub topics」(Pub/Sub 主題) 頁面。

    前往 Pub/Sub 主題頁面

  2. 按一下主題 ID。

  3. 在「主題詳細資料」頁面的「訊息」下方,按一下「發布訊息」

  4. 在「郵件內文」欄位中輸入訊息資料。

  5. 按一下 [發布]

gcloud

如要發布訊息,請使用 gcloud pubsub topics publish 指令:

gcloud pubsub topics publish TOPIC_ID \   --message=MESSAGE_DATA \   [--attribute=KEY="VALUE",...]

更改下列內容:

  • TOPIC_ID:主題的 ID
  • MESSAGE_DATA:包含訊息資料的字串
  • KEY訊息屬性的鍵
  • VALUE:訊息屬性鍵的值

REST

如要發布訊息,請傳送類似下列內容的 POST 要求:

 POST  https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID:publish Content-Type: application/json Authorization: Bearer $(gcloud auth application-default print-access-token) 

更改下列內容:

  • PROJECT_ID:含有主題的專案 ID
  • TOPIC_ID:主題的 ID

指定以下要求主體欄位:

{   "messages": [     {       "attributes": {         "KEY": "VALUE",         ...       },       "data": "MESSAGE_DATA",     }   ] }

更改下列內容:

  • KEY訊息屬性的鍵
  • VALUE:訊息屬性鍵的值
  • MESSAGE_DATA:採用 Base64 編碼的字串,內含訊息資料

訊息必須包含非空白資料欄位,或至少一個屬性。

如果要求成功,回應會是含有訊息 ID 的 JSON 物件。以下是含有訊息 ID 的回應範例:

{   "messageIds": [     "19916711285",   ] }

C++

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件

namespace pubsub = ::google::cloud::pubsub; using ::google::cloud::future; using ::google::cloud::StatusOr; [](pubsub::Publisher publisher) {   auto message_id = publisher.Publish(       pubsub::MessageBuilder{}.SetData("Hello World!").Build());   auto done = message_id.then([](future<StatusOr<std::string>> f) {     auto id = f.get();     if (!id) throw std::move(id).status();     std::cout << "Hello World! published with id=" << *id << "\n";   });   // Block until the message is published   done.get(); }

C#

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C# 設定操作說明進行操作。詳情請參閱 Pub/Sub C# API 參考說明文件

 using Google.Cloud.PubSub.V1; using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks;  public class PublishMessagesAsyncSample {     public async Task<int> PublishMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)     {         TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);         PublisherClient publisher = await PublisherClient.CreateAsync(topicName);          int publishedMessageCount = 0;         var publishTasks = messageTexts.Select(async text =>         {             try             {                 string message = await publisher.PublishAsync(text);                 Console.WriteLine($"Published message {message}");                 Interlocked.Increment(ref publishedMessageCount);             }             catch (Exception exception)             {                 Console.WriteLine($"An error occurred when publishing message {text}: {exception.Message}");             }         });         await Task.WhenAll(publishTasks);         // PublisherClient instance should be shutdown after use.         // The TimeSpan specifies for how long to attempt to publish locally queued messages.         await publisher.ShutdownAsync(TimeSpan.FromSeconds(15));         return publishedMessageCount;     } }

Go

以下範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的操作說明設定 Go 環境。詳情請參閱 Pub/Sub Go API 參考說明文件

import ( 	"context" 	"fmt" 	"io" 	"strconv" 	"sync" 	"sync/atomic"  	"cloud.google.com/go/pubsub/v2" )  func publishThatScales(w io.Writer, projectID, topicID string, n int) error { 	// projectID := "my-project-id" 	// topicID := "my-topic" 	ctx := context.Background() 	client, err := pubsub.NewClient(ctx, projectID) 	if err != nil { 		return fmt.Errorf("pubsub.NewClient: %w", err) 	} 	defer client.Close()  	var wg sync.WaitGroup 	var totalErrors uint64  	// client.Publisher can be passed a topic ID (e.g. "my-topic") or 	// a fully qualified name (e.g. "projects/my-project/topics/my-topic"). 	// If a topic ID is provided, the project ID from the client is used. 	// Reuse this publisher for all publish calls to send messages in batches. 	publisher := client.Publisher(topicID)  	for i := 0; i < n; i++ { 		result := publisher.Publish(ctx, &pubsub.Message{ 			Data: []byte("Message " + strconv.Itoa(i)), 		})  		wg.Add(1) 		go func(i int, res *pubsub.PublishResult) { 			defer wg.Done() 			// The Get method blocks until a server-generated ID or 			// an error is returned for the published message. 			id, err := res.Get(ctx) 			if err != nil { 				// Error handling code can be added here. 				fmt.Fprintf(w, "Failed to publish: %v", err) 				atomic.AddUint64(&totalErrors, 1) 				return 			} 			fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id) 		}(i, result) 	}  	wg.Wait()  	if totalErrors > 0 { 		return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, n) 	} 	return nil } 

Java

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件

 import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsub.v1.Publisher; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit;  public class PublishWithErrorHandlerExample {    public static void main(String... args) throws Exception {     // TODO(developer): Replace these variables before running the sample.     String projectId = "your-project-id";     String topicId = "your-topic-id";      publishWithErrorHandlerExample(projectId, topicId);   }    public static void publishWithErrorHandlerExample(String projectId, String topicId)       throws IOException, InterruptedException {     TopicName topicName = TopicName.of(projectId, topicId);     Publisher publisher = null;      try {       // Create a publisher instance with default settings bound to the topic       publisher = Publisher.newBuilder(topicName).build();        List<String> messages = Arrays.asList("first message", "second message");        for (final String message : messages) {         ByteString data = ByteString.copyFromUtf8(message);         PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();          // Once published, returns a server-assigned message id (unique within the topic)         ApiFuture<String> future = publisher.publish(pubsubMessage);          // Add an asynchronous callback to handle success / failure         ApiFutures.addCallback(             future,             new ApiFutureCallback<String>() {                @Override               public void onFailure(Throwable throwable) {                 if (throwable instanceof ApiException) {                   ApiException apiException = ((ApiException) throwable);                   // details on the API exception                   System.out.println(apiException.getStatusCode().getCode());                   System.out.println(apiException.isRetryable());                 }                 System.out.println("Error publishing message : " + message);               }                @Override               public void onSuccess(String messageId) {                 // Once published, returns server-assigned message ids (unique within the topic)                 System.out.println("Published message ID: " + messageId);               }             },             MoreExecutors.directExecutor());       }     } finally {       if (publisher != null) {         // When finished with the publisher, shutdown to free up resources.         publisher.shutdown();         publisher.awaitTermination(1, TimeUnit.MINUTES);       }     }   } }

Node.js

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**  * TODO(developer): Uncomment these variables before running the sample.  */ // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; // const data = JSON.stringify({foo: 'bar'});  // Imports the Google Cloud client library const {PubSub} = require('@google-cloud/pubsub');  // Creates a client; cache this for further use const pubSubClient = new PubSub();  async function publishMessage(topicNameOrId, data) {   // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)   const dataBuffer = Buffer.from(data);    // Cache topic objects (publishers) and reuse them.   const topic = pubSubClient.topic(topicNameOrId);    try {     const messageId = await topic.publishMessage({data: dataBuffer});     console.log(`Message ${messageId} published.`);   } catch (error) {     console.error(`Received error while publishing: ${error.message}`);     process.exitCode = 1;   } }

Node.js

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**  * TODO(developer): Uncomment these variables before running the sample.  */ // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; // const data = JSON.stringify({foo: 'bar'});  // Imports the Google Cloud client library import {PubSub} from '@google-cloud/pubsub';  // Creates a client; cache this for further use const pubSubClient = new PubSub();  async function publishMessage(topicNameOrId: string, data: string) {   // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)   const dataBuffer = Buffer.from(data);    // Cache topic objects (publishers) and reuse them.   const topic = pubSubClient.topic(topicNameOrId);    try {     const messageId = await topic.publishMessage({data: dataBuffer});     console.log(`Message ${messageId} published.`);   } catch (error) {     console.error(       `Received error while publishing: ${(error as Error).message}`,     );     process.exitCode = 1;   } }

PHP

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 PHP 設定說明進行操作。 詳情請參閱 Pub/Sub PHP API 參考說明文件

use Google\Cloud\PubSub\MessageBuilder; use Google\Cloud\PubSub\PubSubClient;  /**  * Publishes a message for a Pub/Sub topic.  *  * @param string $projectId  The Google project ID.  * @param string $topicName  The Pub/Sub topic name.  * @param string $message  The message to publish.  */ function publish_message($projectId, $topicName, $message) {     $pubsub = new PubSubClient([         'projectId' => $projectId,     ]);      $topic = $pubsub->topic($topicName);     $topic->publish((new MessageBuilder)->setData($message)->build());      print('Message published' . PHP_EOL); }

Python

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Python 設定操作說明來進行。詳情請參閱 Pub/Sub Python API 參考說明文件

"""Publishes multiple messages to a Pub/Sub topic with an error handler.""" from concurrent import futures from google.cloud import pubsub_v1 from typing import Callable  # TODO(developer) # project_id = "your-project-id" # topic_id = "your-topic-id"  publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_id) publish_futures = []  def get_callback(     publish_future: pubsub_v1.publisher.futures.Future, data: str ) -> Callable[[pubsub_v1.publisher.futures.Future], None]:     def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:         try:             # Wait 60 seconds for the publish call to succeed.             print(publish_future.result(timeout=60))         except futures.TimeoutError:             print(f"Publishing {data} timed out.")      return callback  for i in range(10):     data = str(i)     # When you publish a message, the client returns a future.     publish_future = publisher.publish(topic_path, data.encode("utf-8"))     # Non-blocking. Publish failures are handled in the callback function.     publish_future.add_done_callback(get_callback(publish_future, data))     publish_futures.append(publish_future)  # Wait for all the publish futures to resolve before exiting. futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)  print(f"Published messages with error handler to {topic_path}.")

Ruby

以下範例使用 Ruby Pub/Sub 用戶端程式庫 v3。如果您仍在使用第 2 版程式庫,請參閱 第 3 版遷移指南。如要查看 Ruby 第 2 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照「快速入門:使用用戶端程式庫」的操作說明設定 Ruby 環境。詳情請參閱 Pub/Sub Ruby API 參考說明文件

# topic_id = "your-topic-id"  pubsub = Google::Cloud::PubSub.new publisher = pubsub.publisher topic_id  begin   publisher.publish_async "This is a test message." do |result|     raise "Failed to publish the message." unless result.succeeded?     puts "Message published asynchronously."   end   # Stop the async_publisher to send all queued messages immediately.   publisher.async_publisher.stop.wait! rescue StandardError => e   puts "Received error while publishing: #{e.message}" end

發布訊息後,Pub/Sub 服務會將訊息 ID 傳回給發布者。

運用屬性發布訊息

您可在 Pub/Sub 訊息中將自訂屬性嵌入為中繼資料。屬性可用來提供額外的訊息相關資訊,例如優先順序、來源或目的地。屬性也可以用來篩選訂閱項目中的訊息。

在訊息中使用屬性時,請遵守下列規範:

  • 每則訊息最多可有 100 個屬性。

  • 屬性鍵和值必須是字串類型。沒有規定必須使用的編碼。

  • 屬性鍵開頭不得為 goog,且不得超過 256 個位元組。

  • 屬性值不得超過 1024 個位元組。

訊息結構定義可利用下列方式表示:

 {   "data": string,   "attributes": {     string: string,     ...   },   "messageId": string,   "publishTime": string,   "orderingKey": string } 

如果是發布端重複訊息,即使 messageId 相同,同一個用戶端原始訊息的 publishTime 值也可能不同。

PubsubMessage JSON 結構定義是以 RESTRPC 說明文件的一部分發布。您可以將自訂屬性用於事件時間戳記。

下列範例說明如何將含有屬性的訊息發布至主題。

控制台

如要發布含有屬性的訊息,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的「主題」頁面。

    前往 Pub/Sub 主題頁面

  2. 按一下要發布訊息的主題。

  3. 在主題詳細資料頁面中,按一下「Messages」(訊息)

  4. 按一下「發布訊息」

  5. 在「郵件內文」欄位中輸入訊息資料。

  6. 在「訊息屬性」下方,按一下「新增屬性」

  7. 輸入鍵/值組合。

  8. 視需要新增其他屬性。

  9. 按一下 [發布]

gcloud

gcloud pubsub topics publish my-topic --message="hello" \   --attribute="origin=gcloud-sample,username=gcp,eventTime='2021-01-01T12:00:00Z'"

C++

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件

namespace pubsub = ::google::cloud::pubsub; using ::google::cloud::future; using ::google::cloud::StatusOr; [](pubsub::Publisher publisher) {   std::vector<future<void>> done;   for (int i = 0; i != 10; ++i) {     auto message_id = publisher.Publish(         pubsub::MessageBuilder{}             .SetData("Hello World! [" + std::to_string(i) + "]")             .SetAttribute("origin", "cpp-sample")             .SetAttribute("username", "gcp")             .Build());     done.push_back(message_id.then([i](future<StatusOr<std::string>> f) {       auto id = f.get();       if (!id) throw std::move(id).status();       std::cout << "Message " << i << " published with id=" << *id << "\n";     }));   }   publisher.Flush();   // Block until all the messages are published (optional)   for (auto& f : done) f.get(); }

C#

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C# 設定操作說明進行操作。詳情請參閱 Pub/Sub C# API 參考說明文件

 using Google.Cloud.PubSub.V1; using Google.Protobuf; using System; using System.Threading.Tasks;  public class PublishMessageWithCustomAttributesAsyncSample {     public async Task PublishMessageWithCustomAttributesAsync(string projectId, string topicId, string messageText)     {         TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);         PublisherClient publisher = await PublisherClient.CreateAsync(topicName);          var pubsubMessage = new PubsubMessage         {             // The data is any arbitrary ByteString. Here, we're using text.             Data = ByteString.CopyFromUtf8(messageText),             // The attributes provide metadata in a string-to-string dictionary.             Attributes =             {                 { "year", "2020" },                 { "author", "unknown" }             }         };         string message = await publisher.PublishAsync(pubsubMessage);         Console.WriteLine($"Published message {message}");         // PublisherClient instance should be shutdown after use.         // The TimeSpan specifies for how long to attempt to publish locally queued messages.         await publisher.ShutdownAsync(TimeSpan.FromSeconds(15));     } }

Go

以下範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的操作說明設定 Go 環境。詳情請參閱 Pub/Sub Go API 參考說明文件

import ( 	"context" 	"fmt" 	"io"  	"cloud.google.com/go/pubsub/v2" )  func publishCustomAttributes(w io.Writer, projectID, topicID string) error { 	// projectID := "my-project-id" 	// topicID := "my-topic" 	ctx := context.Background() 	client, err := pubsub.NewClient(ctx, projectID) 	if err != nil { 		return fmt.Errorf("pubsub.NewClient: %w", err) 	} 	defer client.Close()  	// client.Publisher can be passed a topic ID (e.g. "my-topic") or 	// a fully qualified name (e.g. "projects/my-project/topics/my-topic"). 	// If a topic ID is provided, the project ID from the client is used. 	// Reuse this publisher for all publish calls to send messages in batches. 	publisher := client.Publisher(topicID) 	result := publisher.Publish(ctx, &pubsub.Message{ 		Data: []byte("Hello world!"), 		Attributes: map[string]string{ 			"origin":   "golang", 			"username": "gcp", 		}, 	}) 	// Block until the result is returned and a server-generated 	// ID is returned for the published message. 	id, err := result.Get(ctx) 	if err != nil { 		return fmt.Errorf("Get: %w", err) 	} 	fmt.Fprintf(w, "Published message with custom attributes; msg ID: %v\n", id) 	return nil } 

Java

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件

 import com.google.api.core.ApiFuture; import com.google.cloud.pubsub.v1.Publisher; import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;  public class PublishWithCustomAttributesExample {   public static void main(String... args) throws Exception {     // TODO(developer): Replace these variables before running the sample.     String projectId = "your-project-id";     String topicId = "your-topic-id";      publishWithCustomAttributesExample(projectId, topicId);   }    public static void publishWithCustomAttributesExample(String projectId, String topicId)       throws IOException, ExecutionException, InterruptedException {     TopicName topicName = TopicName.of(projectId, topicId);     Publisher publisher = null;      try {       // Create a publisher instance with default settings bound to the topic       publisher = Publisher.newBuilder(topicName).build();        String message = "first message";       ByteString data = ByteString.copyFromUtf8(message);       PubsubMessage pubsubMessage =           PubsubMessage.newBuilder()               .setData(data)               .putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))               .build();        // Once published, returns a server-assigned message id (unique within the topic)       ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);       String messageId = messageIdFuture.get();       System.out.println("Published a message with custom attributes: " + messageId);      } finally {       if (publisher != null) {         // When finished with the publisher, shutdown to free up resources.         publisher.shutdown();         publisher.awaitTermination(1, TimeUnit.MINUTES);       }     }   } }

Node.js

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**  * TODO(developer): Uncomment these variables before running the sample.  */ // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; // const data = JSON.stringify({foo: 'bar'});  // Imports the Google Cloud client library const {PubSub} = require('@google-cloud/pubsub');  // Creates a client; cache this for further use const pubSubClient = new PubSub();  async function publishMessageWithCustomAttributes(topicNameOrId, data) {   // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)   const dataBuffer = Buffer.from(data);    // Add two custom attributes, origin and username, to the message   const customAttributes = {     origin: 'nodejs-sample',     username: 'gcp',   };    // Cache topic objects (publishers) and reuse them.   const topic = pubSubClient.topic(topicNameOrId);    const messageId = await topic.publishMessage({     data: dataBuffer,     attributes: customAttributes,   });   console.log(`Message ${messageId} published.`); }

Python

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Python 設定操作說明來進行。詳情請參閱 Pub/Sub Python API 參考說明文件

from google.cloud import pubsub_v1  # TODO(developer) # project_id = "your-project-id" # topic_id = "your-topic-id"  publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_id)  for n in range(1, 10):     data_str = f"Message number {n}"     # Data must be a bytestring     data = data_str.encode("utf-8")     # Add two attributes, origin and username, to the message     future = publisher.publish(         topic_path, data, origin="python-sample", username="gcp"     )     print(future.result())  print(f"Published messages with custom attributes to {topic_path}.")

Ruby

以下範例使用 Ruby Pub/Sub 用戶端程式庫 v3。如果您仍在使用第 2 版程式庫,請參閱 第 3 版遷移指南。如要查看 Ruby 第 2 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照「快速入門:使用用戶端程式庫」的操作說明設定 Ruby 環境。詳情請參閱 Pub/Sub Ruby API 參考說明文件

# topic_id = "your-topic-id"  pubsub = Google::Cloud::PubSub.new publisher = pubsub.publisher topic_id  # Add two attributes, origin and username, to the message publisher.publish_async "This is a test message.",                         origin:   "ruby-sample",                         username: "gcp" do |result|   raise "Failed to publish the message." unless result.succeeded?   puts "Message with custom attributes published asynchronously." end  # Stop the async_publisher to send all queued messages immediately. publisher.async_publisher.stop.wait!

運用排序鍵發布訊息

如要在訂閱端用戶端依序接收訊息,您必須設定發布端用戶端,以發布含有排序鍵的訊息。

如要瞭解排序鍵的概念,請參閱「排序訊息」。

以下是發布商客戶使用排序訊息時應考量的幾項要點:

  • 單一發布端用戶端中的排序:如果單一發布端用戶端在同一個區域中發布排序鍵相同的訊息,訂閱端用戶端就會按照發布順序接收這些訊息。舉例來說,如果發布端用戶端使用排序鍵 A 發布訊息 1、2 和 3,訂閱端用戶端會依序收到這些訊息。

  • 多個發布端用戶端之間的排序:即使多個發布端用戶端使用相同的排序鍵,訂閱端用戶端收到的訊息順序,仍會與訊息在同一區域的發布順序一致。但發布商客戶本身並不知道這筆訂單。

    舉例來說,如果發布者用戶端 X 和 Y 各自發布含有排序鍵 A 的訊息,且 Pub/Sub 先收到 X 的訊息,再收到 Y 的訊息,則所有訂閱者用戶端都會先收到 X 的訊息,再收到 Y 的訊息。如果需要確保不同發布端用戶端之間的訊息順序,這些用戶端必須實作額外的協調機制,確保不會同時發布含有相同排序鍵的訊息。舉例來說,發布時可使用鎖定服務來維護排序鍵的擁有權。

  • 跨區域排序:只有當排序鍵的發布作業位於相同區域時,系統才會保證訊息傳送順序。如果發布者應用程式將含有相同排序鍵的訊息發布至不同區域,系統就無法強制執行這些發布作業的順序。訂閱者可以連線至任何區域,且排序保證仍會維持。

    在 Google Cloud中執行應用程式時,應用程式預設會連線至同一區域的 Pub/Sub 端點。因此,在Google Cloud 內單一區域執行應用程式,通常可確保您與單一區域互動。

    在Google Cloud 外部或多個區域執行發布商應用程式時,您可以在設定 Pub/Sub 用戶端時使用位置端點,確保連線至單一區域。Pub/Sub 的所有位置端點都指向單一區域。如要進一步瞭解位置端點,請參閱 Pub/Sub 端點。如需 Pub/Sub 的所有位置端點清單,請參閱位置端點清單

  • 發布失敗:如果使用排序鍵發布失敗,發布者中具有相同排序鍵的待處理訊息也會失敗,包括這個排序鍵的未來發布要求。發生這類失敗時,您必須使用排序鍵繼續發布。如需繼續發布作業的範例,請參閱「使用排序鍵重試要求」。

您可以使用 Google Cloud 控制台、Google Cloud CLI、Pub/Sub API 或用戶端程式庫,發布含有排序鍵的訊息。

控制台

如要發布含有屬性的訊息,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的「主題」頁面。

    前往 Pub/Sub 主題頁面

  2. 按一下要發布訊息的主題。

  3. 在主題詳細資料頁面中,按一下「Messages」(訊息)

  4. 按一下「發布訊息」

  5. 在「郵件內文」欄位中輸入訊息資料。

  6. 在「訊息排序」欄位中,輸入排序鍵。

  7. 按一下 [發布]

gcloud

如要發布含有排序鍵的訊息,請使用 gcloud pubsub topics publish 指令和 --ordering-key 標記:

gcloud pubsub topics publish TOPIC_ID \   --message=MESSAGE_DATA \   --ordering-key=ORDERING_KEY

更改下列內容:

  • TOPIC_ID:主題的 ID
  • MESSAGE_DATA:包含訊息資料的字串
  • ORDERING_KEY:含有排序鍵的字串

REST

如要發布附帶排序鍵的訊息,請傳送類似下列內容的 POST 要求:

 POST  https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID:publish Content-Type: application/json Authorization: Bearer $(gcloud auth application-default print-access-token) 

更改下列內容:

  • PROJECT_ID:含有主題的專案 ID
  • TOPIC_ID:主題的 ID

指定以下要求主體欄位:

{   "messages": [     {       "attributes": {         "KEY": "VALUE",         ...       },       "data": "MESSAGE_DATA",       "ordering_key": "ORDERING_KEY",     }   ] }

更改下列內容:

  • KEY訊息屬性的鍵
  • VALUE:訊息屬性鍵的值
  • MESSAGE_DATA:採用 Base64 編碼的字串,內含訊息資料
  • ORDERING_KEY:含有排序鍵的字串

訊息必須包含非空白資料欄位,或至少一個屬性。

如果要求成功,回應會是含有訊息 ID 的 JSON 物件。以下是含有訊息 ID 的回應範例:

{   "messageIds": [     "19916711285",   ] }

C++

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件

namespace pubsub = ::google::cloud::pubsub; using ::google::cloud::future; using ::google::cloud::StatusOr; [](pubsub::Publisher publisher) {   struct SampleData {     std::string ordering_key;     std::string data;   } data[] = {       {"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},       {"key1", "message4"}, {"key1", "message5"},   };   std::vector<future<void>> done;   for (auto& datum : data) {     auto message_id =         publisher.Publish(pubsub::MessageBuilder{}                               .SetData("Hello World! [" + datum.data + "]")                               .SetOrderingKey(datum.ordering_key)                               .Build());     std::string ack_id = datum.ordering_key + "#" + datum.data;     done.push_back(message_id.then([ack_id](future<StatusOr<std::string>> f) {       auto id = f.get();       if (!id) throw std::move(id).status();       std::cout << "Message " << ack_id << " published with id=" << *id                 << "\n";     }));   }   publisher.Flush();   // Block until all the messages are published (optional)   for (auto& f : done) f.get(); }

C#

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C# 設定操作說明進行操作。詳情請參閱 Pub/Sub C# API 參考說明文件

 using Google.Cloud.PubSub.V1; using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks;  public class PublishOrderedMessagesAsyncSample {     public async Task<int> PublishOrderedMessagesAsync(string projectId, string topicId, IEnumerable<(string, string)> keysAndMessages)     {         TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);          var customSettings = new PublisherClient.Settings         {             EnableMessageOrdering = true         };          PublisherClient publisher = await new PublisherClientBuilder         {             TopicName = topicName,             // Sending messages to the same region ensures they are received in order even when multiple publishers are used.             Endpoint = "us-east1-pubsub.googleapis.com:443",             Settings = customSettings         }.BuildAsync();          int publishedMessageCount = 0;         var publishTasks = keysAndMessages.Select(async keyAndMessage =>         {             try             {                 string message = await publisher.PublishAsync(keyAndMessage.Item1, keyAndMessage.Item2);                 Console.WriteLine($"Published message {message}");                 Interlocked.Increment(ref publishedMessageCount);             }             catch (Exception exception)             {                 Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");             }         });         await Task.WhenAll(publishTasks);         // PublisherClient instance should be shutdown after use.         // The TimeSpan specifies for how long to attempt to publish locally queued messages.         await publisher.ShutdownAsync(TimeSpan.FromSeconds(15));         return publishedMessageCount;     } }

Go

以下範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的操作說明設定 Go 環境。詳情請參閱 Pub/Sub Go API 參考說明文件

import ( 	"context" 	"fmt" 	"io" 	"sync" 	"sync/atomic"  	"cloud.google.com/go/pubsub/v2" 	"google.golang.org/api/option" )  func publishWithOrderingKey(w io.Writer, projectID, topicID string) { 	// projectID := "my-project-id" 	// topicID := "my-topic" 	ctx := context.Background()  	// Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering key are in the same region. 	// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints 	client, err := pubsub.NewClient(ctx, projectID, 		option.WithEndpoint("us-east1-pubsub.googleapis.com:443")) 	if err != nil { 		fmt.Fprintf(w, "pubsub.NewClient: %v", err) 		return 	} 	defer client.Close()  	var wg sync.WaitGroup 	var totalErrors uint64  	// client.Publisher can be passed a topic ID (e.g. "my-topic") or 	// a fully qualified name (e.g. "projects/my-project/topics/my-topic"). 	// If a topic ID is provided, the project ID from the client is used. 	// Reuse this publisher for all publish calls to send messages in batches. 	publisher := client.Publisher(topicID) 	publisher.EnableMessageOrdering = true  	messages := []struct { 		message     string 		orderingKey string 	}{ 		{ 			message:     "message1", 			orderingKey: "key1", 		}, 		{ 			message:     "message2", 			orderingKey: "key2", 		}, 		{ 			message:     "message3", 			orderingKey: "key1", 		}, 		{ 			message:     "message4", 			orderingKey: "key2", 		}, 	}  	for _, m := range messages { 		result := publisher.Publish(ctx, &pubsub.Message{ 			Data:        []byte(m.message), 			OrderingKey: m.orderingKey, 		})  		wg.Add(1) 		go func(res *pubsub.PublishResult) { 			defer wg.Done() 			// The Get method blocks until a server-generated ID or 			// an error is returned for the published message. 			_, err := res.Get(ctx) 			if err != nil { 				// Error handling code can be added here. 				fmt.Printf("Failed to publish: %s\n", err) 				atomic.AddUint64(&totalErrors, 1) 				return 			} 		}(result) 	}  	wg.Wait()  	if totalErrors > 0 { 		fmt.Fprintf(w, "%d of 4 messages did not publish successfully", totalErrors) 		return 	}  	fmt.Fprint(w, "Published 4 messages with ordering keys successfully\n") } 

Java

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件

import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsub.v1.Publisher; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import java.io.IOException; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.TimeUnit;  public class PublishWithOrderingKeys {   public static void main(String... args) throws Exception {     // TODO(developer): Replace these variables before running the sample.     String projectId = "your-project-id";     // Choose an existing topic.     String topicId = "your-topic-id";      publishWithOrderingKeysExample(projectId, topicId);   }    public static void publishWithOrderingKeysExample(String projectId, String topicId)       throws IOException, InterruptedException {     TopicName topicName = TopicName.of(projectId, topicId);     // Create a publisher and set message ordering to true.     Publisher publisher =         Publisher.newBuilder(topicName)             // Sending messages to the same region ensures they are received in order             // even when multiple publishers are used.             .setEndpoint("us-east1-pubsub.googleapis.com:443")             .setEnableMessageOrdering(true)             .build();      try {       Map<String, String> messages = new LinkedHashMap<String, String>();       messages.put("message1", "key1");       messages.put("message2", "key2");       messages.put("message3", "key1");       messages.put("message4", "key2");        for (Map.Entry<String, String> entry : messages.entrySet()) {         ByteString data = ByteString.copyFromUtf8(entry.getKey());         PubsubMessage pubsubMessage =             PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();         ApiFuture<String> future = publisher.publish(pubsubMessage);          // Add an asynchronous callback to handle publish success / failure.         ApiFutures.addCallback(             future,             new ApiFutureCallback<String>() {                @Override               public void onFailure(Throwable throwable) {                 if (throwable instanceof ApiException) {                   ApiException apiException = ((ApiException) throwable);                   // Details on the API exception.                   System.out.println(apiException.getStatusCode().getCode());                   System.out.println(apiException.isRetryable());                 }                 System.out.println("Error publishing message : " + pubsubMessage.getData());               }                @Override               public void onSuccess(String messageId) {                 // Once published, returns server-assigned message ids (unique within the topic).                 System.out.println(pubsubMessage.getData() + " : " + messageId);               }             },             MoreExecutors.directExecutor());       }     } finally {       // When finished with the publisher, shutdown to free up resources.       publisher.shutdown();       publisher.awaitTermination(1, TimeUnit.MINUTES);     }   } }

Node.js

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**  * TODO(developer): Uncomment these variables before running the sample.  */ // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; // const data = JSON.stringify({foo: 'bar'}); // const orderingKey = 'key1';  // Imports the Google Cloud client library const {PubSub} = require('@google-cloud/pubsub');  // Creates a client; cache this for further use const pubSubClient = new PubSub({   // Sending messages to the same region ensures they are received in order   // even when multiple publishers are used.   apiEndpoint: 'us-east1-pubsub.googleapis.com:443', });  async function publishOrderedMessage(topicNameOrId, data, orderingKey) {   // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)   const dataBuffer = Buffer.from(data);    // Be sure to set an ordering key that matches other messages   // you want to receive in order, relative to each other.   const message = {     data: dataBuffer,     orderingKey: orderingKey,   };    // Cache topic objects (publishers) and reuse them.   //   // Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering   // key are in the same region. For list of locational endpoints for Pub/Sub, see:   // https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints   const publishOptions = {     messageOrdering: true,   };   const topic = pubSubClient.topic(topicNameOrId, publishOptions);    // Publishes the message   const messageId = await topic.publishMessage(message);    console.log(`Message ${messageId} published.`);    return messageId; }

Python

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Python 設定操作說明來進行。詳情請參閱 Pub/Sub Python API 參考說明文件

from google.cloud import pubsub_v1  # TODO(developer): Choose an existing topic. # project_id = "your-project-id" # topic_id = "your-topic-id"  publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True) # Sending messages to the same region ensures they are received in order # even when multiple publishers are used. client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"} publisher = pubsub_v1.PublisherClient(     publisher_options=publisher_options, client_options=client_options ) # The `topic_path` method creates a fully qualified identifier # in the form `projects/{project_id}/topics/{topic_id}` topic_path = publisher.topic_path(project_id, topic_id)  for message in [     ("message1", "key1"),     ("message2", "key2"),     ("message3", "key1"),     ("message4", "key2"), ]:     # Data must be a bytestring     data = message[0].encode("utf-8")     ordering_key = message[1]     # When you publish a message, the client returns a future.     future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)     print(future.result())  print(f"Published messages with ordering keys to {topic_path}.")

Ruby

以下範例使用 Ruby Pub/Sub 用戶端程式庫 v3。如果您仍在使用第 2 版程式庫,請參閱 第 3 版遷移指南。如要查看 Ruby 第 2 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照「快速入門:使用用戶端程式庫」的操作說明設定 Ruby 環境。詳情請參閱 Pub/Sub Ruby API 參考說明文件

# topic_id = "your-topic-id"  pubsub = Google::Cloud::PubSub.new # Start sending messages in one request once the size of all queued messages # reaches 1 MB or the number of queued messages reaches 20 publisher = pubsub.publisher topic_id, async: {   max_bytes:    1_000_000,   max_messages: 20 }  publisher.enable_message_ordering! 10.times do |i|   publisher.publish_async "This is message ##{i}.",                           ordering_key: "ordering-key" end  # Stop the async_publisher to send all queued messages immediately. publisher.async_publisher.stop! puts "Messages published with ordering key."

監控發布商

Cloud Monitoring 提供多項指標,可監控主題。

如要監控主題並維持發布商的健康狀態,請參閱「維持發布商的健康狀態」。

後續步驟