Google Cloud Pub/Sub es el servicio de mensajería gestionado de Google. Los productores publican mensajes en un topic, y los consumidores los reciben extrayéndolos de una subscription asociada a ese topic, de modo que ambos lados quedan totalmente desacoplados. El servicio expone una API gRPC, y la edición Enterprise de sgcWebSockets incluye un cliente gRPC de Pub/Sub tipado construido sobre TsgcGRPCClient, así que puedes publicar y extraer directamente desde Delphi y C++Builder sin ningún runtime externo ni sidecar.
Este artículo muestra cómo se conecta el cliente, cómo funciona la autenticación, y cómo publicar un mensaje en un topic y extraer mensajes de una subscription, usando las mismas llamadas que la demo de ejemplo.
Cómo funciona
La API gRPC de Pub/Sub no es más que mensajes de Protocol Buffers enmarcados sobre HTTP/2. sgcWebSockets ya incluye una pila HTTP/2 completa, así que el cliente de Pub/Sub se apoya en un transporte TsgcHTTP2Client apuntando a pubsub.googleapis.com en el puerto 443 sobre TLS. TsgcGRPCClient se encarga del enmarcado gRPC, las cabeceras de la petición y los trailers de la respuesta, mientras que las clases de mensaje tipadas de Pub/Sub serializan y analizan las cargas protobuf por ti.
La autenticación usa una cuenta de servicio de Google. Proporcionas las credenciales de la cuenta de servicio a un TsgcHTTPGoogleCloud_PubSub_Client, que obtiene un token de acceso OAuth2, y colocas ese token en el DefaultMetadata del cliente gRPC como una cabecera authorization: Bearer. Como se establece como metadata por defecto, viaja automáticamente en cada llamada de Pub/Sub.
uses
sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
sgcGRPC_Google_PubSub;
var
HTTP2: TsgcHTTP2Client;
GRPC: TsgcGRPCClient;
begin
HTTP2 := TsgcHTTP2Client.Create(nil);
HTTP2.Host := 'pubsub.googleapis.com';
HTTP2.Port := 443;
HTTP2.TLS := True;
GRPC := TsgcGRPCClient.Create(nil);
GRPC.Client := HTTP2;
GRPC.ChannelOptions.ContentType := grpcProto;
GRPC.ChannelOptions.Compression := grpcNoCompression;
HTTP2.Active := True;
end;
Autenticación con cuenta de servicio
El helper de Google Cloud gestiona el intercambio del token. Carga el JSON de la cuenta de servicio, establece las credenciales JWT y maneja el evento OnAuthToken para copiar el bearer token resultante en el cliente gRPC. El mismo componente también admite OAuth2 si prefieres un flujo interactivo.
var
PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
PubSub.OnAuthToken := PubSubAuthToken;
PubSub.GoogleCloudOptions.Authentication := gcaJWT;
PubSub.GoogleCloudOptions.JWT.ClientEmail := 'svc@my-project.iam.gserviceaccount.com';
PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';
// triggers OnAuthToken once the access token is acquired
PubSub.ListTopics('my-project');
end;
procedure TForm1.PubSubAuthToken(Sender: TObject;
const TokenType, Token, Data: string);
begin
GRPC.DefaultMetadata.Clear;
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;
Publicar en un topic
Para publicar, construye un TsgcGRPCPubSubPublishRequest, establece el nombre completo del topic, añade uno o más mensajes y llama al método Publish del servicio google.pubsub.v1.Publisher. Las cargas de los mensajes son bytes en bruto, así que codifica tu texto como UTF-8 primero. La respuesta lleva los ids de mensaje asignados por el servidor.
var
oRequest: TsgcGRPCPubSubPublishRequest;
oMsg: TsgcGRPCPubSubPubsubMessage;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPublishResponse;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPublishRequest.Create;
try
oRequest.Topic := 'projects/my-project/topics/my-topic';
oMsg := oRequest.AddMessage;
oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');
oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
oRequest.ToBytes);
try
if oResponse.StatusCode = grpcOK then
begin
oResult := TsgcGRPCPubSubPublishResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.MessageIds.Count - 1 do
Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
finally
oResult.Free;
end;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
Extraer de una subscription
Para recibir mensajes, construye un TsgcGRPCPubSubPullRequest con la ruta de la subscription y el número máximo de mensajes a obtener, luego llama a Pull en el servicio google.pubsub.v1.Subscriber. Analiza la respuesta en un TsgcGRPCPubSubPullResponse y recorre los mensajes recibidos. Cada uno lleva un AckId que debes devolver para confirmar la entrega, de lo contrario Pub/Sub reenvía el mensaje tras el plazo de acuse.
var
oRequest: TsgcGRPCPubSubPullRequest;
oResponse: TsgcGRPCResponse;
oResult: TsgcGRPCPubSubPullResponse;
oRecv: TsgcGRPCPubSubReceivedMessage;
i: Integer;
begin
oRequest := TsgcGRPCPubSubPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.MaxMessages := 10;
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
oRequest.ToBytes);
try
oResult := TsgcGRPCPubSubPullResponse.Create;
try
oResult.LoadFromBytes(oResponse.Data);
for i := 0 to oResult.ReceivedMessageCount - 1 do
begin
oRecv := oResult.ReceivedMessage(i);
Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
Acknowledge(oRecv.AckId); // confirm delivery
end;
finally
oResult.Free;
end;
finally
oResponse.Free;
end;
finally
oRequest.Free;
end;
end;
Acusar es otra llamada unaria más. Construye un TsgcGRPCPubSubAcknowledgeRequest con la ruta de la subscription y los ack ids que quieras confirmar, luego llama a Acknowledge en el servicio google.pubsub.v1.Subscriber.
var
oRequest: TsgcGRPCPubSubAcknowledgeRequest;
oResponse: TsgcGRPCResponse;
begin
oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.AckIds.Add(aAckId);
oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
oRequest.ToBytes);
oResponse.Free;
finally
oRequest.Free;
end;
end;
Streaming pull
Un Pull puntual está bien para sondeos ocasionales, pero para un flujo constante de mensajes Pub/Sub ofrece StreamingPull, un stream gRPC bidireccional que sigue entregando mensajes a medida que llegan. Abre un stream bidi en el servicio google.pubsub.v1.Subscriber, envía un TsgcGRPCPubSubStreamingPullRequest inicial con la subscription, luego maneja los mensajes entrantes en el evento OnGRPCStreamMessage del cliente gRPC.
var
oRequest: TsgcGRPCPubSubStreamingPullRequest;
vStreamId: Integer;
begin
vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
'StreamingPull');
oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
try
oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
oRequest.StreamAckDeadlineSeconds := 10;
GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
finally
oRequest.Free;
end;
// each incoming batch arrives on OnGRPCStreamMessage
// call GRPC.CloseBidiStream(vStreamId) to stop
end;
Más allá de publicar y extraer
El cliente tipado cubre también el resto de la superficie de administración. Incluye clases de petición y respuesta para gestionar topics (CreateTopic, ListTopics, DeleteTopic) y subscriptions (CreateSubscription, ListSubscriptions, DeleteSubscription), además de ModifyAckDeadline para ampliar el plazo de los mensajes que aún estás procesando. Cada operación sigue la misma forma: rellena un objeto de petición, llama a ToBytes, invoca el método del servicio y carga la respuesta en la clase de respuesta correspondiente.
Disponibilidad
El cliente gRPC de Google Cloud Pub/Sub forma parte de la edición Enterprise de sgcWebSockets. Hay un ejemplo completo y listo para ejecutar con publicación, extracción, streaming pull y gestión de topics/subscriptions en Demos\21.GRPC\10.PubSub, y la referencia completa del cliente subyacente está en la página de producto del Cliente gRPC.
¿Preguntas o comentarios? Ponte en contacto. Recibirás respuesta de las personas que escribieron el código.
