Delphi で gRPC を使う Google BigQuery

· コンポーネント

Google BigQuery は Google のサーバーレスデータウェアハウスで、非常に大きなテーブルに対する分析クエリを実行するために構築されています。BigQuery からテーブルデータを自分のアプリケーションに取り出したいとき、最速の経路は BigQuery Storage Read API です。これは、REST エンドポイント経由でページ送りする代わりに、行をストレージから直接ストリーミングする gRPC サービスです。sgcWebSockets Enterprise には TsgcGRPCClient の上に配置される型付き BigQuery gRPC クライアントが付属しているため、外部の gRPC ランタイムなしで、Delphi や C++Builder から BigQuery のテーブルを読み取れます。

仕組み

Storage Read API は google.cloud.bigquery.storage.v1.BigQueryRead という名前の gRPC サービスです。gRPC は HTTP/2 上でフレーミングされた Protocol Buffers なので、クライアントは sgcWebSockets の HTTP/2 スタック上で動作します。TsgcHTTP2Client をポート 443 で TLS を使って bigquerystorage.googleapis.com に向け、それを TsgcGRPCClient に割り当てると、gRPC クライアントがメッセージのフレーミング、ヘッダー、タイムアウト、トレーラーを代行します。

テーブルの読み取りは 2 段階の呼び出しパターンです。まず CreateReadSession を呼び出します。これは BigQuery にテーブル上のセッションを開いて 1 つ以上の読み取りストリームを返すよう依頼するユーナリ呼び出しです。次にストリーム上で ReadRows を呼び出します。これはサーバーストリーミング呼び出しで、リクエストを 1 つ送ると、サーバーがストリームの終了まで一連の行バッチをプッシュし返します。sgcGRPC_Google_BigQuery 内の型付き BigQuery メッセージがリクエストの protobuf を構築し、レスポンスを解析するので、手作業で組み立てたバイト列の代わりに Delphi クラスを扱えます。

認証

BigQuery は Google Cloud サービスアカウントで認証されます。デモはサービスアカウントの JSON を読み込み、自己署名 JWT を構築してアクセストークンと交換し、そのトークンをすべての呼び出しで gRPC メタデータとして送信します。自己署名のサービスアカウント JWT はオーディエンスにバインドされるため、bigquerystorage.googleapis.com に受け入れられるには、トークンが BigQuery Storage エンドポイントを対象にしている必要があります。

GoogleCloud.GoogleCloudOptions.Authentication := gcaJWT;
GoogleCloud.GoogleCloudOptions.JWT.ClientEmail  := ClientEmail;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKeyId := PrivateKeyId;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKey;
GoogleCloud.GoogleCloudOptions.JWT.ProjectId := ProjectId;
GoogleCloud.GoogleCloudOptions.JWT.API_Endpoint :=
  'https://bigquerystorage.googleapis.com/';

// once the token is acquired, attach it to every gRPC call
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);

クライアントのセットアップ

gRPC チャネルは HTTP/2 接続です。トランスポートを作成して BigQuery Storage ホストに向け、次に gRPC クライアントを作成してトランスポートをその Client プロパティに割り当てます。コンテンツタイプはバイナリ proto ワイヤ形式で、圧縮はオフのままにします。

uses
  sgcHTTP2_Client, sgcGRPC_Client, sgcGRPC_Types, sgcGRPC_Google_BigQuery;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'bigquerystorage.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

読み取りセッションの作成

セッションを開くには、TsgcGRPCBigQueryCreateReadSessionRequest に値を入れます。Parent は課金対象のプロジェクト、ReadSession.Table は完全修飾されたテーブルパス、DataFormat は行のワイヤ形式を選択します(1 = Avro、2 = Arrow)。MaxStreamCount は BigQuery が返す並列読み取りストリームの数に上限を設けます。ToBytes でリクエストをシリアライズし、ユーナリの Call として送信します。レスポンスは ReadSession で、LoadFromBytes で読み込んでストリーム名を読み戻します。

var
  oRequest: TsgcGRPCBigQueryCreateReadSessionRequest;
  oResponse: TsgcGRPCResponse;
  oSession: TsgcGRPCBigQueryReadSession;
begin
  oRequest := TsgcGRPCBigQueryCreateReadSessionRequest.Create;
  try
    oRequest.Parent := 'projects/' + ProjectId;
    oRequest.ReadSession.Table := 'projects/' + ProjectId +
      '/datasets/' + Dataset + '/tables/' + Table;
    oRequest.ReadSession.DataFormat := 1; { AVRO }
    oRequest.MaxStreamCount := 1;

    oResponse := GRPC.Call(
      'google.cloud.bigquery.storage.v1.BigQueryRead',
      'CreateReadSession', oRequest.ToBytes);

    if oResponse.StatusCode = grpcOK then
    begin
      oSession := TsgcGRPCBigQueryReadSession.Create;
      try
        oSession.LoadFromBytes(oResponse.Data);
        // keep the first read stream name for ReadRows
        if oSession.StreamCount > 0 then
          FReadStreamName := oSession.Stream(0).Name;
      finally
        oSession.Free;
      end;
    end;
  finally
    oRequest.Free;
  end;
end;

行のストリーミング

ストリーム名が手に入ったら ReadRows を呼び出します。ReadStream にストリーム名と、省略可能な開始 Offset を設定し、サーバーストリーミングレスポンスのために CallAsync で送信します。サーバーがプッシュする各バッチは OnGRPCStreamMessage を発生させ、OnGRPCStreamEnd はストリームが完了したときに一度だけ発生します。

var
  oRequest: TsgcGRPCBigQueryReadRowsRequest;
begin
  oRequest := TsgcGRPCBigQueryReadRowsRequest.Create;
  try
    oRequest.ReadStream := FReadStreamName;
    oRequest.Offset := 0;

    GRPC.CallAsync('google.cloud.bigquery.storage.v1.BigQueryRead',
      'ReadRows', oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
end;

procedure TForm1.OnGRPCStreamMessage(Sender: TObject;
  const aMessage: TsgcGRPCStreamMessage; var aCancel: Boolean);
var
  oResponse: TsgcGRPCBigQueryReadRowsResponse;
begin
  oResponse := TsgcGRPCBigQueryReadRowsResponse.Create;
  try
    oResponse.LoadFromBytes(aMessage.Data);
    if oResponse.AvroRows.RowCount > 0 then
      // oResponse.AvroRows.SerializedBinaryRows holds the Avro block
    else if oResponse.ArrowRecordBatch.RowCount > 0 then
      // oResponse.ArrowRecordBatch.SerializedRecordBatch holds the Arrow batch
  finally
    oResponse.Free;
  end;
end;

Avro と Arrow の行形式

BigQuery はワイヤ上で行を解析済みのフィールドとして返しません。2 つの列指向形式のいずれかでシリアライズされたブロックを返し、セッションを作成するときに形式を選びます。DataFormat := 1 ではレスポンスに AvroRows が含まれ、その SerializedBinaryRows フィールドが Avro エンコードされたブロックを保持し、RowCount がそれが含む行数を教えてくれます。DataFormat := 2 ではレスポンスに ArrowRecordBatch が含まれ、その SerializedRecordBatch フィールドが Apache Arrow のレコードバッチを保持します。ブロックは、お好みの Avro または Arrow ライブラリでデコードします。ストリームとともに返されるセッションスキーマは、列名と型を記述するので、各ブロックに何が含まれているかが分かります。

フィルタリングと射影

行全体を読み取る必要はありません。読み取りセッションは ReadOptions オブジェクトを持ち、射影とフィルターを BigQuery に押し下げて、送信するデータを減らせます。必要な列だけを射影するには SelectedFields に列名を追加し、ストリーミングされる前にサーバー側で行をフィルタリングするには RowRestriction に SQL スタイルの述語を設定します。

oRequest.ReadSession.ReadOptions.SelectedFields.Add('name');
oRequest.ReadSession.ReadOptions.SelectedFields.Add('state');
oRequest.ReadSession.ReadOptions.RowRestriction := 'state = "CA"';

提供状況

BigQuery gRPC クライアントは sgcWebSockets Enterprise エディションの一部で、Windows、macOS、Linux、iOS、Android で動作します。サービスアカウントローダー、上で示したセッション作成と行読み取りのフローを備えた、すぐに実行できるサンプルは Demos\21.GRPC\16.BigQuery にあります。他の Google Cloud gRPC クライアントも含む完全なリファレンスは gRPC Client 製品ページにあります。

ご質問やご意見はありますか? お問い合わせください。コードを書いた本人から返信いたします。