Google BigQuery przez gRPC w Delphi

· Komponenty

Google BigQuery to bezserwerowa hurtownia danych firmy Google, zbudowana do uruchamiania zapytań analitycznych na bardzo dużych tabelach. Gdy chcesz pobrać dane tabeli z BigQuery do własnej aplikacji, najszybszą drogą jest BigQuery Storage Read API, usługa gRPC, która strumieniuje wiersze bezpośrednio z magazynu, zamiast stronicować je przez punkt końcowy REST. sgcWebSockets Enterprise dostarcza typowanego klienta gRPC BigQuery, który działa na TsgcGRPCClient, więc możesz odczytać tabelę BigQuery z Delphi i C++Builder bez zewnętrznego środowiska uruchomieniowego gRPC.

Jak to działa

Storage Read API to usługa gRPC o nazwie google.cloud.bigquery.storage.v1.BigQueryRead. gRPC to Protocol Buffers opakowane w HTTP/2, więc klient działa na stosie HTTP/2 sgcWebSockets. Wskazujesz TsgcHTTP2Client na bigquerystorage.googleapis.com na porcie 443 z TLS, przypisujesz go do TsgcGRPCClient, a klient gRPC obsługuje za Ciebie opakowywanie komunikatów, nagłówki, limity czasu i trailery.

Odczytywanie tabeli to wzorzec dwóch wywołań. Najpierw wywołujesz CreateReadSession, wywołanie unarne, które prosi BigQuery o otwarcie sesji na tabeli i zwrócenie jednego lub więcej strumieni odczytu. Następnie wywołujesz ReadRows na strumieniu, wywołanie strumieniowe po stronie serwera: wysyłasz jedno żądanie, a serwer wypycha z powrotem sekwencję pakietów wierszy, dopóki strumień się nie zakończy. Typowane komunikaty BigQuery w sgcGRPC_Google_BigQuery budują protobufy żądań i parsują odpowiedzi, więc pracujesz z klasami Delphi zamiast ręcznie składanych bajtów.

Uwierzytelnianie

BigQuery jest uwierzytelniany za pomocą konta usługi Google Cloud. Demo wczytuje plik JSON konta usługi, buduje samodzielnie podpisany JWT i wymienia go na token dostępu, a następnie wysyła ten token jako metadane gRPC przy każdym wywołaniu. Ponieważ samodzielnie podpisany JWT konta usługi jest powiązany z odbiorcą, token musi być skierowany na punkt końcowy BigQuery Storage, aby został zaakceptowany przez bigquerystorage.googleapis.com.

GoogleCloud.GoogleCloudOptions.Authentication := gcaJWT;
GoogleCloud.GoogleCloudOptions.JWT.ClientEmail  := ClientEmail;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKeyId := PrivateKeyId;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKey;
GoogleCloud.GoogleCloudOptions.JWT.ProjectId := ProjectId;
GoogleCloud.GoogleCloudOptions.JWT.API_Endpoint :=
  'https://bigquerystorage.googleapis.com/';

// once the token is acquired, attach it to every gRPC call
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);

Konfigurowanie klienta

Kanał gRPC to połączenie HTTP/2. Utwórz transport, wskaż go na host BigQuery Storage, a następnie utwórz klienta gRPC i przypisz transport do jego właściwości Client. Typem zawartości jest binarny format proto, a kompresja jest wyłączona.

uses
  sgcHTTP2_Client, sgcGRPC_Client, sgcGRPC_Types, sgcGRPC_Google_BigQuery;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'bigquerystorage.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

Tworzenie sesji odczytu

Aby otworzyć sesję, wypełniasz TsgcGRPCBigQueryCreateReadSessionRequest. Parent to Twój projekt rozliczeniowy, ReadSession.Table to w pełni kwalifikowana ścieżka tabeli, a DataFormat wybiera format wierszy na łączu (1 = Avro, 2 = Arrow). MaxStreamCount ogranicza, ile równoległych strumieni odczytu zwraca BigQuery. Zserializuj żądanie za pomocą ToBytes i wyślij je jako unarne Call; odpowiedź to ReadSession, którą wczytujesz za pomocą LoadFromBytes, aby odczytać nazwy strumieni.

var
  oRequest: TsgcGRPCBigQueryCreateReadSessionRequest;
  oResponse: TsgcGRPCResponse;
  oSession: TsgcGRPCBigQueryReadSession;
begin
  oRequest := TsgcGRPCBigQueryCreateReadSessionRequest.Create;
  try
    oRequest.Parent := 'projects/' + ProjectId;
    oRequest.ReadSession.Table := 'projects/' + ProjectId +
      '/datasets/' + Dataset + '/tables/' + Table;
    oRequest.ReadSession.DataFormat := 1; { AVRO }
    oRequest.MaxStreamCount := 1;

    oResponse := GRPC.Call(
      'google.cloud.bigquery.storage.v1.BigQueryRead',
      'CreateReadSession', oRequest.ToBytes);

    if oResponse.StatusCode = grpcOK then
    begin
      oSession := TsgcGRPCBigQueryReadSession.Create;
      try
        oSession.LoadFromBytes(oResponse.Data);
        // keep the first read stream name for ReadRows
        if oSession.StreamCount > 0 then
          FReadStreamName := oSession.Stream(0).Name;
      finally
        oSession.Free;
      end;
    end;
  finally
    oRequest.Free;
  end;
end;

Strumieniowanie wierszy

Mając nazwę strumienia, wywołujesz ReadRows. Ustaw ReadStream na nazwę strumienia i opcjonalny początkowy Offset, a następnie wyślij to za pomocą CallAsync dla odpowiedzi strumieniowej po stronie serwera. Każdy pakiet wypchnięty przez serwer wyzwala OnGRPCStreamMessage, a OnGRPCStreamEnd uruchamia się raz, gdy strumień się zakończy.

var
  oRequest: TsgcGRPCBigQueryReadRowsRequest;
begin
  oRequest := TsgcGRPCBigQueryReadRowsRequest.Create;
  try
    oRequest.ReadStream := FReadStreamName;
    oRequest.Offset := 0;

    GRPC.CallAsync('google.cloud.bigquery.storage.v1.BigQueryRead',
      'ReadRows', oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
end;

procedure TForm1.OnGRPCStreamMessage(Sender: TObject;
  const aMessage: TsgcGRPCStreamMessage; var aCancel: Boolean);
var
  oResponse: TsgcGRPCBigQueryReadRowsResponse;
begin
  oResponse := TsgcGRPCBigQueryReadRowsResponse.Create;
  try
    oResponse.LoadFromBytes(aMessage.Data);
    if oResponse.AvroRows.RowCount > 0 then
      // oResponse.AvroRows.SerializedBinaryRows holds the Avro block
    else if oResponse.ArrowRecordBatch.RowCount > 0 then
      // oResponse.ArrowRecordBatch.SerializedRecordBatch holds the Arrow batch
  finally
    oResponse.Free;
  end;
end;

Formaty wierszy Avro i Arrow

BigQuery nie zwraca wierszy jako sparsowanych pól na łączu. Zwraca zserializowane bloki w jednym z dwóch formatów kolumnowych, a format wybierasz przy tworzeniu sesji. Przy DataFormat := 1 odpowiedź zawiera AvroRows, którego pole SerializedBinaryRows przechowuje blok zakodowany w Avro, a RowCount mówi Ci, ile wierszy zawiera. Przy DataFormat := 2 odpowiedź zawiera ArrowRecordBatch, którego pole SerializedRecordBatch przechowuje pakiet rekordów Apache Arrow. Blok dekodujesz wybraną przez siebie biblioteką Avro lub Arrow. Schemat sesji, zwracany wraz ze strumieniami, opisuje nazwy i typy kolumn, abyś wiedział, co zawiera każdy blok.

Filtrowanie i projekcja

Nie musisz odczytywać całych wierszy. Sesja odczytu zawiera obiekt ReadOptions, który pozwala Ci przekazać projekcję i filtr do BigQuery, aby wysyłał mniej danych. Dodaj nazwy kolumn do SelectedFields, aby wyświetlić tylko potrzebne kolumny, i ustaw RowRestriction na predykat w stylu SQL, aby filtrować wiersze po stronie serwera, zanim zostaną przesłane strumieniowo.

oRequest.ReadSession.ReadOptions.SelectedFields.Add('name');
oRequest.ReadSession.ReadOptions.SelectedFields.Add('state');
oRequest.ReadSession.ReadOptions.RowRestriction := 'state = "CA"';

Dostępność

Klient gRPC BigQuery jest częścią edycji Enterprise sgcWebSockets i działa na Windows, macOS, Linux, iOS oraz Android. Gotowy do uruchomienia przykład, z loaderem konta usługi oraz pokazanym powyżej przepływem tworzenia sesji i odczytu wierszy, znajduje się w Demos\21.GRPC\16.BigQuery. Pełna dokumentacja, w tym pozostałe klienty gRPC Google Cloud, jest na stronie produktu gRPC Client.

Pytania lub uwagi? Skontaktuj się z nami. Otrzymasz odpowiedź od osób, które napisały ten kod.