Da sgcWebSockets 2026.6.0 è disponibile un client Apache Kafka nativo per Delphi e C++Builder. Il nuovo componente TsgcWSPClient_Kafka dialoga direttamente con i broker Kafka tramite il protocollo binario nativo di Kafka (TCP grezzo), senza alcun gateway REST o libreria esterna nel mezzo.
Apache Kafka è una piattaforma distribuita di event-streaming usata per pubblicare, memorizzare e consumare flussi di record. I producer scrivono messaggi sui topic, i broker li rendono persistenti in log partizionati e append-only, e i consumer li rileggono, eventualmente come parte di un consumer group coordinato. Il client sgcWebSockets ti offre produzione, consumo, amministrazione dei topic e gestione degli offset da un singolo componente.
Caratteristiche
- Protocollo nativo: comunica con il broker usando il protocollo wire di Kafka su TCP. Nessun proxy REST di Confluent, nessun Java, nessuna DLL di terze parti.
- Producer: pubblica record su qualsiasi topic e partition, con una chiave e un valore, e con acknowledgement configurabili (none, leader, all) e timeout della richiesta.
- Consumer: legge i record sia con un consumer group gestito (il broker tiene traccia degli offset committati per ciascun gruppo) sia senza gruppo (legge direttamente tutte le partition di un topic).
- Consumer group: individuazione automatica del coordinator, join/sync, assegnazione delle partition e commit/fetch degli offset.
- Controllo degli offset: interroga gli offset più vecchi, più recenti e committati, e scegli da dove parte un nuovo gruppo (earliest o latest).
- Amministrazione dei topic: crea ed elimina topic, richiedi i metadati del cluster ed elenca i consumer group direttamente dal client.
- Formato record moderno: legge e scrive il formato di record batch v2 usato dai broker Kafka attuali.
- Guidato dagli eventi: gli eventi
OnKafkaConnect,OnKafkaMessage,OnKafkaProduce,OnKafkaErroreOnKafkaDisconnectriportano tutto ciò che accade sulla connessione.
Versioni di Kafka supportate
Il componente usa il formato di record batch v2 introdotto in Apache Kafka 0.11, ed è stato testato con Apache Kafka 3.x. Qualsiasi broker in quell'intervallo, o un broker Kafka-compatibile che parli le stesse versioni di protocollo, funzionerà. La porta predefinita del broker è 9092.
Configurazione
Il client Kafka è un componente di protocollo che gira sopra un TsgcWebSocketClient. Imposta Specifications.RFC6455 := False in modo che il client si connetta come socket TCP grezzo (il protocollo Kafka non è WebSocket), assegna il client al componente TsgcWSPClient_Kafka e imposta Host e Port del broker.
Il comportamento del producer e del consumer si configura tramite la proprietà KafkaOptions:
- ClientId: identifica la tua applicazione presso il broker.
- Producer.Acks:
kafkaAcksNone,kafkaAcksLeaderokafkaAcksAll. - Producer.TimeoutMs: per quanto tempo il broker attende gli acknowledgement richiesti.
- Consumer.GroupId: il nome del consumer group. Lascialo vuoto per consumare senza un gruppo.
- Consumer.OffsetReset:
kafkaOffsetEarliestokafkaOffsetLatest, da dove un gruppo nuovo di zecca inizia a leggere.
uses
sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;
var
oClient: TsgcWebSocketClient;
oKafka: TsgcWSPClient_Kafka;
begin
oClient := TsgcWebSocketClient.Create(nil);
oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
oClient.Host := '127.0.0.1';
oClient.Port := 9092;
oKafka := TsgcWSPClient_Kafka.Create(nil);
oKafka.Client := oClient;
oKafka.KafkaOptions.ClientId := 'my-delphi-app';
oKafka.OnKafkaConnect := OnKafkaConnect;
oKafka.OnKafkaMessage := OnKafkaMessage;
oKafka.OnKafkaProduce := OnKafkaProduce;
oClient.Active := True; // connect to the broker
end;
Produzione di messaggi
Una volta connesso, chiama Produce con il topic, il valore e una chiave e una partition opzionali. Il risultato di ogni pubblicazione viene riportato tramite l'evento OnKafkaProduce, incluso l'offset in cui il record è stato memorizzato.
oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
if aErrorCode = 0 then
Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;
Consumo di messaggi
Imposta un GroupId e un OffsetReset, iscriviti a uno o più topic e chiama Poll per recuperare i record disponibili. Poll restituisce un elenco TsgcKafkaMessages e solleva anche l'evento OnKafkaMessage per ciascun record. Chiama Poll ripetutamente (per esempio da un timer) per ricevere in modo continuo.
oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);
var
oMessages: TsgcKafkaMessages;
i: Integer;
begin
oMessages := oKafka.Poll(1000); // timeout in milliseconds
try
for i := 0 to oMessages.Count - 1 do
Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
oMessages[i].Offset, oMessages[i].GetValueString]));
finally
oMessages.Free;
end;
end;
Gli stessi dati vengono recapitati tramite l'evento OnKafkaMessage, comodo quando esegui il poll da un thread in background o da un timer:
procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
Log(Format('received key="%s" value="%s"',
[aMessage.GetKeyString, aMessage.GetValueString]));
end;
Se vuoi soltanto leggere un topic senza unirti a un gruppo, lascia GroupId vuoto: Subscribe assegna allora ogni partition del topic e Poll restituisce i record dalla posizione selezionata da OffsetReset.
Topic e offset
Il client può gestire i topic e ispezionare gli offset senza alcuno strumento esterno.
// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);
// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);
// commit the consumed offsets for the current group
oKafka.CommitSync;
Download
Il client Kafka è incluso in sgcWebSockets 2026.6.0. Una demo completa è inclusa in Demos/02.WebSocket_Protocols/13.Kafka, che mostra connessione, produzione, sottoscrizione, poll, amministrazione dei topic e gestione degli offset. Scarica sgcWebSockets per provarlo.
Domande, feedback o aiuto per iniziare? Contattaci — riceverai una risposta dalle persone che hanno scritto il codice.
