Delphi에서 gRPC로 Google BigQuery 사용하기

· 컴포넌트

Google BigQuery는 매우 큰 테이블에 대해 분석 쿼리를 실행하도록 만들어진 Google의 서버리스 데이터 웨어하우스입니다. BigQuery에서 테이블 데이터를 직접 애플리케이션으로 가져오려면, 가장 빠른 경로는 BigQuery Storage Read API입니다. 이는 REST 엔드포인트를 통해 페이지로 넘기는 대신 스토리지에서 바로 행을 스트리밍하는 gRPC 서비스입니다. sgcWebSockets Enterprise는 TsgcGRPCClient 위에 위치하는 형식화된 BigQuery gRPC 클라이언트를 함께 제공하므로, 외부 gRPC 런타임 없이 Delphi와 C++Builder에서 BigQuery 테이블을 읽을 수 있습니다.

작동 방식

Storage Read API는 google.cloud.bigquery.storage.v1.BigQueryRead라는 gRPC 서비스입니다. gRPC는 HTTP/2 위에 프레이밍된 Protocol Buffers이므로, 클라이언트는 sgcWebSockets HTTP/2 스택 위에서 실행됩니다. TsgcHTTP2Client를 TLS로 443 포트의 bigquerystorage.googleapis.com을 가리키게 하고 TsgcGRPCClient에 할당하면, gRPC 클라이언트가 메시지 프레이밍, 헤더, 타임아웃, 트레일러를 대신 처리합니다.

테이블을 읽는 것은 두 번의 호출 패턴입니다. 먼저 CreateReadSession을 호출하는데, 이는 BigQuery에 테이블에 대한 세션을 열고 하나 이상의 읽기 스트림을 돌려달라고 요청하는 단항 호출입니다. 그런 다음 스트림에서 ReadRows를 호출하는데, 이는 서버 스트리밍 호출입니다. 하나의 요청을 보내면 서버가 스트림이 끝날 때까지 일련의 행 배치를 다시 밀어 보냅니다. sgcGRPC_Google_BigQuery의 형식화된 BigQuery 메시지가 요청 protobuf를 만들고 응답을 파싱하므로, 직접 조립한 바이트 대신 Delphi 클래스로 작업합니다.

인증

BigQuery는 Google Cloud 서비스 계정으로 인증합니다. 데모는 서비스 계정 JSON을 로드하고, 자체 서명된 JWT를 만들어 액세스 토큰으로 교환한 다음, 그 토큰을 모든 호출에 gRPC 메타데이터로 전송합니다. 자체 서명된 서비스 계정 JWT는 audience에 바인딩되므로, bigquerystorage.googleapis.com이 수락하도록 토큰이 BigQuery Storage 엔드포인트를 대상으로 해야 합니다.

GoogleCloud.GoogleCloudOptions.Authentication := gcaJWT;
GoogleCloud.GoogleCloudOptions.JWT.ClientEmail  := ClientEmail;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKeyId := PrivateKeyId;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKey;
GoogleCloud.GoogleCloudOptions.JWT.ProjectId := ProjectId;
GoogleCloud.GoogleCloudOptions.JWT.API_Endpoint :=
  'https://bigquerystorage.googleapis.com/';

// once the token is acquired, attach it to every gRPC call
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);

클라이언트 설정하기

gRPC 채널은 HTTP/2 연결입니다. 전송을 만들고, BigQuery Storage 호스트를 가리키게 한 다음, gRPC 클라이언트를 만들어 전송을 그 Client 속성에 할당합니다. 콘텐츠 타입은 바이너리 proto 와이어 포맷이고 압축은 끕니다.

uses
  sgcHTTP2_Client, sgcGRPC_Client, sgcGRPC_Types, sgcGRPC_Google_BigQuery;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'bigquerystorage.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

읽기 세션 생성하기

세션을 열려면 TsgcGRPCBigQueryCreateReadSessionRequest를 채웁니다. Parent는 청구 프로젝트, ReadSession.Table은 정규화된 테이블 경로이며, DataFormat은 행의 와이어 포맷(1 = Avro, 2 = Arrow)을 선택합니다. MaxStreamCount는 BigQuery가 반환하는 병렬 읽기 스트림 수를 제한합니다. 요청을 ToBytes로 직렬화하여 단항 Call로 보냅니다. 응답은 ReadSession이며, LoadFromBytes로 로드하여 스트림 이름을 다시 읽습니다.

var
  oRequest: TsgcGRPCBigQueryCreateReadSessionRequest;
  oResponse: TsgcGRPCResponse;
  oSession: TsgcGRPCBigQueryReadSession;
begin
  oRequest := TsgcGRPCBigQueryCreateReadSessionRequest.Create;
  try
    oRequest.Parent := 'projects/' + ProjectId;
    oRequest.ReadSession.Table := 'projects/' + ProjectId +
      '/datasets/' + Dataset + '/tables/' + Table;
    oRequest.ReadSession.DataFormat := 1; { AVRO }
    oRequest.MaxStreamCount := 1;

    oResponse := GRPC.Call(
      'google.cloud.bigquery.storage.v1.BigQueryRead',
      'CreateReadSession', oRequest.ToBytes);

    if oResponse.StatusCode = grpcOK then
    begin
      oSession := TsgcGRPCBigQueryReadSession.Create;
      try
        oSession.LoadFromBytes(oResponse.Data);
        // keep the first read stream name for ReadRows
        if oSession.StreamCount > 0 then
          FReadStreamName := oSession.Stream(0).Name;
      finally
        oSession.Free;
      end;
    end;
  finally
    oRequest.Free;
  end;
end;

행 스트리밍하기

스트림 이름을 손에 쥐면 ReadRows를 호출합니다. ReadStream을 스트림 이름으로 설정하고 선택적 시작 Offset을 설정한 다음, 서버 스트리밍 응답을 위해 CallAsync로 보냅니다. 서버가 미는 각 배치는 OnGRPCStreamMessage를 발생시키고, 스트림이 끝나면 OnGRPCStreamEnd가 한 번 발생합니다.

var
  oRequest: TsgcGRPCBigQueryReadRowsRequest;
begin
  oRequest := TsgcGRPCBigQueryReadRowsRequest.Create;
  try
    oRequest.ReadStream := FReadStreamName;
    oRequest.Offset := 0;

    GRPC.CallAsync('google.cloud.bigquery.storage.v1.BigQueryRead',
      'ReadRows', oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
end;

procedure TForm1.OnGRPCStreamMessage(Sender: TObject;
  const aMessage: TsgcGRPCStreamMessage; var aCancel: Boolean);
var
  oResponse: TsgcGRPCBigQueryReadRowsResponse;
begin
  oResponse := TsgcGRPCBigQueryReadRowsResponse.Create;
  try
    oResponse.LoadFromBytes(aMessage.Data);
    if oResponse.AvroRows.RowCount > 0 then
      // oResponse.AvroRows.SerializedBinaryRows holds the Avro block
    else if oResponse.ArrowRecordBatch.RowCount > 0 then
      // oResponse.ArrowRecordBatch.SerializedRecordBatch holds the Arrow batch
  finally
    oResponse.Free;
  end;
end;

Avro 및 Arrow 행 포맷

BigQuery는 와이어를 통해 행을 파싱된 필드로 반환하지 않습니다. 두 가지 컬럼형 포맷 중 하나로 직렬화된 블록을 반환하며, 세션을 생성할 때 포맷을 선택합니다. DataFormat := 1이면 응답은 AvroRows를 담고 있으며, 그 SerializedBinaryRows 필드는 Avro로 인코딩된 블록을 담고 RowCount는 몇 개의 행이 들어 있는지 알려줍니다. DataFormat := 2이면 응답은 ArrowRecordBatch를 담고 있으며, 그 SerializedRecordBatch 필드는 Apache Arrow 레코드 배치를 담고 있습니다. 블록은 선택한 Avro 또는 Arrow 라이브러리로 디코딩합니다. 스트림과 함께 반환되는 세션 스키마는 컬럼 이름과 타입을 설명하므로 각 블록에 무엇이 들어 있는지 알 수 있습니다.

필터링과 프로젝션

전체 행을 읽을 필요는 없습니다. 읽기 세션은 BigQuery가 더 적은 데이터를 보내도록 프로젝션과 필터를 BigQuery로 푸시할 수 있게 하는 ReadOptions 객체를 담고 있습니다. 필요한 컬럼만 프로젝션하려면 컬럼 이름을 SelectedFields에 추가하고, 스트리밍되기 전에 서버 측에서 행을 필터링하려면 RowRestriction을 SQL 스타일 술어로 설정합니다.

oRequest.ReadSession.ReadOptions.SelectedFields.Add('name');
oRequest.ReadSession.ReadOptions.SelectedFields.Add('state');
oRequest.ReadSession.ReadOptions.RowRestriction := 'state = "CA"';

제공 범위

BigQuery gRPC 클라이언트는 sgcWebSockets Enterprise 에디션의 일부이며 Windows, macOS, Linux, iOS, Android에서 실행됩니다. 서비스 계정 로더, 위에 보인 세션 생성 및 행 읽기 흐름을 갖춘 바로 실행 가능한 샘플은 Demos\21.GRPC\16.BigQuery에 있습니다. 다른 Google Cloud gRPC 클라이언트를 포함한 전체 레퍼런스는 gRPC Client 제품 페이지에 있습니다.

질문이나 의견이 있으신가요? 문의하기. 코드를 작성한 사람들로부터 답변을 받게 됩니다.