Od sgcWebSockets 2026.6.0 dostępny jest natywny klient Apache Kafka dla Delphi i C++Builder. Nowy komponent TsgcWSPClient_Kafka komunikuje się z brokerami Kafka bezpośrednio za pomocą natywnego binarnego protokołu Kafka (surowy TCP), bez żadnej bramy REST ani zewnętrznej biblioteki pośredniczącej.
Apache Kafka to rozproszona platforma strumieniowania zdarzeń używana do publikowania, przechowywania i konsumowania strumieni rekordów. Producenci zapisują wiadomości do tematów, brokery utrwalają je w partycjonowanych, tylko dopisywanych logach, a konsumenci odczytują je z powrotem, opcjonalnie jako część skoordynowanej grupy konsumentów. Klient sgcWebSockets daje Ci produkowanie, konsumowanie, administrację tematami i zarządzanie offsetami z jednego komponentu.
Funkcje
- Natywny protokół: komunikuje się z brokerem przy użyciu protokołu Kafka wire przez TCP. Bez proxy Confluent REST, bez Javy, bez bibliotek DLL firm trzecich.
- Producent: publikuj rekordy do dowolnego tematu i partycji, z kluczem i wartością, oraz konfigurowalnymi potwierdzeniami (żadne, lider, wszystkie) i limitem czasu żądania.
- Konsument: odczytuj rekordy albo z zarządzaną grupą konsumentów (broker śledzi zatwierdzone offsety dla każdej grupy), albo bez grupy (odczytuj bezpośrednio wszystkie partycje tematu).
- Grupy konsumentów: automatyczne wykrywanie koordynatora, join/sync, przypisywanie partycji oraz zatwierdzanie/pobieranie offsetów.
- Kontrola offsetów: odpytuj o najwcześniejszy, najnowszy i zatwierdzony offset oraz wybierz, gdzie zaczyna nowa grupa (najwcześniej lub najnowiej).
- Administracja tematami: twórz i usuwaj tematy, żądaj metadanych klastra i wyświetlaj listę grup konsumentów bezpośrednio z klienta.
- Nowoczesny format rekordów: odczytuje i zapisuje format wsadowy rekordów v2 używany przez współczesne brokery Kafka.
- Sterowany zdarzeniami: zdarzenia
OnKafkaConnect,OnKafkaMessage,OnKafkaProduce,OnKafkaErroriOnKafkaDisconnectraportują wszystko, co dzieje się na połączeniu.
Obsługiwane wersje Kafka
Komponent używa formatu wsadowego rekordów v2, który został wprowadzony w Apache Kafka 0.11, i został przetestowany z Apache Kafka 3.x. Każdy broker w tym zakresie lub broker kompatybilny z Kafka mówiący tymi samymi wersjami protokołu będzie działał. Domyślny port brokera to 9092.
Konfiguracja
Klient Kafka to komponent protokołu, który działa na bazie TsgcWebSocketClient. Ustaw Specifications.RFC6455 := False, aby klient łączył się jako surowe gniazdo TCP (protokół Kafka nie jest WebSocket), przypisz klienta do komponentu TsgcWSPClient_Kafka i ustaw Host oraz Port brokera.
Zachowanie producenta i konsumenta konfiguruje się za pomocą właściwości KafkaOptions:
- ClientId: identyfikuje Twoją aplikację wobec brokera.
- Producer.Acks:
kafkaAcksNone,kafkaAcksLeaderlubkafkaAcksAll. - Producer.TimeoutMs: jak długo broker czeka na wymagane potwierdzenia.
- Consumer.GroupId: nazwa grupy konsumentów. Pozostaw puste, aby konsumować bez grupy.
- Consumer.OffsetReset:
kafkaOffsetEarliestlubkafkaOffsetLatest, gdzie zupełnie nowa grupa zaczyna odczyt.
uses
sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;
var
oClient: TsgcWebSocketClient;
oKafka: TsgcWSPClient_Kafka;
begin
oClient := TsgcWebSocketClient.Create(nil);
oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
oClient.Host := '127.0.0.1';
oClient.Port := 9092;
oKafka := TsgcWSPClient_Kafka.Create(nil);
oKafka.Client := oClient;
oKafka.KafkaOptions.ClientId := 'my-delphi-app';
oKafka.OnKafkaConnect := OnKafkaConnect;
oKafka.OnKafkaMessage := OnKafkaMessage;
oKafka.OnKafkaProduce := OnKafkaProduce;
oClient.Active := True; // connect to the broker
end;
Produkowanie wiadomości
Po połączeniu wywołaj Produce z tematem, wartością oraz opcjonalnym kluczem i partycją. Wynik każdej publikacji jest raportowany za pomocą zdarzenia OnKafkaProduce, łącznie z offsetem, pod którym rekord został zapisany.
oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
if aErrorCode = 0 then
Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;
Konsumowanie wiadomości
Ustaw GroupId i OffsetReset, zasubskrybuj jeden lub więcej tematów i wywołaj Poll, aby pobrać dostępne rekordy. Poll zwraca listę TsgcKafkaMessages i również wyzwala zdarzenie OnKafkaMessage dla każdego rekordu. Wywołuj Poll wielokrotnie (na przykład z timera), aby odbierać w sposób ciągły.
oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);
var
oMessages: TsgcKafkaMessages;
i: Integer;
begin
oMessages := oKafka.Poll(1000); // timeout in milliseconds
try
for i := 0 to oMessages.Count - 1 do
Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
oMessages[i].Offset, oMessages[i].GetValueString]));
finally
oMessages.Free;
end;
end;
Te same dane są dostarczane za pomocą zdarzenia OnKafkaMessage, co jest wygodne, gdy odpytujesz z wątku w tle lub timera:
procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
Log(Format('received key="%s" value="%s"',
[aMessage.GetKeyString, aMessage.GetValueString]));
end;
Jeśli chcesz tylko odczytać temat bez dołączania do grupy, pozostaw GroupId puste: Subscribe przypisuje wtedy każdą partycję tematu, a Poll zwraca rekordy z pozycji wybranej przez OffsetReset.
Tematy i offsety
Klient może zarządzać tematami i przeglądać offsety bez żadnych zewnętrznych narzędzi.
// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);
// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);
// commit the consumed offsets for the current group
oKafka.CommitSync;
Pobierz
Klient Kafka jest dostarczany z sgcWebSockets 2026.6.0. Pełne demo jest zawarte w Demos/02.WebSocket_Protocols/13.Kafka, pokazujące łączenie, produkowanie, subskrypcję, odpytywanie, administrację tematami i zarządzanie offsetami. Pobierz sgcWebSockets, aby je wypróbować.
Pytania, opinie lub pomoc na początek? Skontaktuj się z nami — otrzymasz odpowiedź od ludzi, którzy napisali ten kod.
