Apache Kafka 客户端

适用于 Delphi、C++Builder 和 .NET 的原生 Apache Kafka 客户端。通过普通 TCP 上的 Kafka 线级协议直接与代理通信,无需 REST 代理或外部库:生产和消费记录、协调消费者组、管理偏移量并管理主题。

Apache Kafka 子协议客户端

一流的 Kafka 实现,可在 Delphi / .NET 运行时支持的所有环境中运行 — 从桌面服务到移动设备,与任何符合标准的 Kafka 代理之间流式收发记录。

组件类

TsgcWSPClient_Kafka

协议

基于 TCP 的 Apache Kafka 线级协议

平台

Windows、macOS、Linux、iOS、Android

版本

Standard / Professional / Enterprise

生产、消费并管理 Kafka

通过原生 Kafka 协议流式处理记录所需的一切,全部以简单的 Delphi / .NET 方法和事件形式暴露。

生产

调用 Produce(topic, value, key, partition) 发布一条记录,可选附带键和分区,或使用 ProduceBytes 发送二进制负载。通过 KafkaOptions.Producer.AckskafkaAcksNonekafkaAcksLeaderkafkaAcksAll)选择投递保证;每个结果都会在 OnKafkaProduce 中返回,附带主题、分区和已存储的偏移量。

消费

Subscribe([topic]) 后调用 Poll(timeoutMs) 拉取可用记录,返回一个 TsgcKafkaMessages 列表,并为每条记录触发 OnKafkaMessage。使用 GetKeyStringGetValueStringTopicPartitionOffset 读取每一条。将 GroupId 留空即可不依赖消费者组直接读取所有分区。

消费者组协调

设置 KafkaOptions.Consumer.GroupId,客户端便会自动执行协调器发现、加入与同步、分区分配以及偏移量提交/获取。调用 CommitSync 提交上一次 Poll 的偏移量,使该组在下一次会话从这些位置之后继续。

偏移量管理

GetEarliestOffsetGetLatestOffsetGetCommittedOffset 按主题和分区返回最旧、待写入和最后已提交的偏移量。CommitOffset(topic, partition, offset) 设置一个明确位置,使消费精确地从您期望的位置继续。

主题管理

CreateTopic(name, partitions, replication)DeleteTopic(name) 管理代理上的主题,而 GetMetadata([topics]) 返回集群和分区布局(传入空数组可获取整个集群)。客户端还可以列出消费者组。

v2 记录批次格式

读写 Apache Kafka 0.11 引入的 v2 记录批次格式,可选启用 kafkaCompressionGzip 压缩。已针对默认代理端口 9092 上的 Apache Kafka 3.x 进行测试。

放下组件,设置几个属性,即可运行

将 TsgcWebSocketClient 与 TsgcWSPClient_Kafka 配对,将 Specifications.RFC6455 设为 false 以使用原始 TCP,配置 KafkaOptions,然后 Produce 和 Poll。

uses
  sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client;

var
  WSClient: TsgcWebSocketClient;
  Kafka: TsgcWSPClient_Kafka;
begin
  Kafka := TsgcWSPClient_Kafka.Create(nil);
  Kafka.KafkaOptions.ClientId := 'my-delphi-app';
  Kafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
  Kafka.KafkaOptions.Consumer.GroupId := 'my-group';
  Kafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
  Kafka.OnKafkaMessage := KafkaMessage;
  Kafka.OnKafkaProduce := KafkaProduce;

  WSClient := TsgcWebSocketClient.Create(nil);
  Kafka.Client := WSClient;
  WSClient.Specifications.RFC6455 := False;
  WSClient.Host := '127.0.0.1';
  WSClient.Port := 9092;
  WSClient.Active := True;

  // produce a record to a topic
  Kafka.Produce('my-topic', 'Hello Kafka', 'key-1');

  // consume: subscribe once, then Poll repeatedly (e.g. from a timer)
  Kafka.Subscribe(['my-topic']);
end;

procedure TForm1.KafkaMessage(Sender: TObject;
  const Message: TsgcKafkaMessage);
begin
  Memo1.Lines.Add(Message.GetKeyString + ' = ' + Message.GetValueString);
end;

procedure TForm1.KafkaProduce(Sender: TObject; const Topic: string;
  Partition: Integer; Offset: Int64; ErrorCode: Integer);
begin
  Memo1.Lines.Add(Format('Produced to %s [%d] at offset %d',
    [Topic, Partition, Offset]));
end;

// fetch records, commit when a batch is processed
var
  Messages: TsgcKafkaMessages;
begin
  Messages := Kafka.Poll(1000);
  try
    if Messages.Count > 0 then
      Kafka.CommitSync;
  finally
    Messages.Free;
  end;
end;
// uses: sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client
TsgcWSPClient_Kafka *Kafka = new TsgcWSPClient_Kafka(this);
Kafka->KafkaOptions->ClientId = "my-cbuilder-app";
Kafka->KafkaOptions->Producer->Acks = kafkaAcksLeader;
Kafka->KafkaOptions->Consumer->GroupId = "my-group";
Kafka->KafkaOptions->Consumer->OffsetReset = kafkaOffsetEarliest;
Kafka->OnKafkaMessage = KafkaMessage;
Kafka->OnKafkaProduce = KafkaProduce;

TsgcWebSocketClient *WSClient = new TsgcWebSocketClient(this);
Kafka->Client = WSClient;
WSClient->Specifications->RFC6455 = false;
WSClient->Host = "127.0.0.1";
WSClient->Port = 9092;
WSClient->Active = true;

// produce a record to a topic
Kafka->Produce("my-topic", "Hello Kafka", "key-1");

// consume: subscribe once, then Poll repeatedly
Kafka->Subscribe(ARRAYOFCONST(("my-topic")));

void __fastcall TForm1::KafkaMessage(TObject *Sender, TsgcKafkaMessage *Message)
{
  Memo1->Lines->Add(Message->GetKeyString() + " = " + Message->GetValueString());
}

// fetch records, commit when a batch is processed
TsgcKafkaMessages *Messages = Kafka->Poll(1000);
try {
  if (Messages->Count > 0)
    Kafka->CommitSync();
} __finally {
  Messages->Free();
}
using esegece.sgcWebSockets;

var Kafka = new TsgcWSPClient_Kafka();
Kafka.KafkaOptions.ClientId = "my-net-app";
Kafka.KafkaOptions.Producer.Acks = TsgcKafkaAcks.kafkaAcksLeader;
Kafka.KafkaOptions.Consumer.GroupId = "my-group";
Kafka.KafkaOptions.Consumer.OffsetReset = TsgcKafkaOffsetReset.kafkaOffsetEarliest;
Kafka.OnKafkaMessage += (sender, message) =>
  Console.WriteLine(message.GetKeyString() + " = " + message.GetValueString());
Kafka.OnKafkaProduce += (sender, topic, partition, offset, error) =>
  Console.WriteLine($"Produced to {topic} [{partition}] at offset {offset}");

var WSClient = new TsgcWebSocketClient();
Kafka.Client = WSClient;
WSClient.Specifications.RFC6455 = false;
WSClient.Host = "127.0.0.1";
WSClient.Port = 9092;
WSClient.Active = true;

// produce a record to a topic
Kafka.Produce("my-topic", "Hello Kafka", "key-1");

// consume: subscribe once, then Poll repeatedly
Kafka.Subscribe(new string[] { "my-topic" });

TsgcKafkaMessages Messages = Kafka.Poll(1000);
try {
  if (Messages.Count > 0)
    Kafka.CommitSync();
} finally {
  Messages.Free();
}

规范与参考

本组件所实现协议的权威来源。

文档与演示

深度链接到组件参考,获取可直接运行的演示项目,并下载试用版。

在线帮助 — Kafka 该组件的完整属性、方法和事件参考。
演示项目 — Demos\Protocols\Kafka 可直接运行的示例项目。随 sgcWebSockets 包一同提供 — 请从下方下载试用版。
用户手册(PDF) 涵盖库中每个组件的完整手册。

准备好使用 Apache Kafka 进行流式处理了吗?

下载免费试用版,几分钟内即可开始构建事件流式处理解决方案。