Klient Apache Kafka dla Delphi

· Komponenty

Od sgcWebSockets 2026.6.0 dostępny jest natywny klient Apache Kafka dla Delphi i C++Builder. Nowy komponent TsgcWSPClient_Kafka komunikuje się z brokerami Kafka bezpośrednio za pomocą natywnego binarnego protokołu Kafka (surowy TCP), bez żadnej bramy REST ani zewnętrznej biblioteki pośredniczącej.

Apache Kafka to rozproszona platforma strumieniowania zdarzeń używana do publikowania, przechowywania i konsumowania strumieni rekordów. Producenci zapisują wiadomości do tematów, brokery utrwalają je w partycjonowanych, tylko dopisywanych logach, a konsumenci odczytują je z powrotem, opcjonalnie jako część skoordynowanej grupy konsumentów. Klient sgcWebSockets daje Ci produkowanie, konsumowanie, administrację tematami i zarządzanie offsetami z jednego komponentu.

Funkcje

Obsługiwane wersje Kafka

Komponent używa formatu wsadowego rekordów v2, który został wprowadzony w Apache Kafka 0.11, i został przetestowany z Apache Kafka 3.x. Każdy broker w tym zakresie lub broker kompatybilny z Kafka mówiący tymi samymi wersjami protokołu będzie działał. Domyślny port brokera to 9092.

Konfiguracja

Klient Kafka to komponent protokołu, który działa na bazie TsgcWebSocketClient. Ustaw Specifications.RFC6455 := False, aby klient łączył się jako surowe gniazdo TCP (protokół Kafka nie jest WebSocket), przypisz klienta do komponentu TsgcWSPClient_Kafka i ustaw Host oraz Port brokera.

Zachowanie producenta i konsumenta konfiguruje się za pomocą właściwości KafkaOptions:

uses
  sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;

var
  oClient: TsgcWebSocketClient;
  oKafka: TsgcWSPClient_Kafka;
begin
  oClient := TsgcWebSocketClient.Create(nil);
  oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
  oClient.Host := '127.0.0.1';
  oClient.Port := 9092;

  oKafka := TsgcWSPClient_Kafka.Create(nil);
  oKafka.Client := oClient;
  oKafka.KafkaOptions.ClientId := 'my-delphi-app';
  oKafka.OnKafkaConnect := OnKafkaConnect;
  oKafka.OnKafkaMessage := OnKafkaMessage;
  oKafka.OnKafkaProduce := OnKafkaProduce;

  oClient.Active := True; // connect to the broker
end;

Produkowanie wiadomości

Po połączeniu wywołaj Produce z tematem, wartością oraz opcjonalnym kluczem i partycją. Wynik każdej publikacji jest raportowany za pomocą zdarzenia OnKafkaProduce, łącznie z offsetem, pod którym rekord został zapisany.

oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
  aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
  if aErrorCode = 0 then
    Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;

Konsumowanie wiadomości

Ustaw GroupId i OffsetReset, zasubskrybuj jeden lub więcej tematów i wywołaj Poll, aby pobrać dostępne rekordy. Poll zwraca listę TsgcKafkaMessages i również wyzwala zdarzenie OnKafkaMessage dla każdego rekordu. Wywołuj Poll wielokrotnie (na przykład z timera), aby odbierać w sposób ciągły.

oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);

var
  oMessages: TsgcKafkaMessages;
  i: Integer;
begin
  oMessages := oKafka.Poll(1000); // timeout in milliseconds
  try
    for i := 0 to oMessages.Count - 1 do
      Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
        oMessages[i].Offset, oMessages[i].GetValueString]));
  finally
    oMessages.Free;
  end;
end;

Te same dane są dostarczane za pomocą zdarzenia OnKafkaMessage, co jest wygodne, gdy odpytujesz z wątku w tle lub timera:

procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
  Log(Format('received key="%s" value="%s"',
    [aMessage.GetKeyString, aMessage.GetValueString]));
end;

Jeśli chcesz tylko odczytać temat bez dołączania do grupy, pozostaw GroupId puste: Subscribe przypisuje wtedy każdą partycję tematu, a Poll zwraca rekordy z pozycji wybranej przez OffsetReset.

Tematy i offsety

Klient może zarządzać tematami i przeglądać offsety bez żadnych zewnętrznych narzędzi.

// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);

// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);

// commit the consumed offsets for the current group
oKafka.CommitSync;

Pobierz

Klient Kafka jest dostarczany z sgcWebSockets 2026.6.0. Pełne demo jest zawarte w Demos/02.WebSocket_Protocols/13.Kafka, pokazujące łączenie, produkowanie, subskrypcję, odpytywanie, administrację tematami i zarządzanie offsetami. Pobierz sgcWebSockets, aby je wypróbować.

Pytania, opinie lub pomoc na początek? Skontaktuj się z nami — otrzymasz odpowiedź od ludzi, którzy napisali ten kod.