Google BigQuery over gRPC in Delphi

· Components

Google BigQuery is Google's serverless data warehouse, built to run analytical queries over very large tables. When you want to pull table data out of BigQuery into your own application, the fastest path is the BigQuery Storage Read API, a gRPC service that streams rows straight from storage instead of paging them through a REST endpoint. sgcWebSockets Enterprise ships a typed BigQuery gRPC client that sits on TsgcGRPCClient, so you can read a BigQuery table from Delphi and C++Builder with no external gRPC runtime.

How it works

The Storage Read API is a gRPC service named google.cloud.bigquery.storage.v1.BigQueryRead. gRPC is Protocol Buffers framed over HTTP/2, so the client runs on the sgcWebSockets HTTP/2 stack. You point a TsgcHTTP2Client at bigquerystorage.googleapis.com on port 443 with TLS, assign it to a TsgcGRPCClient, and the gRPC client handles the message framing, headers, timeouts and trailers for you.

Reading a table is a two-call pattern. First you call CreateReadSession, a unary call that asks BigQuery to open a session over a table and hand back one or more read streams. Then you call ReadRows on a stream, a server-streaming call: you send one request and the server pushes back a sequence of row batches until the stream ends. The typed BigQuery messages in sgcGRPC_Google_BigQuery build the request protobufs and parse the responses, so you work with Delphi classes instead of hand-assembled bytes.

Authentication

BigQuery is authenticated with a Google Cloud service account. The demo loads the service-account JSON, builds a self-signed JWT and exchanges it for an access token, then sends that token as gRPC metadata on every call. Because a self-signed service-account JWT is audience-bound, the token has to target the BigQuery Storage endpoint so it is accepted by bigquerystorage.googleapis.com.

GoogleCloud.GoogleCloudOptions.Authentication := gcaJWT;
GoogleCloud.GoogleCloudOptions.JWT.ClientEmail  := ClientEmail;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKeyId := PrivateKeyId;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKey;
GoogleCloud.GoogleCloudOptions.JWT.ProjectId := ProjectId;
GoogleCloud.GoogleCloudOptions.JWT.API_Endpoint :=
  'https://bigquerystorage.googleapis.com/';

// once the token is acquired, attach it to every gRPC call
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);

Setting up the client

A gRPC channel is an HTTP/2 connection. Create the transport, point it at the BigQuery Storage host, then create the gRPC client and assign the transport to its Client property. The content type is the binary proto wire format and compression is left off.

uses
  sgcHTTP2_Client, sgcGRPC_Client, sgcGRPC_Types, sgcGRPC_Google_BigQuery;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'bigquerystorage.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

Creating a read session

To open a session you fill a TsgcGRPCBigQueryCreateReadSessionRequest. The Parent is your billing project, the ReadSession.Table is the fully qualified table path, and DataFormat selects the wire format of the rows (1 = Avro, 2 = Arrow). MaxStreamCount caps how many parallel read streams BigQuery returns. Serialize the request with ToBytes and send it as a unary Call; the response is a ReadSession that you load with LoadFromBytes to read back the stream names.

var
  oRequest: TsgcGRPCBigQueryCreateReadSessionRequest;
  oResponse: TsgcGRPCResponse;
  oSession: TsgcGRPCBigQueryReadSession;
begin
  oRequest := TsgcGRPCBigQueryCreateReadSessionRequest.Create;
  try
    oRequest.Parent := 'projects/' + ProjectId;
    oRequest.ReadSession.Table := 'projects/' + ProjectId +
      '/datasets/' + Dataset + '/tables/' + Table;
    oRequest.ReadSession.DataFormat := 1; { AVRO }
    oRequest.MaxStreamCount := 1;

    oResponse := GRPC.Call(
      'google.cloud.bigquery.storage.v1.BigQueryRead',
      'CreateReadSession', oRequest.ToBytes);

    if oResponse.StatusCode = grpcOK then
    begin
      oSession := TsgcGRPCBigQueryReadSession.Create;
      try
        oSession.LoadFromBytes(oResponse.Data);
        // keep the first read stream name for ReadRows
        if oSession.StreamCount > 0 then
          FReadStreamName := oSession.Stream(0).Name;
      finally
        oSession.Free;
      end;
    end;
  finally
    oRequest.Free;
  end;
end;

Streaming the rows

With a stream name in hand you call ReadRows. Set ReadStream to the stream name and an optional starting Offset, then send it with CallAsync for a server-streaming response. Each batch the server pushes raises OnGRPCStreamMessage, and OnGRPCStreamEnd fires once when the stream is done.

var
  oRequest: TsgcGRPCBigQueryReadRowsRequest;
begin
  oRequest := TsgcGRPCBigQueryReadRowsRequest.Create;
  try
    oRequest.ReadStream := FReadStreamName;
    oRequest.Offset := 0;

    GRPC.CallAsync('google.cloud.bigquery.storage.v1.BigQueryRead',
      'ReadRows', oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
end;

procedure TForm1.OnGRPCStreamMessage(Sender: TObject;
  const aMessage: TsgcGRPCStreamMessage; var aCancel: Boolean);
var
  oResponse: TsgcGRPCBigQueryReadRowsResponse;
begin
  oResponse := TsgcGRPCBigQueryReadRowsResponse.Create;
  try
    oResponse.LoadFromBytes(aMessage.Data);
    if oResponse.AvroRows.RowCount > 0 then
      // oResponse.AvroRows.SerializedBinaryRows holds the Avro block
    else if oResponse.ArrowRecordBatch.RowCount > 0 then
      // oResponse.ArrowRecordBatch.SerializedRecordBatch holds the Arrow batch
  finally
    oResponse.Free;
  end;
end;

Avro and Arrow row formats

BigQuery does not return rows as parsed fields over the wire. It returns serialized blocks in one of two columnar formats, and you pick the format when you create the session. With DataFormat := 1 the response carries AvroRows, whose SerializedBinaryRows field holds an Avro-encoded block and RowCount tells you how many rows it contains. With DataFormat := 2 the response carries an ArrowRecordBatch, whose SerializedRecordBatch field holds an Apache Arrow record batch. You decode the block with your Avro or Arrow library of choice. The session schema, returned alongside the streams, describes the column names and types so you know what each block contains.

Filtering and projecting

You do not have to read whole rows. The read session carries a ReadOptions object that lets you push a projection and a filter down to BigQuery so it sends less data. Add column names to SelectedFields to project only the columns you need, and set RowRestriction to a SQL-style predicate to filter rows server-side before they are streamed.

oRequest.ReadSession.ReadOptions.SelectedFields.Add('name');
oRequest.ReadSession.ReadOptions.SelectedFields.Add('state');
oRequest.ReadSession.ReadOptions.RowRestriction := 'state = "CA"';

Availability

The BigQuery gRPC client is part of the sgcWebSockets Enterprise edition and runs on Windows, macOS, Linux, iOS and Android. A ready-to-run sample, with the service-account loader, create-session and read-rows flow shown above, is in Demos\21.GRPC\16.BigQuery. The full reference, including the other Google Cloud gRPC clients, is on the gRPC Client product page.

Questions or feedback? Get in touch. You will get a reply from the people who wrote the code.