Google BigQuery via gRPC in Delphi

· Componenten

Google BigQuery is Google's serverloze datawarehouse, gebouwd om analytische queries over zeer grote tabellen uit te voeren. Wanneer je tabelgegevens uit BigQuery in je eigen applicatie wilt halen, is het snelste pad de BigQuery Storage Read API, een gRPC-service die rijen rechtstreeks uit storage streamt in plaats van ze via een REST-endpoint te pagineren. sgcWebSockets Enterprise levert een getypeerde BigQuery gRPC-client die op TsgcGRPCClient draait, zodat je een BigQuery-tabel kunt lezen vanuit Delphi en C++Builder zonder externe gRPC-runtime.

Hoe het werkt

De Storage Read API is een gRPC-service genaamd google.cloud.bigquery.storage.v1.BigQueryRead. gRPC bestaat uit Protocol Buffers ingekapseld over HTTP/2, dus de client draait op de HTTP/2-stack van sgcWebSockets. Je richt een TsgcHTTP2Client op bigquerystorage.googleapis.com op poort 443 met TLS, wijst het toe aan een TsgcGRPCClient, en de gRPC-client verzorgt de berichtframing, headers, timeouts en trailers voor je.

Een tabel lezen is een patroon met twee calls. Eerst roep je CreateReadSession aan, een unaire call die BigQuery vraagt een sessie over een tabel te openen en een of meer read streams terug te geven. Vervolgens roep je ReadRows aan op een stream, een server-streaming-call: je verstuurt één request en de server pusht een reeks rij-batches terug totdat de stream eindigt. De getypeerde BigQuery-berichten in sgcGRPC_Google_BigQuery bouwen de request-protobufs en parsen de responses, zodat je met Delphi-klassen werkt in plaats van met de hand samengestelde bytes.

Authenticatie

BigQuery wordt geauthenticeerd met een Google Cloud-service-account. De demo laadt de service-account-JSON, bouwt een zelfondertekende JWT en wisselt deze in voor een accesstoken, en verstuurt dat token vervolgens als gRPC-metadata bij elke call. Omdat een zelfondertekende service-account-JWT audience-gebonden is, moet het token gericht zijn op het BigQuery Storage-endpoint zodat het door bigquerystorage.googleapis.com wordt geaccepteerd.

GoogleCloud.GoogleCloudOptions.Authentication := gcaJWT;
GoogleCloud.GoogleCloudOptions.JWT.ClientEmail  := ClientEmail;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKeyId := PrivateKeyId;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKey;
GoogleCloud.GoogleCloudOptions.JWT.ProjectId := ProjectId;
GoogleCloud.GoogleCloudOptions.JWT.API_Endpoint :=
  'https://bigquerystorage.googleapis.com/';

// once the token is acquired, attach it to every gRPC call
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);

De client opzetten

Een gRPC-kanaal is een HTTP/2-verbinding. Maak de transport, richt het op de BigQuery Storage-host, maak vervolgens de gRPC-client en wijs de transport toe aan zijn Client-eigenschap. Het contenttype is het binaire proto-wireformaat en compressie blijft uit.

uses
  sgcHTTP2_Client, sgcGRPC_Client, sgcGRPC_Types, sgcGRPC_Google_BigQuery;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'bigquerystorage.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

Een read session aanmaken

Om een sessie te openen vul je een TsgcGRPCBigQueryCreateReadSessionRequest. De Parent is je facturatieproject, de ReadSession.Table is het volledig gekwalificeerde tabelpad, en DataFormat selecteert het wireformaat van de rijen (1 = Avro, 2 = Arrow). MaxStreamCount begrenst hoeveel parallelle read streams BigQuery retourneert. Serialiseer de request met ToBytes en verstuur het als een unaire Call; de response is een ReadSession die je laadt met LoadFromBytes om de stream-namen terug te lezen.

var
  oRequest: TsgcGRPCBigQueryCreateReadSessionRequest;
  oResponse: TsgcGRPCResponse;
  oSession: TsgcGRPCBigQueryReadSession;
begin
  oRequest := TsgcGRPCBigQueryCreateReadSessionRequest.Create;
  try
    oRequest.Parent := 'projects/' + ProjectId;
    oRequest.ReadSession.Table := 'projects/' + ProjectId +
      '/datasets/' + Dataset + '/tables/' + Table;
    oRequest.ReadSession.DataFormat := 1; { AVRO }
    oRequest.MaxStreamCount := 1;

    oResponse := GRPC.Call(
      'google.cloud.bigquery.storage.v1.BigQueryRead',
      'CreateReadSession', oRequest.ToBytes);

    if oResponse.StatusCode = grpcOK then
    begin
      oSession := TsgcGRPCBigQueryReadSession.Create;
      try
        oSession.LoadFromBytes(oResponse.Data);
        // keep the first read stream name for ReadRows
        if oSession.StreamCount > 0 then
          FReadStreamName := oSession.Stream(0).Name;
      finally
        oSession.Free;
      end;
    end;
  finally
    oRequest.Free;
  end;
end;

De rijen streamen

Met een stream-naam in handen roep je ReadRows aan. Stel ReadStream in op de stream-naam en een optionele start-Offset, en verstuur het vervolgens met CallAsync voor een server-streaming-response. Elke batch die de server pusht roept OnGRPCStreamMessage op, en OnGRPCStreamEnd vuurt eenmaal wanneer de stream klaar is.

var
  oRequest: TsgcGRPCBigQueryReadRowsRequest;
begin
  oRequest := TsgcGRPCBigQueryReadRowsRequest.Create;
  try
    oRequest.ReadStream := FReadStreamName;
    oRequest.Offset := 0;

    GRPC.CallAsync('google.cloud.bigquery.storage.v1.BigQueryRead',
      'ReadRows', oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
end;

procedure TForm1.OnGRPCStreamMessage(Sender: TObject;
  const aMessage: TsgcGRPCStreamMessage; var aCancel: Boolean);
var
  oResponse: TsgcGRPCBigQueryReadRowsResponse;
begin
  oResponse := TsgcGRPCBigQueryReadRowsResponse.Create;
  try
    oResponse.LoadFromBytes(aMessage.Data);
    if oResponse.AvroRows.RowCount > 0 then
      // oResponse.AvroRows.SerializedBinaryRows holds the Avro block
    else if oResponse.ArrowRecordBatch.RowCount > 0 then
      // oResponse.ArrowRecordBatch.SerializedRecordBatch holds the Arrow batch
  finally
    oResponse.Free;
  end;
end;

Avro- en Arrow-rijformaten

BigQuery retourneert rijen niet als geparseerde velden over de wire. Het retourneert geserialiseerde blokken in een van twee kolomgeoriënteerde formaten, en je kiest het formaat wanneer je de sessie aanmaakt. Met DataFormat := 1 draagt de response AvroRows, waarvan het SerializedBinaryRows-veld een Avro-gecodeerd blok bevat en RowCount je vertelt hoeveel rijen het bevat. Met DataFormat := 2 draagt de response een ArrowRecordBatch, waarvan het SerializedRecordBatch-veld een Apache Arrow record batch bevat. Je decodeert het blok met je Avro- of Arrow-bibliotheek naar keuze. Het sessieschema, dat naast de streams wordt geretourneerd, beschrijft de kolomnamen en -typen zodat je weet wat elk blok bevat.

Filteren en projecteren

Je hoeft geen hele rijen te lezen. De read session draagt een ReadOptions-object waarmee je een projectie en een filter naar BigQuery kunt pushen zodat het minder data verstuurt. Voeg kolomnamen toe aan SelectedFields om alleen de kolommen te projecteren die je nodig hebt, en stel RowRestriction in op een SQL-achtig predicaat om rijen server-side te filteren voordat ze worden gestreamd.

oRequest.ReadSession.ReadOptions.SelectedFields.Add('name');
oRequest.ReadSession.ReadOptions.SelectedFields.Add('state');
oRequest.ReadSession.ReadOptions.RowRestriction := 'state = "CA"';

Beschikbaarheid

De BigQuery gRPC-client maakt deel uit van de sgcWebSockets Enterprise-editie en draait op Windows, macOS, Linux, iOS en Android. Een kant-en-klaar voorbeeld, met de service-account-loader, de create-session- en read-rows-flow hierboven getoond, staat in Demos\21.GRPC\16.BigQuery. De volledige referentie, inclusief de andere Google Cloud gRPC-clients, staat op de productpagina van de gRPC Client.

Vragen of feedback? Neem contact op. Je krijgt antwoord van de mensen die de code hebben geschreven.