Ab sgcWebSockets 2026.6.0 ist ein nativer Apache Kafka Client für Delphi und C++Builder verfügbar. Die neue Komponente TsgcWSPClient_Kafka kommuniziert direkt mit Kafka-Brokern über das native binäre Kafka-Protokoll (raw TCP), ohne ein REST-Gateway oder eine externe Bibliothek dazwischen.
Apache Kafka ist eine verteilte Event-Streaming-Plattform, die zum Veröffentlichen, Speichern und Konsumieren von Datensatzströmen verwendet wird. Producer schreiben Nachrichten in Topics, Broker speichern sie persistent in partitionierten, nur-anhängenden Logs, und Consumer lesen sie wieder zurück, optional als Teil einer koordinierten Consumer Group. Der sgcWebSockets Client bietet Ihnen Produzieren, Konsumieren, Topic-Verwaltung und Offset-Verwaltung aus einer einzigen Komponente.
Funktionen
- Natives Protokoll: kommuniziert mit dem Broker über das Kafka-Wire-Protokoll über TCP. Kein Confluent REST-Proxy, kein Java, keine Drittanbieter-DLLs.
- Producer: Datensätze in beliebige Topics und Partitionen veröffentlichen, mit Key und Value, sowie konfigurierbaren Acknowledgements (none, leader, all) und Anforderungs-Timeout.
- Consumer: Datensätze entweder mit einer verwalteten Consumer Group lesen (der Broker verfolgt die committeten Offsets pro Gruppe) oder ohne Gruppe (alle Partitionen eines Topics direkt lesen).
- Consumer Groups: automatische Koordinator-Erkennung, Join/Sync, Partitionszuweisung und Offset-Commit/Fetch.
- Offset-Steuerung: Abfrage der frühesten, neuesten und committeten Offsets sowie Auswahl, wo eine neue Gruppe beginnt (earliest oder latest).
- Topic-Verwaltung: Topics erstellen und löschen, Cluster-Metadaten anfordern und Consumer Groups direkt vom Client aus auflisten.
- Modernes Datensatzformat: liest und schreibt das v2-Record-Batch-Format, das von aktuellen Kafka-Brokern verwendet wird.
- Ereignisgesteuert: die Ereignisse
OnKafkaConnect,OnKafkaMessage,OnKafkaProduce,OnKafkaErrorundOnKafkaDisconnectmelden alles, was auf der Verbindung geschieht.
Unterstützte Kafka-Versionen
Die Komponente verwendet das v2-Record-Batch-Format, das in Apache Kafka 0.11 eingeführt wurde, und wurde gegen Apache Kafka 3.x getestet. Jeder Broker in diesem Bereich oder ein Kafka-kompatibler Broker, der dieselben Protokollversionen spricht, funktioniert. Der Standard-Broker-Port ist 9092.
Konfiguration
Der Kafka Client ist eine Protokollkomponente, die auf einem TsgcWebSocketClient aufsetzt. Setzen Sie Specifications.RFC6455 := False, damit der Client sich als raw TCP-Socket verbindet (das Kafka-Protokoll ist kein WebSocket), weisen Sie den Client der Komponente TsgcWSPClient_Kafka zu und setzen Sie den Broker-Host und -Port.
Das Verhalten von Producer und Consumer wird über die Eigenschaft KafkaOptions konfiguriert:
- ClientId: identifiziert Ihre Anwendung gegenüber dem Broker.
- Producer.Acks:
kafkaAcksNone,kafkaAcksLeaderoderkafkaAcksAll. - Producer.TimeoutMs: wie lange der Broker auf die erforderlichen Acknowledgements wartet.
- Consumer.GroupId: der Name der Consumer Group. Leer lassen, um ohne Gruppe zu konsumieren.
- Consumer.OffsetReset:
kafkaOffsetEarliestoderkafkaOffsetLatest, wo eine brandneue Gruppe mit dem Lesen beginnt.
uses
sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;
var
oClient: TsgcWebSocketClient;
oKafka: TsgcWSPClient_Kafka;
begin
oClient := TsgcWebSocketClient.Create(nil);
oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
oClient.Host := '127.0.0.1';
oClient.Port := 9092;
oKafka := TsgcWSPClient_Kafka.Create(nil);
oKafka.Client := oClient;
oKafka.KafkaOptions.ClientId := 'my-delphi-app';
oKafka.OnKafkaConnect := OnKafkaConnect;
oKafka.OnKafkaMessage := OnKafkaMessage;
oKafka.OnKafkaProduce := OnKafkaProduce;
oClient.Active := True; // connect to the broker
end;
Nachrichten produzieren
Nach dem Verbindungsaufbau rufen Sie Produce mit dem Topic, dem Value und einem optionalen Key und einer Partition auf. Das Ergebnis jeder Veröffentlichung wird über das Ereignis OnKafkaProduce gemeldet, einschließlich des Offsets, an dem der Datensatz gespeichert wurde.
oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
if aErrorCode = 0 then
Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;
Nachrichten konsumieren
Setzen Sie eine GroupId und ein OffsetReset, abonnieren Sie ein oder mehrere Topics und rufen Sie Poll auf, um die verfügbaren Datensätze abzurufen. Poll gibt eine TsgcKafkaMessages-Liste zurück und löst außerdem das Ereignis OnKafkaMessage für jeden Datensatz aus. Rufen Sie Poll wiederholt auf (zum Beispiel von einem Timer), um kontinuierlich zu empfangen.
oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);
var
oMessages: TsgcKafkaMessages;
i: Integer;
begin
oMessages := oKafka.Poll(1000); // timeout in milliseconds
try
for i := 0 to oMessages.Count - 1 do
Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
oMessages[i].Offset, oMessages[i].GetValueString]));
finally
oMessages.Free;
end;
end;
Dieselben Daten werden über das Ereignis OnKafkaMessage geliefert, was praktisch ist, wenn Sie aus einem Hintergrund-Thread oder einem Timer pollen:
procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
Log(Format('received key="%s" value="%s"',
[aMessage.GetKeyString, aMessage.GetValueString]));
end;
Wenn Sie ein Topic nur lesen möchten, ohne einer Gruppe beizutreten, lassen Sie GroupId leer: Subscribe weist dann jede Partition des Topics zu und Poll gibt die Datensätze ab der durch OffsetReset ausgewählten Position zurück.
Topics und Offsets
Der Client kann Topics verwalten und Offsets ohne jegliche externe Tools inspizieren.
// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);
// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);
// commit the consumed offsets for the current group
oKafka.CommitSync;
Download
Der Kafka Client wird mit sgcWebSockets 2026.6.0 ausgeliefert. Eine vollständige Demo ist in Demos/02.WebSocket_Protocols/13.Kafka enthalten und zeigt Connect, Produce, Subscribe, Poll, Topic-Verwaltung und Offset-Verwaltung. sgcWebSockets herunterladen, um es auszuprobieren.
Fragen, Feedback oder Hilfe beim Einstieg? Kontaktieren Sie uns — Sie erhalten eine Antwort von den Leuten, die den Code geschrieben haben.
