Google Cloud Pub/Sub przez gRPC w Delphi

· Komponenty

Google Cloud Pub/Sub to zarządzana usługa przesyłania komunikatów firmy Google. Producenci publikują komunikaty do tematu, a konsumenci odbierają je, pobierając z subskrypcji dołączonej do tego tematu, dzięki czemu obie strony pozostają w pełni odseparowane. Usługa udostępnia API gRPC, a edycja Enterprise sgcWebSockets dostarcza typowanego klienta gRPC Pub/Sub zbudowanego na TsgcGRPCClient, więc możesz publikować i pobierać bezpośrednio z Delphi i C++Builder bez żadnego zewnętrznego środowiska uruchomieniowego ani komponentu pomocniczego.

Ten wpis pokazuje, jak klient jest skonfigurowany, jak działa uwierzytelnianie oraz jak opublikować komunikat do tematu i pobrać komunikaty z subskrypcji, używając tych samych wywołań co przykładowe demo.

Jak to działa

API gRPC Pub/Sub to po prostu komunikaty Protocol Buffers opakowane w HTTP/2. sgcWebSockets dostarcza już kompletny stos HTTP/2, więc klient Pub/Sub działa na transporcie TsgcHTTP2Client wskazującym na pubsub.googleapis.com na porcie 443 przez TLS. TsgcGRPCClient zajmuje się opakowywaniem gRPC, nagłówkami żądań i trailerami odpowiedzi, podczas gdy typowane klasy komunikatów Pub/Sub serializują i parsują za Ciebie ładunki protobuf.

Uwierzytelnianie korzysta z konta usługi Google. Przekazujesz poświadczenia konta usługi do TsgcHTTPGoogleCloud_PubSub_Client, który uzyskuje token dostępu OAuth2, a następnie umieszczasz ten token w DefaultMetadata klienta gRPC jako nagłówek authorization: Bearer. Ponieważ jest ustawiony jako domyślne metadane, towarzyszy automatycznie każdemu wywołaniu Pub/Sub.

uses
  sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
  sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
  sgcGRPC_Google_PubSub;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'pubsub.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

Uwierzytelnianie przez konto usługi

Komponent pomocniczy Google Cloud obsługuje wymianę tokenów. Wczytaj plik JSON konta usługi, ustaw poświadczenia JWT i obsłuż zdarzenie OnAuthToken, aby skopiować otrzymany token bearer do klienta gRPC. Ten sam komponent obsługuje również OAuth2, jeśli wolisz przepływ interaktywny.

var
  PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
  PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
  PubSub.OnAuthToken := PubSubAuthToken;

  PubSub.GoogleCloudOptions.Authentication := gcaJWT;
  PubSub.GoogleCloudOptions.JWT.ClientEmail  := 'svc@my-project.iam.gserviceaccount.com';
  PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
  PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
  PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';

  // triggers OnAuthToken once the access token is acquired
  PubSub.ListTopics('my-project');
end;

procedure TForm1.PubSubAuthToken(Sender: TObject;
  const TokenType, Token, Data: string);
begin
  GRPC.DefaultMetadata.Clear;
  GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;

Publikowanie do tematu

Aby opublikować, zbuduj TsgcGRPCPubSubPublishRequest, ustaw w pełni kwalifikowaną nazwę tematu, dodaj jeden lub więcej komunikatów i wywołaj metodę Publish na usłudze google.pubsub.v1.Publisher. Ładunki komunikatów to surowe bajty, więc najpierw zakoduj swój tekst jako UTF-8. Odpowiedź zawiera przypisane przez serwer identyfikatory komunikatów.

var
  oRequest: TsgcGRPCPubSubPublishRequest;
  oMsg: TsgcGRPCPubSubPubsubMessage;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPublishResponse;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPublishRequest.Create;
  try
    oRequest.Topic := 'projects/my-project/topics/my-topic';
    oMsg := oRequest.AddMessage;
    oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');

    oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
      oRequest.ToBytes);
    try
      if oResponse.StatusCode = grpcOK then
      begin
        oResult := TsgcGRPCPubSubPublishResponse.Create;
        try
          oResult.LoadFromBytes(oResponse.Data);
          for i := 0 to oResult.MessageIds.Count - 1 do
            Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
        finally
          oResult.Free;
        end;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

Pobieranie z subskrypcji

Aby odbierać komunikaty, zbuduj TsgcGRPCPubSubPullRequest ze ścieżką subskrypcji i maksymalną liczbą komunikatów do pobrania, a następnie wywołaj Pull na usłudze google.pubsub.v1.Subscriber. Sparsuj odpowiedź do TsgcGRPCPubSubPullResponse i przejdź przez odebrane komunikaty. Każdy z nich zawiera AckId, który musisz odesłać, aby potwierdzić dostarczenie, w przeciwnym razie Pub/Sub ponownie dostarczy komunikat po upływie terminu potwierdzenia.

var
  oRequest: TsgcGRPCPubSubPullRequest;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPullResponse;
  oRecv: TsgcGRPCPubSubReceivedMessage;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.MaxMessages := 10;

    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
      oRequest.ToBytes);
    try
      oResult := TsgcGRPCPubSubPullResponse.Create;
      try
        oResult.LoadFromBytes(oResponse.Data);
        for i := 0 to oResult.ReceivedMessageCount - 1 do
        begin
          oRecv := oResult.ReceivedMessage(i);
          Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
          Acknowledge(oRecv.AckId);   // confirm delivery
        end;
      finally
        oResult.Free;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

Potwierdzenie to jeszcze jedno wywołanie unarne. Zbuduj TsgcGRPCPubSubAcknowledgeRequest ze ścieżką subskrypcji i identyfikatorami ack, które chcesz potwierdzić, a następnie wywołaj Acknowledge na usłudze google.pubsub.v1.Subscriber.

var
  oRequest: TsgcGRPCPubSubAcknowledgeRequest;
  oResponse: TsgcGRPCResponse;
begin
  oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.AckIds.Add(aAckId);
    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
      oRequest.ToBytes);
    oResponse.Free;
  finally
    oRequest.Free;
  end;
end;

Strumieniowe pobieranie

Jednorazowe Pull jest w porządku przy okazjonalnym odpytywaniu, ale dla stałego strumienia komunikatów Pub/Sub oferuje StreamingPull, dwukierunkowy strumień gRPC, który dostarcza komunikaty w miarę ich nadchodzenia. Otwórz strumień dwukierunkowy na usłudze google.pubsub.v1.Subscriber, wyślij początkowe TsgcGRPCPubSubStreamingPullRequest z subskrypcją, a następnie obsłuż nadchodzące komunikaty w zdarzeniu OnGRPCStreamMessage klienta gRPC.

var
  oRequest: TsgcGRPCPubSubStreamingPullRequest;
  vStreamId: Integer;
begin
  vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
    'StreamingPull');

  oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.StreamAckDeadlineSeconds := 10;
    GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
  // each incoming batch arrives on OnGRPCStreamMessage
  // call GRPC.CloseBidiStream(vStreamId) to stop
end;

Poza publikowaniem i pobieraniem

Typowany klient obejmuje również resztę powierzchni administracyjnej. Zawiera klasy żądań i odpowiedzi do zarządzania tematami (CreateTopic, ListTopics, DeleteTopic) oraz subskrypcjami (CreateSubscription, ListSubscriptions, DeleteSubscription), a także ModifyAckDeadline do wydłużenia terminu na komunikatach, które wciąż przetwarzasz. Każda operacja ma ten sam kształt: wypełnij obiekt żądania, wywołaj ToBytes, wywołaj metodę usługi i wczytaj odpowiedź do pasującej klasy odpowiedzi.

Dostępność

Klient gRPC Google Cloud Pub/Sub jest częścią edycji Enterprise sgcWebSockets. Kompletny, gotowy do uruchomienia przykład z publikowaniem, pobieraniem, strumieniowym pobieraniem oraz zarządzaniem tematami/subskrypcjami znajduje się w Demos\21.GRPC\10.PubSub, a pełna dokumentacja bazowego klienta jest na stronie produktu gRPC Client.

Pytania lub uwagi? Skontaktuj się z nami. Otrzymasz odpowiedź od osób, które napisały ten kod.