Cliente Apache Kafka

Cliente Apache Kafka nativo para Delphi, C++Builder y .NET. Dialogue con el broker directamente sobre el protocolo de cable de Kafka en TCP puro, sin proxy REST ni librería externa: produzca y consuma registros, coordine grupos de consumidores, gestione offsets y administre topics.

Cliente del subprotocolo Apache Kafka

Una implementación de Kafka de primer nivel que se ejecuta allí donde se ejecute el runtime de Delphi / .NET — desde servicios de escritorio hasta dispositivos móviles, transmitiendo registros hacia y desde cualquier broker Kafka estándar.

Clase del componente

TsgcWSPClient_Kafka

Protocolo

Protocolo de cable Apache Kafka sobre TCP

Plataformas

Windows, macOS, Linux, iOS, Android

Edición

Standard / Professional / Enterprise

Produzca, consuma y administre Kafka

Todo lo necesario para transmitir registros sobre el protocolo nativo de Kafka, expuesto a través de métodos y eventos Delphi / .NET sencillos.

Producir

Llame a Produce(topic, value, key, partition) para publicar un registro con clave y partición opcionales, o ProduceBytes para un payload binario. Elija la garantía de entrega mediante KafkaOptions.Producer.Acks (kafkaAcksNone, kafkaAcksLeader o kafkaAcksAll); cada resultado llega en OnKafkaProduce con el topic, la partición y el offset almacenado.

Consumir

Subscribe([topic]) y luego Poll(timeoutMs) obtiene los registros disponibles, devolviendo una lista TsgcKafkaMessages y disparando OnKafkaMessage por registro. Lea cada uno con GetKeyString, GetValueString, Topic, Partition y Offset. Deje GroupId vacío para leer todas las particiones directamente sin grupo.

Coordinación de grupos de consumidores

Establezca KafkaOptions.Consumer.GroupId y el cliente realiza el descubrimiento del coordinador, el join y el sync, la asignación de particiones y el commit/fetch de offsets de forma automática. Llame a CommitSync para confirmar los offsets del último Poll de modo que el grupo reanude a continuación en la siguiente sesión.

Gestión de offsets

GetEarliestOffset, GetLatestOffset y GetCommittedOffset devuelven el offset más antiguo, el siguiente a escribir y el último confirmado por topic y partición. CommitOffset(topic, partition, offset) fija una posición explícita para que el consumo se reanude exactamente donde usted quiera.

Administración de topics

CreateTopic(name, partitions, replication) y DeleteTopic(name) gestionan los topics en el broker, mientras que GetMetadata([topics]) devuelve la disposición del clúster y las particiones (pase un array vacío para todo el clúster). El cliente también puede listar grupos de consumidores.

Formato de batch de registros v2

Lee y escribe el formato de batch de registros v2 introducido en Apache Kafka 0.11, con compresión kafkaCompressionGzip opcional. Probado contra Apache Kafka 3.x en el puerto de broker por defecto 9092.

Coloque el componente, configure unas pocas propiedades y listo

Empareje un TsgcWebSocketClient con TsgcWSPClient_Kafka, establezca Specifications.RFC6455 a false para TCP puro, configure KafkaOptions, luego Produce y Poll.

uses
  sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client;

var
  WSClient: TsgcWebSocketClient;
  Kafka: TsgcWSPClient_Kafka;
begin
  Kafka := TsgcWSPClient_Kafka.Create(nil);
  Kafka.KafkaOptions.ClientId := 'my-delphi-app';
  Kafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
  Kafka.KafkaOptions.Consumer.GroupId := 'my-group';
  Kafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
  Kafka.OnKafkaMessage := KafkaMessage;
  Kafka.OnKafkaProduce := KafkaProduce;

  WSClient := TsgcWebSocketClient.Create(nil);
  Kafka.Client := WSClient;
  WSClient.Specifications.RFC6455 := False;
  WSClient.Host := '127.0.0.1';
  WSClient.Port := 9092;
  WSClient.Active := True;

  // producir un registro en un topic
  Kafka.Produce('my-topic', 'Hello Kafka', 'key-1');

  // consumir: suscribirse una vez, luego Poll repetidamente (p. ej. desde un timer)
  Kafka.Subscribe(['my-topic']);
end;

procedure TForm1.KafkaMessage(Sender: TObject;
  const Message: TsgcKafkaMessage);
begin
  Memo1.Lines.Add(Message.GetKeyString + ' = ' + Message.GetValueString);
end;

procedure TForm1.KafkaProduce(Sender: TObject; const Topic: string;
  Partition: Integer; Offset: Int64; ErrorCode: Integer);
begin
  Memo1.Lines.Add(Format('Produced to %s [%d] at offset %d',
    [Topic, Partition, Offset]));
end;

// obtener registros, confirmar cuando se procesa un batch
var
  Messages: TsgcKafkaMessages;
begin
  Messages := Kafka.Poll(1000);
  try
    if Messages.Count > 0 then
      Kafka.CommitSync;
  finally
    Messages.Free;
  end;
end;
// uses: sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client
TsgcWSPClient_Kafka *Kafka = new TsgcWSPClient_Kafka(this);
Kafka->KafkaOptions->ClientId = "my-cbuilder-app";
Kafka->KafkaOptions->Producer->Acks = kafkaAcksLeader;
Kafka->KafkaOptions->Consumer->GroupId = "my-group";
Kafka->KafkaOptions->Consumer->OffsetReset = kafkaOffsetEarliest;
Kafka->OnKafkaMessage = KafkaMessage;
Kafka->OnKafkaProduce = KafkaProduce;

TsgcWebSocketClient *WSClient = new TsgcWebSocketClient(this);
Kafka->Client = WSClient;
WSClient->Specifications->RFC6455 = false;
WSClient->Host = "127.0.0.1";
WSClient->Port = 9092;
WSClient->Active = true;

// producir un registro en un topic
Kafka->Produce("my-topic", "Hello Kafka", "key-1");

// consumir: suscribirse una vez, luego Poll repetidamente
Kafka->Subscribe(ARRAYOFCONST(("my-topic")));

void __fastcall TForm1::KafkaMessage(TObject *Sender, TsgcKafkaMessage *Message)
{
  Memo1->Lines->Add(Message->GetKeyString() + " = " + Message->GetValueString());
}

// obtener registros, confirmar cuando se procesa un batch
TsgcKafkaMessages *Messages = Kafka->Poll(1000);
try {
  if (Messages->Count > 0)
    Kafka->CommitSync();
} __finally {
  Messages->Free();
}
using esegece.sgcWebSockets;

var Kafka = new TsgcWSPClient_Kafka();
Kafka.KafkaOptions.ClientId = "my-net-app";
Kafka.KafkaOptions.Producer.Acks = TsgcKafkaAcks.kafkaAcksLeader;
Kafka.KafkaOptions.Consumer.GroupId = "my-group";
Kafka.KafkaOptions.Consumer.OffsetReset = TsgcKafkaOffsetReset.kafkaOffsetEarliest;
Kafka.OnKafkaMessage += (sender, message) =>
  Console.WriteLine(message.GetKeyString() + " = " + message.GetValueString());
Kafka.OnKafkaProduce += (sender, topic, partition, offset, error) =>
  Console.WriteLine($"Produced to {topic} [{partition}] at offset {offset}");

var WSClient = new TsgcWebSocketClient();
Kafka.Client = WSClient;
WSClient.Specifications.RFC6455 = false;
WSClient.Host = "127.0.0.1";
WSClient.Port = 9092;
WSClient.Active = true;

// producir un registro en un topic
Kafka.Produce("my-topic", "Hello Kafka", "key-1");

// consumir: suscribirse una vez, luego Poll repetidamente
Kafka.Subscribe(new string[] { "my-topic" });

TsgcKafkaMessages Messages = Kafka.Poll(1000);
try {
  if (Messages.Count > 0)
    Kafka.CommitSync();
} finally {
  Messages.Free();
}

Especificaciones y referencias

Fuentes autorizadas de los protocolos que implementa este componente.

Documentación y demos

Acceda directamente a la referencia del componente, obtenga el proyecto de demo listo para ejecutar y descargue la versión de prueba.

Ayuda en línea — Kafka Referencia completa de propiedades, métodos y eventos de este componente.
Proyecto de demo — Demos\Protocols\Kafka Proyecto de ejemplo listo para ejecutar. Se incluye dentro del paquete sgcWebSockets — descargue la versión de prueba más abajo.
Manual de usuario (PDF) Manual exhaustivo que cubre todos los componentes de la biblioteca.

¿Listo para transmitir con Apache Kafka?

Descargue la versión de prueba gratuita y empiece a construir soluciones de event-streaming en minutos.