Google Cloud Pub/Sub over gRPC in Delphi

· Components

Google Cloud Pub/Sub is Google's managed messaging service. Producers publish messages to a topic, and consumers receive them by pulling from a subscription attached to that topic, so the two sides stay fully decoupled. The service exposes a gRPC API, and the sgcWebSockets Enterprise edition ships a typed Pub/Sub gRPC client on top of TsgcGRPCClient, so you can publish and pull straight from Delphi and C++Builder without any external runtime or sidecar.

This post shows how the client is wired up, how authentication works, and how to publish a message to a topic and pull messages from a subscription, using the same calls as the sample demo.

How it works

Pub/Sub's gRPC API is just Protocol Buffers messages framed over HTTP/2. sgcWebSockets already ships a complete HTTP/2 stack, so the Pub/Sub client sits on a TsgcHTTP2Client transport pointed at pubsub.googleapis.com on port 443 over TLS. TsgcGRPCClient takes care of the gRPC framing, the request headers and the response trailers, while the typed Pub/Sub message classes serialize and parse the protobuf payloads for you.

Authentication uses a Google service account. You feed the service-account credentials to a TsgcHTTPGoogleCloud_PubSub_Client, which obtains an OAuth2 access token, and you place that token in the gRPC client's DefaultMetadata as an authorization: Bearer header. Because it is set as default metadata, it travels on every Pub/Sub call automatically.

uses
  sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
  sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
  sgcGRPC_Google_PubSub;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'pubsub.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

Service-account authentication

The Google Cloud helper handles the token exchange. Load the service-account JSON, set the JWT credentials, and handle the OnAuthToken event to copy the resulting bearer token into the gRPC client. The same component also supports OAuth2 if you prefer an interactive flow.

var
  PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
  PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
  PubSub.OnAuthToken := PubSubAuthToken;

  PubSub.GoogleCloudOptions.Authentication := gcaJWT;
  PubSub.GoogleCloudOptions.JWT.ClientEmail  := 'svc@my-project.iam.gserviceaccount.com';
  PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
  PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
  PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';

  // triggers OnAuthToken once the access token is acquired
  PubSub.ListTopics('my-project');
end;

procedure TForm1.PubSubAuthToken(Sender: TObject;
  const TokenType, Token, Data: string);
begin
  GRPC.DefaultMetadata.Clear;
  GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;

Publishing to a topic

To publish, build a TsgcGRPCPubSubPublishRequest, set the fully qualified topic name, add one or more messages, and call the Publish method on the google.pubsub.v1.Publisher service. Message payloads are raw bytes, so encode your text as UTF-8 first. The response carries the server-assigned message ids.

var
  oRequest: TsgcGRPCPubSubPublishRequest;
  oMsg: TsgcGRPCPubSubPubsubMessage;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPublishResponse;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPublishRequest.Create;
  try
    oRequest.Topic := 'projects/my-project/topics/my-topic';
    oMsg := oRequest.AddMessage;
    oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');

    oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
      oRequest.ToBytes);
    try
      if oResponse.StatusCode = grpcOK then
      begin
        oResult := TsgcGRPCPubSubPublishResponse.Create;
        try
          oResult.LoadFromBytes(oResponse.Data);
          for i := 0 to oResult.MessageIds.Count - 1 do
            Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
        finally
          oResult.Free;
        end;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

Pulling from a subscription

To receive messages, build a TsgcGRPCPubSubPullRequest with the subscription path and the maximum number of messages to fetch, then call Pull on the google.pubsub.v1.Subscriber service. Parse the reply into a TsgcGRPCPubSubPullResponse and walk the received messages. Each one carries an AckId that you must send back to acknowledge delivery, otherwise Pub/Sub redelivers the message after the ack deadline.

var
  oRequest: TsgcGRPCPubSubPullRequest;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPullResponse;
  oRecv: TsgcGRPCPubSubReceivedMessage;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.MaxMessages := 10;

    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
      oRequest.ToBytes);
    try
      oResult := TsgcGRPCPubSubPullResponse.Create;
      try
        oResult.LoadFromBytes(oResponse.Data);
        for i := 0 to oResult.ReceivedMessageCount - 1 do
        begin
          oRecv := oResult.ReceivedMessage(i);
          Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
          Acknowledge(oRecv.AckId);   // confirm delivery
        end;
      finally
        oResult.Free;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

Acknowledging is one more unary call. Build a TsgcGRPCPubSubAcknowledgeRequest with the subscription path and the ack ids you want to confirm, then call Acknowledge on the google.pubsub.v1.Subscriber service.

var
  oRequest: TsgcGRPCPubSubAcknowledgeRequest;
  oResponse: TsgcGRPCResponse;
begin
  oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.AckIds.Add(aAckId);
    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
      oRequest.ToBytes);
    oResponse.Free;
  finally
    oRequest.Free;
  end;
end;

Streaming pull

A one-shot Pull is fine for occasional polling, but for a steady flow of messages Pub/Sub offers StreamingPull, a bidirectional gRPC stream that keeps delivering messages as they arrive. Open a bidi stream on the google.pubsub.v1.Subscriber service, send an initial TsgcGRPCPubSubStreamingPullRequest with the subscription, then handle incoming messages on the gRPC client's OnGRPCStreamMessage event.

var
  oRequest: TsgcGRPCPubSubStreamingPullRequest;
  vStreamId: Integer;
begin
  vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
    'StreamingPull');

  oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.StreamAckDeadlineSeconds := 10;
    GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
  // each incoming batch arrives on OnGRPCStreamMessage
  // call GRPC.CloseBidiStream(vStreamId) to stop
end;

Beyond publish and pull

The typed client covers the rest of the admin surface too. It includes request and response classes for managing topics (CreateTopic, ListTopics, DeleteTopic) and subscriptions (CreateSubscription, ListSubscriptions, DeleteSubscription), plus ModifyAckDeadline to extend the deadline on messages you are still processing. Every operation follows the same shape: fill a request object, call ToBytes, invoke the service method, and load the reply into the matching response class.

Availability

The Google Cloud Pub/Sub gRPC client is part of the sgcWebSockets Enterprise edition. A complete, ready-to-run sample with publishing, pulling, streaming pull and topic/subscription management is in Demos\21.GRPC\10.PubSub, and the full reference for the underlying client is on the gRPC Client product page.

Questions or feedback? Get in touch. You will get a reply from the people who wrote the code.