Google Cloud Pub/Sub via gRPC dans Delphi

· Composants

Google Cloud Pub/Sub est le service de messagerie managé de Google. Les producteurs publient des messages sur un topic, et les consommateurs les reçoivent en tirant depuis une subscription rattachée à ce topic, de sorte que les deux côtés restent totalement découplés. Le service expose une API gRPC, et l'édition Enterprise de sgcWebSockets fournit un client gRPC Pub/Sub typé au-dessus de TsgcGRPCClient, ce qui vous permet de publier et de tirer directement depuis Delphi et C++Builder sans aucun runtime externe ni sidecar.

Cet article montre comment le client est câblé, comment fonctionne l'authentification, et comment publier un message sur un topic et tirer des messages depuis une subscription, en utilisant les mêmes appels que la démo d'exemple.

Comment ça fonctionne

L'API gRPC de Pub/Sub n'est que des messages Protocol Buffers encadrés sur HTTP/2. sgcWebSockets fournit déjà une pile HTTP/2 complète, donc le client Pub/Sub repose sur un transport TsgcHTTP2Client pointé vers pubsub.googleapis.com sur le port 443 via TLS. TsgcGRPCClient prend en charge l'encadrement gRPC, les en-têtes de requête et les trailers de réponse, tandis que les classes de messages Pub/Sub typées sérialisent et analysent les charges utiles protobuf à votre place.

L'authentification utilise un compte de service Google. Vous fournissez les identifiants du compte de service à un TsgcHTTPGoogleCloud_PubSub_Client, qui obtient un jeton d'accès OAuth2, et vous placez ce jeton dans le DefaultMetadata du client gRPC sous la forme d'un en-tête authorization: Bearer. Comme il est défini en tant que métadonnées par défaut, il voyage automatiquement sur chaque appel Pub/Sub.

uses
  sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
  sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
  sgcGRPC_Google_PubSub;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'pubsub.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

Authentification par compte de service

L'assistant Google Cloud gère l'échange de jeton. Chargez le JSON du compte de service, définissez les identifiants JWT, et gérez l'événement OnAuthToken pour copier le jeton bearer obtenu dans le client gRPC. Le même composant prend aussi en charge OAuth2 si vous préférez un flux interactif.

var
  PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
  PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
  PubSub.OnAuthToken := PubSubAuthToken;

  PubSub.GoogleCloudOptions.Authentication := gcaJWT;
  PubSub.GoogleCloudOptions.JWT.ClientEmail  := 'svc@my-project.iam.gserviceaccount.com';
  PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
  PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
  PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';

  // triggers OnAuthToken once the access token is acquired
  PubSub.ListTopics('my-project');
end;

procedure TForm1.PubSubAuthToken(Sender: TObject;
  const TokenType, Token, Data: string);
begin
  GRPC.DefaultMetadata.Clear;
  GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;

Publier sur un topic

Pour publier, construisez un TsgcGRPCPubSubPublishRequest, définissez le nom de topic pleinement qualifié, ajoutez un ou plusieurs messages, et appelez la méthode Publish sur le service google.pubsub.v1.Publisher. Les charges utiles des messages sont des octets bruts, donc encodez votre texte en UTF-8 d'abord. La réponse porte les identifiants de message attribués par le serveur.

var
  oRequest: TsgcGRPCPubSubPublishRequest;
  oMsg: TsgcGRPCPubSubPubsubMessage;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPublishResponse;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPublishRequest.Create;
  try
    oRequest.Topic := 'projects/my-project/topics/my-topic';
    oMsg := oRequest.AddMessage;
    oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');

    oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
      oRequest.ToBytes);
    try
      if oResponse.StatusCode = grpcOK then
      begin
        oResult := TsgcGRPCPubSubPublishResponse.Create;
        try
          oResult.LoadFromBytes(oResponse.Data);
          for i := 0 to oResult.MessageIds.Count - 1 do
            Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
        finally
          oResult.Free;
        end;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

Tirer depuis une subscription

Pour recevoir des messages, construisez un TsgcGRPCPubSubPullRequest avec le chemin de la subscription et le nombre maximal de messages à récupérer, puis appelez Pull sur le service google.pubsub.v1.Subscriber. Analysez la réponse dans un TsgcGRPCPubSubPullResponse et parcourez les messages reçus. Chacun porte un AckId que vous devez renvoyer pour accuser réception de la livraison, sinon Pub/Sub redistribue le message après l'échéance d'accusé.

var
  oRequest: TsgcGRPCPubSubPullRequest;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPullResponse;
  oRecv: TsgcGRPCPubSubReceivedMessage;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.MaxMessages := 10;

    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
      oRequest.ToBytes);
    try
      oResult := TsgcGRPCPubSubPullResponse.Create;
      try
        oResult.LoadFromBytes(oResponse.Data);
        for i := 0 to oResult.ReceivedMessageCount - 1 do
        begin
          oRecv := oResult.ReceivedMessage(i);
          Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
          Acknowledge(oRecv.AckId);   // confirm delivery
        end;
      finally
        oResult.Free;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

L'accusé de réception est un appel unaire de plus. Construisez un TsgcGRPCPubSubAcknowledgeRequest avec le chemin de la subscription et les identifiants d'accusé que vous voulez confirmer, puis appelez Acknowledge sur le service google.pubsub.v1.Subscriber.

var
  oRequest: TsgcGRPCPubSubAcknowledgeRequest;
  oResponse: TsgcGRPCResponse;
begin
  oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.AckIds.Add(aAckId);
    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
      oRequest.ToBytes);
    oResponse.Free;
  finally
    oRequest.Free;
  end;
end;

Tirage en flux continu

Un Pull ponctuel convient pour un sondage occasionnel, mais pour un flux régulier de messages Pub/Sub propose StreamingPull, un flux gRPC bidirectionnel qui continue de livrer les messages à mesure qu'ils arrivent. Ouvrez un flux bidi sur le service google.pubsub.v1.Subscriber, envoyez un TsgcGRPCPubSubStreamingPullRequest initial avec la subscription, puis gérez les messages entrants sur l'événement OnGRPCStreamMessage du client gRPC.

var
  oRequest: TsgcGRPCPubSubStreamingPullRequest;
  vStreamId: Integer;
begin
  vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
    'StreamingPull');

  oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.StreamAckDeadlineSeconds := 10;
    GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
  // each incoming batch arrives on OnGRPCStreamMessage
  // call GRPC.CloseBidiStream(vStreamId) to stop
end;

Au-delà de publier et tirer

Le client typé couvre aussi le reste de la surface d'administration. Il inclut des classes de requête et de réponse pour gérer les topics (CreateTopic, ListTopics, DeleteTopic) et les subscriptions (CreateSubscription, ListSubscriptions, DeleteSubscription), plus ModifyAckDeadline pour prolonger l'échéance sur les messages que vous êtes encore en train de traiter. Chaque opération suit la même forme : remplissez un objet de requête, appelez ToBytes, invoquez la méthode du service, et chargez la réponse dans la classe de réponse correspondante.

Disponibilité

Le client gRPC Google Cloud Pub/Sub fait partie de l'édition Enterprise de sgcWebSockets. Un exemple complet et prêt à l'emploi avec publication, tirage, tirage en flux continu et gestion des topics/subscriptions se trouve dans Demos\21.GRPC\10.PubSub, et la référence complète du client sous-jacent est sur la page produit du client gRPC.

Des questions ou des commentaires ? Contactez-nous. Vous recevrez une réponse des personnes qui ont écrit le code.