从 sgcWebSockets 2026.6.0 起,为 Delphi 和 C++Builder 提供原生 Apache Kafka 客户端。新的 TsgcWSPClient_Kafka 组件通过原生 Kafka 二进制协议(原始 TCP)直接与 Kafka broker 通信,中间无需任何 REST 网关或外部库。
Apache Kafka 是一个分布式事件流平台,用于发布、存储和消费记录流。生产者将消息写入 topic,broker 将其持久化到分区式的仅追加日志中,消费者再将它们读回,并可选地作为协调一致的消费者组的一部分。sgcWebSockets 客户端通过单个组件即可实现生产、消费、topic 管理和 offset 管理。
功能特性
- 原生协议:使用 Kafka 线路协议通过 TCP 与 broker 通信。无需 Confluent REST 代理、无需 Java、无需第三方 DLL。
- 生产者:将记录发布到任意 topic 和 partition,带有键(key)和值(value),并可配置确认机制(无、leader、全部)和请求超时。
- 消费者:既可使用受管理的消费者组读取记录(broker 按组跟踪已提交的 offset),也可无组读取(直接读取 topic 的所有 partition)。
- 消费者组:自动协调器发现、加入/同步、partition 分配以及 offset 提交/获取。
- 偏移量控制:查询最早、最新和已提交的 offset,并选择新组从何处开始(earliest 或 latest)。
- 主题管理:直接从客户端创建和删除 topic、请求集群元数据并列出消费者组。
- 现代记录格式:读写当前 Kafka broker 使用的 v2 记录批次格式。
- 事件驱动:
OnKafkaConnect、OnKafkaMessage、OnKafkaProduce、OnKafkaError和OnKafkaDisconnect事件报告连接上发生的所有情况。
支持的 Kafka 版本
该组件使用在 Apache Kafka 0.11 中引入的 v2 记录批次格式,并已针对 Apache Kafka 3.x 进行测试。该范围内的任何 broker,或使用相同协议版本的 Kafka 兼容 broker 都可正常工作。默认的 broker 端口为 9092。
配置
Kafka 客户端是运行于 TsgcWebSocketClient 之上的协议组件。设置 Specifications.RFC6455 := False,使客户端以原始 TCP 套接字方式连接(Kafka 协议不是 WebSocket),将该客户端分配给 TsgcWSPClient_Kafka 组件,并设置 broker 的 Host 和 Port。
生产者和消费者的行为通过 KafkaOptions 属性进行配置:
- ClientId:向 broker 标识您的应用程序。
- Producer.Acks:
kafkaAcksNone、kafkaAcksLeader或kafkaAcksAll。 - Producer.TimeoutMs:broker 等待所需确认的时长。
- Consumer.GroupId:消费者组名称。留空则在无组的情况下消费。
- Consumer.OffsetReset:
kafkaOffsetEarliest或kafkaOffsetLatest,即全新的组从何处开始读取。
uses
sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;
var
oClient: TsgcWebSocketClient;
oKafka: TsgcWSPClient_Kafka;
begin
oClient := TsgcWebSocketClient.Create(nil);
oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
oClient.Host := '127.0.0.1';
oClient.Port := 9092;
oKafka := TsgcWSPClient_Kafka.Create(nil);
oKafka.Client := oClient;
oKafka.KafkaOptions.ClientId := 'my-delphi-app';
oKafka.OnKafkaConnect := OnKafkaConnect;
oKafka.OnKafkaMessage := OnKafkaMessage;
oKafka.OnKafkaProduce := OnKafkaProduce;
oClient.Active := True; // connect to the broker
end;
生产消息
连接成功后,调用 Produce 并传入 topic、值以及可选的键和 partition。每次发布的结果都会通过 OnKafkaProduce 事件报告,包括记录存储所在的 offset。
oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
if aErrorCode = 0 then
Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;
消费消息
设置 GroupId 和 OffsetReset,订阅一个或多个 topic,并调用 Poll 获取可用的记录。Poll 返回一个 TsgcKafkaMessages 列表,同时也会为每条记录触发 OnKafkaMessage 事件。反复调用 Poll(例如从定时器中)即可持续接收。
oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);
var
oMessages: TsgcKafkaMessages;
i: Integer;
begin
oMessages := oKafka.Poll(1000); // timeout in milliseconds
try
for i := 0 to oMessages.Count - 1 do
Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
oMessages[i].Offset, oMessages[i].GetValueString]));
finally
oMessages.Free;
end;
end;
相同的数据也会通过 OnKafkaMessage 事件传递,这在从后台线程或定时器中轮询时非常方便:
procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
Log(Format('received key="%s" value="%s"',
[aMessage.GetKeyString, aMessage.GetValueString]));
end;
如果您只想读取某个 topic 而不加入组,请将 GroupId 留空:此时 Subscribe 会分配该 topic 的每个 partition,而 Poll 则从 OffsetReset 所选位置返回记录。
主题与偏移量
客户端无需任何外部工具即可管理 topic 并检查 offset。
// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);
// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);
// commit the consumed offsets for the current group
oKafka.CommitSync;
下载
Kafka 客户端随 sgcWebSockets 2026.6.0 一同发布。完整演示包含在 Demos/02.WebSocket_Protocols/13.Kafka 中,展示了连接、生产、订阅、轮询、topic 管理和 offset 管理。下载 sgcWebSockets 即可试用。
有疑问、反馈或需要帮助上手?联系我们 — 您会收到编写这些代码的人的回复。
