Google Cloud Pub/Sub ist der verwaltete Messaging-Dienst von Google. Produzenten veröffentlichen Nachrichten in einem Topic, und Konsumenten empfangen sie, indem sie aus einer mit diesem Topic verknüpften Subscription abrufen, sodass beide Seiten vollständig entkoppelt bleiben. Der Dienst stellt eine gRPC-API bereit, und die sgcWebSockets Enterprise Edition liefert einen typisierten Pub/Sub-gRPC-Client auf Basis von TsgcGRPCClient, sodass Sie direkt aus Delphi und C++Builder veröffentlichen und abrufen können, ohne externe Laufzeitumgebung oder Sidecar.
Dieser Beitrag zeigt, wie der Client verdrahtet wird, wie die Authentifizierung funktioniert und wie Sie eine Nachricht in einem Topic veröffentlichen und Nachrichten aus einer Subscription abrufen, anhand derselben Aufrufe wie im Beispiel-Demo.
Wie es funktioniert
Die gRPC-API von Pub/Sub besteht lediglich aus Protocol-Buffers-Nachrichten, die über HTTP/2 gerahmt werden. sgcWebSockets liefert bereits einen vollständigen HTTP/2-Stack, sodass der Pub/Sub-Client auf einem TsgcHTTP2Client-Transport sitzt, der auf pubsub.googleapis.com an Port 443 über TLS verweist. TsgcGRPCClient kümmert sich um das gRPC-Framing, die Request-Header und die Response-Trailer, während die typisierten Pub/Sub-Nachrichtenklassen die protobuf-Nutzdaten für Sie serialisieren und parsen.
Die Authentifizierung verwendet ein Google-Dienstkonto. Sie übergeben die Dienstkonto-Anmeldedaten an einen TsgcHTTPGoogleCloud_PubSub_Client, der ein OAuth2-Zugriffstoken bezieht, und legen dieses Token in den DefaultMetadata des gRPC-Clients als authorization: Bearer-Header ab. Da es als Default-Metadaten gesetzt ist, wird es bei jedem Pub/Sub-Aufruf automatisch mitgesendet.
uses
sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
sgcGRPC_Google_PubSub;
var
HTTP2: TsgcHTTP2Client;
GRPC: TsgcGRPCClient;
begin
HTTP2 := TsgcHTTP2Client.Create(nil);
HTTP2.Host := 'pubsub.googleapis.com';
HTTP2.Port := 443;
HTTP2.TLS := True;
GRPC := TsgcGRPCClient.Create(nil);
GRPC.Client := HTTP2;
GRPC.ChannelOptions.ContentType := grpcProto;
GRPC.ChannelOptions.Compression := grpcNoCompression;
HTTP2.Active := True;
end;
Dienstkonto-Authentifizierung
Der Google-Cloud-Helfer übernimmt den Token-Austausch. Laden Sie das Dienstkonto-JSON, setzen Sie die JWT-Anmeldedaten und behandeln Sie das OnAuthToken-Ereignis, um das resultierende Bearer-Token in den gRPC-Client zu kopieren. Dieselbe Komponente unterstützt auch OAuth2, falls Sie einen interaktiven Ablauf bevorzugen.
var
PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
PubSub.OnAuthToken := PubSubAuthToken;
PubSub.GoogleCloudOptions.Authentication := gcaJWT;
PubSub.GoogleCloudOptions.JWT.ClientEmail := 'svc@my-project.iam.gserviceaccount.com';
PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';
// triggers OnAuthToken once the access token is acquired
PubSub.ListTopics('my-project');
end;
procedure TForm1.PubSubAuthToken(Sender: TObject;
const TokenType, Token, Data: string);
begin
GRPC.DefaultMetadata.Clear;
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;
In einem Topic veröffentlichen
Zum Veröffentlichen erstellen Sie einen TsgcGRPCPubSubPublishRequest, setzen den voll qualifizierten Topic-Namen, fügen eine oder mehrere Nachrichten hinzu und rufen die Methode Publish des Dienstes google.pubsub.v1.Publisher auf. Nachrichten-Nutzdaten sind Rohbytes, kodieren Sie Ihren Text also zuerst als UTF-8. Die Antwort trägt die vom Server zugewiesenen Nachrichten-IDs.
var
oRequest: TsgcGRPCPubSubPublishRequest;
oMsg: TsgcGRPCPubSubPubsubMessage;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPublishResponse;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPublishRequest.Create;
try
oRequest.Topic := 'projects/my-project/topics/my-topic';
oMsg := oRequest.AddMessage;
oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');
oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
oRequest.ToBytes);
try
if oResponse.StatusCode = grpcOK then
begin
oResult := TsgcGRPCPubSubPublishResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.MessageIds.Count - 1 do
Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
finally
oResult.Free;
end;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
Aus einer Subscription abrufen
Zum Empfangen von Nachrichten erstellen Sie einen TsgcGRPCPubSubPullRequest mit dem Subscription-Pfad und der maximalen Anzahl abzurufender Nachrichten und rufen dann Pull des Dienstes google.pubsub.v1.Subscriber auf. Parsen Sie die Antwort in einen TsgcGRPCPubSubPullResponse und durchlaufen Sie die empfangenen Nachrichten. Jede trägt eine AckId, die Sie zurücksenden müssen, um die Zustellung zu bestätigen, andernfalls stellt Pub/Sub die Nachricht nach Ablauf der Ack-Frist erneut zu.
var
oRequest: TsgcGRPCPubSubPullRequest;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPullResponse;
oRecv: TsgcGRPCPubSubReceivedMessage;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.MaxMessages := 10;
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
oRequest.ToBytes);
try
oResult := TsgcGRPCPubSubPullResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.ReceivedMessageCount - 1 do
begin
oRecv := oResult.ReceivedMessage(i);
Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
Acknowledge(oRecv.AckId); // confirm delivery
end;
finally
oResult.Free;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
Das Bestätigen ist ein weiterer unärer Aufruf. Erstellen Sie einen TsgcGRPCPubSubAcknowledgeRequest mit dem Subscription-Pfad und den Ack-IDs, die Sie bestätigen möchten, und rufen Sie dann Acknowledge des Dienstes google.pubsub.v1.Subscriber auf.
var
oRequest: TsgcGRPCPubSubAcknowledgeRequest;
oResponse: TsgcGRPCResponse;
begin
oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.AckIds.Add(aAckId);
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
oRequest.ToBytes);
oResponse.Free;
finally
oRequest.Free;
end;
end;
Streaming-Pull
Ein einmaliges Pull eignet sich gut für gelegentliches Polling, aber für einen stetigen Nachrichtenfluss bietet Pub/Sub StreamingPull, einen bidirektionalen gRPC-Stream, der Nachrichten weiterhin zustellt, sobald sie eintreffen. Öffnen Sie einen Bidi-Stream auf dem Dienst google.pubsub.v1.Subscriber, senden Sie einen initialen TsgcGRPCPubSubStreamingPullRequest mit der Subscription und behandeln Sie eingehende Nachrichten im OnGRPCStreamMessage-Ereignis des gRPC-Clients.
var
oRequest: TsgcGRPCPubSubStreamingPullRequest;
vStreamId: Integer;
begin
vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
'StreamingPull');
oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.StreamAckDeadlineSeconds := 10;
GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
finally
oRequest.Free;
end;
// each incoming batch arrives on OnGRPCStreamMessage
// call GRPC.CloseBidiStream(vStreamId) to stop
end;
Über Veröffentlichen und Abrufen hinaus
Der typisierte Client deckt auch den restlichen Admin-Bereich ab. Er enthält Request- und Response-Klassen zum Verwalten von Topics (CreateTopic, ListTopics, DeleteTopic) und Subscriptions (CreateSubscription, ListSubscriptions, DeleteSubscription) sowie ModifyAckDeadline, um die Frist für Nachrichten zu verlängern, die Sie noch verarbeiten. Jede Operation folgt demselben Muster: Füllen Sie ein Request-Objekt, rufen Sie ToBytes auf, rufen Sie die Dienstmethode auf und laden Sie die Antwort in die passende Response-Klasse.
Verfügbarkeit
Der Google Cloud Pub/Sub gRPC-Client ist Teil der sgcWebSockets Enterprise Edition. Ein vollständiges, sofort lauffähiges Beispiel mit Veröffentlichen, Abrufen, Streaming-Pull und Topic-/Subscription-Verwaltung befindet sich in Demos\21.GRPC\10.PubSub, und die vollständige Referenz zum zugrunde liegenden Client finden Sie auf der gRPC-Client-Produktseite.
Fragen oder Feedback? Nehmen Sie Kontakt auf. Sie erhalten eine Antwort von den Leuten, die den Code geschrieben haben.
