Google Cloud Pub/Sub est le service de messagerie managé de Google. Les producteurs publient des messages sur un topic, et les consommateurs les reçoivent en tirant depuis une subscription rattachée à ce topic, de sorte que les deux côtés restent totalement découplés. Le service expose une API gRPC, et l'édition Enterprise de sgcWebSockets fournit un client gRPC Pub/Sub typé au-dessus de TsgcGRPCClient, ce qui vous permet de publier et de tirer directement depuis Delphi et C++Builder sans aucun runtime externe ni sidecar.
Cet article montre comment le client est câblé, comment fonctionne l'authentification, et comment publier un message sur un topic et tirer des messages depuis une subscription, en utilisant les mêmes appels que la démo d'exemple.
Comment ça fonctionne
L'API gRPC de Pub/Sub n'est que des messages Protocol Buffers encadrés sur HTTP/2. sgcWebSockets fournit déjà une pile HTTP/2 complète, donc le client Pub/Sub repose sur un transport TsgcHTTP2Client pointé vers pubsub.googleapis.com sur le port 443 via TLS. TsgcGRPCClient prend en charge l'encadrement gRPC, les en-têtes de requête et les trailers de réponse, tandis que les classes de messages Pub/Sub typées sérialisent et analysent les charges utiles protobuf à votre place.
L'authentification utilise un compte de service Google. Vous fournissez les identifiants du compte de service à un TsgcHTTPGoogleCloud_PubSub_Client, qui obtient un jeton d'accès OAuth2, et vous placez ce jeton dans le DefaultMetadata du client gRPC sous la forme d'un en-tête authorization: Bearer. Comme il est défini en tant que métadonnées par défaut, il voyage automatiquement sur chaque appel Pub/Sub.
uses
sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
sgcGRPC_Google_PubSub;
var
HTTP2: TsgcHTTP2Client;
GRPC: TsgcGRPCClient;
begin
HTTP2 := TsgcHTTP2Client.Create(nil);
HTTP2.Host := 'pubsub.googleapis.com';
HTTP2.Port := 443;
HTTP2.TLS := True;
GRPC := TsgcGRPCClient.Create(nil);
GRPC.Client := HTTP2;
GRPC.ChannelOptions.ContentType := grpcProto;
GRPC.ChannelOptions.Compression := grpcNoCompression;
HTTP2.Active := True;
end;
Authentification par compte de service
L'assistant Google Cloud gère l'échange de jeton. Chargez le JSON du compte de service, définissez les identifiants JWT, et gérez l'événement OnAuthToken pour copier le jeton bearer obtenu dans le client gRPC. Le même composant prend aussi en charge OAuth2 si vous préférez un flux interactif.
var
PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
PubSub.OnAuthToken := PubSubAuthToken;
PubSub.GoogleCloudOptions.Authentication := gcaJWT;
PubSub.GoogleCloudOptions.JWT.ClientEmail := 'svc@my-project.iam.gserviceaccount.com';
PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';
// triggers OnAuthToken once the access token is acquired
PubSub.ListTopics('my-project');
end;
procedure TForm1.PubSubAuthToken(Sender: TObject;
const TokenType, Token, Data: string);
begin
GRPC.DefaultMetadata.Clear;
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;
Publier sur un topic
Pour publier, construisez un TsgcGRPCPubSubPublishRequest, définissez le nom de topic pleinement qualifié, ajoutez un ou plusieurs messages, et appelez la méthode Publish sur le service google.pubsub.v1.Publisher. Les charges utiles des messages sont des octets bruts, donc encodez votre texte en UTF-8 d'abord. La réponse porte les identifiants de message attribués par le serveur.
var
oRequest: TsgcGRPCPubSubPublishRequest;
oMsg: TsgcGRPCPubSubPubsubMessage;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPublishResponse;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPublishRequest.Create;
try
oRequest.Topic := 'projects/my-project/topics/my-topic';
oMsg := oRequest.AddMessage;
oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');
oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
oRequest.ToBytes);
try
if oResponse.StatusCode = grpcOK then
begin
oResult := TsgcGRPCPubSubPublishResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.MessageIds.Count - 1 do
Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
finally
oResult.Free;
end;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
Tirer depuis une subscription
Pour recevoir des messages, construisez un TsgcGRPCPubSubPullRequest avec le chemin de la subscription et le nombre maximal de messages à récupérer, puis appelez Pull sur le service google.pubsub.v1.Subscriber. Analysez la réponse dans un TsgcGRPCPubSubPullResponse et parcourez les messages reçus. Chacun porte un AckId que vous devez renvoyer pour accuser réception de la livraison, sinon Pub/Sub redistribue le message après l'échéance d'accusé.
var
oRequest: TsgcGRPCPubSubPullRequest;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPullResponse;
oRecv: TsgcGRPCPubSubReceivedMessage;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.MaxMessages := 10;
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
oRequest.ToBytes);
try
oResult := TsgcGRPCPubSubPullResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.ReceivedMessageCount - 1 do
begin
oRecv := oResult.ReceivedMessage(i);
Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
Acknowledge(oRecv.AckId); // confirm delivery
end;
finally
oResult.Free;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
L'accusé de réception est un appel unaire de plus. Construisez un TsgcGRPCPubSubAcknowledgeRequest avec le chemin de la subscription et les identifiants d'accusé que vous voulez confirmer, puis appelez Acknowledge sur le service google.pubsub.v1.Subscriber.
var
oRequest: TsgcGRPCPubSubAcknowledgeRequest;
oResponse: TsgcGRPCResponse;
begin
oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.AckIds.Add(aAckId);
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
oRequest.ToBytes);
oResponse.Free;
finally
oRequest.Free;
end;
end;
Tirage en flux continu
Un Pull ponctuel convient pour un sondage occasionnel, mais pour un flux régulier de messages Pub/Sub propose StreamingPull, un flux gRPC bidirectionnel qui continue de livrer les messages à mesure qu'ils arrivent. Ouvrez un flux bidi sur le service google.pubsub.v1.Subscriber, envoyez un TsgcGRPCPubSubStreamingPullRequest initial avec la subscription, puis gérez les messages entrants sur l'événement OnGRPCStreamMessage du client gRPC.
var
oRequest: TsgcGRPCPubSubStreamingPullRequest;
vStreamId: Integer;
begin
vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
'StreamingPull');
oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.StreamAckDeadlineSeconds := 10;
GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
finally
oRequest.Free;
end;
// each incoming batch arrives on OnGRPCStreamMessage
// call GRPC.CloseBidiStream(vStreamId) to stop
end;
Au-delà de publier et tirer
Le client typé couvre aussi le reste de la surface d'administration. Il inclut des classes de requête et de réponse pour gérer les topics (CreateTopic, ListTopics, DeleteTopic) et les subscriptions (CreateSubscription, ListSubscriptions, DeleteSubscription), plus ModifyAckDeadline pour prolonger l'échéance sur les messages que vous êtes encore en train de traiter. Chaque opération suit la même forme : remplissez un objet de requête, appelez ToBytes, invoquez la méthode du service, et chargez la réponse dans la classe de réponse correspondante.
Disponibilité
Le client gRPC Google Cloud Pub/Sub fait partie de l'édition Enterprise de sgcWebSockets. Un exemple complet et prêt à l'emploi avec publication, tirage, tirage en flux continu et gestion des topics/subscriptions se trouve dans Demos\21.GRPC\10.PubSub, et la référence complète du client sous-jacent est sur la page produit du client gRPC.
Des questions ou des commentaires ? Contactez-nous. Vous recevrez une réponse des personnes qui ont écrit le code.
