Delphi で gRPC を使う Google Cloud Pub/Sub

· コンポーネント

Google Cloud Pub/Sub は Google のマネージドメッセージングサービスです。プロデューサーがメッセージをトピックに発行し、コンシューマーはそのトピックに紐付いたサブスクリプションから取得して受信するため、両者は完全に分離された状態を保ちます。このサービスは gRPC API を公開しており、sgcWebSockets Enterprise エディションには TsgcGRPCClient の上に構築された型付き Pub/Sub gRPC クライアントが付属しているため、外部ランタイムやサイドカーなしで Delphi や C++Builder から直接発行・取得できます。

この記事では、クライアントの組み立て方、認証の仕組み、そしてサンプルデモと同じ呼び出しを使ってトピックへメッセージを発行し、サブスクリプションからメッセージを取得する方法を紹介します。

仕組み

Pub/Sub の gRPC API は、HTTP/2 上でフレーミングされた Protocol Buffers メッセージにすぎません。sgcWebSockets には完全な HTTP/2 スタックがすでに付属しているため、Pub/Sub クライアントは TLS 経由でポート 443 の pubsub.googleapis.com に向けた TsgcHTTP2Client トランスポート上に配置されます。TsgcGRPCClient が gRPC のフレーミング、リクエストヘッダー、レスポンストレーラーを処理し、型付き Pub/Sub メッセージクラスが protobuf ペイロードのシリアライズと解析を代行します。

認証には Google サービスアカウントを使用します。サービスアカウントの認証情報を TsgcHTTPGoogleCloud_PubSub_Client に渡すと OAuth2 アクセストークンが取得されます。そのトークンを gRPC クライアントの DefaultMetadataauthorization: Bearer ヘッダーとして設定します。デフォルトメタデータとして設定されるため、すべての Pub/Sub 呼び出しに自動的に付随します。

uses
  sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
  sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
  sgcGRPC_Google_PubSub;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'pubsub.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

サービスアカウント認証

Google Cloud ヘルパーがトークンの交換を処理します。サービスアカウント JSON を読み込み、JWT 認証情報を設定し、OnAuthToken イベントを処理して、得られたベアラートークンを gRPC クライアントにコピーします。同じコンポーネントは、対話的なフローを好む場合に備えて OAuth2 もサポートしています。

var
  PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
  PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
  PubSub.OnAuthToken := PubSubAuthToken;

  PubSub.GoogleCloudOptions.Authentication := gcaJWT;
  PubSub.GoogleCloudOptions.JWT.ClientEmail  := 'svc@my-project.iam.gserviceaccount.com';
  PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
  PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
  PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';

  // triggers OnAuthToken once the access token is acquired
  PubSub.ListTopics('my-project');
end;

procedure TForm1.PubSubAuthToken(Sender: TObject;
  const TokenType, Token, Data: string);
begin
  GRPC.DefaultMetadata.Clear;
  GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;

トピックへの発行

発行するには、TsgcGRPCPubSubPublishRequest を構築し、完全修飾されたトピック名を設定し、1 つ以上のメッセージを追加して、google.pubsub.v1.Publisher サービスの Publish メソッドを呼び出します。メッセージのペイロードは生のバイト列なので、テキストはまず UTF-8 としてエンコードしてください。レスポンスにはサーバーが割り当てたメッセージ ID が含まれます。

var
  oRequest: TsgcGRPCPubSubPublishRequest;
  oMsg: TsgcGRPCPubSubPubsubMessage;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPublishResponse;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPublishRequest.Create;
  try
    oRequest.Topic := 'projects/my-project/topics/my-topic';
    oMsg := oRequest.AddMessage;
    oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');

    oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
      oRequest.ToBytes);
    try
      if oResponse.StatusCode = grpcOK then
      begin
        oResult := TsgcGRPCPubSubPublishResponse.Create;
        try
          oResult.LoadFromBytes(oResponse.Data);
          for i := 0 to oResult.MessageIds.Count - 1 do
            Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
        finally
          oResult.Free;
        end;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

サブスクリプションからの取得

メッセージを受信するには、サブスクリプションのパスと取得する最大メッセージ数を指定して TsgcGRPCPubSubPullRequest を構築し、google.pubsub.v1.Subscriber サービスの Pull を呼び出します。レスポンスを TsgcGRPCPubSubPullResponse に解析し、受信したメッセージを順に処理します。各メッセージには AckId が含まれており、配信を確認するために送り返す必要があります。そうしないと、Pub/Sub は確認応答の期限後にメッセージを再配信します。

var
  oRequest: TsgcGRPCPubSubPullRequest;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPullResponse;
  oRecv: TsgcGRPCPubSubReceivedMessage;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.MaxMessages := 10;

    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
      oRequest.ToBytes);
    try
      oResult := TsgcGRPCPubSubPullResponse.Create;
      try
        oResult.LoadFromBytes(oResponse.Data);
        for i := 0 to oResult.ReceivedMessageCount - 1 do
        begin
          oRecv := oResult.ReceivedMessage(i);
          Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
          Acknowledge(oRecv.AckId);   // confirm delivery
        end;
      finally
        oResult.Free;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

確認応答はもう 1 つのユーナリ呼び出しです。サブスクリプションのパスと確認したい ack id を指定して TsgcGRPCPubSubAcknowledgeRequest を構築し、google.pubsub.v1.Subscriber サービスの Acknowledge を呼び出します。

var
  oRequest: TsgcGRPCPubSubAcknowledgeRequest;
  oResponse: TsgcGRPCResponse;
begin
  oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.AckIds.Add(aAckId);
    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
      oRequest.ToBytes);
    oResponse.Free;
  finally
    oRequest.Free;
  end;
end;

ストリーミング取得

一度きりの Pull はたまにポーリングする場合には十分ですが、メッセージを継続的に受け取るには、Pub/Sub が StreamingPull を提供しています。これは双方向 gRPC ストリームで、メッセージが到着するたびに配信し続けます。google.pubsub.v1.Subscriber サービスで双方向ストリームを開き、サブスクリプションを指定した最初の TsgcGRPCPubSubStreamingPullRequest を送信し、その後は gRPC クライアントの OnGRPCStreamMessage イベントで受信メッセージを処理します。

var
  oRequest: TsgcGRPCPubSubStreamingPullRequest;
  vStreamId: Integer;
begin
  vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
    'StreamingPull');

  oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.StreamAckDeadlineSeconds := 10;
    GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
  // each incoming batch arrives on OnGRPCStreamMessage
  // call GRPC.CloseBidiStream(vStreamId) to stop
end;

発行と取得を超えて

型付きクライアントは、残りの管理機能の大部分もカバーしています。トピックを管理するためのリクエスト/レスポンスクラス(CreateTopicListTopicsDeleteTopic)とサブスクリプションを管理するためのクラス(CreateSubscriptionListSubscriptionsDeleteSubscription)に加えて、まだ処理中のメッセージの期限を延長する ModifyAckDeadline も含まれています。すべての操作は同じ形に従います。リクエストオブジェクトに値を入れ、ToBytes を呼び出し、サービスメソッドを実行し、対応するレスポンスクラスにレスポンスを読み込みます。

提供状況

Google Cloud Pub/Sub gRPC クライアントは sgcWebSockets Enterprise エディションの一部です。発行、取得、ストリーミング取得、トピック/サブスクリプション管理を備えた、すぐに実行できる完全なサンプルは Demos\21.GRPC\10.PubSub にあり、基盤となるクライアントの完全なリファレンスは gRPC Client 製品ページにあります。

ご質問やご意見はありますか? お問い合わせください。コードを書いた本人から返信いたします。