Google BigQuery は Google のサーバーレスデータウェアハウスで、非常に大きなテーブルに対する分析クエリを実行するために構築されています。BigQuery からテーブルデータを自分のアプリケーションに取り出したいとき、最速の経路は BigQuery Storage Read API です。これは、REST エンドポイント経由でページ送りする代わりに、行をストレージから直接ストリーミングする gRPC サービスです。sgcWebSockets Enterprise には TsgcGRPCClient の上に配置される型付き BigQuery gRPC クライアントが付属しているため、外部の gRPC ランタイムなしで、Delphi や C++Builder から BigQuery のテーブルを読み取れます。
仕組み
Storage Read API は google.cloud.bigquery.storage.v1.BigQueryRead という名前の gRPC サービスです。gRPC は HTTP/2 上でフレーミングされた Protocol Buffers なので、クライアントは sgcWebSockets の HTTP/2 スタック上で動作します。TsgcHTTP2Client をポート 443 で TLS を使って bigquerystorage.googleapis.com に向け、それを TsgcGRPCClient に割り当てると、gRPC クライアントがメッセージのフレーミング、ヘッダー、タイムアウト、トレーラーを代行します。
テーブルの読み取りは 2 段階の呼び出しパターンです。まず CreateReadSession を呼び出します。これは BigQuery にテーブル上のセッションを開いて 1 つ以上の読み取りストリームを返すよう依頼するユーナリ呼び出しです。次にストリーム上で ReadRows を呼び出します。これはサーバーストリーミング呼び出しで、リクエストを 1 つ送ると、サーバーがストリームの終了まで一連の行バッチをプッシュし返します。sgcGRPC_Google_BigQuery 内の型付き BigQuery メッセージがリクエストの protobuf を構築し、レスポンスを解析するので、手作業で組み立てたバイト列の代わりに Delphi クラスを扱えます。
認証
BigQuery は Google Cloud サービスアカウントで認証されます。デモはサービスアカウントの JSON を読み込み、自己署名 JWT を構築してアクセストークンと交換し、そのトークンをすべての呼び出しで gRPC メタデータとして送信します。自己署名のサービスアカウント JWT はオーディエンスにバインドされるため、bigquerystorage.googleapis.com に受け入れられるには、トークンが BigQuery Storage エンドポイントを対象にしている必要があります。
GoogleCloud.GoogleCloudOptions.Authentication := gcaJWT;
GoogleCloud.GoogleCloudOptions.JWT.ClientEmail := ClientEmail;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKeyId := PrivateKeyId;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKey;
GoogleCloud.GoogleCloudOptions.JWT.ProjectId := ProjectId;
GoogleCloud.GoogleCloudOptions.JWT.API_Endpoint :=
'https://bigquerystorage.googleapis.com/';
// once the token is acquired, attach it to every gRPC call
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);
クライアントのセットアップ
gRPC チャネルは HTTP/2 接続です。トランスポートを作成して BigQuery Storage ホストに向け、次に gRPC クライアントを作成してトランスポートをその Client プロパティに割り当てます。コンテンツタイプはバイナリ proto ワイヤ形式で、圧縮はオフのままにします。
uses
sgcHTTP2_Client, sgcGRPC_Client, sgcGRPC_Types, sgcGRPC_Google_BigQuery;
var
HTTP2: TsgcHTTP2Client;
GRPC: TsgcGRPCClient;
begin
HTTP2 := TsgcHTTP2Client.Create(nil);
HTTP2.Host := 'bigquerystorage.googleapis.com';
HTTP2.Port := 443;
HTTP2.TLS := True;
GRPC := TsgcGRPCClient.Create(nil);
GRPC.Client := HTTP2;
GRPC.ChannelOptions.ContentType := grpcProto;
GRPC.ChannelOptions.Compression := grpcNoCompression;
HTTP2.Active := True;
end;
読み取りセッションの作成
セッションを開くには、TsgcGRPCBigQueryCreateReadSessionRequest に値を入れます。Parent は課金対象のプロジェクト、ReadSession.Table は完全修飾されたテーブルパス、DataFormat は行のワイヤ形式を選択します(1 = Avro、2 = Arrow)。MaxStreamCount は BigQuery が返す並列読み取りストリームの数に上限を設けます。ToBytes でリクエストをシリアライズし、ユーナリの Call として送信します。レスポンスは ReadSession で、LoadFromBytes で読み込んでストリーム名を読み戻します。
var
oRequest: TsgcGRPCBigQueryCreateReadSessionRequest;
oResponse: TsgcGRPCResponse;
oSession: TsgcGRPCBigQueryReadSession;
begin
oRequest := TsgcGRPCBigQueryCreateReadSessionRequest.Create;
try
oRequest.Parent := 'projects/' + ProjectId;
oRequest.ReadSession.Table := 'projects/' + ProjectId +
'/datasets/' + Dataset + '/tables/' + Table;
oRequest.ReadSession.DataFormat := 1; { AVRO }
oRequest.MaxStreamCount := 1;
oResponse := GRPC.Call(
'google.cloud.bigquery.storage.v1.BigQueryRead',
'CreateReadSession', oRequest.ToBytes);
if oResponse.StatusCode = grpcOK then
begin
oSession := TsgcGRPCBigQueryReadSession.Create;
try
oSession.LoadFromBytes(oResponse.Data);
// keep the first read stream name for ReadRows
if oSession.StreamCount > 0 then
FReadStreamName := oSession.Stream(0).Name;
finally
oSession.Free;
end;
end;
finally
oRequest.Free;
end;
end;
行のストリーミング
ストリーム名が手に入ったら ReadRows を呼び出します。ReadStream にストリーム名と、省略可能な開始 Offset を設定し、サーバーストリーミングレスポンスのために CallAsync で送信します。サーバーがプッシュする各バッチは OnGRPCStreamMessage を発生させ、OnGRPCStreamEnd はストリームが完了したときに一度だけ発生します。
var
oRequest: TsgcGRPCBigQueryReadRowsRequest;
begin
oRequest := TsgcGRPCBigQueryReadRowsRequest.Create;
try
oRequest.ReadStream := FReadStreamName;
oRequest.Offset := 0;
GRPC.CallAsync('google.cloud.bigquery.storage.v1.BigQueryRead',
'ReadRows', oRequest.ToBytes);
finally
oRequest.Free;
end;
end;
procedure TForm1.OnGRPCStreamMessage(Sender: TObject;
const aMessage: TsgcGRPCStreamMessage; var aCancel: Boolean);
var
oResponse: TsgcGRPCBigQueryReadRowsResponse;
begin
oResponse := TsgcGRPCBigQueryReadRowsResponse.Create;
try
oResponse.LoadFromBytes(aMessage.Data);
if oResponse.AvroRows.RowCount > 0 then
// oResponse.AvroRows.SerializedBinaryRows holds the Avro block
else if oResponse.ArrowRecordBatch.RowCount > 0 then
// oResponse.ArrowRecordBatch.SerializedRecordBatch holds the Arrow batch
finally
oResponse.Free;
end;
end;
Avro と Arrow の行形式
BigQuery はワイヤ上で行を解析済みのフィールドとして返しません。2 つの列指向形式のいずれかでシリアライズされたブロックを返し、セッションを作成するときに形式を選びます。DataFormat := 1 ではレスポンスに AvroRows が含まれ、その SerializedBinaryRows フィールドが Avro エンコードされたブロックを保持し、RowCount がそれが含む行数を教えてくれます。DataFormat := 2 ではレスポンスに ArrowRecordBatch が含まれ、その SerializedRecordBatch フィールドが Apache Arrow のレコードバッチを保持します。ブロックは、お好みの Avro または Arrow ライブラリでデコードします。ストリームとともに返されるセッションスキーマは、列名と型を記述するので、各ブロックに何が含まれているかが分かります。
フィルタリングと射影
行全体を読み取る必要はありません。読み取りセッションは ReadOptions オブジェクトを持ち、射影とフィルターを BigQuery に押し下げて、送信するデータを減らせます。必要な列だけを射影するには SelectedFields に列名を追加し、ストリーミングされる前にサーバー側で行をフィルタリングするには RowRestriction に SQL スタイルの述語を設定します。
oRequest.ReadSession.ReadOptions.SelectedFields.Add('name');
oRequest.ReadSession.ReadOptions.SelectedFields.Add('state');
oRequest.ReadSession.ReadOptions.RowRestriction := 'state = "CA"';
提供状況
BigQuery gRPC クライアントは sgcWebSockets Enterprise エディションの一部で、Windows、macOS、Linux、iOS、Android で動作します。サービスアカウントローダー、上で示したセッション作成と行読み取りのフローを備えた、すぐに実行できるサンプルは Demos\21.GRPC\16.BigQuery にあります。他の Google Cloud gRPC クライアントも含む完全なリファレンスは gRPC Client 製品ページにあります。
ご質問やご意見はありますか? お問い合わせください。コードを書いた本人から返信いたします。
