Cliente Apache Kafka para Delphi

· Componentes

Desde sgcWebSockets 2026.6.0 hay disponible un cliente nativo de Apache Kafka para Delphi y C++Builder. El nuevo componente TsgcWSPClient_Kafka se comunica directamente con los brokers de Kafka sobre el protocolo binario nativo de Kafka (TCP en bruto), sin ninguna pasarela REST ni biblioteca externa de por medio.

Apache Kafka es una plataforma distribuida de streaming de eventos que se utiliza para publicar, almacenar y consumir flujos de registros. Los productores escriben mensajes en topics, los brokers los persisten en logs particionados de solo anexado (append-only), y los consumidores los vuelven a leer, opcionalmente como parte de un grupo de consumidores coordinado. El cliente de sgcWebSockets te ofrece producción, consumo, administración de topics y gestión de offsets desde un único componente.

Características

Versiones de Kafka soportadas

El componente usa el formato de lote de registros v2 que se introdujo en Apache Kafka 0.11, y ha sido probado con Apache Kafka 3.x. Cualquier broker dentro de ese rango, o un broker compatible con Kafka que hable las mismas versiones del protocolo, funcionará. El puerto por defecto del broker es 9092.

Configuración

El cliente de Kafka es un componente de protocolo que se ejecuta sobre un TsgcWebSocketClient. Establece Specifications.RFC6455 := False para que el cliente se conecte como un socket TCP en bruto (el protocolo de Kafka no es WebSocket), asigna el cliente al componente TsgcWSPClient_Kafka y define el Host y el Port del broker.

El comportamiento del productor y del consumidor se configura a través de la propiedad KafkaOptions:

uses
  sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;

var
  oClient: TsgcWebSocketClient;
  oKafka: TsgcWSPClient_Kafka;
begin
  oClient := TsgcWebSocketClient.Create(nil);
  oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
  oClient.Host := '127.0.0.1';
  oClient.Port := 9092;

  oKafka := TsgcWSPClient_Kafka.Create(nil);
  oKafka.Client := oClient;
  oKafka.KafkaOptions.ClientId := 'my-delphi-app';
  oKafka.OnKafkaConnect := OnKafkaConnect;
  oKafka.OnKafkaMessage := OnKafkaMessage;
  oKafka.OnKafkaProduce := OnKafkaProduce;

  oClient.Active := True; // connect to the broker
end;

Producir mensajes

Una vez conectado, llama a Produce con el topic, el valor y, opcionalmente, una clave y una partition. El resultado de cada publicación se informa a través del evento OnKafkaProduce, incluyendo el offset en el que se almacenó el registro.

oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
  aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
  if aErrorCode = 0 then
    Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;

Consumir mensajes

Define un GroupId y un OffsetReset, suscríbete a uno o varios topics y llama a Poll para obtener los registros que estén disponibles. Poll devuelve una lista TsgcKafkaMessages y también dispara el evento OnKafkaMessage por cada registro. Llama a Poll repetidamente (por ejemplo, desde un temporizador) para recibir de forma continua.

oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);

var
  oMessages: TsgcKafkaMessages;
  i: Integer;
begin
  oMessages := oKafka.Poll(1000); // timeout in milliseconds
  try
    for i := 0 to oMessages.Count - 1 do
      Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
        oMessages[i].Offset, oMessages[i].GetValueString]));
  finally
    oMessages.Free;
  end;
end;

Los mismos datos se entregan a través del evento OnKafkaMessage, lo cual es cómodo cuando haces el poll desde un hilo en segundo plano o un temporizador:

procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
  Log(Format('received key="%s" value="%s"',
    [aMessage.GetKeyString, aMessage.GetValueString]));
end;

Si solo quieres leer un topic sin unirte a un grupo, deja GroupId vacío: Subscribe asigna entonces todas las particiones del topic y Poll devuelve los registros desde la posición seleccionada por OffsetReset.

Topics y offsets

El cliente puede gestionar topics e inspeccionar offsets sin ninguna herramienta externa.

// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);

// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);

// commit the consumed offsets for the current group
oKafka.CommitSync;

Descarga

El cliente de Kafka se incluye con sgcWebSockets 2026.6.0. Se incluye una demo completa en Demos/02.WebSocket_Protocols/13.Kafka, que muestra conexión, producción, suscripción, poll, administración de topics y gestión de offsets. Descarga sgcWebSockets para probarlo.

¿Preguntas, comentarios o ayuda para empezar? Ponte en contacto — recibirás respuesta de las personas que escribieron el código.