Apache Kafka Client

Native Apache Kafka client for Delphi, C++Builder and .NET. Talk to the broker directly over the Kafka wire protocol on plain TCP, no REST proxy or external library: produce and consume records, coordinate consumer groups, manage offsets and administer topics.

Apache Kafka subprotocol client

A first-class Kafka implementation that runs everywhere the Delphi / .NET runtime runs — from desktop services to mobile devices, streaming records to and from any standard Kafka broker.

Component class

TsgcWSPClient_Kafka

Protocol

Apache Kafka wire protocol over TCP

Platforms

Windows, macOS, Linux, iOS, Android

Edition

Standard / Professional / Enterprise

Produce, consume and administer Kafka

Everything you need to stream records over the native Kafka protocol, exposed through plain Delphi / .NET methods and events.

Produce

Call Produce(topic, value, key, partition) to publish a record with an optional key and partition, or ProduceBytes for a binary payload. Pick the delivery guarantee through KafkaOptions.Producer.Acks (kafkaAcksNone, kafkaAcksLeader or kafkaAcksAll); each result arrives in OnKafkaProduce with the topic, partition and stored offset.

Consume

Subscribe([topic]) then Poll(timeoutMs) fetches the available records, returning a TsgcKafkaMessages list and raising OnKafkaMessage per record. Read each one with GetKeyString, GetValueString, Topic, Partition and Offset. Leave GroupId empty to read all partitions directly without a group.

Consumer group coordination

Set KafkaOptions.Consumer.GroupId and the client performs coordinator discovery, join and sync, partition assignment and offset commit/fetch automatically. Call CommitSync to commit the offsets of the last Poll so the group resumes after them on the next session.

Offset management

GetEarliestOffset, GetLatestOffset and GetCommittedOffset return the oldest, next-to-write and last-committed offsets per topic and partition. CommitOffset(topic, partition, offset) sets an explicit position so consumption resumes exactly where you want it.

Topic administration

CreateTopic(name, partitions, replication) and DeleteTopic(name) manage topics on the broker, while GetMetadata([topics]) returns cluster and partition layout (pass an empty array for the whole cluster). The client can also list consumer groups.

v2 record batch format

Reads and writes the v2 record batch format introduced in Apache Kafka 0.11, with optional kafkaCompressionGzip compression. Tested against Apache Kafka 3.x on the default broker port 9092.

Drop the component, set a few properties, go

Pair a TsgcWebSocketClient with TsgcWSPClient_Kafka, set Specifications.RFC6455 to false for plain TCP, configure KafkaOptions, then Produce and Poll.

uses
  sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client;

var
  WSClient: TsgcWebSocketClient;
  Kafka: TsgcWSPClient_Kafka;
begin
  Kafka := TsgcWSPClient_Kafka.Create(nil);
  Kafka.KafkaOptions.ClientId := 'my-delphi-app';
  Kafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
  Kafka.KafkaOptions.Consumer.GroupId := 'my-group';
  Kafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
  Kafka.OnKafkaMessage := KafkaMessage;
  Kafka.OnKafkaProduce := KafkaProduce;

  WSClient := TsgcWebSocketClient.Create(nil);
  Kafka.Client := WSClient;
  WSClient.Specifications.RFC6455 := False;
  WSClient.Host := '127.0.0.1';
  WSClient.Port := 9092;
  WSClient.Active := True;

  // produce a record to a topic
  Kafka.Produce('my-topic', 'Hello Kafka', 'key-1');

  // consume: subscribe once, then Poll repeatedly (e.g. from a timer)
  Kafka.Subscribe(['my-topic']);
end;

procedure TForm1.KafkaMessage(Sender: TObject;
  const Message: TsgcKafkaMessage);
begin
  Memo1.Lines.Add(Message.GetKeyString + ' = ' + Message.GetValueString);
end;

procedure TForm1.KafkaProduce(Sender: TObject; const Topic: string;
  Partition: Integer; Offset: Int64; ErrorCode: Integer);
begin
  Memo1.Lines.Add(Format('Produced to %s [%d] at offset %d',
    [Topic, Partition, Offset]));
end;

// fetch records, commit when a batch is processed
var
  Messages: TsgcKafkaMessages;
begin
  Messages := Kafka.Poll(1000);
  try
    if Messages.Count > 0 then
      Kafka.CommitSync;
  finally
    Messages.Free;
  end;
end;
// uses: sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client
TsgcWSPClient_Kafka *Kafka = new TsgcWSPClient_Kafka(this);
Kafka->KafkaOptions->ClientId = "my-cbuilder-app";
Kafka->KafkaOptions->Producer->Acks = kafkaAcksLeader;
Kafka->KafkaOptions->Consumer->GroupId = "my-group";
Kafka->KafkaOptions->Consumer->OffsetReset = kafkaOffsetEarliest;
Kafka->OnKafkaMessage = KafkaMessage;
Kafka->OnKafkaProduce = KafkaProduce;

TsgcWebSocketClient *WSClient = new TsgcWebSocketClient(this);
Kafka->Client = WSClient;
WSClient->Specifications->RFC6455 = false;
WSClient->Host = "127.0.0.1";
WSClient->Port = 9092;
WSClient->Active = true;

// produce a record to a topic
Kafka->Produce("my-topic", "Hello Kafka", "key-1");

// consume: subscribe once, then Poll repeatedly
Kafka->Subscribe(ARRAYOFCONST(("my-topic")));

void __fastcall TForm1::KafkaMessage(TObject *Sender, TsgcKafkaMessage *Message)
{
  Memo1->Lines->Add(Message->GetKeyString() + " = " + Message->GetValueString());
}

// fetch records, commit when a batch is processed
TsgcKafkaMessages *Messages = Kafka->Poll(1000);
try {
  if (Messages->Count > 0)
    Kafka->CommitSync();
} __finally {
  Messages->Free();
}
using esegece.sgcWebSockets;

var Kafka = new TsgcWSPClient_Kafka();
Kafka.KafkaOptions.ClientId = "my-net-app";
Kafka.KafkaOptions.Producer.Acks = TsgcKafkaAcks.kafkaAcksLeader;
Kafka.KafkaOptions.Consumer.GroupId = "my-group";
Kafka.KafkaOptions.Consumer.OffsetReset = TsgcKafkaOffsetReset.kafkaOffsetEarliest;
Kafka.OnKafkaMessage += (sender, message) =>
  Console.WriteLine(message.GetKeyString() + " = " + message.GetValueString());
Kafka.OnKafkaProduce += (sender, topic, partition, offset, error) =>
  Console.WriteLine($"Produced to {topic} [{partition}] at offset {offset}");

var WSClient = new TsgcWebSocketClient();
Kafka.Client = WSClient;
WSClient.Specifications.RFC6455 = false;
WSClient.Host = "127.0.0.1";
WSClient.Port = 9092;
WSClient.Active = true;

// produce a record to a topic
Kafka.Produce("my-topic", "Hello Kafka", "key-1");

// consume: subscribe once, then Poll repeatedly
Kafka.Subscribe(new string[] { "my-topic" });

TsgcKafkaMessages Messages = Kafka.Poll(1000);
try {
  if (Messages.Count > 0)
    Kafka.CommitSync();
} finally {
  Messages.Free();
}

Specifications & references

Authoritative sources for the protocols this component implements.

Documentation & Demos

Deep-link to the component reference, grab the ready-to-run demo project, and download the trial.

Online Help — Kafka Full property, method and event reference for this component.
Demo Project — Demos\Protocols\Kafka Ready-to-run example project. Ships inside the sgcWebSockets package — download the trial below.
User Manual (PDF) Comprehensive manual covering every component in the library.

Ready to Stream with Apache Kafka?

Download the free trial and start building event-streaming solutions in minutes.