Google Cloud Pub/Sub è il servizio di messaggistica gestito di Google. I produttori pubblicano messaggi su un topic e i consumatori li ricevono estraendoli da una subscription collegata a quel topic, così le due parti restano completamente disaccoppiate. Il servizio espone un'API gRPC e l'edizione Enterprise di sgcWebSockets include un client gRPC Pub/Sub tipizzato sopra TsgcGRPCClient, così puoi pubblicare ed estrarre direttamente da Delphi e C++Builder senza alcun runtime esterno o sidecar.
Questo articolo mostra come è cablato il client, come funziona l'autenticazione e come pubblicare un messaggio su un topic ed estrarre messaggi da una subscription, usando le stesse chiamate della demo di esempio.
Come funziona
L'API gRPC di Pub/Sub è semplicemente costituita da messaggi Protocol Buffers incapsulati su HTTP/2. sgcWebSockets include già uno stack HTTP/2 completo, quindi il client Pub/Sub poggia su un trasporto TsgcHTTP2Client puntato a pubsub.googleapis.com sulla porta 443 su TLS. TsgcGRPCClient si occupa dell'incapsulamento gRPC, degli header della richiesta e dei trailer della risposta, mentre le classi di messaggi Pub/Sub tipizzate serializzano e analizzano per te i payload protobuf.
L'autenticazione usa un account di servizio Google. Fornisci le credenziali dell'account di servizio a un TsgcHTTPGoogleCloud_PubSub_Client, che ottiene un token di accesso OAuth2, e inserisci quel token nel DefaultMetadata del client gRPC come header authorization: Bearer. Poiché è impostato come metadata predefinito, viaggia automaticamente su ogni chiamata Pub/Sub.
uses
sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
sgcGRPC_Google_PubSub;
var
HTTP2: TsgcHTTP2Client;
GRPC: TsgcGRPCClient;
begin
HTTP2 := TsgcHTTP2Client.Create(nil);
HTTP2.Host := 'pubsub.googleapis.com';
HTTP2.Port := 443;
HTTP2.TLS := True;
GRPC := TsgcGRPCClient.Create(nil);
GRPC.Client := HTTP2;
GRPC.ChannelOptions.ContentType := grpcProto;
GRPC.ChannelOptions.Compression := grpcNoCompression;
HTTP2.Active := True;
end;
Autenticazione con account di servizio
L'helper Google Cloud gestisce lo scambio del token. Carica il JSON dell'account di servizio, imposta le credenziali JWT e gestisci l'evento OnAuthToken per copiare il token bearer risultante nel client gRPC. Lo stesso componente supporta anche OAuth2 se preferisci un flusso interattivo.
var
PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
PubSub.OnAuthToken := PubSubAuthToken;
PubSub.GoogleCloudOptions.Authentication := gcaJWT;
PubSub.GoogleCloudOptions.JWT.ClientEmail := 'svc@my-project.iam.gserviceaccount.com';
PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';
// triggers OnAuthToken once the access token is acquired
PubSub.ListTopics('my-project');
end;
procedure TForm1.PubSubAuthToken(Sender: TObject;
const TokenType, Token, Data: string);
begin
GRPC.DefaultMetadata.Clear;
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;
Pubblicare su un topic
Per pubblicare, costruisci un TsgcGRPCPubSubPublishRequest, imposta il nome del topic completamente qualificato, aggiungi uno o più messaggi e chiama il metodo Publish sul servizio google.pubsub.v1.Publisher. I payload dei messaggi sono byte grezzi, quindi codifica prima il tuo testo come UTF-8. La risposta contiene gli id dei messaggi assegnati dal server.
var
oRequest: TsgcGRPCPubSubPublishRequest;
oMsg: TsgcGRPCPubSubPubsubMessage;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPublishResponse;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPublishRequest.Create;
try
oRequest.Topic := 'projects/my-project/topics/my-topic';
oMsg := oRequest.AddMessage;
oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');
oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
oRequest.ToBytes);
try
if oResponse.StatusCode = grpcOK then
begin
oResult := TsgcGRPCPubSubPublishResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.MessageIds.Count - 1 do
Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
finally
oResult.Free;
end;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
Estrarre da una subscription
Per ricevere messaggi, costruisci un TsgcGRPCPubSubPullRequest con il percorso della subscription e il numero massimo di messaggi da recuperare, poi chiama Pull sul servizio google.pubsub.v1.Subscriber. Analizza la risposta in un TsgcGRPCPubSubPullResponse e scorri i messaggi ricevuti. Ognuno porta con sé un AckId che devi rispedire per confermare la consegna, altrimenti Pub/Sub riconsegna il messaggio dopo la scadenza dell'ack.
var
oRequest: TsgcGRPCPubSubPullRequest;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPullResponse;
oRecv: TsgcGRPCPubSubReceivedMessage;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.MaxMessages := 10;
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
oRequest.ToBytes);
try
oResult := TsgcGRPCPubSubPullResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.ReceivedMessageCount - 1 do
begin
oRecv := oResult.ReceivedMessage(i);
Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
Acknowledge(oRecv.AckId); // confirm delivery
end;
finally
oResult.Free;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
Confermare è un'ulteriore chiamata unaria. Costruisci un TsgcGRPCPubSubAcknowledgeRequest con il percorso della subscription e gli ack id che vuoi confermare, poi chiama Acknowledge sul servizio google.pubsub.v1.Subscriber.
var
oRequest: TsgcGRPCPubSubAcknowledgeRequest;
oResponse: TsgcGRPCResponse;
begin
oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.AckIds.Add(aAckId);
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
oRequest.ToBytes);
oResponse.Free;
finally
oRequest.Free;
end;
end;
Streaming pull
Un Pull singolo va bene per il polling occasionale, ma per un flusso costante di messaggi Pub/Sub offre StreamingPull, uno stream gRPC bidirezionale che continua a consegnare i messaggi man mano che arrivano. Apri uno stream bidi sul servizio google.pubsub.v1.Subscriber, invia un TsgcGRPCPubSubStreamingPullRequest iniziale con la subscription, poi gestisci i messaggi in arrivo sull'evento OnGRPCStreamMessage del client gRPC.
var
oRequest: TsgcGRPCPubSubStreamingPullRequest;
vStreamId: Integer;
begin
vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
'StreamingPull');
oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.StreamAckDeadlineSeconds := 10;
GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
finally
oRequest.Free;
end;
// each incoming batch arrives on OnGRPCStreamMessage
// call GRPC.CloseBidiStream(vStreamId) to stop
end;
Oltre publish e pull
Il client tipizzato copre anche il resto della superficie amministrativa. Include classi di richiesta e risposta per gestire i topic (CreateTopic, ListTopics, DeleteTopic) e le subscription (CreateSubscription, ListSubscriptions, DeleteSubscription), più ModifyAckDeadline per estendere la scadenza sui messaggi che stai ancora elaborando. Ogni operazione segue la stessa forma: riempi un oggetto richiesta, chiama ToBytes, invoca il metodo del servizio e carica la risposta nella classe di risposta corrispondente.
Disponibilità
Il client gRPC di Google Cloud Pub/Sub fa parte dell'edizione Enterprise di sgcWebSockets. Un esempio completo e pronto all'uso con pubblicazione, estrazione, streaming pull e gestione di topic/subscription è in Demos\21.GRPC\10.PubSub e il riferimento completo per il client sottostante è sulla pagina prodotto gRPC Client.
Domande o feedback? Contattaci. Riceverai una risposta dalle persone che hanno scritto il codice.
