Cliente Apache Kafka para Delphi

· Componentes

A partir do sgcWebSockets 2026.6.0 um cliente Apache Kafka nativo está disponível para Delphi e C++Builder. O novo componente TsgcWSPClient_Kafka se comunica com os brokers Kafka diretamente sobre o protocolo binário nativo do Kafka (TCP puro), sem nenhum gateway REST ou biblioteca externa intermediária.

O Apache Kafka é uma plataforma distribuída de streaming de eventos usada para publicar, armazenar e consumir fluxos de registros. Os produtores escrevem mensagens em topics, os brokers as persistem em logs particionados e append-only, e os consumidores as leem de volta, opcionalmente como parte de um grupo de consumidores coordenado. O cliente sgcWebSockets oferece produção, consumo, administração de topics e gerenciamento de offsets a partir de um único componente.

Recursos

Versões do Kafka suportadas

O componente usa o formato de batch de registro v2 que foi introduzido no Apache Kafka 0.11, e foi testado com o Apache Kafka 3.x. Qualquer broker nesse intervalo, ou um broker compatível com Kafka que fale as mesmas versões de protocolo, funcionará. A porta padrão do broker é 9092.

Configuração

O cliente Kafka é um componente de protocolo que roda sobre um TsgcWebSocketClient. Defina Specifications.RFC6455 := False para que o cliente se conecte como um socket TCP puro (o protocolo Kafka não é WebSocket), atribua o cliente ao componente TsgcWSPClient_Kafka, e defina o Host e a Port do broker.

O comportamento do produtor e do consumidor é configurado através da propriedade KafkaOptions:

uses
  sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;

var
  oClient: TsgcWebSocketClient;
  oKafka: TsgcWSPClient_Kafka;
begin
  oClient := TsgcWebSocketClient.Create(nil);
  oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
  oClient.Host := '127.0.0.1';
  oClient.Port := 9092;

  oKafka := TsgcWSPClient_Kafka.Create(nil);
  oKafka.Client := oClient;
  oKafka.KafkaOptions.ClientId := 'my-delphi-app';
  oKafka.OnKafkaConnect := OnKafkaConnect;
  oKafka.OnKafkaMessage := OnKafkaMessage;
  oKafka.OnKafkaProduce := OnKafkaProduce;

  oClient.Active := True; // connect to the broker
end;

Produzindo mensagens

Uma vez conectado, chame Produce com o topic, o valor e uma chave e partition opcionais. O resultado de cada publicação é reportado através do evento OnKafkaProduce, incluindo o offset em que o registro foi armazenado.

oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
  aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
  if aErrorCode = 0 then
    Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;

Consumindo mensagens

Defina um GroupId e um OffsetReset, inscreva-se em um ou mais topics e chame Poll para buscar os registros disponíveis. Poll retorna uma lista TsgcKafkaMessages e também dispara o evento OnKafkaMessage para cada registro. Chame Poll repetidamente (por exemplo, a partir de um timer) para receber continuamente.

oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);

var
  oMessages: TsgcKafkaMessages;
  i: Integer;
begin
  oMessages := oKafka.Poll(1000); // timeout in milliseconds
  try
    for i := 0 to oMessages.Count - 1 do
      Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
        oMessages[i].Offset, oMessages[i].GetValueString]));
  finally
    oMessages.Free;
  end;
end;

Os mesmos dados são entregues através do evento OnKafkaMessage, o que é conveniente quando você faz o poll a partir de uma thread em segundo plano ou de um timer:

procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
  Log(Format('received key="%s" value="%s"',
    [aMessage.GetKeyString, aMessage.GetValueString]));
end;

Se você quiser apenas ler um topic sem entrar em um grupo, deixe o GroupId vazio: Subscribe então atribui todas as partitions do topic e Poll retorna os registros a partir da posição selecionada por OffsetReset.

Topics e offsets

O cliente pode gerenciar topics e inspecionar offsets sem nenhuma ferramenta externa.

// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);

// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);

// commit the consumed offsets for the current group
oKafka.CommitSync;

Download

O cliente Kafka acompanha o sgcWebSockets 2026.6.0. Uma demo completa está incluída em Demos/02.WebSocket_Protocols/13.Kafka, mostrando conexão, produção, inscrição, poll, administração de topics e gerenciamento de offsets. Baixe o sgcWebSockets para experimentá-lo.

Dúvidas, comentários ou ajuda para começar? Entre em contato — você receberá uma resposta das pessoas que escreveram o código.