Depuis sgcWebSockets 2026.6.0, un client Apache Kafka natif est disponible pour Delphi et C++Builder. Le nouveau composant TsgcWSPClient_Kafka dialogue directement avec les brokers Kafka via le protocole binaire natif de Kafka (TCP brut), sans passerelle REST ni bibliothèque externe intermédiaire.
Apache Kafka est une plateforme distribuée de diffusion d'événements utilisée pour publier, stocker et consommer des flux d'enregistrements. Les producteurs écrivent des messages dans des topics, les brokers les conservent dans des journaux partitionnés et en mode ajout uniquement, et les consommateurs les relisent, éventuellement au sein d'un groupe de consommateurs coordonné. Le client sgcWebSockets vous offre la production, la consommation, l'administration des topics et la gestion des offsets depuis un seul composant.
Fonctionnalités
- Protocole natif : communique avec le broker en utilisant le protocole de transmission Kafka sur TCP. Aucun proxy REST Confluent, aucun Java, aucune DLL tierce.
- Producteur : publie des enregistrements dans n'importe quel topic et partition, avec une clé et une valeur, et des accusés de réception configurables (aucun, leader, tous) et un délai d'expiration de requête.
- Consommateur : lit les enregistrements soit avec un groupe de consommateurs géré (le broker suit les offsets validés par groupe), soit sans groupe (lecture directe de toutes les partitions d'un topic).
- Groupes de consommateurs : découverte automatique du coordinateur, jonction/synchronisation, attribution des partitions et validation/récupération des offsets.
- Contrôle des offsets : interrogez les offsets les plus anciens, les plus récents et validés, et choisissez où démarre un nouveau groupe (le plus ancien ou le plus récent).
- Administration des topics : créez et supprimez des topics, demandez les métadonnées du cluster et listez les groupes de consommateurs directement depuis le client.
- Format d'enregistrement moderne : lit et écrit le format de lot d'enregistrements v2 utilisé par les brokers Kafka actuels.
- Piloté par événements : les événements
OnKafkaConnect,OnKafkaMessage,OnKafkaProduce,OnKafkaErroretOnKafkaDisconnectsignalent tout ce qui se passe sur la connexion.
Versions de Kafka prises en charge
Le composant utilise le format de lot d'enregistrements v2 introduit dans Apache Kafka 0.11, et il a été testé avec Apache Kafka 3.x. Tout broker dans cette plage, ou un broker compatible Kafka parlant les mêmes versions de protocole, fonctionnera. Le port de broker par défaut est 9092.
Configuration
Le client Kafka est un composant de protocole qui s'exécute au-dessus d'un TsgcWebSocketClient. Définissez Specifications.RFC6455 := False afin que le client se connecte comme un socket TCP brut (le protocole Kafka n'est pas WebSocket), affectez le client au composant TsgcWSPClient_Kafka, et définissez le Host et le Port du broker.
Le comportement du producteur et du consommateur se configure via la propriété KafkaOptions :
- ClientId : identifie votre application auprès du broker.
- Producer.Acks :
kafkaAcksNone,kafkaAcksLeaderoukafkaAcksAll. - Producer.TimeoutMs : durée pendant laquelle le broker attend les accusés de réception requis.
- Consumer.GroupId : le nom du groupe de consommateurs. Laissez-le vide pour consommer sans groupe.
- Consumer.OffsetReset :
kafkaOffsetEarliestoukafkaOffsetLatest, où un tout nouveau groupe commence à lire.
uses
sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;
var
oClient: TsgcWebSocketClient;
oKafka: TsgcWSPClient_Kafka;
begin
oClient := TsgcWebSocketClient.Create(nil);
oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
oClient.Host := '127.0.0.1';
oClient.Port := 9092;
oKafka := TsgcWSPClient_Kafka.Create(nil);
oKafka.Client := oClient;
oKafka.KafkaOptions.ClientId := 'my-delphi-app';
oKafka.OnKafkaConnect := OnKafkaConnect;
oKafka.OnKafkaMessage := OnKafkaMessage;
oKafka.OnKafkaProduce := OnKafkaProduce;
oClient.Active := True; // connect to the broker
end;
Production de messages
Une fois connecté, appelez Produce avec le topic, la valeur et, en option, une clé et une partition. Le résultat de chaque publication est signalé via l'événement OnKafkaProduce, y compris l'offset auquel l'enregistrement a été stocké.
oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
if aErrorCode = 0 then
Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;
Consommation de messages
Définissez un GroupId et un OffsetReset, abonnez-vous à un ou plusieurs topics et appelez Poll pour récupérer les enregistrements disponibles. Poll renvoie une liste TsgcKafkaMessages et déclenche également l'événement OnKafkaMessage pour chaque enregistrement. Appelez Poll à plusieurs reprises (par exemple depuis un timer) pour recevoir en continu.
oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);
var
oMessages: TsgcKafkaMessages;
i: Integer;
begin
oMessages := oKafka.Poll(1000); // timeout in milliseconds
try
for i := 0 to oMessages.Count - 1 do
Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
oMessages[i].Offset, oMessages[i].GetValueString]));
finally
oMessages.Free;
end;
end;
Les mêmes données sont fournies via l'événement OnKafkaMessage, ce qui est pratique lorsque vous interrogez depuis un thread en arrière-plan ou un timer :
procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
Log(Format('received key="%s" value="%s"',
[aMessage.GetKeyString, aMessage.GetValueString]));
end;
Si vous souhaitez seulement lire un topic sans rejoindre un groupe, laissez GroupId vide : Subscribe attribue alors chaque partition du topic et Poll renvoie les enregistrements depuis la position sélectionnée par OffsetReset.
Topics et offsets
Le client peut gérer les topics et inspecter les offsets sans aucun outil externe.
// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);
// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);
// commit the consumed offsets for the current group
oKafka.CommitSync;
Téléchargement
Le client Kafka est livré avec sgcWebSockets 2026.6.0. Une démo complète est incluse dans Demos/02.WebSocket_Protocols/13.Kafka, montrant la connexion, la production, l'abonnement, le polling, l'administration des topics et la gestion des offsets. Téléchargez sgcWebSockets pour l'essayer.
Des questions, des retours ou besoin d'aide pour démarrer ? Contactez-nous — vous recevrez une réponse des personnes qui ont écrit le code.
