在 Delphi 中通过 gRPC 使用 Google BigQuery

· 组件

Google BigQuery 是 Google 的无服务器数据仓库,专为对超大型表运行分析查询而构建。当你想把表数据从 BigQuery 拉取到自己的应用程序中时,最快的途径是 BigQuery Storage Read API,这是一个 gRPC 服务,它直接从存储流式传输行,而不是通过 REST 端点分页传输它们。sgcWebSockets Enterprise 提供了一个位于 TsgcGRPCClient 之上的强类型 BigQuery gRPC 客户端,因此你可以从 Delphi 和 C++Builder 读取一张 BigQuery 表,无需任何外部 gRPC 运行时。

工作原理

Storage Read API 是一个名为 google.cloud.bigquery.storage.v1.BigQueryRead 的 gRPC 服务。gRPC 是通过 HTTP/2 封帧的 Protocol Buffers,因此客户端运行在 sgcWebSockets HTTP/2 栈之上。你将一个 TsgcHTTP2Client 指向 bigquerystorage.googleapis.com 端口 443 并启用 TLS,将它赋给一个 TsgcGRPCClient,gRPC 客户端就会替你处理消息封帧、请求头、超时和尾部。

读取一张表是一个两次调用的模式。首先你调用 CreateReadSession,这是一个一元调用,请求 BigQuery 在一张表上打开一个会话并交回一个或多个读取流。然后你在一个流上调用 ReadRows,这是一个服务器流式调用:你发送一个请求,服务器推回一系列行批次,直到流结束。sgcGRPC_Google_BigQuery 中的强类型 BigQuery 消息构建请求 protobuf 并解析响应,因此你处理的是 Delphi 类而不是手工组装的字节。

身份验证

BigQuery 使用一个 Google Cloud 服务账号进行身份验证。该 demo 加载服务账号 JSON,构建一个自签名 JWT 并将其换取为一个访问令牌,然后将该令牌作为 gRPC 元数据随每个调用一起发送。由于自签名的服务账号 JWT 是受众绑定的,因此该令牌必须以 BigQuery Storage 端点为目标,才能被 bigquerystorage.googleapis.com 接受。

GoogleCloud.GoogleCloudOptions.Authentication := gcaJWT;
GoogleCloud.GoogleCloudOptions.JWT.ClientEmail  := ClientEmail;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKeyId := PrivateKeyId;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKey;
GoogleCloud.GoogleCloudOptions.JWT.ProjectId := ProjectId;
GoogleCloud.GoogleCloudOptions.JWT.API_Endpoint :=
  'https://bigquerystorage.googleapis.com/';

// once the token is acquired, attach it to every gRPC call
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);

设置客户端

一个 gRPC 通道就是一个 HTTP/2 连接。创建传输,将它指向 BigQuery Storage 主机,然后创建 gRPC 客户端并将传输赋给它的 Client 属性。内容类型是二进制 proto 传输格式,并关闭压缩。

uses
  sgcHTTP2_Client, sgcGRPC_Client, sgcGRPC_Types, sgcGRPC_Google_BigQuery;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'bigquerystorage.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

创建读取会话

要打开一个会话,你填充一个 TsgcGRPCBigQueryCreateReadSessionRequestParent 是你的计费项目,ReadSession.Table 是完全限定的表路径,DataFormat 选择行的传输格式(1 = Avro,2 = Arrow)。MaxStreamCount 限制 BigQuery 返回多少个并行读取流。用 ToBytes 序列化请求并作为一元 Call 发送;响应是一个 ReadSession,你用 LoadFromBytes 加载它来读回流名称。

var
  oRequest: TsgcGRPCBigQueryCreateReadSessionRequest;
  oResponse: TsgcGRPCResponse;
  oSession: TsgcGRPCBigQueryReadSession;
begin
  oRequest := TsgcGRPCBigQueryCreateReadSessionRequest.Create;
  try
    oRequest.Parent := 'projects/' + ProjectId;
    oRequest.ReadSession.Table := 'projects/' + ProjectId +
      '/datasets/' + Dataset + '/tables/' + Table;
    oRequest.ReadSession.DataFormat := 1; { AVRO }
    oRequest.MaxStreamCount := 1;

    oResponse := GRPC.Call(
      'google.cloud.bigquery.storage.v1.BigQueryRead',
      'CreateReadSession', oRequest.ToBytes);

    if oResponse.StatusCode = grpcOK then
    begin
      oSession := TsgcGRPCBigQueryReadSession.Create;
      try
        oSession.LoadFromBytes(oResponse.Data);
        // keep the first read stream name for ReadRows
        if oSession.StreamCount > 0 then
          FReadStreamName := oSession.Stream(0).Name;
      finally
        oSession.Free;
      end;
    end;
  finally
    oRequest.Free;
  end;
end;

流式传输行

有了一个流名称在手,你就调用 ReadRows。将 ReadStream 设置为流名称和一个可选的起始 Offset,然后用 CallAsync 发送它以获得服务器流式响应。服务器推送的每一批都会触发 OnGRPCStreamMessage,而 OnGRPCStreamEnd 在流完成时触发一次。

var
  oRequest: TsgcGRPCBigQueryReadRowsRequest;
begin
  oRequest := TsgcGRPCBigQueryReadRowsRequest.Create;
  try
    oRequest.ReadStream := FReadStreamName;
    oRequest.Offset := 0;

    GRPC.CallAsync('google.cloud.bigquery.storage.v1.BigQueryRead',
      'ReadRows', oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
end;

procedure TForm1.OnGRPCStreamMessage(Sender: TObject;
  const aMessage: TsgcGRPCStreamMessage; var aCancel: Boolean);
var
  oResponse: TsgcGRPCBigQueryReadRowsResponse;
begin
  oResponse := TsgcGRPCBigQueryReadRowsResponse.Create;
  try
    oResponse.LoadFromBytes(aMessage.Data);
    if oResponse.AvroRows.RowCount > 0 then
      // oResponse.AvroRows.SerializedBinaryRows holds the Avro block
    else if oResponse.ArrowRecordBatch.RowCount > 0 then
      // oResponse.ArrowRecordBatch.SerializedRecordBatch holds the Arrow batch
  finally
    oResponse.Free;
  end;
end;

Avro 和 Arrow 行格式

BigQuery 不会在传输上以解析过的字段形式返回行。它以两种列式格式之一返回序列化的数据块,而你在创建会话时选择该格式。使用 DataFormat := 1 时,响应携带 AvroRows,它的 SerializedBinaryRows 字段保存一个 Avro 编码的数据块,RowCount 告诉你它包含多少行。使用 DataFormat := 2 时,响应携带一个 ArrowRecordBatch,它的 SerializedRecordBatch 字段保存一个 Apache Arrow 记录批。你用自己选择的 Avro 或 Arrow 库来解码该数据块。会话架构与流一起返回,它描述了列名和类型,以便你知道每个数据块包含什么。

过滤和投影

你不必读取整行。读取会话携带一个 ReadOptions 对象,让你把投影和过滤下推到 BigQuery,以便它发送更少的数据。将列名添加到 SelectedFields 以只投影你需要的列,并将 RowRestriction 设置为一个 SQL 风格的谓词,以在行被流式传输之前在服务器端过滤它们。

oRequest.ReadSession.ReadOptions.SelectedFields.Add('name');
oRequest.ReadSession.ReadOptions.SelectedFields.Add('state');
oRequest.ReadSession.ReadOptions.RowRestriction := 'state = "CA"';

可用性

BigQuery gRPC 客户端是 sgcWebSockets Enterprise 版本的一部分,可运行于 Windows、macOS、Linux、iOS 和 Android。一个开箱即用的示例,包含上面展示的服务账号加载器、创建会话和读取行流程,位于 Demos\21.GRPC\16.BigQuery。完整参考资料,包括其他 Google Cloud gRPC 客户端,请见gRPC Client 产品页面

有问题或反馈?联系我们。你会收到来自编写这段代码的人的回复。