Google Cloud Pub/Sub is Google's beheerde messagingdienst. Producenten publiceren berichten naar een topic, en consumenten ontvangen ze door op te halen uit een subscription die aan dat topic is gekoppeld, zodat beide kanten volledig ontkoppeld blijven. De dienst biedt een gRPC-API, en de sgcWebSockets Enterprise-editie levert een getypeerde Pub/Sub gRPC-client bovenop TsgcGRPCClient, zodat je rechtstreeks vanuit Delphi en C++Builder kunt publiceren en ophalen zonder enige externe runtime of sidecar.
Dit bericht laat zien hoe de client wordt opgezet, hoe authenticatie werkt, en hoe je een bericht naar een topic publiceert en berichten uit een subscription ophaalt, met dezelfde calls als de voorbeelddemo.
Hoe het werkt
De gRPC-API van Pub/Sub bestaat gewoon uit Protocol Buffers-berichten ingekapseld over HTTP/2. sgcWebSockets levert al een complete HTTP/2-stack, dus de Pub/Sub-client draait op een TsgcHTTP2Client-transport gericht op pubsub.googleapis.com op poort 443 over TLS. TsgcGRPCClient verzorgt de gRPC-framing, de request-headers en de response-trailers, terwijl de getypeerde Pub/Sub-berichtklassen de protobuf-payloads voor je serialiseren en parsen.
Authenticatie gebruikt een Google-service-account. Je voert de service-account-credentials in bij een TsgcHTTPGoogleCloud_PubSub_Client, die een OAuth2-accesstoken verkrijgt, en je plaatst dat token in de DefaultMetadata van de gRPC-client als een authorization: Bearer-header. Omdat het als default metadata wordt ingesteld, reist het automatisch mee bij elke Pub/Sub-call.
uses
sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
sgcGRPC_Google_PubSub;
var
HTTP2: TsgcHTTP2Client;
GRPC: TsgcGRPCClient;
begin
HTTP2 := TsgcHTTP2Client.Create(nil);
HTTP2.Host := 'pubsub.googleapis.com';
HTTP2.Port := 443;
HTTP2.TLS := True;
GRPC := TsgcGRPCClient.Create(nil);
GRPC.Client := HTTP2;
GRPC.ChannelOptions.ContentType := grpcProto;
GRPC.ChannelOptions.Compression := grpcNoCompression;
HTTP2.Active := True;
end;
Service-account-authenticatie
De Google Cloud-helper verzorgt de token-uitwisseling. Laad de service-account-JSON, stel de JWT-credentials in, en handel de OnAuthToken-gebeurtenis af om het resulterende bearer-token naar de gRPC-client te kopiëren. Hetzelfde component ondersteunt ook OAuth2 als je een interactieve flow verkiest.
var
PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
PubSub.OnAuthToken := PubSubAuthToken;
PubSub.GoogleCloudOptions.Authentication := gcaJWT;
PubSub.GoogleCloudOptions.JWT.ClientEmail := 'svc@my-project.iam.gserviceaccount.com';
PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';
// triggers OnAuthToken once the access token is acquired
PubSub.ListTopics('my-project');
end;
procedure TForm1.PubSubAuthToken(Sender: TObject;
const TokenType, Token, Data: string);
begin
GRPC.DefaultMetadata.Clear;
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;
Publiceren naar een topic
Om te publiceren bouw je een TsgcGRPCPubSubPublishRequest, stel je de volledig gekwalificeerde topic-naam in, voeg je een of meer berichten toe, en roep je de Publish-methode aan op de google.pubsub.v1.Publisher-service. Berichtpayloads zijn ruwe bytes, dus codeer je tekst eerst als UTF-8. De response bevat de door de server toegekende bericht-ids.
var
oRequest: TsgcGRPCPubSubPublishRequest;
oMsg: TsgcGRPCPubSubPubsubMessage;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPublishResponse;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPublishRequest.Create;
try
oRequest.Topic := 'projects/my-project/topics/my-topic';
oMsg := oRequest.AddMessage;
oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');
oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
oRequest.ToBytes);
try
if oResponse.StatusCode = grpcOK then
begin
oResult := TsgcGRPCPubSubPublishResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.MessageIds.Count - 1 do
Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
finally
oResult.Free;
end;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
Ophalen uit een subscription
Om berichten te ontvangen bouw je een TsgcGRPCPubSubPullRequest met het subscription-pad en het maximale aantal op te halen berichten, en roep je vervolgens Pull aan op de google.pubsub.v1.Subscriber-service. Parse het antwoord in een TsgcGRPCPubSubPullResponse en loop door de ontvangen berichten. Elk bericht draagt een AckId dat je moet terugsturen om de aflevering te bevestigen, anders levert Pub/Sub het bericht opnieuw af na de ack-deadline.
var
oRequest: TsgcGRPCPubSubPullRequest;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPullResponse;
oRecv: TsgcGRPCPubSubReceivedMessage;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.MaxMessages := 10;
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
oRequest.ToBytes);
try
oResult := TsgcGRPCPubSubPullResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.ReceivedMessageCount - 1 do
begin
oRecv := oResult.ReceivedMessage(i);
Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
Acknowledge(oRecv.AckId); // confirm delivery
end;
finally
oResult.Free;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
Bevestigen is nog een unaire call. Bouw een TsgcGRPCPubSubAcknowledgeRequest met het subscription-pad en de ack-ids die je wilt bevestigen, en roep vervolgens Acknowledge aan op de google.pubsub.v1.Subscriber-service.
var
oRequest: TsgcGRPCPubSubAcknowledgeRequest;
oResponse: TsgcGRPCResponse;
begin
oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.AckIds.Add(aAckId);
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
oRequest.ToBytes);
oResponse.Free;
finally
oRequest.Free;
end;
end;
Streaming pull
Een eenmalige Pull is prima voor incidenteel pollen, maar voor een gestage stroom berichten biedt Pub/Sub StreamingPull, een bidirectionele gRPC-stream die berichten blijft afleveren zodra ze binnenkomen. Open een bidi-stream op de google.pubsub.v1.Subscriber-service, verstuur een initiële TsgcGRPCPubSubStreamingPullRequest met de subscription, en handel binnenkomende berichten af op de OnGRPCStreamMessage-gebeurtenis van de gRPC-client.
var
oRequest: TsgcGRPCPubSubStreamingPullRequest;
vStreamId: Integer;
begin
vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
'StreamingPull');
oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.StreamAckDeadlineSeconds := 10;
GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
finally
oRequest.Free;
end;
// each incoming batch arrives on OnGRPCStreamMessage
// call GRPC.CloseBidiStream(vStreamId) to stop
end;
Verder dan publiceren en ophalen
De getypeerde client dekt ook de rest van het beheeroppervlak. Hij bevat request- en response-klassen voor het beheren van topics (CreateTopic, ListTopics, DeleteTopic) en subscriptions (CreateSubscription, ListSubscriptions, DeleteSubscription), plus ModifyAckDeadline om de deadline te verlengen voor berichten die je nog verwerkt. Elke bewerking volgt dezelfde vorm: vul een request-object, roep ToBytes aan, roep de servicemethode aan, en laad het antwoord in de bijbehorende response-klasse.
Beschikbaarheid
De Google Cloud Pub/Sub gRPC-client maakt deel uit van de sgcWebSockets Enterprise-editie. Een compleet, kant-en-klaar voorbeeld met publiceren, ophalen, streaming pull en topic-/subscription-beheer staat in Demos\21.GRPC\10.PubSub, en de volledige referentie voor de onderliggende client staat op de productpagina van de gRPC Client.
Vragen of feedback? Neem contact op. Je krijgt antwoord van de mensen die de code hebben geschreven.
