Google Cloud Pub/Sub via gRPC in Delphi

· Componenten

Google Cloud Pub/Sub is Google's beheerde messagingdienst. Producenten publiceren berichten naar een topic, en consumenten ontvangen ze door op te halen uit een subscription die aan dat topic is gekoppeld, zodat beide kanten volledig ontkoppeld blijven. De dienst biedt een gRPC-API, en de sgcWebSockets Enterprise-editie levert een getypeerde Pub/Sub gRPC-client bovenop TsgcGRPCClient, zodat je rechtstreeks vanuit Delphi en C++Builder kunt publiceren en ophalen zonder enige externe runtime of sidecar.

Dit bericht laat zien hoe de client wordt opgezet, hoe authenticatie werkt, en hoe je een bericht naar een topic publiceert en berichten uit een subscription ophaalt, met dezelfde calls als de voorbeelddemo.

Hoe het werkt

De gRPC-API van Pub/Sub bestaat gewoon uit Protocol Buffers-berichten ingekapseld over HTTP/2. sgcWebSockets levert al een complete HTTP/2-stack, dus de Pub/Sub-client draait op een TsgcHTTP2Client-transport gericht op pubsub.googleapis.com op poort 443 over TLS. TsgcGRPCClient verzorgt de gRPC-framing, de request-headers en de response-trailers, terwijl de getypeerde Pub/Sub-berichtklassen de protobuf-payloads voor je serialiseren en parsen.

Authenticatie gebruikt een Google-service-account. Je voert de service-account-credentials in bij een TsgcHTTPGoogleCloud_PubSub_Client, die een OAuth2-accesstoken verkrijgt, en je plaatst dat token in de DefaultMetadata van de gRPC-client als een authorization: Bearer-header. Omdat het als default metadata wordt ingesteld, reist het automatisch mee bij elke Pub/Sub-call.

uses
  sgcHTTP2_Client, sgcGRPC_Types, sgcGRPC_Client,
  sgcHTTP_Google_Cloud, sgcHTTP_Google_PubSub,
  sgcGRPC_Google_PubSub;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'pubsub.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

Service-account-authenticatie

De Google Cloud-helper verzorgt de token-uitwisseling. Laad de service-account-JSON, stel de JWT-credentials in, en handel de OnAuthToken-gebeurtenis af om het resulterende bearer-token naar de gRPC-client te kopiëren. Hetzelfde component ondersteunt ook OAuth2 als je een interactieve flow verkiest.

var
  PubSub: TsgcHTTPGoogleCloud_PubSub_Client;
begin
  PubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
  PubSub.OnAuthToken := PubSubAuthToken;

  PubSub.GoogleCloudOptions.Authentication := gcaJWT;
  PubSub.GoogleCloudOptions.JWT.ClientEmail  := 'svc@my-project.iam.gserviceaccount.com';
  PubSub.GoogleCloudOptions.JWT.PrivateKeyId := 'key-id';
  PubSub.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKeyPEM;
  PubSub.GoogleCloudOptions.JWT.ProjectId := 'my-project';

  // triggers OnAuthToken once the access token is acquired
  PubSub.ListTopics('my-project');
end;

procedure TForm1.PubSubAuthToken(Sender: TObject;
  const TokenType, Token, Data: string);
begin
  GRPC.DefaultMetadata.Clear;
  GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
end;

Publiceren naar een topic

Om te publiceren bouw je een TsgcGRPCPubSubPublishRequest, stel je de volledig gekwalificeerde topic-naam in, voeg je een of meer berichten toe, en roep je de Publish-methode aan op de google.pubsub.v1.Publisher-service. Berichtpayloads zijn ruwe bytes, dus codeer je tekst eerst als UTF-8. De response bevat de door de server toegekende bericht-ids.

var
  oRequest: TsgcGRPCPubSubPublishRequest;
  oMsg: TsgcGRPCPubSubPubsubMessage;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPublishResponse;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPublishRequest.Create;
  try
    oRequest.Topic := 'projects/my-project/topics/my-topic';
    oMsg := oRequest.AddMessage;
    oMsg.Data := TEncoding.UTF8.GetBytes('Hello Pub/Sub');

    oResponse := GRPC.Call('google.pubsub.v1.Publisher', 'Publish',
      oRequest.ToBytes);
    try
      if oResponse.StatusCode = grpcOK then
      begin
        oResult := TsgcGRPCPubSubPublishResponse.Create;
        try
          oResult.LoadFromBytes(oResponse.Data);
          for i := 0 to oResult.MessageIds.Count - 1 do
            Memo1.Lines.Add('MessageId: ' + oResult.MessageIds[i]);
        finally
          oResult.Free;
        end;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

Ophalen uit een subscription

Om berichten te ontvangen bouw je een TsgcGRPCPubSubPullRequest met het subscription-pad en het maximale aantal op te halen berichten, en roep je vervolgens Pull aan op de google.pubsub.v1.Subscriber-service. Parse het antwoord in een TsgcGRPCPubSubPullResponse en loop door de ontvangen berichten. Elk bericht draagt een AckId dat je moet terugsturen om de aflevering te bevestigen, anders levert Pub/Sub het bericht opnieuw af na de ack-deadline.

var
  oRequest: TsgcGRPCPubSubPullRequest;
  oResponse: TsgcGRPCResponse;
  oResult: TsgcGRPCPubSubPullResponse;
  oRecv: TsgcGRPCPubSubReceivedMessage;
  i: Integer;
begin
  oRequest := TsgcGRPCPubSubPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.MaxMessages := 10;

    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Pull',
      oRequest.ToBytes);
    try
      oResult := TsgcGRPCPubSubPullResponse.Create;
      try
        oResult.LoadFromBytes(oResponse.Data);
        for i := 0 to oResult.ReceivedMessageCount - 1 do
        begin
          oRecv := oResult.ReceivedMessage(i);
          Memo1.Lines.Add(TEncoding.UTF8.GetString(oRecv.Message.Data));
          Acknowledge(oRecv.AckId);   // confirm delivery
        end;
      finally
        oResult.Free;
      end;
    finally
      oResponse.Free;
    end;
  finally
    oRequest.Free;
  end;
end;

Bevestigen is nog een unaire call. Bouw een TsgcGRPCPubSubAcknowledgeRequest met het subscription-pad en de ack-ids die je wilt bevestigen, en roep vervolgens Acknowledge aan op de google.pubsub.v1.Subscriber-service.

var
  oRequest: TsgcGRPCPubSubAcknowledgeRequest;
  oResponse: TsgcGRPCResponse;
begin
  oRequest := TsgcGRPCPubSubAcknowledgeRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.AckIds.Add(aAckId);
    oResponse := GRPC.Call('google.pubsub.v1.Subscriber', 'Acknowledge',
      oRequest.ToBytes);
    oResponse.Free;
  finally
    oRequest.Free;
  end;
end;

Streaming pull

Een eenmalige Pull is prima voor incidenteel pollen, maar voor een gestage stroom berichten biedt Pub/Sub StreamingPull, een bidirectionele gRPC-stream die berichten blijft afleveren zodra ze binnenkomen. Open een bidi-stream op de google.pubsub.v1.Subscriber-service, verstuur een initiële TsgcGRPCPubSubStreamingPullRequest met de subscription, en handel binnenkomende berichten af op de OnGRPCStreamMessage-gebeurtenis van de gRPC-client.

var
  oRequest: TsgcGRPCPubSubStreamingPullRequest;
  vStreamId: Integer;
begin
  vStreamId := GRPC.OpenBidiStream('google.pubsub.v1.Subscriber',
    'StreamingPull');

  oRequest := TsgcGRPCPubSubStreamingPullRequest.Create;
  try
    oRequest.Subscription := 'projects/my-project/subscriptions/my-sub';
    oRequest.StreamAckDeadlineSeconds := 10;
    GRPC.SendBidiMessage(vStreamId, oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
  // each incoming batch arrives on OnGRPCStreamMessage
  // call GRPC.CloseBidiStream(vStreamId) to stop
end;

Verder dan publiceren en ophalen

De getypeerde client dekt ook de rest van het beheeroppervlak. Hij bevat request- en response-klassen voor het beheren van topics (CreateTopic, ListTopics, DeleteTopic) en subscriptions (CreateSubscription, ListSubscriptions, DeleteSubscription), plus ModifyAckDeadline om de deadline te verlengen voor berichten die je nog verwerkt. Elke bewerking volgt dezelfde vorm: vul een request-object, roep ToBytes aan, roep de servicemethode aan, en laad het antwoord in de bijbehorende response-klasse.

Beschikbaarheid

De Google Cloud Pub/Sub gRPC-client maakt deel uit van de sgcWebSockets Enterprise-editie. Een compleet, kant-en-klaar voorbeeld met publiceren, ophalen, streaming pull en topic-/subscription-beheer staat in Demos\21.GRPC\10.PubSub, en de volledige referentie voor de onderliggende client staat op de productpagina van de gRPC Client.

Vragen of feedback? Neem contact op. Je krijgt antwoord van de mensen die de code hebben geschreven.