Delphi'de gRPC Üzerinden Google BigQuery

· Bileşenler

Google BigQuery, çok büyük tablolar üzerinde analitik sorgular çalıştırmak için oluşturulmuş, Google'ın sunucusuz veri ambarıdır. Tablo verilerini BigQuery'den kendi uygulamanıza çekmek istediğinizde, en hızlı yol, satırları bir REST uç noktası üzerinden sayfalamak yerine doğrudan depolamadan akıtan bir gRPC hizmeti olan BigQuery Storage Read API'sidir. sgcWebSockets Enterprise, TsgcGRPCClient üzerine oturan türlü bir BigQuery gRPC istemcisi içerir, böylece bir BigQuery tablosunu harici bir gRPC çalışma zamanı olmadan Delphi ve C++Builder'dan okuyabilirsiniz.

Nasıl çalışır

Storage Read API, google.cloud.bigquery.storage.v1.BigQueryRead adlı bir gRPC hizmetidir. gRPC, HTTP/2 üzerinde çerçevelenmiş Protocol Buffers'tır, bu nedenle istemci sgcWebSockets HTTP/2 yığını üzerinde çalışır. TLS ile 443 numaralı bağlantı noktasında bir TsgcHTTP2Clientbigquerystorage.googleapis.com adresine yönlendirir, onu bir TsgcGRPCClient'a atarsınız ve gRPC istemcisi mesaj çerçevelemesini, üst bilgileri, zaman aşımlarını ve trailer'ları sizin için işler.

Bir tabloyu okumak iki çağrılı bir desendir. Önce, BigQuery'den bir tablo üzerinde bir oturum açmasını ve bir veya daha fazla okuma akışı döndürmesini isteyen tekli bir çağrı olan CreateReadSession'ı çağırırsınız. Ardından bir akış üzerinde, sunucu akışı çağrısı olan ReadRows'u çağırırsınız: bir istek gönderirsiniz ve sunucu, akış bitene kadar bir dizi satır grubunu geri gönderir. sgcGRPC_Google_BigQuery içindeki türlü BigQuery mesajları, istek protobuf'larını oluşturur ve yanıtları ayrıştırır, böylece elle bir araya getirilmiş baytlar yerine Delphi sınıflarıyla çalışırsınız.

Kimlik Doğrulama

BigQuery, bir Google Cloud hizmet hesabıyla kimlik doğrulaması yapar. Demo, hizmet hesabı JSON'ını yükler, kendinden imzalı bir JWT oluşturur ve onu bir erişim belirteciyle takas eder, ardından bu belirteci her çağrıda gRPC meta verisi olarak gönderir. Kendinden imzalı bir hizmet hesabı JWT'si hedef kitleye bağlı olduğundan, belirtecin bigquerystorage.googleapis.com tarafından kabul edilmesi için BigQuery Storage uç noktasını hedeflemesi gerekir.

GoogleCloud.GoogleCloudOptions.Authentication := gcaJWT;
GoogleCloud.GoogleCloudOptions.JWT.ClientEmail  := ClientEmail;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKeyId := PrivateKeyId;
GoogleCloud.GoogleCloudOptions.JWT.PrivateKey.Text := PrivateKey;
GoogleCloud.GoogleCloudOptions.JWT.ProjectId := ProjectId;
GoogleCloud.GoogleCloudOptions.JWT.API_Endpoint :=
  'https://bigquerystorage.googleapis.com/';

// once the token is acquired, attach it to every gRPC call
GRPC.DefaultMetadata.Add('authorization', 'Bearer ' + Token);

İstemcinin kurulması

Bir gRPC kanalı, bir HTTP/2 bağlantısıdır. Taşımayı oluşturun, onu BigQuery Storage ana bilgisayarına yönlendirin, ardından gRPC istemcisini oluşturun ve taşımayı onun Client özelliğine atayın. İçerik türü ikili proto hat formatıdır ve sıkıştırma kapalı bırakılır.

uses
  sgcHTTP2_Client, sgcGRPC_Client, sgcGRPC_Types, sgcGRPC_Google_BigQuery;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'bigquerystorage.googleapis.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
  GRPC.ChannelOptions.ContentType := grpcProto;
  GRPC.ChannelOptions.Compression := grpcNoCompression;
  HTTP2.Active := True;
end;

Bir okuma oturumu oluşturma

Bir oturum açmak için bir TsgcGRPCBigQueryCreateReadSessionRequest doldurursunuz. Parent, faturalandırma projenizdir, ReadSession.Table tam nitelikli tablo yoludur ve DataFormat, satırların hat formatını seçer (1 = Avro, 2 = Arrow). MaxStreamCount, BigQuery'nin kaç paralel okuma akışı döndüreceğini sınırlar. İsteği ToBytes ile serileştirin ve tekli bir Call olarak gönderin; yanıt, akış adlarını geri okumak için LoadFromBytes ile yüklediğiniz bir ReadSession'dır.

var
  oRequest: TsgcGRPCBigQueryCreateReadSessionRequest;
  oResponse: TsgcGRPCResponse;
  oSession: TsgcGRPCBigQueryReadSession;
begin
  oRequest := TsgcGRPCBigQueryCreateReadSessionRequest.Create;
  try
    oRequest.Parent := 'projects/' + ProjectId;
    oRequest.ReadSession.Table := 'projects/' + ProjectId +
      '/datasets/' + Dataset + '/tables/' + Table;
    oRequest.ReadSession.DataFormat := 1; { AVRO }
    oRequest.MaxStreamCount := 1;

    oResponse := GRPC.Call(
      'google.cloud.bigquery.storage.v1.BigQueryRead',
      'CreateReadSession', oRequest.ToBytes);

    if oResponse.StatusCode = grpcOK then
    begin
      oSession := TsgcGRPCBigQueryReadSession.Create;
      try
        oSession.LoadFromBytes(oResponse.Data);
        // keep the first read stream name for ReadRows
        if oSession.StreamCount > 0 then
          FReadStreamName := oSession.Stream(0).Name;
      finally
        oSession.Free;
      end;
    end;
  finally
    oRequest.Free;
  end;
end;

Satırları akıtma

Elinizde bir akış adı varken ReadRows'u çağırırsınız. ReadStream'i akış adına ve isteğe bağlı bir başlangıç Offset değerine ayarlayın, ardından bir sunucu akışı yanıtı için onu CallAsync ile gönderin. Sunucunun gönderdiği her grup OnGRPCStreamMessage olayını tetikler ve akış bittiğinde OnGRPCStreamEnd bir kez tetiklenir.

var
  oRequest: TsgcGRPCBigQueryReadRowsRequest;
begin
  oRequest := TsgcGRPCBigQueryReadRowsRequest.Create;
  try
    oRequest.ReadStream := FReadStreamName;
    oRequest.Offset := 0;

    GRPC.CallAsync('google.cloud.bigquery.storage.v1.BigQueryRead',
      'ReadRows', oRequest.ToBytes);
  finally
    oRequest.Free;
  end;
end;

procedure TForm1.OnGRPCStreamMessage(Sender: TObject;
  const aMessage: TsgcGRPCStreamMessage; var aCancel: Boolean);
var
  oResponse: TsgcGRPCBigQueryReadRowsResponse;
begin
  oResponse := TsgcGRPCBigQueryReadRowsResponse.Create;
  try
    oResponse.LoadFromBytes(aMessage.Data);
    if oResponse.AvroRows.RowCount > 0 then
      // oResponse.AvroRows.SerializedBinaryRows holds the Avro block
    else if oResponse.ArrowRecordBatch.RowCount > 0 then
      // oResponse.ArrowRecordBatch.SerializedRecordBatch holds the Arrow batch
  finally
    oResponse.Free;
  end;
end;

Avro ve Arrow satır formatları

BigQuery, satırları hat üzerinde ayrıştırılmış alanlar olarak döndürmez. İki sütunlu formattan birinde serileştirilmiş bloklar döndürür ve formatı oturumu oluştururken seçersiniz. DataFormat := 1 ile yanıt, SerializedBinaryRows alanı Avro ile kodlanmış bir blok tutan ve RowCount kaç satır içerdiğini söyleyen AvroRows'u taşır. DataFormat := 2 ile yanıt, SerializedRecordBatch alanı bir Apache Arrow kayıt grubu tutan bir ArrowRecordBatch taşır. Bloğu, seçtiğiniz Avro veya Arrow kütüphanesiyle çözersiniz. Akışların yanı sıra döndürülen oturum şeması, sütun adlarını ve türlerini açıklar, böylece her bloğun ne içerdiğini bilirsiniz.

Filtreleme ve projeksiyon

Tüm satırları okumak zorunda değilsiniz. Okuma oturumu, daha az veri göndermesi için BigQuery'ye bir projeksiyon ve bir filtre göndermenize olanak tanıyan bir ReadOptions nesnesi taşır. Yalnızca ihtiyaç duyduğunuz sütunları projelemek için SelectedFields'e sütun adları ekleyin ve satırlar akıtılmadan önce sunucu tarafında filtrelemek için RowRestriction'ı SQL tarzı bir koşula ayarlayın.

oRequest.ReadSession.ReadOptions.SelectedFields.Add('name');
oRequest.ReadSession.ReadOptions.SelectedFields.Add('state');
oRequest.ReadSession.ReadOptions.RowRestriction := 'state = "CA"';

Kullanılabilirlik

BigQuery gRPC istemcisi, sgcWebSockets Enterprise sürümünün bir parçasıdır ve Windows, macOS, Linux, iOS ve Android üzerinde çalışır. Yukarıda gösterilen hizmet hesabı yükleyicisi, oturum oluşturma ve satır okuma akışını içeren, çalıştırılmaya hazır bir örnek Demos\21.GRPC\16.BigQuery içinde yer alır. Diğer Google Cloud gRPC istemcileri dahil olmak üzere tam referans, gRPC Client ürün sayfasında bulunur.

Sorularınız veya geri bildirimleriniz mi var? Bize ulaşın. Kodu yazan kişilerden bir yanıt alacaksınız.