Desde sgcWebSockets 2026.6.0 hay disponible un cliente nativo de Apache Kafka para Delphi y C++Builder. El nuevo componente TsgcWSPClient_Kafka se comunica directamente con los brokers de Kafka sobre el protocolo binario nativo de Kafka (TCP en bruto), sin ninguna pasarela REST ni biblioteca externa de por medio.
Apache Kafka es una plataforma distribuida de streaming de eventos que se utiliza para publicar, almacenar y consumir flujos de registros. Los productores escriben mensajes en topics, los brokers los persisten en logs particionados de solo anexado (append-only), y los consumidores los vuelven a leer, opcionalmente como parte de un grupo de consumidores coordinado. El cliente de sgcWebSockets te ofrece producción, consumo, administración de topics y gestión de offsets desde un único componente.
Características
- Protocolo nativo: se comunica con el broker usando el protocolo de cable de Kafka sobre TCP. Sin proxy REST de Confluent, sin Java, sin DLL de terceros.
- Productor: publica registros en cualquier topic y partition, con una clave y un valor, y confirmaciones (acknowledgements) configurables (ninguna, líder, todas) y tiempo de espera de la petición.
- Consumidor: lee registros bien con un grupo de consumidores gestionado (el broker registra los offsets confirmados por grupo) o bien sin grupo (lee directamente todas las particiones de un topic).
- Grupos de consumidores: descubrimiento automático del coordinador, join/sync, asignación de particiones y confirmación/obtención de offsets.
- Control de offsets: consulta los offsets más antiguo, más reciente y confirmado, y elige dónde empieza un grupo nuevo (más antiguo o más reciente).
- Administración de topics: crea y elimina topics, solicita los metadatos del clúster y lista los grupos de consumidores directamente desde el cliente.
- Formato de registro moderno: lee y escribe el formato de lote de registros v2 que usan los brokers de Kafka actuales.
- Orientado a eventos: los eventos
OnKafkaConnect,OnKafkaMessage,OnKafkaProduce,OnKafkaErroryOnKafkaDisconnectinforman de todo lo que ocurre en la conexión.
Versiones de Kafka soportadas
El componente usa el formato de lote de registros v2 que se introdujo en Apache Kafka 0.11, y ha sido probado con Apache Kafka 3.x. Cualquier broker dentro de ese rango, o un broker compatible con Kafka que hable las mismas versiones del protocolo, funcionará. El puerto por defecto del broker es 9092.
Configuración
El cliente de Kafka es un componente de protocolo que se ejecuta sobre un TsgcWebSocketClient. Establece Specifications.RFC6455 := False para que el cliente se conecte como un socket TCP en bruto (el protocolo de Kafka no es WebSocket), asigna el cliente al componente TsgcWSPClient_Kafka y define el Host y el Port del broker.
El comportamiento del productor y del consumidor se configura a través de la propiedad KafkaOptions:
- ClientId: identifica tu aplicación ante el broker.
- Producer.Acks:
kafkaAcksNone,kafkaAcksLeaderokafkaAcksAll. - Producer.TimeoutMs: cuánto tiempo espera el broker las confirmaciones requeridas.
- Consumer.GroupId: el nombre del grupo de consumidores. Déjalo vacío para consumir sin grupo.
- Consumer.OffsetReset:
kafkaOffsetEarliestokafkaOffsetLatest, dónde empieza a leer un grupo totalmente nuevo.
uses
sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;
var
oClient: TsgcWebSocketClient;
oKafka: TsgcWSPClient_Kafka;
begin
oClient := TsgcWebSocketClient.Create(nil);
oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
oClient.Host := '127.0.0.1';
oClient.Port := 9092;
oKafka := TsgcWSPClient_Kafka.Create(nil);
oKafka.Client := oClient;
oKafka.KafkaOptions.ClientId := 'my-delphi-app';
oKafka.OnKafkaConnect := OnKafkaConnect;
oKafka.OnKafkaMessage := OnKafkaMessage;
oKafka.OnKafkaProduce := OnKafkaProduce;
oClient.Active := True; // connect to the broker
end;
Producir mensajes
Una vez conectado, llama a Produce con el topic, el valor y, opcionalmente, una clave y una partition. El resultado de cada publicación se informa a través del evento OnKafkaProduce, incluyendo el offset en el que se almacenó el registro.
oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
if aErrorCode = 0 then
Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;
Consumir mensajes
Define un GroupId y un OffsetReset, suscríbete a uno o varios topics y llama a Poll para obtener los registros que estén disponibles. Poll devuelve una lista TsgcKafkaMessages y también dispara el evento OnKafkaMessage por cada registro. Llama a Poll repetidamente (por ejemplo, desde un temporizador) para recibir de forma continua.
oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);
var
oMessages: TsgcKafkaMessages;
i: Integer;
begin
oMessages := oKafka.Poll(1000); // timeout in milliseconds
try
for i := 0 to oMessages.Count - 1 do
Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
oMessages[i].Offset, oMessages[i].GetValueString]));
finally
oMessages.Free;
end;
end;
Los mismos datos se entregan a través del evento OnKafkaMessage, lo cual es cómodo cuando haces el poll desde un hilo en segundo plano o un temporizador:
procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
Log(Format('received key="%s" value="%s"',
[aMessage.GetKeyString, aMessage.GetValueString]));
end;
Si solo quieres leer un topic sin unirte a un grupo, deja GroupId vacío: Subscribe asigna entonces todas las particiones del topic y Poll devuelve los registros desde la posición seleccionada por OffsetReset.
Topics y offsets
El cliente puede gestionar topics e inspeccionar offsets sin ninguna herramienta externa.
// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);
// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);
// commit the consumed offsets for the current group
oKafka.CommitSync;
Descarga
El cliente de Kafka se incluye con sgcWebSockets 2026.6.0. Se incluye una demo completa en Demos/02.WebSocket_Protocols/13.Kafka, que muestra conexión, producción, suscripción, poll, administración de topics y gestión de offsets. Descarga sgcWebSockets para probarlo.
¿Preguntas, comentarios o ayuda para empezar? Ponte en contacto — recibirás respuesta de las personas que escribieron el código.
