适用于 Delphi 的 Apache Kafka 客户端

· 组件

sgcWebSockets 2026.6.0 起,为 Delphi 和 C++Builder 提供原生 Apache Kafka 客户端。新的 TsgcWSPClient_Kafka 组件通过原生 Kafka 二进制协议(原始 TCP)直接与 Kafka broker 通信,中间无需任何 REST 网关或外部库。

Apache Kafka 是一个分布式事件流平台,用于发布、存储和消费记录流。生产者将消息写入 topic,broker 将其持久化到分区式的仅追加日志中,消费者再将它们读回,并可选地作为协调一致的消费者组的一部分。sgcWebSockets 客户端通过单个组件即可实现生产、消费、topic 管理和 offset 管理。

功能特性

支持的 Kafka 版本

该组件使用在 Apache Kafka 0.11 中引入的 v2 记录批次格式,并已针对 Apache Kafka 3.x 进行测试。该范围内的任何 broker,或使用相同协议版本的 Kafka 兼容 broker 都可正常工作。默认的 broker 端口为 9092

配置

Kafka 客户端是运行于 TsgcWebSocketClient 之上的协议组件。设置 Specifications.RFC6455 := False,使客户端以原始 TCP 套接字方式连接(Kafka 协议不是 WebSocket),将该客户端分配给 TsgcWSPClient_Kafka 组件,并设置 broker 的 HostPort

生产者和消费者的行为通过 KafkaOptions 属性进行配置:

uses
  sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;

var
  oClient: TsgcWebSocketClient;
  oKafka: TsgcWSPClient_Kafka;
begin
  oClient := TsgcWebSocketClient.Create(nil);
  oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
  oClient.Host := '127.0.0.1';
  oClient.Port := 9092;

  oKafka := TsgcWSPClient_Kafka.Create(nil);
  oKafka.Client := oClient;
  oKafka.KafkaOptions.ClientId := 'my-delphi-app';
  oKafka.OnKafkaConnect := OnKafkaConnect;
  oKafka.OnKafkaMessage := OnKafkaMessage;
  oKafka.OnKafkaProduce := OnKafkaProduce;

  oClient.Active := True; // connect to the broker
end;

生产消息

连接成功后,调用 Produce 并传入 topic、值以及可选的键和 partition。每次发布的结果都会通过 OnKafkaProduce 事件报告,包括记录存储所在的 offset。

oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
  aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
  if aErrorCode = 0 then
    Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;

消费消息

设置 GroupIdOffsetReset,订阅一个或多个 topic,并调用 Poll 获取可用的记录。Poll 返回一个 TsgcKafkaMessages 列表,同时也会为每条记录触发 OnKafkaMessage 事件。反复调用 Poll(例如从定时器中)即可持续接收。

oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);

var
  oMessages: TsgcKafkaMessages;
  i: Integer;
begin
  oMessages := oKafka.Poll(1000); // timeout in milliseconds
  try
    for i := 0 to oMessages.Count - 1 do
      Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
        oMessages[i].Offset, oMessages[i].GetValueString]));
  finally
    oMessages.Free;
  end;
end;

相同的数据也会通过 OnKafkaMessage 事件传递,这在从后台线程或定时器中轮询时非常方便:

procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
  Log(Format('received key="%s" value="%s"',
    [aMessage.GetKeyString, aMessage.GetValueString]));
end;

如果您只想读取某个 topic 而不加入组,请将 GroupId 留空:此时 Subscribe 会分配该 topic 的每个 partition,而 Poll 则从 OffsetReset 所选位置返回记录。

主题与偏移量

客户端无需任何外部工具即可管理 topic 并检查 offset。

// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);

// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);

// commit the consumed offsets for the current group
oKafka.CommitSync;

下载

Kafka 客户端随 sgcWebSockets 2026.6.0 一同发布。完整演示包含在 Demos/02.WebSocket_Protocols/13.Kafka 中,展示了连接、生产、订阅、轮询、topic 管理和 offset 管理。下载 sgcWebSockets 即可试用。

有疑问、反馈或需要帮助上手?联系我们 — 您会收到编写这些代码的人的回复。