Google Cloud Pub/Sub sobre gRPC en Delphi

· Componentes

Google Cloud Pub/Sub es el servicio de mensajería gestionado de Google. Los productores publican mensajes en un topic, y los consumidores los reciben extrayéndolos de una subscription asociada a ese topic, de modo que ambos lados quedan totalmente desacoplados. El servicio expone una API gRPC, y la edición Enterprise de sgcWebSockets incluye un cliente gRPC de Pub/Sub tipado construido sobre TsgcGRPCClient, así que puedes publicar y extraer directamente desde Delphi y C++Builder sin ningún runtime externo ni sidecar.

Este artículo muestra cómo se conecta el cliente, cómo funciona la autenticación, y cómo publicar un mensaje en un topic y extraer mensajes de una subscription, usando las mismas llamadas que la demo de ejemplo.

Cómo funciona

La API gRPC de Pub/Sub no es más que mensajes de Protocol Buffers enmarcados sobre HTTP/2. sgcWebSockets ya incluye una pila HTTP/2 completa, así que el cliente de Pub/Sub se apoya en un transporte TsgcHTTP2Client apuntando a pubsub.googleapis.com en el puerto 443 sobre TLS. TsgcGRPCClient se encarga del enmarcado gRPC, las cabeceras de la petición y los trailers de la respuesta, mientras que las clases de mensaje tipadas de Pub/Sub serializan y analizan las cargas protobuf por ti.

La autenticación usa una cuenta de servicio de Google. Proporcionas las credenciales de la cuenta de servicio a un TsgcHTTPGoogleCloud_PubSub_Client, que obtiene un token de acceso OAuth2, y colocas ese token en el DefaultMetadata del cliente gRPC como una cabecera authorization: Bearer. Como se establece como metadata por defecto, viaja automáticamente en cada llamada de Pub/Sub.

uses
  sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
  sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
  sgcGRPC_Google_PubSub;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'pubsub.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

Autenticación con cuenta de servicio

El helper de Google Cloud gestiona el intercambio del token. Carga el JSON de la cuenta de servicio, establece las credenciales JWT y maneja el evento OnAuthToken para copiar el bearer token resultante en el cliente gRPC. El mismo componente también admite OAuth2 si prefieres un flujo interactivo.

var
  PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
  PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
  PubSub.OnAuthToken := PubSubAuthToken;

  PubSub.GoogleCloudOptions.Authentication := gcaJWT;
  PubSub.GoogleCloudOptions.JWT.ClientEmail  := 'svc@my-project.iam.gserviceaccount.com';
  PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
  PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
  PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';

  // triggers OnAuthToken once the access token is acquired
  PubSub.ListTopics('my-project');
end;

procedure TForm1.PubSubAuthToken(Sender: TObject;
  const TokenType, Token, Data: string);
begin
  GRPC.DefaultMetadata.Clear;
  GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;

Publicar en un topic

Para publicar, construye un TsgcGRPCPubSubPublishRequest, establece el nombre completo del topic, añade uno o más mensajes y llama al método Publish del servicio google.pubsub.v1.Publisher. Las cargas de los mensajes son bytes en bruto, así que codifica tu texto como UTF-8 primero. La respuesta lleva los ids de mensaje asignados por el servidor.

var
  oRequest: TsgcGRPCPubSubPublishRequest;
  oMsg: TsgcGRPCPubSubPubsubMessage;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPublishResponse;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPublishRequest.Create;
  try
    oRequest.Topic := 'projects/my-project/topics/my-topic';
    oMsg := oRequest.AddMessage;
    oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');

    oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
      oRequest.ToBytes);
    try
      if oResponse.StatusCode = grpcOK then
      begin
        oResult := TsgcGRPCPubSubPublishResponse.Create;
        try
          oResult.LoadFromBytes(oResponse.Data);
          for i := 0 to oResult.MessageIds.Count - 1 do
            Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
        finally
          oResult.Free;
        end;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

Extraer de una subscription

Para recibir mensajes, construye un TsgcGRPCPubSubPullRequest con la ruta de la subscription y el número máximo de mensajes a obtener, luego llama a Pull en el servicio google.pubsub.v1.Subscriber. Analiza la respuesta en un TsgcGRPCPubSubPullResponse y recorre los mensajes recibidos. Cada uno lleva un AckId que debes devolver para confirmar la entrega, de lo contrario Pub/Sub reenvía el mensaje tras el plazo de acuse.

var
  oRequest: TsgcGRPCPubSubPullRequest;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPullResponse;
  oRecv: TsgcGRPCPubSubReceivedMessage;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.MaxMessages := 10;

    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
      oRequest.ToBytes);
    try
      oResult := TsgcGRPCPubSubPullResponse.Create;
      try
        oResult.LoadFromBytes(oResponse.Data);
        for i := 0 to oResult.ReceivedMessageCount - 1 do
        begin
          oRecv := oResult.ReceivedMessage(i);
          Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
          Acknowledge(oRecv.AckId);   // confirm delivery
        end;
      finally
        oResult.Free;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

Acusar es otra llamada unaria más. Construye un TsgcGRPCPubSubAcknowledgeRequest con la ruta de la subscription y los ack ids que quieras confirmar, luego llama a Acknowledge en el servicio google.pubsub.v1.Subscriber.

var
  oRequest: TsgcGRPCPubSubAcknowledgeRequest;
  oResponse: TsgcGRPCResponse;
begin
  oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.AckIds.Add(aAckId);
    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
      oRequest.ToBytes);
    oResponse.Free;
  finally
    oRequest.Free;
  end;
end;

Streaming pull

Un Pull puntual está bien para sondeos ocasionales, pero para un flujo constante de mensajes Pub/Sub ofrece StreamingPull, un stream gRPC bidireccional que sigue entregando mensajes a medida que llegan. Abre un stream bidi en el servicio google.pubsub.v1.Subscriber, envía un TsgcGRPCPubSubStreamingPullRequest inicial con la subscription, luego maneja los mensajes entrantes en el evento OnGRPCStreamMessage del cliente gRPC.

var
  oRequest: TsgcGRPCPubSubStreamingPullRequest;
  vStreamId: Integer;
begin
  vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
    'StreamingPull');

  oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.StreamAckDeadlineSeconds := 10;
    GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
  // each incoming batch arrives on OnGRPCStreamMessage
  // call GRPC.CloseBidiStream(vStreamId) to stop
end;

Más allá de publicar y extraer

El cliente tipado cubre también el resto de la superficie de administración. Incluye clases de petición y respuesta para gestionar topics (CreateTopic, ListTopics, DeleteTopic) y subscriptions (CreateSubscription, ListSubscriptions, DeleteSubscription), además de ModifyAckDeadline para ampliar el plazo de los mensajes que aún estás procesando. Cada operación sigue la misma forma: rellena un objeto de petición, llama a ToBytes, invoca el método del servicio y carga la respuesta en la clase de respuesta correspondiente.

Disponibilidad

El cliente gRPC de Google Cloud Pub/Sub forma parte de la edición Enterprise de sgcWebSockets. Hay un ejemplo completo y listo para ejecutar con publicación, extracción, streaming pull y gestión de topics/subscriptions en Demos\21.GRPC\10.PubSub, y la referencia completa del cliente subyacente está en la página de producto del Cliente gRPC.

¿Preguntas o comentarios? Ponte en contacto. Recibirás respuesta de las personas que escribieron el código.