Google Cloud Pub/Sub to zarządzana usługa przesyłania komunikatów firmy Google. Producenci publikują komunikaty do tematu, a konsumenci odbierają je, pobierając z subskrypcji dołączonej do tego tematu, dzięki czemu obie strony pozostają w pełni odseparowane. Usługa udostępnia API gRPC, a edycja Enterprise sgcWebSockets dostarcza typowanego klienta gRPC Pub/Sub zbudowanego na TsgcGRPCClient, więc możesz publikować i pobierać bezpośrednio z Delphi i C++Builder bez żadnego zewnętrznego środowiska uruchomieniowego ani komponentu pomocniczego.
Ten wpis pokazuje, jak klient jest skonfigurowany, jak działa uwierzytelnianie oraz jak opublikować komunikat do tematu i pobrać komunikaty z subskrypcji, używając tych samych wywołań co przykładowe demo.
Jak to działa
API gRPC Pub/Sub to po prostu komunikaty Protocol Buffers opakowane w HTTP/2. sgcWebSockets dostarcza już kompletny stos HTTP/2, więc klient Pub/Sub działa na transporcie TsgcHTTP2Client wskazującym na pubsub.googleapis.com na porcie 443 przez TLS. TsgcGRPCClient zajmuje się opakowywaniem gRPC, nagłówkami żądań i trailerami odpowiedzi, podczas gdy typowane klasy komunikatów Pub/Sub serializują i parsują za Ciebie ładunki protobuf.
Uwierzytelnianie korzysta z konta usługi Google. Przekazujesz poświadczenia konta usługi do TsgcHTTPGoogleCloud_PubSub_Client, który uzyskuje token dostępu OAuth2, a następnie umieszczasz ten token w DefaultMetadata klienta gRPC jako nagłówek authorization: Bearer. Ponieważ jest ustawiony jako domyślne metadane, towarzyszy automatycznie każdemu wywołaniu Pub/Sub.
uses
sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
sgcGRPC_Google_PubSub;
var
HTTP2: TsgcHTTP2Client;
GRPC: TsgcGRPCClient;
begin
HTTP2 := TsgcHTTP2Client.Create(nil);
HTTP2.Host := 'pubsub.googleapis.com';
HTTP2.Port := 443;
HTTP2.TLS := True;
GRPC := TsgcGRPCClient.Create(nil);
GRPC.Client := HTTP2;
GRPC.ChannelOptions.ContentType := grpcProto;
GRPC.ChannelOptions.Compression := grpcNoCompression;
HTTP2.Active := True;
end;
Uwierzytelnianie przez konto usługi
Komponent pomocniczy Google Cloud obsługuje wymianę tokenów. Wczytaj plik JSON konta usługi, ustaw poświadczenia JWT i obsłuż zdarzenie OnAuthToken, aby skopiować otrzymany token bearer do klienta gRPC. Ten sam komponent obsługuje również OAuth2, jeśli wolisz przepływ interaktywny.
var
PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
PubSub.OnAuthToken := PubSubAuthToken;
PubSub.GoogleCloudOptions.Authentication := gcaJWT;
PubSub.GoogleCloudOptions.JWT.ClientEmail := 'svc@my-project.iam.gserviceaccount.com';
PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';
// triggers OnAuthToken once the access token is acquired
PubSub.ListTopics('my-project');
end;
procedure TForm1.PubSubAuthToken(Sender: TObject;
const TokenType, Token, Data: string);
begin
GRPC.DefaultMetadata.Clear;
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;
Publikowanie do tematu
Aby opublikować, zbuduj TsgcGRPCPubSubPublishRequest, ustaw w pełni kwalifikowaną nazwę tematu, dodaj jeden lub więcej komunikatów i wywołaj metodę Publish na usłudze google.pubsub.v1.Publisher. Ładunki komunikatów to surowe bajty, więc najpierw zakoduj swój tekst jako UTF-8. Odpowiedź zawiera przypisane przez serwer identyfikatory komunikatów.
var
oRequest: TsgcGRPCPubSubPublishRequest;
oMsg: TsgcGRPCPubSubPubsubMessage;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPublishResponse;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPublishRequest.Create;
try
oRequest.Topic := 'projects/my-project/topics/my-topic';
oMsg := oRequest.AddMessage;
oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');
oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
oRequest.ToBytes);
try
if oResponse.StatusCode = grpcOK then
begin
oResult := TsgcGRPCPubSubPublishResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.MessageIds.Count - 1 do
Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
finally
oResult.Free;
end;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
Pobieranie z subskrypcji
Aby odbierać komunikaty, zbuduj TsgcGRPCPubSubPullRequest ze ścieżką subskrypcji i maksymalną liczbą komunikatów do pobrania, a następnie wywołaj Pull na usłudze google.pubsub.v1.Subscriber. Sparsuj odpowiedź do TsgcGRPCPubSubPullResponse i przejdź przez odebrane komunikaty. Każdy z nich zawiera AckId, który musisz odesłać, aby potwierdzić dostarczenie, w przeciwnym razie Pub/Sub ponownie dostarczy komunikat po upływie terminu potwierdzenia.
var
oRequest: TsgcGRPCPubSubPullRequest;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPullResponse;
oRecv: TsgcGRPCPubSubReceivedMessage;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.MaxMessages := 10;
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
oRequest.ToBytes);
try
oResult := TsgcGRPCPubSubPullResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.ReceivedMessageCount - 1 do
begin
oRecv := oResult.ReceivedMessage(i);
Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
Acknowledge(oRecv.AckId); // confirm delivery
end;
finally
oResult.Free;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
Potwierdzenie to jeszcze jedno wywołanie unarne. Zbuduj TsgcGRPCPubSubAcknowledgeRequest ze ścieżką subskrypcji i identyfikatorami ack, które chcesz potwierdzić, a następnie wywołaj Acknowledge na usłudze google.pubsub.v1.Subscriber.
var
oRequest: TsgcGRPCPubSubAcknowledgeRequest;
oResponse: TsgcGRPCResponse;
begin
oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.AckIds.Add(aAckId);
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
oRequest.ToBytes);
oResponse.Free;
finally
oRequest.Free;
end;
end;
Strumieniowe pobieranie
Jednorazowe Pull jest w porządku przy okazjonalnym odpytywaniu, ale dla stałego strumienia komunikatów Pub/Sub oferuje StreamingPull, dwukierunkowy strumień gRPC, który dostarcza komunikaty w miarę ich nadchodzenia. Otwórz strumień dwukierunkowy na usłudze google.pubsub.v1.Subscriber, wyślij początkowe TsgcGRPCPubSubStreamingPullRequest z subskrypcją, a następnie obsłuż nadchodzące komunikaty w zdarzeniu OnGRPCStreamMessage klienta gRPC.
var
oRequest: TsgcGRPCPubSubStreamingPullRequest;
vStreamId: Integer;
begin
vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
'StreamingPull');
oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.StreamAckDeadlineSeconds := 10;
GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
finally
oRequest.Free;
end;
// each incoming batch arrives on OnGRPCStreamMessage
// call GRPC.CloseBidiStream(vStreamId) to stop
end;
Poza publikowaniem i pobieraniem
Typowany klient obejmuje również resztę powierzchni administracyjnej. Zawiera klasy żądań i odpowiedzi do zarządzania tematami (CreateTopic, ListTopics, DeleteTopic) oraz subskrypcjami (CreateSubscription, ListSubscriptions, DeleteSubscription), a także ModifyAckDeadline do wydłużenia terminu na komunikatach, które wciąż przetwarzasz. Każda operacja ma ten sam kształt: wypełnij obiekt żądania, wywołaj ToBytes, wywołaj metodę usługi i wczytaj odpowiedź do pasującej klasy odpowiedzi.
Dostępność
Klient gRPC Google Cloud Pub/Sub jest częścią edycji Enterprise sgcWebSockets. Kompletny, gotowy do uruchomienia przykład z publikowaniem, pobieraniem, strumieniowym pobieraniem oraz zarządzaniem tematami/subskrypcjami znajduje się w Demos\21.GRPC\10.PubSub, a pełna dokumentacja bazowego klienta jest na stronie produktu gRPC Client.
Pytania lub uwagi? Skontaktuj się z nami. Otrzymasz odpowiedź od osób, które napisały ten kod.
