Apache Kafka Client für Delphi

· Komponenten

Ab sgcWebSockets 2026.6.0 ist ein nativer Apache Kafka Client für Delphi und C++Builder verfügbar. Die neue Komponente TsgcWSPClient_Kafka kommuniziert direkt mit Kafka-Brokern über das native binäre Kafka-Protokoll (raw TCP), ohne ein REST-Gateway oder eine externe Bibliothek dazwischen.

Apache Kafka ist eine verteilte Event-Streaming-Plattform, die zum Veröffentlichen, Speichern und Konsumieren von Datensatzströmen verwendet wird. Producer schreiben Nachrichten in Topics, Broker speichern sie persistent in partitionierten, nur-anhängenden Logs, und Consumer lesen sie wieder zurück, optional als Teil einer koordinierten Consumer Group. Der sgcWebSockets Client bietet Ihnen Produzieren, Konsumieren, Topic-Verwaltung und Offset-Verwaltung aus einer einzigen Komponente.

Funktionen

Unterstützte Kafka-Versionen

Die Komponente verwendet das v2-Record-Batch-Format, das in Apache Kafka 0.11 eingeführt wurde, und wurde gegen Apache Kafka 3.x getestet. Jeder Broker in diesem Bereich oder ein Kafka-kompatibler Broker, der dieselben Protokollversionen spricht, funktioniert. Der Standard-Broker-Port ist 9092.

Konfiguration

Der Kafka Client ist eine Protokollkomponente, die auf einem TsgcWebSocketClient aufsetzt. Setzen Sie Specifications.RFC6455 := False, damit der Client sich als raw TCP-Socket verbindet (das Kafka-Protokoll ist kein WebSocket), weisen Sie den Client der Komponente TsgcWSPClient_Kafka zu und setzen Sie den Broker-Host und -Port.

Das Verhalten von Producer und Consumer wird über die Eigenschaft KafkaOptions konfiguriert:

uses
  sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;

var
  oClient: TsgcWebSocketClient;
  oKafka: TsgcWSPClient_Kafka;
begin
  oClient := TsgcWebSocketClient.Create(nil);
  oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
  oClient.Host := '127.0.0.1';
  oClient.Port := 9092;

  oKafka := TsgcWSPClient_Kafka.Create(nil);
  oKafka.Client := oClient;
  oKafka.KafkaOptions.ClientId := 'my-delphi-app';
  oKafka.OnKafkaConnect := OnKafkaConnect;
  oKafka.OnKafkaMessage := OnKafkaMessage;
  oKafka.OnKafkaProduce := OnKafkaProduce;

  oClient.Active := True; // connect to the broker
end;

Nachrichten produzieren

Nach dem Verbindungsaufbau rufen Sie Produce mit dem Topic, dem Value und einem optionalen Key und einer Partition auf. Das Ergebnis jeder Veröffentlichung wird über das Ereignis OnKafkaProduce gemeldet, einschließlich des Offsets, an dem der Datensatz gespeichert wurde.

oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
  aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
  if aErrorCode = 0 then
    Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;

Nachrichten konsumieren

Setzen Sie eine GroupId und ein OffsetReset, abonnieren Sie ein oder mehrere Topics und rufen Sie Poll auf, um die verfügbaren Datensätze abzurufen. Poll gibt eine TsgcKafkaMessages-Liste zurück und löst außerdem das Ereignis OnKafkaMessage für jeden Datensatz aus. Rufen Sie Poll wiederholt auf (zum Beispiel von einem Timer), um kontinuierlich zu empfangen.

oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);

var
  oMessages: TsgcKafkaMessages;
  i: Integer;
begin
  oMessages := oKafka.Poll(1000); // timeout in milliseconds
  try
    for i := 0 to oMessages.Count - 1 do
      Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
        oMessages[i].Offset, oMessages[i].GetValueString]));
  finally
    oMessages.Free;
  end;
end;

Dieselben Daten werden über das Ereignis OnKafkaMessage geliefert, was praktisch ist, wenn Sie aus einem Hintergrund-Thread oder einem Timer pollen:

procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
  Log(Format('received key="%s" value="%s"',
    [aMessage.GetKeyString, aMessage.GetValueString]));
end;

Wenn Sie ein Topic nur lesen möchten, ohne einer Gruppe beizutreten, lassen Sie GroupId leer: Subscribe weist dann jede Partition des Topics zu und Poll gibt die Datensätze ab der durch OffsetReset ausgewählten Position zurück.

Topics und Offsets

Der Client kann Topics verwalten und Offsets ohne jegliche externe Tools inspizieren.

// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);

// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);

// commit the consumed offsets for the current group
oKafka.CommitSync;

Download

Der Kafka Client wird mit sgcWebSockets 2026.6.0 ausgeliefert. Eine vollständige Demo ist in Demos/02.WebSocket_Protocols/13.Kafka enthalten und zeigt Connect, Produce, Subscribe, Poll, Topic-Verwaltung und Offset-Verwaltung. sgcWebSockets herunterladen, um es auszuprobieren.

Fragen, Feedback oder Hilfe beim Einstieg? Kontaktieren Sie uns — Sie erhalten eine Antwort von den Leuten, die den Code geschrieben haben.