Client Apache Kafka per Delphi

· Componenti

Da sgcWebSockets 2026.6.0 è disponibile un client Apache Kafka nativo per Delphi e C++Builder. Il nuovo componente TsgcWSPClient_Kafka dialoga direttamente con i broker Kafka tramite il protocollo binario nativo di Kafka (TCP grezzo), senza alcun gateway REST o libreria esterna nel mezzo.

Apache Kafka è una piattaforma distribuita di event-streaming usata per pubblicare, memorizzare e consumare flussi di record. I producer scrivono messaggi sui topic, i broker li rendono persistenti in log partizionati e append-only, e i consumer li rileggono, eventualmente come parte di un consumer group coordinato. Il client sgcWebSockets ti offre produzione, consumo, amministrazione dei topic e gestione degli offset da un singolo componente.

Caratteristiche

Versioni di Kafka supportate

Il componente usa il formato di record batch v2 introdotto in Apache Kafka 0.11, ed è stato testato con Apache Kafka 3.x. Qualsiasi broker in quell'intervallo, o un broker Kafka-compatibile che parli le stesse versioni di protocollo, funzionerà. La porta predefinita del broker è 9092.

Configurazione

Il client Kafka è un componente di protocollo che gira sopra un TsgcWebSocketClient. Imposta Specifications.RFC6455 := False in modo che il client si connetta come socket TCP grezzo (il protocollo Kafka non è WebSocket), assegna il client al componente TsgcWSPClient_Kafka e imposta Host e Port del broker.

Il comportamento del producer e del consumer si configura tramite la proprietà KafkaOptions:

uses
  sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;

var
  oClient: TsgcWebSocketClient;
  oKafka: TsgcWSPClient_Kafka;
begin
  oClient := TsgcWebSocketClient.Create(nil);
  oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
  oClient.Host := '127.0.0.1';
  oClient.Port := 9092;

  oKafka := TsgcWSPClient_Kafka.Create(nil);
  oKafka.Client := oClient;
  oKafka.KafkaOptions.ClientId := 'my-delphi-app';
  oKafka.OnKafkaConnect := OnKafkaConnect;
  oKafka.OnKafkaMessage := OnKafkaMessage;
  oKafka.OnKafkaProduce := OnKafkaProduce;

  oClient.Active := True; // connect to the broker
end;

Produzione di messaggi

Una volta connesso, chiama Produce con il topic, il valore e una chiave e una partition opzionali. Il risultato di ogni pubblicazione viene riportato tramite l'evento OnKafkaProduce, incluso l'offset in cui il record è stato memorizzato.

oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
  aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
  if aErrorCode = 0 then
    Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;

Consumo di messaggi

Imposta un GroupId e un OffsetReset, iscriviti a uno o più topic e chiama Poll per recuperare i record disponibili. Poll restituisce un elenco TsgcKafkaMessages e solleva anche l'evento OnKafkaMessage per ciascun record. Chiama Poll ripetutamente (per esempio da un timer) per ricevere in modo continuo.

oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);

var
  oMessages: TsgcKafkaMessages;
  i: Integer;
begin
  oMessages := oKafka.Poll(1000); // timeout in milliseconds
  try
    for i := 0 to oMessages.Count - 1 do
      Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
        oMessages[i].Offset, oMessages[i].GetValueString]));
  finally
    oMessages.Free;
  end;
end;

Gli stessi dati vengono recapitati tramite l'evento OnKafkaMessage, comodo quando esegui il poll da un thread in background o da un timer:

procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
  Log(Format('received key="%s" value="%s"',
    [aMessage.GetKeyString, aMessage.GetValueString]));
end;

Se vuoi soltanto leggere un topic senza unirti a un gruppo, lascia GroupId vuoto: Subscribe assegna allora ogni partition del topic e Poll restituisce i record dalla posizione selezionata da OffsetReset.

Topic e offset

Il client può gestire i topic e ispezionare gli offset senza alcuno strumento esterno.

// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);

// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);

// commit the consumed offsets for the current group
oKafka.CommitSync;

Download

Il client Kafka è incluso in sgcWebSockets 2026.6.0. Una demo completa è inclusa in Demos/02.WebSocket_Protocols/13.Kafka, che mostra connessione, produzione, sottoscrizione, poll, amministrazione dei topic e gestione degli offset. Scarica sgcWebSockets per provarlo.

Domande, feedback o aiuto per iniziare? Contattaci — riceverai una risposta dalle persone che hanno scritto il codice.