Google BigQuery sobre gRPC no Delphi

· Componentes

O Google BigQuery é o data warehouse serverless do Google, criado para executar consultas analíticas sobre tabelas muito grandes. Quando você quer extrair dados de tabela do BigQuery para sua própria aplicação, o caminho mais rápido é a BigQuery Storage Read API, um serviço gRPC que faz streaming de linhas diretamente do storage em vez de paginá-las por um endpoint REST. O sgcWebSockets Enterprise inclui um cliente BigQuery gRPC tipado que se apoia sobre o TsgcGRPCClient, para que você possa ler uma tabela do BigQuery a partir do Delphi e do C++Builder sem nenhum runtime gRPC externo.

Como funciona

A Storage Read API é um serviço gRPC chamado google.cloud.bigquery.storage.v1.BigQueryRead. O gRPC são Protocol Buffers encapsulados sobre HTTP/2, então o cliente roda sobre a pilha HTTP/2 do sgcWebSockets. Você aponta um TsgcHTTP2Client para bigquerystorage.googleapis.com na porta 443 com TLS, o atribui a um TsgcGRPCClient, e o cliente gRPC trata o enquadramento de mensagens, os cabeçalhos, os timeouts e os trailers para você.

Ler uma tabela é um padrão de duas chamadas. Primeiro você chama CreateReadSession, uma chamada unária que pede ao BigQuery para abrir uma sessão sobre uma tabela e devolver um ou mais streams de leitura. Depois você chama ReadRows em um stream, uma chamada server-streaming: você envia uma requisição e o servidor empurra de volta uma sequência de lotes de linhas até o stream terminar. As mensagens BigQuery tipadas em sgcGRPC_Google_BigQuery montam os protobufs da requisição e analisam as respostas, então você trabalha com classes Delphi em vez de bytes montados à mão.

Autenticação

O BigQuery é autenticado com uma conta de serviço do Google Cloud. A demo carrega o JSON da conta de serviço, monta um JWT autoassinado e o troca por um token de acesso, então envia esse token como metadado gRPC em cada chamada. Como um JWT autoassinado de conta de serviço é vinculado à audiência, o token precisa ter como alvo o endpoint do BigQuery Storage para que seja aceito por bigquerystorage.googleapis.com.

GoogleCloud.GoogleCloudOptions.Authentication := gcaJWT;
GoogleCloud.GoogleCloudOptions.JWT.ClientEmail  := ClientEmail;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKeyId := PrivateKeyId;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKey;
GoogleCloud.GoogleCloudOptions.JWT.ProjectId := ProjectId;
GoogleCloud.GoogleCloudOptions.JWT.API_Endpoint :=
  'https://bigquerystorage.googleapis.com/';

// once the token is acquired, attach it to every gRPC call
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);

Configurando o cliente

Um canal gRPC é uma conexão HTTP/2. Crie o transporte, aponte-o para o host do BigQuery Storage, então crie o cliente gRPC e atribua o transporte à sua propriedade Client. O tipo de conteúdo é o formato proto binário do protocolo e a compressão fica desligada.

uses
  sgcHTTP2_Client, sgcGRPC_Client, sgcGRPC_Types, sgcGRPC_Google_BigQuery;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'bigquerystorage.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

Criando uma sessão de leitura

Para abrir uma sessão você preenche um TsgcGRPCBigQueryCreateReadSessionRequest. O Parent é seu projeto de cobrança, a ReadSession.Table é o caminho totalmente qualificado da tabela, e DataFormat seleciona o formato de protocolo das linhas (1 = Avro, 2 = Arrow). O MaxStreamCount limita quantos streams de leitura paralelos o BigQuery retorna. Serialize a requisição com ToBytes e envie-a como um Call unário; a resposta é uma ReadSession que você carrega com LoadFromBytes para ler de volta os nomes dos streams.

var
  oRequest: TsgcGRPCBigQueryCreateReadSessionRequest;
  oResponse: TsgcGRPCResponse;
  oSession: TsgcGRPCBigQueryReadSession;
begin
  oRequest := TsgcGRPCBigQueryCreateReadSessionRequest.Create;
  try
    oRequest.Parent := 'projects/' + ProjectId;
    oRequest.ReadSession.Table := 'projects/' + ProjectId +
      '/datasets/' + Dataset + '/tables/' + Table;
    oRequest.ReadSession.DataFormat := 1; { AVRO }
    oRequest.MaxStreamCount := 1;

    oResponse := GRPC.Call(
      'google.cloud.bigquery.storage.v1.BigQueryRead',
      'CreateReadSession', oRequest.ToBytes);

    if oResponse.StatusCode = grpcOK then
    begin
      oSession := TsgcGRPCBigQueryReadSession.Create;
      try
        oSession.LoadFromBytes(oResponse.Data);
        // keep the first read stream name for ReadRows
        if oSession.StreamCount > 0 then
          FReadStreamName := oSession.Stream(0).Name;
      finally
        oSession.Free;
      end;
    end;
  finally
    oRequest.Free;
  end;
end;

Fazendo streaming das linhas

Com um nome de stream em mãos você chama ReadRows. Defina ReadStream com o nome do stream e um Offset inicial opcional, então envie-o com CallAsync para uma resposta server-streaming. Cada lote que o servidor empurra dispara OnGRPCStreamMessage, e OnGRPCStreamEnd dispara uma vez quando o stream termina.

var
  oRequest: TsgcGRPCBigQueryReadRowsRequest;
begin
  oRequest := TsgcGRPCBigQueryReadRowsRequest.Create;
  try
    oRequest.ReadStream := FReadStreamName;
    oRequest.Offset := 0;

    GRPC.CallAsync('google.cloud.bigquery.storage.v1.BigQueryRead',
      'ReadRows', oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
end;

procedure TForm1.OnGRPCStreamMessage(Sender: TObject;
  const aMessage: TsgcGRPCStreamMessage; var aCancel: Boolean);
var
  oResponse: TsgcGRPCBigQueryReadRowsResponse;
begin
  oResponse := TsgcGRPCBigQueryReadRowsResponse.Create;
  try
    oResponse.LoadFromBytes(aMessage.Data);
    if oResponse.AvroRows.RowCount > 0 then
      // oResponse.AvroRows.SerializedBinaryRows holds the Avro block
    else if oResponse.ArrowRecordBatch.RowCount > 0 then
      // oResponse.ArrowRecordBatch.SerializedRecordBatch holds the Arrow batch
  finally
    oResponse.Free;
  end;
end;

Formatos de linha Avro e Arrow

O BigQuery não retorna as linhas como campos analisados no protocolo. Ele retorna blocos serializados em um de dois formatos colunares, e você escolhe o formato ao criar a sessão. Com DataFormat := 1 a resposta carrega AvroRows, cujo campo SerializedBinaryRows contém um bloco codificado em Avro e RowCount diz quantas linhas ele contém. Com DataFormat := 2 a resposta carrega um ArrowRecordBatch, cujo campo SerializedRecordBatch contém um record batch Apache Arrow. Você decodifica o bloco com a sua biblioteca Avro ou Arrow preferida. O schema da sessão, retornado junto com os streams, descreve os nomes e tipos das colunas para que você saiba o que cada bloco contém.

Filtrando e projetando

Você não precisa ler linhas inteiras. A sessão de leitura carrega um objeto ReadOptions que permite empurrar uma projeção e um filtro para o BigQuery para que ele envie menos dados. Adicione nomes de coluna a SelectedFields para projetar apenas as colunas de que você precisa, e defina RowRestriction com um predicado no estilo SQL para filtrar linhas no servidor antes que sejam transmitidas.

oRequest.ReadSession.ReadOptions.SelectedFields.Add('name');
oRequest.ReadSession.ReadOptions.SelectedFields.Add('state');
oRequest.ReadSession.ReadOptions.RowRestriction := 'state = "CA"';

Disponibilidade

O cliente BigQuery gRPC faz parte da edição Enterprise do sgcWebSockets e roda em Windows, macOS, Linux, iOS e Android. Um exemplo pronto para executar, com o carregador de conta de serviço e o fluxo de criar sessão e ler linhas mostrado acima, está em Demos\21.GRPC\16.BigQuery. A referência completa, incluindo os outros clientes gRPC do Google Cloud, está na página do produto gRPC Client.

Dúvidas ou comentários? Entre em contato. Você receberá uma resposta das pessoas que escreveram o código.