Google Cloud Pub/Sub is Google's managed messaging service. Producers publish messages to a topic, and consumers receive them by pulling from a subscription attached to that topic, so the two sides stay fully decoupled. The service exposes a gRPC API, and the sgcWebSockets Enterprise edition ships a typed Pub/Sub gRPC client on top of TsgcGRPCClient, so you can publish and pull straight from Delphi and C++Builder without any external runtime or sidecar.
This post shows how the client is wired up, how authentication works, and how to publish a message to a topic and pull messages from a subscription, using the same calls as the sample demo.
How it works
Pub/Sub's gRPC API is just Protocol Buffers messages framed over HTTP/2. sgcWebSockets already ships a complete HTTP/2 stack, so the Pub/Sub client sits on a TsgcHTTP2Client transport pointed at pubsub.googleapis.com on port 443 over TLS. TsgcGRPCClient takes care of the gRPC framing, the request headers and the response trailers, while the typed Pub/Sub message classes serialize and parse the protobuf payloads for you.
Authentication uses a Google service account. You feed the service-account credentials to a TsgcHTTPGoogleCloud_PubSub_Client, which obtains an OAuth2 access token, and you place that token in the gRPC client's DefaultMetadata as an authorization: Bearer header. Because it is set as default metadata, it travels on every Pub/Sub call automatically.
uses
sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
sgcGRPC_Google_PubSub;
var
HTTP2: TsgcHTTP2Client;
GRPC: TsgcGRPCClient;
begin
HTTP2 := TsgcHTTP2Client.Create(nil);
HTTP2.Host := 'pubsub.googleapis.com';
HTTP2.Port := 443;
HTTP2.TLS := True;
GRPC := TsgcGRPCClient.Create(nil);
GRPC.Client := HTTP2;
GRPC.ChannelOptions.ContentType := grpcProto;
GRPC.ChannelOptions.Compression := grpcNoCompression;
HTTP2.Active := True;
end;
Service-account authentication
The Google Cloud helper handles the token exchange. Load the service-account JSON, set the JWT credentials, and handle the OnAuthToken event to copy the resulting bearer token into the gRPC client. The same component also supports OAuth2 if you prefer an interactive flow.
var
PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
PubSub.OnAuthToken := PubSubAuthToken;
PubSub.GoogleCloudOptions.Authentication := gcaJWT;
PubSub.GoogleCloudOptions.JWT.ClientEmail := 'svc@my-project.iam.gserviceaccount.com';
PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';
// triggers OnAuthToken once the access token is acquired
PubSub.ListTopics('my-project');
end;
procedure TForm1.PubSubAuthToken(Sender: TObject;
const TokenType, Token, Data: string);
begin
GRPC.DefaultMetadata.Clear;
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;
Publishing to a topic
To publish, build a TsgcGRPCPubSubPublishRequest, set the fully qualified topic name, add one or more messages, and call the Publish method on the google.pubsub.v1.Publisher service. Message payloads are raw bytes, so encode your text as UTF-8 first. The response carries the server-assigned message ids.
var
oRequest: TsgcGRPCPubSubPublishRequest;
oMsg: TsgcGRPCPubSubPubsubMessage;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPublishResponse;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPublishRequest.Create;
try
oRequest.Topic := 'projects/my-project/topics/my-topic';
oMsg := oRequest.AddMessage;
oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');
oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
oRequest.ToBytes);
try
if oResponse.StatusCode = grpcOK then
begin
oResult := TsgcGRPCPubSubPublishResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.MessageIds.Count - 1 do
Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
finally
oResult.Free;
end;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
Pulling from a subscription
To receive messages, build a TsgcGRPCPubSubPullRequest with the subscription path and the maximum number of messages to fetch, then call Pull on the google.pubsub.v1.Subscriber service. Parse the reply into a TsgcGRPCPubSubPullResponse and walk the received messages. Each one carries an AckId that you must send back to acknowledge delivery, otherwise Pub/Sub redelivers the message after the ack deadline.
var
oRequest: TsgcGRPCPubSubPullRequest;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPullResponse;
oRecv: TsgcGRPCPubSubReceivedMessage;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.MaxMessages := 10;
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
oRequest.ToBytes);
try
oResult := TsgcGRPCPubSubPullResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.ReceivedMessageCount - 1 do
begin
oRecv := oResult.ReceivedMessage(i);
Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
Acknowledge(oRecv.AckId); // confirm delivery
end;
finally
oResult.Free;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
Acknowledging is one more unary call. Build a TsgcGRPCPubSubAcknowledgeRequest with the subscription path and the ack ids you want to confirm, then call Acknowledge on the google.pubsub.v1.Subscriber service.
var
oRequest: TsgcGRPCPubSubAcknowledgeRequest;
oResponse: TsgcGRPCResponse;
begin
oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.AckIds.Add(aAckId);
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
oRequest.ToBytes);
oResponse.Free;
finally
oRequest.Free;
end;
end;
Streaming pull
A one-shot Pull is fine for occasional polling, but for a steady flow of messages Pub/Sub offers StreamingPull, a bidirectional gRPC stream that keeps delivering messages as they arrive. Open a bidi stream on the google.pubsub.v1.Subscriber service, send an initial TsgcGRPCPubSubStreamingPullRequest with the subscription, then handle incoming messages on the gRPC client's OnGRPCStreamMessage event.
var
oRequest: TsgcGRPCPubSubStreamingPullRequest;
vStreamId: Integer;
begin
vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
'StreamingPull');
oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.StreamAckDeadlineSeconds := 10;
GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
finally
oRequest.Free;
end;
// each incoming batch arrives on OnGRPCStreamMessage
// call GRPC.CloseBidiStream(vStreamId) to stop
end;
Beyond publish and pull
The typed client covers the rest of the admin surface too. It includes request and response classes for managing topics (CreateTopic, ListTopics, DeleteTopic) and subscriptions (CreateSubscription, ListSubscriptions, DeleteSubscription), plus ModifyAckDeadline to extend the deadline on messages you are still processing. Every operation follows the same shape: fill a request object, call ToBytes, invoke the service method, and load the reply into the matching response class.
Availability
The Google Cloud Pub/Sub gRPC client is part of the sgcWebSockets Enterprise edition. A complete, ready-to-run sample with publishing, pulling, streaming pull and topic/subscription management is in Demos\21.GRPC\10.PubSub, and the full reference for the underlying client is on the gRPC Client product page.
Questions or feedback? Get in touch. You will get a reply from the people who wrote the code.
