Google Cloud Pub/Sub는 Google의 관리형 메시징 서비스입니다. 생산자는 토픽에 메시지를 게시하고, 소비자는 그 토픽에 연결된 구독에서 메시지를 가져와 수신하므로, 양쪽이 완전히 분리된 상태를 유지합니다. 이 서비스는 gRPC API를 제공하며, sgcWebSockets Enterprise 에디션은 TsgcGRPCClient 위에 형식화된 Pub/Sub gRPC 클라이언트를 함께 제공하므로, 외부 런타임이나 사이드카 없이 Delphi와 C++Builder에서 바로 게시하고 가져올 수 있습니다.
이 글에서는 클라이언트가 어떻게 연결되는지, 인증이 어떻게 작동하는지, 그리고 샘플 데모와 동일한 호출을 사용하여 토픽에 메시지를 게시하고 구독에서 메시지를 가져오는 방법을 보여줍니다.
작동 방식
Pub/Sub의 gRPC API는 HTTP/2 위에 프레이밍된 Protocol Buffers 메시지일 뿐입니다. sgcWebSockets는 이미 완전한 HTTP/2 스택을 함께 제공하므로, Pub/Sub 클라이언트는 TLS를 통해 443 포트의 pubsub.googleapis.com을 가리키는 TsgcHTTP2Client 전송 위에 위치합니다. TsgcGRPCClient는 gRPC 프레이밍, 요청 헤더, 응답 트레일러를 처리하고, 형식화된 Pub/Sub 메시지 클래스가 protobuf 페이로드를 직렬화하고 파싱해 줍니다.
인증은 Google 서비스 계정을 사용합니다. 서비스 계정 자격 증명을 TsgcHTTPGoogleCloud_PubSub_Client에 제공하면 OAuth2 액세스 토큰을 획득하고, 그 토큰을 authorization: Bearer 헤더로 gRPC 클라이언트의 DefaultMetadata에 넣습니다. 기본 메타데이터로 설정되므로 모든 Pub/Sub 호출에 자동으로 따라갑니다.
uses
sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
sgcGRPC_Google_PubSub;
var
HTTP2: TsgcHTTP2Client;
GRPC: TsgcGRPCClient;
begin
HTTP2 := TsgcHTTP2Client.Create(nil);
HTTP2.Host := 'pubsub.googleapis.com';
HTTP2.Port := 443;
HTTP2.TLS := True;
GRPC := TsgcGRPCClient.Create(nil);
GRPC.Client := HTTP2;
GRPC.ChannelOptions.ContentType := grpcProto;
GRPC.ChannelOptions.Compression := grpcNoCompression;
HTTP2.Active := True;
end;
서비스 계정 인증
Google Cloud 헬퍼가 토큰 교환을 처리합니다. 서비스 계정 JSON을 로드하고, JWT 자격 증명을 설정한 다음, OnAuthToken 이벤트를 처리하여 결과로 받은 bearer 토큰을 gRPC 클라이언트로 복사합니다. 동일한 컴포넌트는 대화형 흐름을 선호하는 경우 OAuth2도 지원합니다.
var
PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
PubSub.OnAuthToken := PubSubAuthToken;
PubSub.GoogleCloudOptions.Authentication := gcaJWT;
PubSub.GoogleCloudOptions.JWT.ClientEmail := 'svc@my-project.iam.gserviceaccount.com';
PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';
// triggers OnAuthToken once the access token is acquired
PubSub.ListTopics('my-project');
end;
procedure TForm1.PubSubAuthToken(Sender: TObject;
const TokenType, Token, Data: string);
begin
GRPC.DefaultMetadata.Clear;
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;
토픽에 게시하기
게시하려면 TsgcGRPCPubSubPublishRequest를 만들고, 정규화된 토픽 이름을 설정하고, 하나 이상의 메시지를 추가한 다음, google.pubsub.v1.Publisher 서비스에서 Publish 메서드를 호출합니다. 메시지 페이로드는 원시 바이트이므로 텍스트를 먼저 UTF-8로 인코딩하세요. 응답에는 서버가 할당한 메시지 id가 들어 있습니다.
var
oRequest: TsgcGRPCPubSubPublishRequest;
oMsg: TsgcGRPCPubSubPubsubMessage;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPublishResponse;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPublishRequest.Create;
try
oRequest.Topic := 'projects/my-project/topics/my-topic';
oMsg := oRequest.AddMessage;
oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');
oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
oRequest.ToBytes);
try
if oResponse.StatusCode = grpcOK then
begin
oResult := TsgcGRPCPubSubPublishResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.MessageIds.Count - 1 do
Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
finally
oResult.Free;
end;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
구독에서 가져오기
메시지를 수신하려면 구독 경로와 가져올 최대 메시지 수를 가진 TsgcGRPCPubSubPullRequest를 만든 다음, google.pubsub.v1.Subscriber 서비스에서 Pull을 호출합니다. 응답을 TsgcGRPCPubSubPullResponse로 파싱하고 수신된 메시지를 순회합니다. 각 메시지에는 전달을 확인하기 위해 다시 보내야 하는 AckId가 들어 있으며, 그렇지 않으면 Pub/Sub는 ack 데드라인이 지난 후 메시지를 재전송합니다.
var
oRequest: TsgcGRPCPubSubPullRequest;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPullResponse;
oRecv: TsgcGRPCPubSubReceivedMessage;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.MaxMessages := 10;
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
oRequest.ToBytes);
try
oResult := TsgcGRPCPubSubPullResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.ReceivedMessageCount - 1 do
begin
oRecv := oResult.ReceivedMessage(i);
Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
Acknowledge(oRecv.AckId); // confirm delivery
end;
finally
oResult.Free;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
확인(acknowledge)은 하나의 추가 단항 호출입니다. 구독 경로와 확인하려는 ack id로 TsgcGRPCPubSubAcknowledgeRequest를 만든 다음, google.pubsub.v1.Subscriber 서비스에서 Acknowledge를 호출합니다.
var
oRequest: TsgcGRPCPubSubAcknowledgeRequest;
oResponse: TsgcGRPCResponse;
begin
oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.AckIds.Add(aAckId);
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
oRequest.ToBytes);
oResponse.Free;
finally
oRequest.Free;
end;
end;
스트리밍 가져오기
일회성 Pull은 가끔 폴링할 때는 괜찮지만, 메시지가 꾸준히 흐르는 경우 Pub/Sub는 메시지가 도착하는 대로 계속 전달하는 양방향 gRPC 스트림인 StreamingPull을 제공합니다. google.pubsub.v1.Subscriber 서비스에서 양방향(bidi) 스트림을 열고, 구독이 포함된 초기 TsgcGRPCPubSubStreamingPullRequest를 보낸 다음, gRPC 클라이언트의 OnGRPCStreamMessage 이벤트에서 들어오는 메시지를 처리합니다.
var
oRequest: TsgcGRPCPubSubStreamingPullRequest;
vStreamId: Integer;
begin
vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
'StreamingPull');
oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.StreamAckDeadlineSeconds := 10;
GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
finally
oRequest.Free;
end;
// each incoming batch arrives on OnGRPCStreamMessage
// call GRPC.CloseBidiStream(vStreamId) to stop
end;
게시와 가져오기를 넘어서
형식화된 클라이언트는 나머지 관리 영역도 다룹니다. 토픽 관리(CreateTopic, ListTopics, DeleteTopic)와 구독 관리(CreateSubscription, ListSubscriptions, DeleteSubscription)를 위한 요청 및 응답 클래스가 포함되어 있으며, 아직 처리 중인 메시지의 데드라인을 연장하는 ModifyAckDeadline도 있습니다. 모든 작업은 동일한 형태를 따릅니다. 요청 객체를 채우고, ToBytes를 호출하고, 서비스 메서드를 호출한 다음, 대응하는 응답 클래스에 응답을 로드합니다.
제공 범위
Google Cloud Pub/Sub gRPC 클라이언트는 sgcWebSockets Enterprise 에디션의 일부입니다. 게시, 가져오기, 스트리밍 가져오기, 토픽/구독 관리를 갖춘 완전하고 바로 실행 가능한 샘플은 Demos\21.GRPC\10.PubSub에 있으며, 기반이 되는 클라이언트의 전체 레퍼런스는 gRPC Client 제품 페이지에 있습니다.
질문이나 의견이 있으신가요? 문의하기. 코드를 작성한 사람들로부터 답변을 받게 됩니다.
