Client Apache Kafka pour Delphi

· Composants

Depuis sgcWebSockets 2026.6.0, un client Apache Kafka natif est disponible pour Delphi et C++Builder. Le nouveau composant TsgcWSPClient_Kafka dialogue directement avec les brokers Kafka via le protocole binaire natif de Kafka (TCP brut), sans passerelle REST ni bibliothèque externe intermédiaire.

Apache Kafka est une plateforme distribuée de diffusion d'événements utilisée pour publier, stocker et consommer des flux d'enregistrements. Les producteurs écrivent des messages dans des topics, les brokers les conservent dans des journaux partitionnés et en mode ajout uniquement, et les consommateurs les relisent, éventuellement au sein d'un groupe de consommateurs coordonné. Le client sgcWebSockets vous offre la production, la consommation, l'administration des topics et la gestion des offsets depuis un seul composant.

Fonctionnalités

Versions de Kafka prises en charge

Le composant utilise le format de lot d'enregistrements v2 introduit dans Apache Kafka 0.11, et il a été testé avec Apache Kafka 3.x. Tout broker dans cette plage, ou un broker compatible Kafka parlant les mêmes versions de protocole, fonctionnera. Le port de broker par défaut est 9092.

Configuration

Le client Kafka est un composant de protocole qui s'exécute au-dessus d'un TsgcWebSocketClient. Définissez Specifications.RFC6455 := False afin que le client se connecte comme un socket TCP brut (le protocole Kafka n'est pas WebSocket), affectez le client au composant TsgcWSPClient_Kafka, et définissez le Host et le Port du broker.

Le comportement du producteur et du consommateur se configure via la propriété KafkaOptions :

uses
  sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;

var
  oClient: TsgcWebSocketClient;
  oKafka: TsgcWSPClient_Kafka;
begin
  oClient := TsgcWebSocketClient.Create(nil);
  oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
  oClient.Host := '127.0.0.1';
  oClient.Port := 9092;

  oKafka := TsgcWSPClient_Kafka.Create(nil);
  oKafka.Client := oClient;
  oKafka.KafkaOptions.ClientId := 'my-delphi-app';
  oKafka.OnKafkaConnect := OnKafkaConnect;
  oKafka.OnKafkaMessage := OnKafkaMessage;
  oKafka.OnKafkaProduce := OnKafkaProduce;

  oClient.Active := True; // connect to the broker
end;

Production de messages

Une fois connecté, appelez Produce avec le topic, la valeur et, en option, une clé et une partition. Le résultat de chaque publication est signalé via l'événement OnKafkaProduce, y compris l'offset auquel l'enregistrement a été stocké.

oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
  aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
  if aErrorCode = 0 then
    Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;

Consommation de messages

Définissez un GroupId et un OffsetReset, abonnez-vous à un ou plusieurs topics et appelez Poll pour récupérer les enregistrements disponibles. Poll renvoie une liste TsgcKafkaMessages et déclenche également l'événement OnKafkaMessage pour chaque enregistrement. Appelez Poll à plusieurs reprises (par exemple depuis un timer) pour recevoir en continu.

oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);

var
  oMessages: TsgcKafkaMessages;
  i: Integer;
begin
  oMessages := oKafka.Poll(1000); // timeout in milliseconds
  try
    for i := 0 to oMessages.Count - 1 do
      Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
        oMessages[i].Offset, oMessages[i].GetValueString]));
  finally
    oMessages.Free;
  end;
end;

Les mêmes données sont fournies via l'événement OnKafkaMessage, ce qui est pratique lorsque vous interrogez depuis un thread en arrière-plan ou un timer :

procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
  Log(Format('received key="%s" value="%s"',
    [aMessage.GetKeyString, aMessage.GetValueString]));
end;

Si vous souhaitez seulement lire un topic sans rejoindre un groupe, laissez GroupId vide : Subscribe attribue alors chaque partition du topic et Poll renvoie les enregistrements depuis la position sélectionnée par OffsetReset.

Topics et offsets

Le client peut gérer les topics et inspecter les offsets sans aucun outil externe.

// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);

// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);

// commit the consumed offsets for the current group
oKafka.CommitSync;

Téléchargement

Le client Kafka est livré avec sgcWebSockets 2026.6.0. Une démo complète est incluse dans Demos/02.WebSocket_Protocols/13.Kafka, montrant la connexion, la production, l'abonnement, le polling, l'administration des topics et la gestion des offsets. Téléchargez sgcWebSockets pour l'essayer.

Des questions, des retours ou besoin d'aide pour démarrer ? Contactez-nous — vous recevrez une réponse des personnes qui ont écrit le code.