Delphi'de gRPC Üzerinden Google Cloud Pub/Sub

· Bileşenler

Google Cloud Pub/Sub, Google'ın yönetilen mesajlaşma hizmetidir. Üreticiler mesajları bir konuya (topic) yayınlar ve tüketiciler bunları o konuya bağlı bir abonelikten (subscription) çekerek alır, böylece iki taraf tamamen birbirinden ayrı kalır. Hizmet bir gRPC API'si sunar ve sgcWebSockets Enterprise sürümü, TsgcGRPCClient üzerine kurulu türlü bir Pub/Sub gRPC istemcisi içerir, böylece doğrudan Delphi ve C++Builder'dan herhangi bir harici çalışma zamanı veya yan bileşen (sidecar) olmadan yayın yapabilir ve çekebilirsiniz.

Bu yazı, istemcinin nasıl bağlandığını, kimlik doğrulamanın nasıl işlediğini ve örnek demodaki aynı çağrıları kullanarak bir konuya nasıl mesaj yayınlanacağını ve bir abonelikten nasıl mesaj çekileceğini gösterir.

Nasıl çalışır

Pub/Sub'ın gRPC API'si yalnızca HTTP/2 üzerinde çerçevelenmiş Protocol Buffers mesajlarıdır. sgcWebSockets zaten eksiksiz bir HTTP/2 yığını içerir, bu nedenle Pub/Sub istemcisi, TLS üzerinden 443 numaralı bağlantı noktasında pubsub.googleapis.com adresine yönlendirilmiş bir TsgcHTTP2Client taşımasına oturur. TsgcGRPCClient, gRPC çerçevelemesini, istek üst bilgilerini ve yanıt trailer'larını üstlenirken, türlü Pub/Sub mesaj sınıfları protobuf yüklerini sizin için serileştirir ve ayrıştırır.

Kimlik doğrulama, bir Google hizmet hesabı kullanır. Hizmet hesabı kimlik bilgilerini, bir OAuth2 erişim belirteci elde eden bir TsgcHTTPGoogleCloud_PubSub_Client'a verirsiniz ve bu belirteci gRPC istemcisinin DefaultMetadata'sına bir authorization: Bearer üst bilgisi olarak yerleştirirsiniz. Varsayılan meta veri olarak ayarlandığından, her Pub/Sub çağrısında otomatik olarak taşınır.

uses
  sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
  sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
  sgcGRPC_Google_PubSub;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'pubsub.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

Hizmet hesabı kimlik doğrulaması

Google Cloud yardımcısı, belirteç takasını işler. Hizmet hesabı JSON'ını yükleyin, JWT kimlik bilgilerini ayarlayın ve ortaya çıkan bearer belirtecini gRPC istemcisine kopyalamak için OnAuthToken olayını işleyin. Aynı bileşen, etkileşimli bir akışı tercih ederseniz OAuth2'yi de destekler.

var
  PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
  PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
  PubSub.OnAuthToken := PubSubAuthToken;

  PubSub.GoogleCloudOptions.Authentication := gcaJWT;
  PubSub.GoogleCloudOptions.JWT.ClientEmail  := 'svc@my-project.iam.gserviceaccount.com';
  PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
  PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
  PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';

  // triggers OnAuthToken once the access token is acquired
  PubSub.ListTopics('my-project');
end;

procedure TForm1.PubSubAuthToken(Sender: TObject;
  const TokenType, Token, Data: string);
begin
  GRPC.DefaultMetadata.Clear;
  GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;

Bir konuya yayınlama

Yayınlamak için bir TsgcGRPCPubSubPublishRequest oluşturun, tam nitelikli konu adını ayarlayın, bir veya daha fazla mesaj ekleyin ve google.pubsub.v1.Publisher hizmetinde Publish metodunu çağırın. Mesaj yükleri ham baytlardır, bu nedenle metninizi önce UTF-8 olarak kodlayın. Yanıt, sunucu tarafından atanan mesaj kimliklerini taşır.

var
  oRequest: TsgcGRPCPubSubPublishRequest;
  oMsg: TsgcGRPCPubSubPubsubMessage;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPublishResponse;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPublishRequest.Create;
  try
    oRequest.Topic := 'projects/my-project/topics/my-topic';
    oMsg := oRequest.AddMessage;
    oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');

    oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
      oRequest.ToBytes);
    try
      if oResponse.StatusCode = grpcOK then
      begin
        oResult := TsgcGRPCPubSubPublishResponse.Create;
        try
          oResult.LoadFromBytes(oResponse.Data);
          for i := 0 to oResult.MessageIds.Count - 1 do
            Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
        finally
          oResult.Free;
        end;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

Bir abonelikten çekme

Mesajları almak için, abonelik yolu ve alınacak maksimum mesaj sayısıyla bir TsgcGRPCPubSubPullRequest oluşturun, ardından google.pubsub.v1.Subscriber hizmetinde Pull'u çağırın. Yanıtı bir TsgcGRPCPubSubPullResponse'a ayrıştırın ve alınan mesajlar üzerinde gezinin. Her biri, teslimi onaylamak için geri göndermeniz gereken bir AckId taşır, aksi takdirde Pub/Sub, onay son tarihinden sonra mesajı yeniden teslim eder.

var
  oRequest: TsgcGRPCPubSubPullRequest;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPullResponse;
  oRecv: TsgcGRPCPubSubReceivedMessage;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.MaxMessages := 10;

    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
      oRequest.ToBytes);
    try
      oResult := TsgcGRPCPubSubPullResponse.Create;
      try
        oResult.LoadFromBytes(oResponse.Data);
        for i := 0 to oResult.ReceivedMessageCount - 1 do
        begin
          oRecv := oResult.ReceivedMessage(i);
          Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
          Acknowledge(oRecv.AckId);   // confirm delivery
        end;
      finally
        oResult.Free;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

Onaylama, bir tekli çağrı daha gerektirir. Abonelik yolu ve onaylamak istediğiniz onay kimlikleriyle bir TsgcGRPCPubSubAcknowledgeRequest oluşturun, ardından google.pubsub.v1.Subscriber hizmetinde Acknowledge'i çağırın.

var
  oRequest: TsgcGRPCPubSubAcknowledgeRequest;
  oResponse: TsgcGRPCResponse;
begin
  oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.AckIds.Add(aAckId);
    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
      oRequest.ToBytes);
    oResponse.Free;
  finally
    oRequest.Free;
  end;
end;

Akışlı çekme

Tek seferlik bir Pull, ara sıra yapılan sorgulamalar için uygundur, ancak istikrarlı bir mesaj akışı için Pub/Sub, mesajları geldikçe sunmaya devam eden çift yönlü bir gRPC akışı olan StreamingPull'u sunar. google.pubsub.v1.Subscriber hizmetinde çift yönlü bir akış açın, abonelikle birlikte ilk bir TsgcGRPCPubSubStreamingPullRequest gönderin, ardından gelen mesajları gRPC istemcisinin OnGRPCStreamMessage olayında işleyin.

var
  oRequest: TsgcGRPCPubSubStreamingPullRequest;
  vStreamId: Integer;
begin
  vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
    'StreamingPull');

  oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.StreamAckDeadlineSeconds := 10;
    GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
  // each incoming batch arrives on OnGRPCStreamMessage
  // call GRPC.CloseBidiStream(vStreamId) to stop
end;

Yayınlama ve çekmenin ötesinde

Türlü istemci, yönetim yüzeyinin geri kalanını da kapsar. Konuları (CreateTopic, ListTopics, DeleteTopic) ve abonelikleri (CreateSubscription, ListSubscriptions, DeleteSubscription) yönetmek için istek ve yanıt sınıfları ile hâlâ işlediğiniz mesajların son tarihini uzatmak için ModifyAckDeadline'ı içerir. Her işlem aynı biçimi izler: bir istek nesnesini doldurun, ToBytes'i çağırın, hizmet metodunu çağırın ve yanıtı eşleşen yanıt sınıfına yükleyin.

Kullanılabilirlik

Google Cloud Pub/Sub gRPC istemcisi, sgcWebSockets Enterprise sürümünün bir parçasıdır. Yayınlama, çekme, akışlı çekme ve konu/abonelik yönetimini içeren eksiksiz, çalıştırılmaya hazır bir örnek Demos\21.GRPC\10.PubSub içinde yer alır ve temeldeki istemcinin tam referansı gRPC Client ürün sayfasında bulunur.

Sorularınız veya geri bildirimleriniz mi var? Bize ulaşın. Kodu yazan kişilerden bir yanıt alacaksınız.