Google Cloud Pub/Sub sobre gRPC no Delphi

· Componentes

O Google Cloud Pub/Sub é o serviço de mensageria gerenciado do Google. Os produtores publicam mensagens em um tópico, e os consumidores as recebem fazendo pull de uma assinatura vinculada a esse tópico, de modo que os dois lados permanecem totalmente desacoplados. O serviço expõe uma API gRPC, e a edição Enterprise do sgcWebSockets inclui um cliente Pub/Sub gRPC tipado sobre o TsgcGRPCClient, para que você possa publicar e fazer pull diretamente do Delphi e do C++Builder sem nenhum runtime externo ou sidecar.

Este post mostra como o cliente é configurado, como funciona a autenticação e como publicar uma mensagem em um tópico e fazer pull de mensagens de uma assinatura, usando as mesmas chamadas da demo de exemplo.

Como funciona

A API gRPC do Pub/Sub é apenas mensagens Protocol Buffers encapsuladas sobre HTTP/2. O sgcWebSockets já vem com uma pilha HTTP/2 completa, então o cliente Pub/Sub se apoia em um transporte TsgcHTTP2Client apontado para pubsub.googleapis.com na porta 443 sobre TLS. O TsgcGRPCClient cuida do enquadramento gRPC, dos cabeçalhos da requisição e dos trailers da resposta, enquanto as classes de mensagem Pub/Sub tipadas serializam e analisam os payloads protobuf para você.

A autenticação usa uma conta de serviço do Google. Você fornece as credenciais da conta de serviço a um TsgcHTTPGoogleCloud_PubSub_Client, que obtém um token de acesso OAuth2, e coloca esse token no DefaultMetadata do cliente gRPC como um cabeçalho authorization: Bearer. Por ser definido como metadado padrão, ele acompanha automaticamente cada chamada Pub/Sub.

uses
  sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
  sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
  sgcGRPC_Google_PubSub;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'pubsub.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

Autenticação por conta de serviço

O auxiliar do Google Cloud cuida da troca de token. Carregue o JSON da conta de serviço, defina as credenciais JWT e trate o evento OnAuthToken para copiar o token bearer resultante para o cliente gRPC. O mesmo componente também oferece suporte a OAuth2 se você preferir um fluxo interativo.

var
  PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
  PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
  PubSub.OnAuthToken := PubSubAuthToken;

  PubSub.GoogleCloudOptions.Authentication := gcaJWT;
  PubSub.GoogleCloudOptions.JWT.ClientEmail  := 'svc@my-project.iam.gserviceaccount.com';
  PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
  PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
  PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';

  // triggers OnAuthToken once the access token is acquired
  PubSub.ListTopics('my-project');
end;

procedure TForm1.PubSubAuthToken(Sender: TObject;
  const TokenType, Token, Data: string);
begin
  GRPC.DefaultMetadata.Clear;
  GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;

Publicando em um tópico

Para publicar, monte um TsgcGRPCPubSubPublishRequest, defina o nome totalmente qualificado do tópico, adicione uma ou mais mensagens e chame o método Publish no serviço google.pubsub.v1.Publisher. Os payloads das mensagens são bytes brutos, então codifique seu texto como UTF-8 primeiro. A resposta carrega os ids de mensagem atribuídos pelo servidor.

var
  oRequest: TsgcGRPCPubSubPublishRequest;
  oMsg: TsgcGRPCPubSubPubsubMessage;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPublishResponse;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPublishRequest.Create;
  try
    oRequest.Topic := 'projects/my-project/topics/my-topic';
    oMsg := oRequest.AddMessage;
    oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');

    oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
      oRequest.ToBytes);
    try
      if oResponse.StatusCode = grpcOK then
      begin
        oResult := TsgcGRPCPubSubPublishResponse.Create;
        try
          oResult.LoadFromBytes(oResponse.Data);
          for i := 0 to oResult.MessageIds.Count - 1 do
            Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
        finally
          oResult.Free;
        end;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

Fazendo pull de uma assinatura

Para receber mensagens, monte um TsgcGRPCPubSubPullRequest com o caminho da assinatura e o número máximo de mensagens a buscar, então chame Pull no serviço google.pubsub.v1.Subscriber. Analise a resposta em um TsgcGRPCPubSubPullResponse e percorra as mensagens recebidas. Cada uma carrega um AckId que você precisa enviar de volta para confirmar a entrega, caso contrário o Pub/Sub reentrega a mensagem após o prazo de confirmação.

var
  oRequest: TsgcGRPCPubSubPullRequest;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPullResponse;
  oRecv: TsgcGRPCPubSubReceivedMessage;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.MaxMessages := 10;

    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
      oRequest.ToBytes);
    try
      oResult := TsgcGRPCPubSubPullResponse.Create;
      try
        oResult.LoadFromBytes(oResponse.Data);
        for i := 0 to oResult.ReceivedMessageCount - 1 do
        begin
          oRecv := oResult.ReceivedMessage(i);
          Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
          Acknowledge(oRecv.AckId);   // confirm delivery
        end;
      finally
        oResult.Free;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

Confirmar é mais uma chamada unária. Monte um TsgcGRPCPubSubAcknowledgeRequest com o caminho da assinatura e os ack ids que você quer confirmar, então chame Acknowledge no serviço google.pubsub.v1.Subscriber.

var
  oRequest: TsgcGRPCPubSubAcknowledgeRequest;
  oResponse: TsgcGRPCResponse;
begin
  oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.AckIds.Add(aAckId);
    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
      oRequest.ToBytes);
    oResponse.Free;
  finally
    oRequest.Free;
  end;
end;

Pull por streaming

Um Pull pontual é adequado para sondagens ocasionais, mas para um fluxo contínuo de mensagens o Pub/Sub oferece o StreamingPull, um stream gRPC bidirecional que continua entregando mensagens conforme elas chegam. Abra um stream bidi no serviço google.pubsub.v1.Subscriber, envie um TsgcGRPCPubSubStreamingPullRequest inicial com a assinatura, então trate as mensagens recebidas no evento OnGRPCStreamMessage do cliente gRPC.

var
  oRequest: TsgcGRPCPubSubStreamingPullRequest;
  vStreamId: Integer;
begin
  vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
    'StreamingPull');

  oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.StreamAckDeadlineSeconds := 10;
    GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
  // each incoming batch arrives on OnGRPCStreamMessage
  // call GRPC.CloseBidiStream(vStreamId) to stop
end;

Além de publicar e fazer pull

O cliente tipado também cobre o restante da superfície administrativa. Ele inclui classes de requisição e resposta para gerenciar tópicos (CreateTopic, ListTopics, DeleteTopic) e assinaturas (CreateSubscription, ListSubscriptions, DeleteSubscription), além de ModifyAckDeadline para estender o prazo de mensagens que você ainda está processando. Cada operação segue o mesmo formato: preencha um objeto de requisição, chame ToBytes, invoque o método do serviço e carregue a resposta na classe de resposta correspondente.

Disponibilidade

O cliente gRPC do Google Cloud Pub/Sub faz parte da edição Enterprise do sgcWebSockets. Um exemplo completo e pronto para executar, com publicação, pull, pull por streaming e gerenciamento de tópicos/assinaturas, está em Demos\21.GRPC\10.PubSub, e a referência completa do cliente subjacente está na página do produto gRPC Client.

Dúvidas ou comentários? Entre em contato. Você receberá uma resposta das pessoas que escreveram o código.