Google BigQuery über gRPC in Delphi

· Komponenten

Google BigQuery ist das serverlose Data-Warehouse von Google, gebaut, um analytische Abfragen über sehr große Tabellen auszuführen. Wenn Sie Tabellendaten aus BigQuery in Ihre eigene Anwendung ziehen möchten, ist der schnellste Weg die BigQuery Storage Read API, ein gRPC-Dienst, der Zeilen direkt aus dem Speicher streamt, statt sie über einen REST-Endpunkt zu paginieren. sgcWebSockets Enterprise liefert einen typisierten BigQuery-gRPC-Client, der auf TsgcGRPCClient aufsetzt, sodass Sie eine BigQuery-Tabelle aus Delphi und C++Builder lesen können, ohne externe gRPC-Laufzeitumgebung.

Wie es funktioniert

Die Storage Read API ist ein gRPC-Dienst namens google.cloud.bigquery.storage.v1.BigQueryRead. gRPC sind Protocol Buffers, die über HTTP/2 gerahmt werden, daher läuft der Client auf dem HTTP/2-Stack von sgcWebSockets. Sie richten einen TsgcHTTP2Client auf bigquerystorage.googleapis.com an Port 443 mit TLS, weisen ihn einem TsgcGRPCClient zu, und der gRPC-Client übernimmt das Nachrichten-Framing, die Header, die Timeouts und die Trailer für Sie.

Das Lesen einer Tabelle ist ein Muster aus zwei Aufrufen. Zuerst rufen Sie CreateReadSession auf, einen unären Aufruf, der BigQuery bittet, eine Session über eine Tabelle zu öffnen und einen oder mehrere Read-Streams zurückzugeben. Dann rufen Sie ReadRows auf einem Stream auf, einen Server-Streaming-Aufruf: Sie senden einen Request, und der Server schiebt eine Folge von Zeilenstapeln zurück, bis der Stream endet. Die typisierten BigQuery-Nachrichten in sgcGRPC_Google_BigQuery bauen die Request-Protobufs und parsen die Responses, sodass Sie mit Delphi-Klassen statt mit von Hand zusammengestellten Bytes arbeiten.

Authentifizierung

BigQuery wird mit einem Google-Cloud-Dienstkonto authentifiziert. Das Demo lädt das Dienstkonto-JSON, baut ein selbstsigniertes JWT und tauscht es gegen ein Zugriffstoken, sendet dieses Token dann bei jedem Aufruf als gRPC-Metadaten. Da ein selbstsigniertes Dienstkonto-JWT audience-gebunden ist, muss das Token auf den BigQuery-Storage-Endpunkt ausgerichtet sein, damit es von bigquerystorage.googleapis.com akzeptiert wird.

GoogleCloud.GoogleCloudOptions.Authentication := gcaJWT;
GoogleCloud.GoogleCloudOptions.JWT.ClientEmail  := ClientEmail;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKeyId := PrivateKeyId;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKey;
GoogleCloud.GoogleCloudOptions.JWT.ProjectId := ProjectId;
GoogleCloud.GoogleCloudOptions.JWT.API_Endpoint :=
  'https://bigquerystorage.googleapis.com/';

// once the token is acquired, attach it to every gRPC call
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);

Den Client einrichten

Ein gRPC-Channel ist eine HTTP/2-Verbindung. Erstellen Sie den Transport, richten Sie ihn auf den BigQuery-Storage-Host, erstellen Sie dann den gRPC-Client und weisen Sie den Transport seiner Client-Eigenschaft zu. Der Content-Type ist das binäre Proto-Wire-Format, und die Komprimierung bleibt aus.

uses
  sgcHTTP2_Client, sgcGRPC_Client, sgcGRPC_Types, sgcGRPC_Google_BigQuery;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'bigquerystorage.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

Eine Read-Session erstellen

Um eine Session zu öffnen, füllen Sie einen TsgcGRPCBigQueryCreateReadSessionRequest. Der Parent ist Ihr Abrechnungsprojekt, die ReadSession.Table ist der voll qualifizierte Tabellenpfad, und DataFormat wählt das Wire-Format der Zeilen (1 = Avro, 2 = Arrow). MaxStreamCount begrenzt, wie viele parallele Read-Streams BigQuery zurückgibt. Serialisieren Sie den Request mit ToBytes und senden Sie ihn als unären Call; die Response ist eine ReadSession, die Sie mit LoadFromBytes laden, um die Stream-Namen zurückzulesen.

var
  oRequest: TsgcGRPCBigQueryCreateReadSessionRequest;
  oResponse: TsgcGRPCResponse;
  oSession: TsgcGRPCBigQueryReadSession;
begin
  oRequest := TsgcGRPCBigQueryCreateReadSessionRequest.Create;
  try
    oRequest.Parent := 'projects/' + ProjectId;
    oRequest.ReadSession.Table := 'projects/' + ProjectId +
      '/datasets/' + Dataset + '/tables/' + Table;
    oRequest.ReadSession.DataFormat := 1; { AVRO }
    oRequest.MaxStreamCount := 1;

    oResponse := GRPC.Call(
      'google.cloud.bigquery.storage.v1.BigQueryRead',
      'CreateReadSession', oRequest.ToBytes);

    if oResponse.StatusCode = grpcOK then
    begin
      oSession := TsgcGRPCBigQueryReadSession.Create;
      try
        oSession.LoadFromBytes(oResponse.Data);
        // keep the first read stream name for ReadRows
        if oSession.StreamCount > 0 then
          FReadStreamName := oSession.Stream(0).Name;
      finally
        oSession.Free;
      end;
    end;
  finally
    oRequest.Free;
  end;
end;

Die Zeilen streamen

Mit einem Stream-Namen in der Hand rufen Sie ReadRows auf. Setzen Sie ReadStream auf den Stream-Namen und einen optionalen Start-Offset, senden Sie ihn dann mit CallAsync für eine Server-Streaming-Response. Jeder Stapel, den der Server schiebt, löst OnGRPCStreamMessage aus, und OnGRPCStreamEnd wird einmal ausgelöst, wenn der Stream fertig ist.

var
  oRequest: TsgcGRPCBigQueryReadRowsRequest;
begin
  oRequest := TsgcGRPCBigQueryReadRowsRequest.Create;
  try
    oRequest.ReadStream := FReadStreamName;
    oRequest.Offset := 0;

    GRPC.CallAsync('google.cloud.bigquery.storage.v1.BigQueryRead',
      'ReadRows', oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
end;

procedure TForm1.OnGRPCStreamMessage(Sender: TObject;
  const aMessage: TsgcGRPCStreamMessage; var aCancel: Boolean);
var
  oResponse: TsgcGRPCBigQueryReadRowsResponse;
begin
  oResponse := TsgcGRPCBigQueryReadRowsResponse.Create;
  try
    oResponse.LoadFromBytes(aMessage.Data);
    if oResponse.AvroRows.RowCount > 0 then
      // oResponse.AvroRows.SerializedBinaryRows holds the Avro block
    else if oResponse.ArrowRecordBatch.RowCount > 0 then
      // oResponse.ArrowRecordBatch.SerializedRecordBatch holds the Arrow batch
  finally
    oResponse.Free;
  end;
end;

Avro- und Arrow-Zeilenformate

BigQuery gibt Zeilen nicht als geparste Felder über die Leitung zurück. Es gibt serialisierte Blöcke in einem von zwei spaltenorientierten Formaten zurück, und Sie wählen das Format, wenn Sie die Session erstellen. Mit DataFormat := 1 trägt die Response AvroRows, deren SerializedBinaryRows-Feld einen Avro-kodierten Block enthält und deren RowCount Ihnen sagt, wie viele Zeilen er enthält. Mit DataFormat := 2 trägt die Response einen ArrowRecordBatch, dessen SerializedRecordBatch-Feld einen Apache-Arrow-Record-Batch enthält. Sie dekodieren den Block mit der Avro- oder Arrow-Bibliothek Ihrer Wahl. Das Session-Schema, das zusammen mit den Streams zurückgegeben wird, beschreibt die Spaltennamen und -typen, sodass Sie wissen, was jeder Block enthält.

Filtern und projizieren

Sie müssen nicht ganze Zeilen lesen. Die Read-Session trägt ein ReadOptions-Objekt, mit dem Sie eine Projektion und einen Filter zu BigQuery hinunterschieben können, sodass es weniger Daten sendet. Fügen Sie Spaltennamen zu SelectedFields hinzu, um nur die Spalten zu projizieren, die Sie benötigen, und setzen Sie RowRestriction auf ein SQL-artiges Prädikat, um Zeilen serverseitig zu filtern, bevor sie gestreamt werden.

oRequest.ReadSession.ReadOptions.SelectedFields.Add('name');
oRequest.ReadSession.ReadOptions.SelectedFields.Add('state');
oRequest.ReadSession.ReadOptions.RowRestriction := 'state = "CA"';

Verfügbarkeit

Der BigQuery-gRPC-Client ist Teil der sgcWebSockets Enterprise Edition und läuft unter Windows, macOS, Linux, iOS und Android. Ein sofort lauffähiges Beispiel, mit dem Dienstkonto-Loader, dem Create-Session- und dem Read-Rows-Ablauf wie oben gezeigt, befindet sich in Demos\21.GRPC\16.BigQuery. Die vollständige Referenz, einschließlich der anderen Google-Cloud-gRPC-Clients, finden Sie auf der gRPC-Client-Produktseite.

Fragen oder Feedback? Nehmen Sie Kontakt auf. Sie erhalten eine Antwort von den Leuten, die den Code geschrieben haben.