Google Cloud Pub/Sub su gRPC in Delphi

· Componenti

Google Cloud Pub/Sub è il servizio di messaggistica gestito di Google. I produttori pubblicano messaggi su un topic e i consumatori li ricevono estraendoli da una subscription collegata a quel topic, così le due parti restano completamente disaccoppiate. Il servizio espone un'API gRPC e l'edizione Enterprise di sgcWebSockets include un client gRPC Pub/Sub tipizzato sopra TsgcGRPCClient, così puoi pubblicare ed estrarre direttamente da Delphi e C++Builder senza alcun runtime esterno o sidecar.

Questo articolo mostra come è cablato il client, come funziona l'autenticazione e come pubblicare un messaggio su un topic ed estrarre messaggi da una subscription, usando le stesse chiamate della demo di esempio.

Come funziona

L'API gRPC di Pub/Sub è semplicemente costituita da messaggi Protocol Buffers incapsulati su HTTP/2. sgcWebSockets include già uno stack HTTP/2 completo, quindi il client Pub/Sub poggia su un trasporto TsgcHTTP2Client puntato a pubsub.googleapis.com sulla porta 443 su TLS. TsgcGRPCClient si occupa dell'incapsulamento gRPC, degli header della richiesta e dei trailer della risposta, mentre le classi di messaggi Pub/Sub tipizzate serializzano e analizzano per te i payload protobuf.

L'autenticazione usa un account di servizio Google. Fornisci le credenziali dell'account di servizio a un TsgcHTTPGoogleCloud_PubSub_Client, che ottiene un token di accesso OAuth2, e inserisci quel token nel DefaultMetadata del client gRPC come header authorization: Bearer. Poiché è impostato come metadata predefinito, viaggia automaticamente su ogni chiamata Pub/Sub.

uses
  sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
  sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
  sgcGRPC_Google_PubSub;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'pubsub.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

Autenticazione con account di servizio

L'helper Google Cloud gestisce lo scambio del token. Carica il JSON dell'account di servizio, imposta le credenziali JWT e gestisci l'evento OnAuthToken per copiare il token bearer risultante nel client gRPC. Lo stesso componente supporta anche OAuth2 se preferisci un flusso interattivo.

var
  PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
  PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
  PubSub.OnAuthToken := PubSubAuthToken;

  PubSub.GoogleCloudOptions.Authentication := gcaJWT;
  PubSub.GoogleCloudOptions.JWT.ClientEmail  := 'svc@my-project.iam.gserviceaccount.com';
  PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
  PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
  PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';

  // triggers OnAuthToken once the access token is acquired
  PubSub.ListTopics('my-project');
end;

procedure TForm1.PubSubAuthToken(Sender: TObject;
  const TokenType, Token, Data: string);
begin
  GRPC.DefaultMetadata.Clear;
  GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;

Pubblicare su un topic

Per pubblicare, costruisci un TsgcGRPCPubSubPublishRequest, imposta il nome del topic completamente qualificato, aggiungi uno o più messaggi e chiama il metodo Publish sul servizio google.pubsub.v1.Publisher. I payload dei messaggi sono byte grezzi, quindi codifica prima il tuo testo come UTF-8. La risposta contiene gli id dei messaggi assegnati dal server.

var
  oRequest: TsgcGRPCPubSubPublishRequest;
  oMsg: TsgcGRPCPubSubPubsubMessage;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPublishResponse;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPublishRequest.Create;
  try
    oRequest.Topic := 'projects/my-project/topics/my-topic';
    oMsg := oRequest.AddMessage;
    oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');

    oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
      oRequest.ToBytes);
    try
      if oResponse.StatusCode = grpcOK then
      begin
        oResult := TsgcGRPCPubSubPublishResponse.Create;
        try
          oResult.LoadFromBytes(oResponse.Data);
          for i := 0 to oResult.MessageIds.Count - 1 do
            Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
        finally
          oResult.Free;
        end;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

Estrarre da una subscription

Per ricevere messaggi, costruisci un TsgcGRPCPubSubPullRequest con il percorso della subscription e il numero massimo di messaggi da recuperare, poi chiama Pull sul servizio google.pubsub.v1.Subscriber. Analizza la risposta in un TsgcGRPCPubSubPullResponse e scorri i messaggi ricevuti. Ognuno porta con sé un AckId che devi rispedire per confermare la consegna, altrimenti Pub/Sub riconsegna il messaggio dopo la scadenza dell'ack.

var
  oRequest: TsgcGRPCPubSubPullRequest;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPullResponse;
  oRecv: TsgcGRPCPubSubReceivedMessage;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.MaxMessages := 10;

    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
      oRequest.ToBytes);
    try
      oResult := TsgcGRPCPubSubPullResponse.Create;
      try
        oResult.LoadFromBytes(oResponse.Data);
        for i := 0 to oResult.ReceivedMessageCount - 1 do
        begin
          oRecv := oResult.ReceivedMessage(i);
          Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
          Acknowledge(oRecv.AckId);   // confirm delivery
        end;
      finally
        oResult.Free;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

Confermare è un'ulteriore chiamata unaria. Costruisci un TsgcGRPCPubSubAcknowledgeRequest con il percorso della subscription e gli ack id che vuoi confermare, poi chiama Acknowledge sul servizio google.pubsub.v1.Subscriber.

var
  oRequest: TsgcGRPCPubSubAcknowledgeRequest;
  oResponse: TsgcGRPCResponse;
begin
  oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.AckIds.Add(aAckId);
    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
      oRequest.ToBytes);
    oResponse.Free;
  finally
    oRequest.Free;
  end;
end;

Streaming pull

Un Pull singolo va bene per il polling occasionale, ma per un flusso costante di messaggi Pub/Sub offre StreamingPull, uno stream gRPC bidirezionale che continua a consegnare i messaggi man mano che arrivano. Apri uno stream bidi sul servizio google.pubsub.v1.Subscriber, invia un TsgcGRPCPubSubStreamingPullRequest iniziale con la subscription, poi gestisci i messaggi in arrivo sull'evento OnGRPCStreamMessage del client gRPC.

var
  oRequest: TsgcGRPCPubSubStreamingPullRequest;
  vStreamId: Integer;
begin
  vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
    'StreamingPull');

  oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.StreamAckDeadlineSeconds := 10;
    GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
  // each incoming batch arrives on OnGRPCStreamMessage
  // call GRPC.CloseBidiStream(vStreamId) to stop
end;

Oltre publish e pull

Il client tipizzato copre anche il resto della superficie amministrativa. Include classi di richiesta e risposta per gestire i topic (CreateTopic, ListTopics, DeleteTopic) e le subscription (CreateSubscription, ListSubscriptions, DeleteSubscription), più ModifyAckDeadline per estendere la scadenza sui messaggi che stai ancora elaborando. Ogni operazione segue la stessa forma: riempi un oggetto richiesta, chiama ToBytes, invoca il metodo del servizio e carica la risposta nella classe di risposta corrispondente.

Disponibilità

Il client gRPC di Google Cloud Pub/Sub fa parte dell'edizione Enterprise di sgcWebSockets. Un esempio completo e pronto all'uso con pubblicazione, estrazione, streaming pull e gestione di topic/subscription è in Demos\21.GRPC\10.PubSub e il riferimento completo per il client sottostante è sulla pagina prodotto gRPC Client.

Domande o feedback? Contattaci. Riceverai una risposta dalle persone che hanno scritto il codice.