Google Cloud Pub/Sub, Google'ın yönetilen mesajlaşma hizmetidir. Üreticiler mesajları bir konuya (topic) yayınlar ve tüketiciler bunları o konuya bağlı bir abonelikten (subscription) çekerek alır, böylece iki taraf tamamen birbirinden ayrı kalır. Hizmet bir gRPC API'si sunar ve sgcWebSockets Enterprise sürümü, TsgcGRPCClient üzerine kurulu türlü bir Pub/Sub gRPC istemcisi içerir, böylece doğrudan Delphi ve C++Builder'dan herhangi bir harici çalışma zamanı veya yan bileşen (sidecar) olmadan yayın yapabilir ve çekebilirsiniz.
Bu yazı, istemcinin nasıl bağlandığını, kimlik doğrulamanın nasıl işlediğini ve örnek demodaki aynı çağrıları kullanarak bir konuya nasıl mesaj yayınlanacağını ve bir abonelikten nasıl mesaj çekileceğini gösterir.
Nasıl çalışır
Pub/Sub'ın gRPC API'si yalnızca HTTP/2 üzerinde çerçevelenmiş Protocol Buffers mesajlarıdır. sgcWebSockets zaten eksiksiz bir HTTP/2 yığını içerir, bu nedenle Pub/Sub istemcisi, TLS üzerinden 443 numaralı bağlantı noktasında pubsub.googleapis.com adresine yönlendirilmiş bir TsgcHTTP2Client taşımasına oturur. TsgcGRPCClient, gRPC çerçevelemesini, istek üst bilgilerini ve yanıt trailer'larını üstlenirken, türlü Pub/Sub mesaj sınıfları protobuf yüklerini sizin için serileştirir ve ayrıştırır.
Kimlik doğrulama, bir Google hizmet hesabı kullanır. Hizmet hesabı kimlik bilgilerini, bir OAuth2 erişim belirteci elde eden bir TsgcHTTPGoogleCloud_PubSub_Client'a verirsiniz ve bu belirteci gRPC istemcisinin DefaultMetadata'sına bir authorization: Bearer üst bilgisi olarak yerleştirirsiniz. Varsayılan meta veri olarak ayarlandığından, her Pub/Sub çağrısında otomatik olarak taşınır.
uses
sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
sgcGRPC_Google_PubSub;
var
HTTP2: TsgcHTTP2Client;
GRPC: TsgcGRPCClient;
begin
HTTP2 := TsgcHTTP2Client.Create(nil);
HTTP2.Host := 'pubsub.googleapis.com';
HTTP2.Port := 443;
HTTP2.TLS := True;
GRPC := TsgcGRPCClient.Create(nil);
GRPC.Client := HTTP2;
GRPC.ChannelOptions.ContentType := grpcProto;
GRPC.ChannelOptions.Compression := grpcNoCompression;
HTTP2.Active := True;
end;
Hizmet hesabı kimlik doğrulaması
Google Cloud yardımcısı, belirteç takasını işler. Hizmet hesabı JSON'ını yükleyin, JWT kimlik bilgilerini ayarlayın ve ortaya çıkan bearer belirtecini gRPC istemcisine kopyalamak için OnAuthToken olayını işleyin. Aynı bileşen, etkileşimli bir akışı tercih ederseniz OAuth2'yi de destekler.
var
PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
PubSub.OnAuthToken := PubSubAuthToken;
PubSub.GoogleCloudOptions.Authentication := gcaJWT;
PubSub.GoogleCloudOptions.JWT.ClientEmail := 'svc@my-project.iam.gserviceaccount.com';
PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';
// triggers OnAuthToken once the access token is acquired
PubSub.ListTopics('my-project');
end;
procedure TForm1.PubSubAuthToken(Sender: TObject;
const TokenType, Token, Data: string);
begin
GRPC.DefaultMetadata.Clear;
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;
Bir konuya yayınlama
Yayınlamak için bir TsgcGRPCPubSubPublishRequest oluşturun, tam nitelikli konu adını ayarlayın, bir veya daha fazla mesaj ekleyin ve google.pubsub.v1.Publisher hizmetinde Publish metodunu çağırın. Mesaj yükleri ham baytlardır, bu nedenle metninizi önce UTF-8 olarak kodlayın. Yanıt, sunucu tarafından atanan mesaj kimliklerini taşır.
var
oRequest: TsgcGRPCPubSubPublishRequest;
oMsg: TsgcGRPCPubSubPubsubMessage;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPublishResponse;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPublishRequest.Create;
try
oRequest.Topic := 'projects/my-project/topics/my-topic';
oMsg := oRequest.AddMessage;
oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');
oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
oRequest.ToBytes);
try
if oResponse.StatusCode = grpcOK then
begin
oResult := TsgcGRPCPubSubPublishResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.MessageIds.Count - 1 do
Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
finally
oResult.Free;
end;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
Bir abonelikten çekme
Mesajları almak için, abonelik yolu ve alınacak maksimum mesaj sayısıyla bir TsgcGRPCPubSubPullRequest oluşturun, ardından google.pubsub.v1.Subscriber hizmetinde Pull'u çağırın. Yanıtı bir TsgcGRPCPubSubPullResponse'a ayrıştırın ve alınan mesajlar üzerinde gezinin. Her biri, teslimi onaylamak için geri göndermeniz gereken bir AckId taşır, aksi takdirde Pub/Sub, onay son tarihinden sonra mesajı yeniden teslim eder.
var
oRequest: TsgcGRPCPubSubPullRequest;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPullResponse;
oRecv: TsgcGRPCPubSubReceivedMessage;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.MaxMessages := 10;
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
oRequest.ToBytes);
try
oResult := TsgcGRPCPubSubPullResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.ReceivedMessageCount - 1 do
begin
oRecv := oResult.ReceivedMessage(i);
Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
Acknowledge(oRecv.AckId); // confirm delivery
end;
finally
oResult.Free;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
Onaylama, bir tekli çağrı daha gerektirir. Abonelik yolu ve onaylamak istediğiniz onay kimlikleriyle bir TsgcGRPCPubSubAcknowledgeRequest oluşturun, ardından google.pubsub.v1.Subscriber hizmetinde Acknowledge'i çağırın.
var
oRequest: TsgcGRPCPubSubAcknowledgeRequest;
oResponse: TsgcGRPCResponse;
begin
oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.AckIds.Add(aAckId);
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
oRequest.ToBytes);
oResponse.Free;
finally
oRequest.Free;
end;
end;
Akışlı çekme
Tek seferlik bir Pull, ara sıra yapılan sorgulamalar için uygundur, ancak istikrarlı bir mesaj akışı için Pub/Sub, mesajları geldikçe sunmaya devam eden çift yönlü bir gRPC akışı olan StreamingPull'u sunar. google.pubsub.v1.Subscriber hizmetinde çift yönlü bir akış açın, abonelikle birlikte ilk bir TsgcGRPCPubSubStreamingPullRequest gönderin, ardından gelen mesajları gRPC istemcisinin OnGRPCStreamMessage olayında işleyin.
var
oRequest: TsgcGRPCPubSubStreamingPullRequest;
vStreamId: Integer;
begin
vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
'StreamingPull');
oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.StreamAckDeadlineSeconds := 10;
GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
finally
oRequest.Free;
end;
// each incoming batch arrives on OnGRPCStreamMessage
// call GRPC.CloseBidiStream(vStreamId) to stop
end;
Yayınlama ve çekmenin ötesinde
Türlü istemci, yönetim yüzeyinin geri kalanını da kapsar. Konuları (CreateTopic, ListTopics, DeleteTopic) ve abonelikleri (CreateSubscription, ListSubscriptions, DeleteSubscription) yönetmek için istek ve yanıt sınıfları ile hâlâ işlediğiniz mesajların son tarihini uzatmak için ModifyAckDeadline'ı içerir. Her işlem aynı biçimi izler: bir istek nesnesini doldurun, ToBytes'i çağırın, hizmet metodunu çağırın ve yanıtı eşleşen yanıt sınıfına yükleyin.
Kullanılabilirlik
Google Cloud Pub/Sub gRPC istemcisi, sgcWebSockets Enterprise sürümünün bir parçasıdır. Yayınlama, çekme, akışlı çekme ve konu/abonelik yönetimini içeren eksiksiz, çalıştırılmaya hazır bir örnek Demos\21.GRPC\10.PubSub içinde yer alır ve temeldeki istemcinin tam referansı gRPC Client ürün sayfasında bulunur.
Sorularınız veya geri bildirimleriniz mi var? Bize ulaşın. Kodu yazan kişilerden bir yanıt alacaksınız.
