Vanaf sgcWebSockets 2026.6.0 is er een native Apache Kafka-client beschikbaar voor Delphi en C++Builder. Het nieuwe TsgcWSPClient_Kafka-component communiceert rechtstreeks met Kafka-brokers via het native Kafka binaire protocol (raw TCP), zonder REST-gateway of externe bibliotheek ertussen.
Apache Kafka is een gedistribueerd event-streamingplatform dat wordt gebruikt om stromen van records te publiceren, op te slaan en te consumeren. Producers schrijven berichten naar topics, brokers bewaren ze in gepartitioneerde, append-only logs, en consumers lezen ze terug, eventueel als onderdeel van een gecoördineerde consumer group. De sgcWebSockets-client geeft je produceren, consumeren, topicbeheer en offsetbeheer vanuit één component.
Functies
- Native protocol: communiceert met de broker via het Kafka-wireprotocol over TCP. Geen Confluent REST-proxy, geen Java, geen DLL's van derden.
- Producer: publiceer records naar elke topic en partition, met een key en value, en configureerbare acknowledgements (geen, leader, all) en request timeout.
- Consumer: lees records ofwel met een beheerde consumer group (de broker houdt de committed offsets per group bij) ofwel zonder group (lees alle partitions van een topic rechtstreeks).
- Consumer groups: automatische coordinator discovery, join/sync, partitiontoewijzing en offset commit/fetch.
- Offsetbeheer: vraag de earliest, latest en committed offsets op, en kies waar een nieuwe group begint (earliest of latest).
- Topicbeheer: maak en verwijder topics, vraag clustermetadata op en lijst consumer groups rechtstreeks vanuit de client.
- Modern recordformaat: leest en schrijft het v2 record batch-formaat dat door huidige Kafka-brokers wordt gebruikt.
- Event-gestuurd: de events
OnKafkaConnect,OnKafkaMessage,OnKafkaProduce,OnKafkaErrorenOnKafkaDisconnectrapporteren alles wat er op de verbinding gebeurt.
Ondersteunde Kafka-versies
Het component gebruikt het v2 record batch-formaat dat werd geïntroduceerd in Apache Kafka 0.11, en het is getest tegen Apache Kafka 3.x. Elke broker in dat bereik, of een Kafka-compatibele broker die dezelfde protocolversies spreekt, werkt. De standaard brokerpoort is 9092.
Configuratie
De Kafka-client is een protocolcomponent dat bovenop een TsgcWebSocketClient draait. Stel Specifications.RFC6455 := False in zodat de client als een raw TCP-socket verbindt (het Kafka-protocol is geen WebSocket), wijs de client toe aan het TsgcWSPClient_Kafka-component, en stel de broker-Host en -Port in.
Het gedrag van de producer en de consumer wordt geconfigureerd via de KafkaOptions-property:
- ClientId: identificeert je applicatie bij de broker.
- Producer.Acks:
kafkaAcksNone,kafkaAcksLeaderofkafkaAcksAll. - Producer.TimeoutMs: hoe lang de broker wacht op de vereiste acknowledgements.
- Consumer.GroupId: de naam van de consumer group. Laat dit leeg om zonder group te consumeren.
- Consumer.OffsetReset:
kafkaOffsetEarliestofkafkaOffsetLatest, waar een gloednieuwe group begint te lezen.
uses
sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;
var
oClient: TsgcWebSocketClient;
oKafka: TsgcWSPClient_Kafka;
begin
oClient := TsgcWebSocketClient.Create(nil);
oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
oClient.Host := '127.0.0.1';
oClient.Port := 9092;
oKafka := TsgcWSPClient_Kafka.Create(nil);
oKafka.Client := oClient;
oKafka.KafkaOptions.ClientId := 'my-delphi-app';
oKafka.OnKafkaConnect := OnKafkaConnect;
oKafka.OnKafkaMessage := OnKafkaMessage;
oKafka.OnKafkaProduce := OnKafkaProduce;
oClient.Active := True; // connect to the broker
end;
Berichten produceren
Roep, eenmaal verbonden, Produce aan met de topic, de value en een optionele key en partition. Het resultaat van elke publicatie wordt gerapporteerd via het OnKafkaProduce-event, inclusief de offset waarop het record is opgeslagen.
oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
if aErrorCode = 0 then
Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;
Berichten consumeren
Stel een GroupId en een OffsetReset in, abonneer je op een of meer topics en roep Poll aan om de beschikbare records op te halen. Poll retourneert een TsgcKafkaMessages-lijst en triggert ook het OnKafkaMessage-event voor elk record. Roep Poll herhaaldelijk aan (bijvoorbeeld vanuit een timer) om continu te ontvangen.
oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);
var
oMessages: TsgcKafkaMessages;
i: Integer;
begin
oMessages := oKafka.Poll(1000); // timeout in milliseconds
try
for i := 0 to oMessages.Count - 1 do
Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
oMessages[i].Offset, oMessages[i].GetValueString]));
finally
oMessages.Free;
end;
end;
Dezelfde data wordt geleverd via het OnKafkaMessage-event, wat handig is wanneer je pollt vanuit een achtergrondthread of een timer:
procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
Log(Format('received key="%s" value="%s"',
[aMessage.GetKeyString, aMessage.GetValueString]));
end;
Als je alleen een topic wilt lezen zonder lid te worden van een group, laat dan GroupId leeg: Subscribe wijst dan elke partition van de topic toe en Poll retourneert de records vanaf de positie die door OffsetReset is geselecteerd.
Topics en offsets
De client kan topics beheren en offsets inspecteren zonder enige externe tooling.
// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);
// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);
// commit the consumed offsets for the current group
oKafka.CommitSync;
Download
De Kafka-client wordt geleverd met sgcWebSockets 2026.6.0. Een volledige demo is inbegrepen in Demos/02.WebSocket_Protocols/13.Kafka, die connect, produce, subscribe, poll, topicbeheer en offsetbeheer toont. Download sgcWebSockets om het te proberen.
Vragen, feedback of hulp bij het opstarten? Neem contact op — je krijgt een antwoord van de mensen die de code hebben geschreven.
