O Google Cloud Pub/Sub é o serviço de mensageria gerenciado do Google. Os produtores publicam mensagens em um tópico, e os consumidores as recebem fazendo pull de uma assinatura vinculada a esse tópico, de modo que os dois lados permanecem totalmente desacoplados. O serviço expõe uma API gRPC, e a edição Enterprise do sgcWebSockets inclui um cliente Pub/Sub gRPC tipado sobre o TsgcGRPCClient, para que você possa publicar e fazer pull diretamente do Delphi e do C++Builder sem nenhum runtime externo ou sidecar.
Este post mostra como o cliente é configurado, como funciona a autenticação e como publicar uma mensagem em um tópico e fazer pull de mensagens de uma assinatura, usando as mesmas chamadas da demo de exemplo.
Como funciona
A API gRPC do Pub/Sub é apenas mensagens Protocol Buffers encapsuladas sobre HTTP/2. O sgcWebSockets já vem com uma pilha HTTP/2 completa, então o cliente Pub/Sub se apoia em um transporte TsgcHTTP2Client apontado para pubsub.googleapis.com na porta 443 sobre TLS. O TsgcGRPCClient cuida do enquadramento gRPC, dos cabeçalhos da requisição e dos trailers da resposta, enquanto as classes de mensagem Pub/Sub tipadas serializam e analisam os payloads protobuf para você.
A autenticação usa uma conta de serviço do Google. Você fornece as credenciais da conta de serviço a um TsgcHTTPGoogleCloud_PubSub_Client, que obtém um token de acesso OAuth2, e coloca esse token no DefaultMetadata do cliente gRPC como um cabeçalho authorization: Bearer. Por ser definido como metadado padrão, ele acompanha automaticamente cada chamada Pub/Sub.
uses
sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
sgcGRPC_Google_PubSub;
var
HTTP2: TsgcHTTP2Client;
GRPC: TsgcGRPCClient;
begin
HTTP2 := TsgcHTTP2Client.Create(nil);
HTTP2.Host := 'pubsub.googleapis.com';
HTTP2.Port := 443;
HTTP2.TLS := True;
GRPC := TsgcGRPCClient.Create(nil);
GRPC.Client := HTTP2;
GRPC.ChannelOptions.ContentType := grpcProto;
GRPC.ChannelOptions.Compression := grpcNoCompression;
HTTP2.Active := True;
end;
Autenticação por conta de serviço
O auxiliar do Google Cloud cuida da troca de token. Carregue o JSON da conta de serviço, defina as credenciais JWT e trate o evento OnAuthToken para copiar o token bearer resultante para o cliente gRPC. O mesmo componente também oferece suporte a OAuth2 se você preferir um fluxo interativo.
var
PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
PubSub.OnAuthToken := PubSubAuthToken;
PubSub.GoogleCloudOptions.Authentication := gcaJWT;
PubSub.GoogleCloudOptions.JWT.ClientEmail := 'svc@my-project.iam.gserviceaccount.com';
PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';
// triggers OnAuthToken once the access token is acquired
PubSub.ListTopics('my-project');
end;
procedure TForm1.PubSubAuthToken(Sender: TObject;
const TokenType, Token, Data: string);
begin
GRPC.DefaultMetadata.Clear;
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;
Publicando em um tópico
Para publicar, monte um TsgcGRPCPubSubPublishRequest, defina o nome totalmente qualificado do tópico, adicione uma ou mais mensagens e chame o método Publish no serviço google.pubsub.v1.Publisher. Os payloads das mensagens são bytes brutos, então codifique seu texto como UTF-8 primeiro. A resposta carrega os ids de mensagem atribuídos pelo servidor.
var
oRequest: TsgcGRPCPubSubPublishRequest;
oMsg: TsgcGRPCPubSubPubsubMessage;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPublishResponse;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPublishRequest.Create;
try
oRequest.Topic := 'projects/my-project/topics/my-topic';
oMsg := oRequest.AddMessage;
oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');
oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
oRequest.ToBytes);
try
if oResponse.StatusCode = grpcOK then
begin
oResult := TsgcGRPCPubSubPublishResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.MessageIds.Count - 1 do
Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
finally
oResult.Free;
end;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
Fazendo pull de uma assinatura
Para receber mensagens, monte um TsgcGRPCPubSubPullRequest com o caminho da assinatura e o número máximo de mensagens a buscar, então chame Pull no serviço google.pubsub.v1.Subscriber. Analise a resposta em um TsgcGRPCPubSubPullResponse e percorra as mensagens recebidas. Cada uma carrega um AckId que você precisa enviar de volta para confirmar a entrega, caso contrário o Pub/Sub reentrega a mensagem após o prazo de confirmação.
var
oRequest: TsgcGRPCPubSubPullRequest;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPullResponse;
oRecv: TsgcGRPCPubSubReceivedMessage;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.MaxMessages := 10;
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
oRequest.ToBytes);
try
oResult := TsgcGRPCPubSubPullResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.ReceivedMessageCount - 1 do
begin
oRecv := oResult.ReceivedMessage(i);
Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
Acknowledge(oRecv.AckId); // confirm delivery
end;
finally
oResult.Free;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
Confirmar é mais uma chamada unária. Monte um TsgcGRPCPubSubAcknowledgeRequest com o caminho da assinatura e os ack ids que você quer confirmar, então chame Acknowledge no serviço google.pubsub.v1.Subscriber.
var
oRequest: TsgcGRPCPubSubAcknowledgeRequest;
oResponse: TsgcGRPCResponse;
begin
oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.AckIds.Add(aAckId);
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
oRequest.ToBytes);
oResponse.Free;
finally
oRequest.Free;
end;
end;
Pull por streaming
Um Pull pontual é adequado para sondagens ocasionais, mas para um fluxo contínuo de mensagens o Pub/Sub oferece o StreamingPull, um stream gRPC bidirecional que continua entregando mensagens conforme elas chegam. Abra um stream bidi no serviço google.pubsub.v1.Subscriber, envie um TsgcGRPCPubSubStreamingPullRequest inicial com a assinatura, então trate as mensagens recebidas no evento OnGRPCStreamMessage do cliente gRPC.
var
oRequest: TsgcGRPCPubSubStreamingPullRequest;
vStreamId: Integer;
begin
vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
'StreamingPull');
oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.StreamAckDeadlineSeconds := 10;
GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
finally
oRequest.Free;
end;
// each incoming batch arrives on OnGRPCStreamMessage
// call GRPC.CloseBidiStream(vStreamId) to stop
end;
Além de publicar e fazer pull
O cliente tipado também cobre o restante da superfície administrativa. Ele inclui classes de requisição e resposta para gerenciar tópicos (CreateTopic, ListTopics, DeleteTopic) e assinaturas (CreateSubscription, ListSubscriptions, DeleteSubscription), além de ModifyAckDeadline para estender o prazo de mensagens que você ainda está processando. Cada operação segue o mesmo formato: preencha um objeto de requisição, chame ToBytes, invoque o método do serviço e carregue a resposta na classe de resposta correspondente.
Disponibilidade
O cliente gRPC do Google Cloud Pub/Sub faz parte da edição Enterprise do sgcWebSockets. Um exemplo completo e pronto para executar, com publicação, pull, pull por streaming e gerenciamento de tópicos/assinaturas, está em Demos\21.GRPC\10.PubSub, e a referência completa do cliente subjacente está na página do produto gRPC Client.
Dúvidas ou comentários? Entre em contato. Você receberá uma resposta das pessoas que escreveram o código.
