如何在 Delphi 中使用 gRPC 客户端

· 组件

TsgcGRPCClient 组件将 gRPC 带到了 Delphi 和 C++Builder,无需任何外部运行时。它是 sgcWebSockets Enterprise 版的一部分,可运行于 Windows、macOS、Linux、iOS 和 Android。本指南讲解该客户端如何工作、如何配置,以及如何在 Delphi 中发起四种 gRPC 调用类型中的每一种。

工作原理

gRPC 是封装在 HTTP/2 之上的 Protocol Buffers 消息。每条消息都作为带长度前缀的载荷在一个 HTTP/2 流上发送,请求会携带诸如 content-type: application/grpcgrpc-timeout 这样的标头,最终状态则通过 grpc-statusgrpc-message 尾部字段(trailers)传回。

sgcWebSockets 本身已经提供了完整的 HTTP/2 协议栈,因此 TsgcGRPCClient 构建在 TsgcHTTP2Client 传输之上。你把序列化后的请求字节交给它,它负责完成分帧、标头和超时处理,打开 HTTP/2 流,并将响应和尾部字段解析回一个带类型的 TsgcGRPCResponse。由于该客户端处理的是原始 TBytes,你可以将它与任何你已在使用的 Protocol Buffers 库搭配。

设置客户端

一个 gRPC 通道就是一条 HTTP/2 连接。创建一个 TsgcHTTP2Client,将它指向主机和端口,然后把它赋给 gRPC 组件的 Client 属性。对于 TLS,HTTP/2 客户端使用 OpenSSL 或 Windows SChannel;对于本地明文服务器(h2c),将 TLS 设为 False

uses
  sgcHTTP2, sgcGRPC_Client, sgcGRPC_Classes, sgcGRPC_Types;

var
  HTTP2: TsgcHTTP2Client;
  GRPC: TsgcGRPCClient;
begin
  HTTP2 := TsgcHTTP2Client.Create(nil);
  HTTP2.Host := 'grpc.example.com';
  HTTP2.Port := 443;
  HTTP2.TLS  := True;                 // False for a local h2c server
  HTTP2.TLSOptions.IOHandler := iohOpenSSL;  // or iohSChannel on Windows

  GRPC := TsgcGRPCClient.Create(nil);
  GRPC.Client := HTTP2;
end;

配置通道

通道级别的设置位于 ChannelOptions 中。你可以启用逐消息的 gzip 压缩、选择传输内容类型,并提高消息和 metadata 的大小上限。

// gzip-compress outgoing messages
GRPC.ChannelOptions.Compression := grpcGzip;          // grpcNoCompression, grpcGzip, grpcDeflate, grpcSnappy

// application/grpc+proto (default) or application/grpc+json
GRPC.ChannelOptions.ContentType := grpcProto;         // or grpcJSON

// raise the limits for large messages
GRPC.ChannelOptions.MaxMessageSize  := 16 * 1024 * 1024;
GRPC.ChannelOptions.MaxMetadataSize := 64 * 1024;

metadata 与身份验证

自定义标头以 gRPC metadata 的形式传递。把它们添加到 DefaultMetadata 即可在每次调用时发送,这通常用于放置授权令牌或追踪标头。

GRPC.DefaultMetadata.Add('authorization', 'Bearer eyJ...');
GRPC.DefaultMetadata.Add('x-api-key', 'my-api-key');

截止时间与取消

逐调用的超时会作为标准的 grpc-timeout 标头发送,因此服务器可以在你的客户端放弃的同一时刻放弃。进行中的流式调用可以随时通过 CancelCall 停止,只需传入打开流时返回的流 id。

一元调用

一元调用发送一个请求并获得一个响应。Call 会一直阻塞,直到回复到达,并返回一个 TsgcGRPCResponse,其中包含 StatusCodeStatusMessage、原始 Data 字节(或 DataString)以及 Trailers

var
  oResponse: TsgcGRPCResponse;
begin
  oResponse := GRPC.Call('helloworld.Greeter', 'SayHello', RequestBytes);
  if oResponse.StatusCode = grpcOK then
    Memo1.Text := oResponse.DataString
  else
    ShowMessage('gRPC error ' + oResponse.StatusMessage);
end;

为了保持界面的响应性,请使用 CallAsync。它会立即返回,并通过 OnGRPCResponse 事件传递回复。

GRPC.OnGRPCResponse := GRPCResponse;
GRPC.CallAsync('helloworld.Greeter', 'SayHello', RequestBytes);

procedure TForm1.GRPCResponse(Sender: TObject; const Response: TsgcGRPCResponse);
begin
  if Response.StatusCode = grpcOK then
    Memo1.Text := Response.DataString;
end;

服务端流式

使用服务端流式时,你发送一个请求,服务器返回一个消息流。每条消息都会触发 OnGRPCStreamMessage,而 OnGRPCStreamEnd 会在最终状态到达时触发一次。

GRPC.OnGRPCStreamMessage := GRPCStreamMessage;
GRPC.OnGRPCStreamEnd := GRPCStreamEnd;

GRPC.ServerStreamingCall('chat.Feed', 'Subscribe', RequestBytes);

procedure TForm1.GRPCStreamMessage(Sender: TObject; aStreamId: Integer;
  const aData: TBytes);
begin
  Memo1.Lines.Add(DecodeMessage(aData));
end;

客户端流式

客户端流式正好相反:你推送一个消息流,服务器只回复一次。打开流,发送每条消息,然后关闭流以读取那一条回复。

var
  vStreamId: Integer;
  oResponse: TsgcGRPCResponse;
begin
  vStreamId := GRPC.OpenClientStream('upload.Service', 'Send');
  GRPC.SendStreamMessage(vStreamId, ChunkBytes1);
  GRPC.SendStreamMessage(vStreamId, ChunkBytes2);
  oResponse := GRPC.CloseClientStream(vStreamId);
  if oResponse.StatusCode = grpcOK then
    ShowMessage('Upload accepted');
end;

双向流式

双向流式在单个 HTTP/2 流上运行全双工交换:双方都可随时发送。打开流,按需发送消息,在 OnGRPCStreamMessage 中处理收到的消息,并在结束时关闭。

var
  vStreamId: Integer;
begin
  vStreamId := GRPC.OpenBidiStream('chat.Room', 'Connect');
  GRPC.SendBidiMessage(vStreamId, EncodeMessage('hello'));
  // ... incoming messages arrive on OnGRPCStreamMessage ...
  GRPC.CloseBidiStream(vStreamId);
end;

状态码与错误处理

每个响应都携带一个带类型的 StatusCode,从 grpcOK 一直到标准的 gRPC 集合(grpcNOT_FOUNDgrpcUNAVAILABLEgrpcDEADLINE_EXCEEDEDgrpcUNAUTHENTICATED 等)。对于异步和流式调用,OnGRPCError 报告非 OK 的 gRPC 状态,而 OnGRPCException 报告传输或连接失败。

GRPC.OnGRPCError := GRPCError;

procedure TForm1.GRPCError(Sender: TObject; aStreamId: Integer;
  aStatusCode: TsgcGRPCStatusCode; const aStatusMessage: string);
begin
  Memo1.Lines.Add(Format('gRPC error %d: %s', [Ord(aStatusCode), aStatusMessage]));
end;

自动重试

客户端可以为你重试失败的调用。启用 RetryPolicy,设置尝试次数和退避时间,并列出应触发重试的状态码。这是针对临时性 grpcUNAVAILABLE 失败的典型配置。

GRPC.RetryPolicy.Enabled := True;
GRPC.RetryPolicy.MaxAttempts := 4;
GRPC.RetryPolicy.InitialBackoff := 200;     // ms
GRPC.RetryPolicy.BackoffMultiplier := 2.0;
GRPC.RetryPolicy.RetryableStatusCodes :=
  GRPC.RetryPolicy.RetryableStatusCodes + [grpcUNAVAILABLE, grpcRESOURCE_EXHAUSTED];

对于诸如日志记录或刷新令牌之类的横切逻辑,可以向 Interceptors 链中添加一个条目,它会在同一处包裹每一次调用。

下一步去哪里

在这个通用客户端之上,sgcWebSockets 还为 Google Cloud 的 gRPC 服务(Pub/Sub、Speech-to-Text、Translation、Vision、Natural Language、Cloud Storage、BigQuery 和 Vertex AI)提供了带类型的客户端,因此你可以调用高层方法,而不必手工组装 protobuf。一个可直接运行的示例位于 Demos\21.GRPC\01.GRPC_Client,完整参考资料则在 gRPC 客户端产品页面上。

有疑问或需要帮助上手?联系我们。你会收到编写这些代码的人的回复。