Apache Kafka-client voor Delphi

· Componenten

Vanaf sgcWebSockets 2026.6.0 is er een native Apache Kafka-client beschikbaar voor Delphi en C++Builder. Het nieuwe TsgcWSPClient_Kafka-component communiceert rechtstreeks met Kafka-brokers via het native Kafka binaire protocol (raw TCP), zonder REST-gateway of externe bibliotheek ertussen.

Apache Kafka is een gedistribueerd event-streamingplatform dat wordt gebruikt om stromen van records te publiceren, op te slaan en te consumeren. Producers schrijven berichten naar topics, brokers bewaren ze in gepartitioneerde, append-only logs, en consumers lezen ze terug, eventueel als onderdeel van een gecoördineerde consumer group. De sgcWebSockets-client geeft je produceren, consumeren, topicbeheer en offsetbeheer vanuit één component.

Functies

Ondersteunde Kafka-versies

Het component gebruikt het v2 record batch-formaat dat werd geïntroduceerd in Apache Kafka 0.11, en het is getest tegen Apache Kafka 3.x. Elke broker in dat bereik, of een Kafka-compatibele broker die dezelfde protocolversies spreekt, werkt. De standaard brokerpoort is 9092.

Configuratie

De Kafka-client is een protocolcomponent dat bovenop een TsgcWebSocketClient draait. Stel Specifications.RFC6455 := False in zodat de client als een raw TCP-socket verbindt (het Kafka-protocol is geen WebSocket), wijs de client toe aan het TsgcWSPClient_Kafka-component, en stel de broker-Host en -Port in.

Het gedrag van de producer en de consumer wordt geconfigureerd via de KafkaOptions-property:

uses
  sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;

var
  oClient: TsgcWebSocketClient;
  oKafka: TsgcWSPClient_Kafka;
begin
  oClient := TsgcWebSocketClient.Create(nil);
  oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
  oClient.Host := '127.0.0.1';
  oClient.Port := 9092;

  oKafka := TsgcWSPClient_Kafka.Create(nil);
  oKafka.Client := oClient;
  oKafka.KafkaOptions.ClientId := 'my-delphi-app';
  oKafka.OnKafkaConnect := OnKafkaConnect;
  oKafka.OnKafkaMessage := OnKafkaMessage;
  oKafka.OnKafkaProduce := OnKafkaProduce;

  oClient.Active := True; // connect to the broker
end;

Berichten produceren

Roep, eenmaal verbonden, Produce aan met de topic, de value en een optionele key en partition. Het resultaat van elke publicatie wordt gerapporteerd via het OnKafkaProduce-event, inclusief de offset waarop het record is opgeslagen.

oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
  aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
  if aErrorCode = 0 then
    Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;

Berichten consumeren

Stel een GroupId en een OffsetReset in, abonneer je op een of meer topics en roep Poll aan om de beschikbare records op te halen. Poll retourneert een TsgcKafkaMessages-lijst en triggert ook het OnKafkaMessage-event voor elk record. Roep Poll herhaaldelijk aan (bijvoorbeeld vanuit een timer) om continu te ontvangen.

oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);

var
  oMessages: TsgcKafkaMessages;
  i: Integer;
begin
  oMessages := oKafka.Poll(1000); // timeout in milliseconds
  try
    for i := 0 to oMessages.Count - 1 do
      Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
        oMessages[i].Offset, oMessages[i].GetValueString]));
  finally
    oMessages.Free;
  end;
end;

Dezelfde data wordt geleverd via het OnKafkaMessage-event, wat handig is wanneer je pollt vanuit een achtergrondthread of een timer:

procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
  Log(Format('received key="%s" value="%s"',
    [aMessage.GetKeyString, aMessage.GetValueString]));
end;

Als je alleen een topic wilt lezen zonder lid te worden van een group, laat dan GroupId leeg: Subscribe wijst dan elke partition van de topic toe en Poll retourneert de records vanaf de positie die door OffsetReset is geselecteerd.

Topics en offsets

De client kan topics beheren en offsets inspecteren zonder enige externe tooling.

// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);

// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);

// commit the consumed offsets for the current group
oKafka.CommitSync;

Download

De Kafka-client wordt geleverd met sgcWebSockets 2026.6.0. Een volledige demo is inbegrepen in Demos/02.WebSocket_Protocols/13.Kafka, die connect, produce, subscribe, poll, topicbeheer en offsetbeheer toont. Download sgcWebSockets om het te proberen.

Vragen, feedback of hulp bij het opstarten? Neem contact op — je krijgt een antwoord van de mensen die de code hebben geschreven.