Apache Kafka Client for Delphi

· Components

From sgcWebSockets 2026.6.0 a native Apache Kafka client is available for Delphi and C++Builder. The new TsgcWSPClient_Kafka component talks to Kafka brokers directly over the native Kafka binary protocol (raw TCP), with no REST gateway or external library in between.

Apache Kafka is a distributed event-streaming platform used to publish, store and consume streams of records. Producers write messages to topics, brokers persist them in partitioned, append-only logs, and consumers read them back, optionally as part of a coordinated consumer group. The sgcWebSockets client gives you producing, consuming, topic administration and offset management from a single component.

Features

Supported Kafka versions

The component uses the v2 record batch format that was introduced in Apache Kafka 0.11, and it has been tested against Apache Kafka 3.x. Any broker in that range, or a Kafka-compatible broker that speaks the same protocol versions, will work. The default broker port is 9092.

Configuration

The Kafka client is a protocol component that runs on top of a TsgcWebSocketClient. Set Specifications.RFC6455 := False so the client connects as a raw TCP socket (the Kafka protocol is not WebSocket), assign the client to the TsgcWSPClient_Kafka component, and set the broker Host and Port.

The behaviour of the producer and the consumer is configured through the KafkaOptions property:

uses
  sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;

var
  oClient: TsgcWebSocketClient;
  oKafka: TsgcWSPClient_Kafka;
begin
  oClient := TsgcWebSocketClient.Create(nil);
  oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
  oClient.Host := '127.0.0.1';
  oClient.Port := 9092;

  oKafka := TsgcWSPClient_Kafka.Create(nil);
  oKafka.Client := oClient;
  oKafka.KafkaOptions.ClientId := 'my-delphi-app';
  oKafka.OnKafkaConnect := OnKafkaConnect;
  oKafka.OnKafkaMessage := OnKafkaMessage;
  oKafka.OnKafkaProduce := OnKafkaProduce;

  oClient.Active := True; // connect to the broker
end;

Producing messages

Once connected, call Produce with the topic, the value and an optional key and partition. The result of every publish is reported through the OnKafkaProduce event, including the offset the record was stored at.

oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
  aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
  if aErrorCode = 0 then
    Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;

Consuming messages

Set a GroupId and an OffsetReset, subscribe to one or more topics and call Poll to fetch the records that are available. Poll returns a TsgcKafkaMessages list and also raises the OnKafkaMessage event for each record. Call Poll repeatedly (for example from a timer) to receive continuously.

oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);

var
  oMessages: TsgcKafkaMessages;
  i: Integer;
begin
  oMessages := oKafka.Poll(1000); // timeout in milliseconds
  try
    for i := 0 to oMessages.Count - 1 do
      Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
        oMessages[i].Offset, oMessages[i].GetValueString]));
  finally
    oMessages.Free;
  end;
end;

The same data is delivered through the OnKafkaMessage event, which is convenient when you poll from a background thread or a timer:

procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
  Log(Format('received key="%s" value="%s"',
    [aMessage.GetKeyString, aMessage.GetValueString]));
end;

If you only want to read a topic without joining a group, leave GroupId empty: Subscribe then assigns every partition of the topic and Poll returns the records from the position selected by OffsetReset.

Topics and offsets

The client can manage topics and inspect offsets without any external tooling.

// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);

// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);

// commit the consumed offsets for the current group
oKafka.CommitSync;

Download

The Kafka client ships with sgcWebSockets 2026.6.0. A full demo is included in Demos/02.WebSocket_Protocols/13.Kafka, showing connect, produce, subscribe, poll, topic administration and offset management. Download sgcWebSockets to try it.

Questions, feedback or help getting started? Get in touch — you will get a reply from the people who wrote the code.