Google Cloud Pub/Sub 是 Google 的托管消息服务。生产者将消息发布到主题,消费者通过从附加到该主题的订阅拉取来接收消息,因此双方保持完全解耦。该服务提供 gRPC API,而 sgcWebSockets Enterprise 版本在 TsgcGRPCClient 之上提供了一个强类型的 Pub/Sub gRPC 客户端,因此你可以直接从 Delphi 和 C++Builder 发布和拉取消息,无需任何外部运行时或边车进程。
本文展示客户端是如何连接搭建的、身份验证如何工作,以及如何向主题发布消息和从订阅拉取消息,使用与示例 demo 相同的调用。
工作原理
Pub/Sub 的 gRPC API 不过是通过 HTTP/2 封帧的 Protocol Buffers 消息。sgcWebSockets 已经提供了完整的 HTTP/2 栈,因此 Pub/Sub 客户端运行在一个指向 pubsub.googleapis.com 端口 443、基于 TLS 的 TsgcHTTP2Client 传输之上。TsgcGRPCClient 负责处理 gRPC 封帧、请求头和响应尾部,而强类型的 Pub/Sub 消息类则替你序列化和解析 protobuf 载荷。
身份验证使用 Google 服务账号。你将服务账号凭据提供给一个 TsgcHTTPGoogleCloud_PubSub_Client,它会获取一个 OAuth2 访问令牌,然后你将该令牌作为 authorization: Bearer 头放入 gRPC 客户端的 DefaultMetadata 中。由于它被设置为默认元数据,因此它会自动随每个 Pub/Sub 调用一起传递。
uses
sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
sgcGRPC_Google_PubSub;
var
HTTP2: TsgcHTTP2Client;
GRPC: TsgcGRPCClient;
begin
HTTP2 := TsgcHTTP2Client.Create(nil);
HTTP2.Host := 'pubsub.googleapis.com';
HTTP2.Port := 443;
HTTP2.TLS := True;
GRPC := TsgcGRPCClient.Create(nil);
GRPC.Client := HTTP2;
GRPC.ChannelOptions.ContentType := grpcProto;
GRPC.ChannelOptions.Compression := grpcNoCompression;
HTTP2.Active := True;
end;
服务账号身份验证
Google Cloud 辅助组件负责处理令牌交换。加载服务账号 JSON,设置 JWT 凭据,并处理 OnAuthToken 事件,将得到的 bearer 令牌复制到 gRPC 客户端中。如果你更喜欢交互式流程,同一组件也支持 OAuth2。
var
PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
PubSub.OnAuthToken := PubSubAuthToken;
PubSub.GoogleCloudOptions.Authentication := gcaJWT;
PubSub.GoogleCloudOptions.JWT.ClientEmail := 'svc@my-project.iam.gserviceaccount.com';
PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';
// triggers OnAuthToken once the access token is acquired
PubSub.ListTopics('my-project');
end;
procedure TForm1.PubSubAuthToken(Sender: TObject;
const TokenType, Token, Data: string);
begin
GRPC.DefaultMetadata.Clear;
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;
向主题发布消息
要发布消息,请构建一个 TsgcGRPCPubSubPublishRequest,设置完全限定的主题名称,添加一条或多条消息,然后在 google.pubsub.v1.Publisher 服务上调用 Publish 方法。消息载荷是原始字节,因此请先将文本编码为 UTF-8。响应携带服务器分配的消息 id。
var
oRequest: TsgcGRPCPubSubPublishRequest;
oMsg: TsgcGRPCPubSubPubsubMessage;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPublishResponse;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPublishRequest.Create;
try
oRequest.Topic := 'projects/my-project/topics/my-topic';
oMsg := oRequest.AddMessage;
oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');
oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
oRequest.ToBytes);
try
if oResponse.StatusCode = grpcOK then
begin
oResult := TsgcGRPCPubSubPublishResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.MessageIds.Count - 1 do
Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
finally
oResult.Free;
end;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
从订阅拉取消息
要接收消息,请构建一个 TsgcGRPCPubSubPullRequest,带上订阅路径和要获取的最大消息数量,然后在 google.pubsub.v1.Subscriber 服务上调用 Pull。将回复解析为一个 TsgcGRPCPubSubPullResponse 并遍历收到的消息。每条消息都携带一个 AckId,你必须将其发回以确认投递,否则 Pub/Sub 会在确认截止时间之后重新投递该消息。
var
oRequest: TsgcGRPCPubSubPullRequest;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPullResponse;
oRecv: TsgcGRPCPubSubReceivedMessage;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.MaxMessages := 10;
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
oRequest.ToBytes);
try
oResult := TsgcGRPCPubSubPullResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.ReceivedMessageCount - 1 do
begin
oRecv := oResult.ReceivedMessage(i);
Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
Acknowledge(oRecv.AckId); // confirm delivery
end;
finally
oResult.Free;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
确认是再做一次一元调用。构建一个 TsgcGRPCPubSubAcknowledgeRequest,带上订阅路径和你想要确认的 ack id,然后在 google.pubsub.v1.Subscriber 服务上调用 Acknowledge。
var
oRequest: TsgcGRPCPubSubAcknowledgeRequest;
oResponse: TsgcGRPCResponse;
begin
oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.AckIds.Add(aAckId);
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
oRequest.ToBytes);
oResponse.Free;
finally
oRequest.Free;
end;
end;
流式拉取
一次性的 Pull 适用于偶尔轮询,但要持续获取消息流,Pub/Sub 提供了 StreamingPull,这是一个双向 gRPC 流,会随着消息的到达持续投递。在 google.pubsub.v1.Subscriber 服务上打开一个双向流,发送一个带有订阅的初始 TsgcGRPCPubSubStreamingPullRequest,然后在 gRPC 客户端的 OnGRPCStreamMessage 事件上处理传入的消息。
var
oRequest: TsgcGRPCPubSubStreamingPullRequest;
vStreamId: Integer;
begin
vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
'StreamingPull');
oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.StreamAckDeadlineSeconds := 10;
GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
finally
oRequest.Free;
end;
// each incoming batch arrives on OnGRPCStreamMessage
// call GRPC.CloseBidiStream(vStreamId) to stop
end;
不止于发布和拉取
这个强类型客户端也覆盖了其余的管理接口。它包含用于管理主题(CreateTopic、ListTopics、DeleteTopic)和订阅(CreateSubscription、ListSubscriptions、DeleteSubscription)的请求和响应类,以及用于延长你仍在处理的消息截止时间的 ModifyAckDeadline。每个操作都遵循相同的形式:填充一个请求对象,调用 ToBytes,调用服务方法,然后将回复加载到对应的响应类中。
可用性
Google Cloud Pub/Sub gRPC 客户端是 sgcWebSockets Enterprise 版本的一部分。一个完整、开箱即用的示例,包含发布、拉取、流式拉取以及主题/订阅管理,位于 Demos\21.GRPC\10.PubSub,底层客户端的完整参考资料请见gRPC Client 产品页面。
有问题或反馈?联系我们。你会收到来自编写这段代码的人的回复。
