在 Delphi 中通过 gRPC 使用 Google Cloud Pub/Sub

· 组件

Google Cloud Pub/Sub 是 Google 的托管消息服务。生产者将消息发布到主题,消费者通过从附加到该主题的订阅拉取来接收消息,因此双方保持完全解耦。该服务提供 gRPC API,而 sgcWebSockets Enterprise 版本在 TsgcGRPCClient 之上提供了一个强类型的 Pub/Sub gRPC 客户端,因此你可以直接从 Delphi 和 C++Builder 发布和拉取消息,无需任何外部运行时或边车进程。

本文展示客户端是如何连接搭建的、身份验证如何工作,以及如何向主题发布消息和从订阅拉取消息,使用与示例 demo 相同的调用。

工作原理

Pub/Sub 的 gRPC API 不过是通过 HTTP/2 封帧的 Protocol Buffers 消息。sgcWebSockets 已经提供了完整的 HTTP/2 栈,因此 Pub/Sub 客户端运行在一个指向 pubsub.googleapis.com 端口 443、基于 TLS 的 TsgcHTTP2Client 传输之上。TsgcGRPCClient 负责处理 gRPC 封帧、请求头和响应尾部,而强类型的 Pub/Sub 消息类则替你序列化和解析 protobuf 载荷。

身份验证使用 Google 服务账号。你将服务账号凭据提供给一个 TsgcHTTPGoogleCloud_PubSub_Client,它会获取一个 OAuth2 访问令牌,然后你将该令牌作为 authorization: Bearer 头放入 gRPC 客户端的 DefaultMetadata 中。由于它被设置为默认元数据,因此它会自动随每个 Pub/Sub 调用一起传递。

uses
  sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
  sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
  sgcGRPC_Google_PubSub;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'pubsub.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

服务账号身份验证

Google Cloud 辅助组件负责处理令牌交换。加载服务账号 JSON,设置 JWT 凭据,并处理 OnAuthToken 事件,将得到的 bearer 令牌复制到 gRPC 客户端中。如果你更喜欢交互式流程,同一组件也支持 OAuth2。

var
  PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
  PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
  PubSub.OnAuthToken := PubSubAuthToken;

  PubSub.GoogleCloudOptions.Authentication := gcaJWT;
  PubSub.GoogleCloudOptions.JWT.ClientEmail  := 'svc@my-project.iam.gserviceaccount.com';
  PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
  PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
  PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';

  // triggers OnAuthToken once the access token is acquired
  PubSub.ListTopics('my-project');
end;

procedure TForm1.PubSubAuthToken(Sender: TObject;
  const TokenType, Token, Data: string);
begin
  GRPC.DefaultMetadata.Clear;
  GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;

向主题发布消息

要发布消息,请构建一个 TsgcGRPCPubSubPublishRequest,设置完全限定的主题名称,添加一条或多条消息,然后在 google.pubsub.v1.Publisher 服务上调用 Publish 方法。消息载荷是原始字节,因此请先将文本编码为 UTF-8。响应携带服务器分配的消息 id。

var
  oRequest: TsgcGRPCPubSubPublishRequest;
  oMsg: TsgcGRPCPubSubPubsubMessage;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPublishResponse;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPublishRequest.Create;
  try
    oRequest.Topic := 'projects/my-project/topics/my-topic';
    oMsg := oRequest.AddMessage;
    oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');

    oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
      oRequest.ToBytes);
    try
      if oResponse.StatusCode = grpcOK then
      begin
        oResult := TsgcGRPCPubSubPublishResponse.Create;
        try
          oResult.LoadFromBytes(oResponse.Data);
          for i := 0 to oResult.MessageIds.Count - 1 do
            Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
        finally
          oResult.Free;
        end;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

从订阅拉取消息

要接收消息,请构建一个 TsgcGRPCPubSubPullRequest,带上订阅路径和要获取的最大消息数量,然后在 google.pubsub.v1.Subscriber 服务上调用 Pull。将回复解析为一个 TsgcGRPCPubSubPullResponse 并遍历收到的消息。每条消息都携带一个 AckId,你必须将其发回以确认投递,否则 Pub/Sub 会在确认截止时间之后重新投递该消息。

var
  oRequest: TsgcGRPCPubSubPullRequest;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPullResponse;
  oRecv: TsgcGRPCPubSubReceivedMessage;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.MaxMessages := 10;

    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
      oRequest.ToBytes);
    try
      oResult := TsgcGRPCPubSubPullResponse.Create;
      try
        oResult.LoadFromBytes(oResponse.Data);
        for i := 0 to oResult.ReceivedMessageCount - 1 do
        begin
          oRecv := oResult.ReceivedMessage(i);
          Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
          Acknowledge(oRecv.AckId);   // confirm delivery
        end;
      finally
        oResult.Free;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

确认是再做一次一元调用。构建一个 TsgcGRPCPubSubAcknowledgeRequest,带上订阅路径和你想要确认的 ack id,然后在 google.pubsub.v1.Subscriber 服务上调用 Acknowledge

var
  oRequest: TsgcGRPCPubSubAcknowledgeRequest;
  oResponse: TsgcGRPCResponse;
begin
  oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.AckIds.Add(aAckId);
    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
      oRequest.ToBytes);
    oResponse.Free;
  finally
    oRequest.Free;
  end;
end;

流式拉取

一次性的 Pull 适用于偶尔轮询,但要持续获取消息流,Pub/Sub 提供了 StreamingPull,这是一个双向 gRPC 流,会随着消息的到达持续投递。在 google.pubsub.v1.Subscriber 服务上打开一个双向流,发送一个带有订阅的初始 TsgcGRPCPubSubStreamingPullRequest,然后在 gRPC 客户端的 OnGRPCStreamMessage 事件上处理传入的消息。

var
  oRequest: TsgcGRPCPubSubStreamingPullRequest;
  vStreamId: Integer;
begin
  vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
    'StreamingPull');

  oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.StreamAckDeadlineSeconds := 10;
    GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
  // each incoming batch arrives on OnGRPCStreamMessage
  // call GRPC.CloseBidiStream(vStreamId) to stop
end;

不止于发布和拉取

这个强类型客户端也覆盖了其余的管理接口。它包含用于管理主题(CreateTopicListTopicsDeleteTopic)和订阅(CreateSubscriptionListSubscriptionsDeleteSubscription)的请求和响应类,以及用于延长你仍在处理的消息截止时间的 ModifyAckDeadline。每个操作都遵循相同的形式:填充一个请求对象,调用 ToBytes,调用服务方法,然后将回复加载到对应的响应类中。

可用性

Google Cloud Pub/Sub gRPC 客户端是 sgcWebSockets Enterprise 版本的一部分。一个完整、开箱即用的示例,包含发布、拉取、流式拉取以及主题/订阅管理,位于 Demos\21.GRPC\10.PubSub,底层客户端的完整参考资料请见gRPC Client 产品页面

有问题或反馈?联系我们。你会收到来自编写这段代码的人的回复。