A partir do sgcWebSockets 2026.6.0 um cliente Apache Kafka nativo está disponível para Delphi e C++Builder. O novo componente TsgcWSPClient_Kafka se comunica com os brokers Kafka diretamente sobre o protocolo binário nativo do Kafka (TCP puro), sem nenhum gateway REST ou biblioteca externa intermediária.
O Apache Kafka é uma plataforma distribuída de streaming de eventos usada para publicar, armazenar e consumir fluxos de registros. Os produtores escrevem mensagens em topics, os brokers as persistem em logs particionados e append-only, e os consumidores as leem de volta, opcionalmente como parte de um grupo de consumidores coordenado. O cliente sgcWebSockets oferece produção, consumo, administração de topics e gerenciamento de offsets a partir de um único componente.
Recursos
- Protocolo nativo: comunica-se com o broker usando o protocolo de fio do Kafka sobre TCP. Sem proxy REST do Confluent, sem Java, sem DLLs de terceiros.
- Produtor: publica registros em qualquer topic e partition, com uma chave e um valor, e confirmações configuráveis (nenhuma, leader, all) e timeout de requisição.
- Consumidor: lê registros com um grupo de consumidores gerenciado (o broker rastreia os offsets confirmados por grupo) ou sem grupo (lê todas as partitions de um topic diretamente).
- Grupos de consumidores: descoberta automática do coordenador, join/sync, atribuição de partitions e commit/fetch de offsets.
- Controle de offset: consulte os offsets mais antigo, mais recente e confirmado, e escolha onde um novo grupo começa (earliest ou latest).
- Administração de topics: crie e exclua topics, solicite metadados do cluster e liste grupos de consumidores diretamente do cliente.
- Formato de registro moderno: lê e escreve o formato de batch de registro v2 usado pelos brokers Kafka atuais.
- Orientado a eventos: os eventos
OnKafkaConnect,OnKafkaMessage,OnKafkaProduce,OnKafkaErroreOnKafkaDisconnectreportam tudo o que acontece na conexão.
Versões do Kafka suportadas
O componente usa o formato de batch de registro v2 que foi introduzido no Apache Kafka 0.11, e foi testado com o Apache Kafka 3.x. Qualquer broker nesse intervalo, ou um broker compatível com Kafka que fale as mesmas versões de protocolo, funcionará. A porta padrão do broker é 9092.
Configuração
O cliente Kafka é um componente de protocolo que roda sobre um TsgcWebSocketClient. Defina Specifications.RFC6455 := False para que o cliente se conecte como um socket TCP puro (o protocolo Kafka não é WebSocket), atribua o cliente ao componente TsgcWSPClient_Kafka, e defina o Host e a Port do broker.
O comportamento do produtor e do consumidor é configurado através da propriedade KafkaOptions:
- ClientId: identifica sua aplicação para o broker.
- Producer.Acks:
kafkaAcksNone,kafkaAcksLeaderoukafkaAcksAll. - Producer.TimeoutMs: por quanto tempo o broker aguarda as confirmações necessárias.
- Consumer.GroupId: o nome do grupo de consumidores. Deixe-o vazio para consumir sem um grupo.
- Consumer.OffsetReset:
kafkaOffsetEarliestoukafkaOffsetLatest, de onde um grupo totalmente novo começa a ler.
uses
sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;
var
oClient: TsgcWebSocketClient;
oKafka: TsgcWSPClient_Kafka;
begin
oClient := TsgcWebSocketClient.Create(nil);
oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
oClient.Host := '127.0.0.1';
oClient.Port := 9092;
oKafka := TsgcWSPClient_Kafka.Create(nil);
oKafka.Client := oClient;
oKafka.KafkaOptions.ClientId := 'my-delphi-app';
oKafka.OnKafkaConnect := OnKafkaConnect;
oKafka.OnKafkaMessage := OnKafkaMessage;
oKafka.OnKafkaProduce := OnKafkaProduce;
oClient.Active := True; // connect to the broker
end;
Produzindo mensagens
Uma vez conectado, chame Produce com o topic, o valor e uma chave e partition opcionais. O resultado de cada publicação é reportado através do evento OnKafkaProduce, incluindo o offset em que o registro foi armazenado.
oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
if aErrorCode = 0 then
Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;
Consumindo mensagens
Defina um GroupId e um OffsetReset, inscreva-se em um ou mais topics e chame Poll para buscar os registros disponíveis. Poll retorna uma lista TsgcKafkaMessages e também dispara o evento OnKafkaMessage para cada registro. Chame Poll repetidamente (por exemplo, a partir de um timer) para receber continuamente.
oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);
var
oMessages: TsgcKafkaMessages;
i: Integer;
begin
oMessages := oKafka.Poll(1000); // timeout in milliseconds
try
for i := 0 to oMessages.Count - 1 do
Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
oMessages[i].Offset, oMessages[i].GetValueString]));
finally
oMessages.Free;
end;
end;
Os mesmos dados são entregues através do evento OnKafkaMessage, o que é conveniente quando você faz o poll a partir de uma thread em segundo plano ou de um timer:
procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
Log(Format('received key="%s" value="%s"',
[aMessage.GetKeyString, aMessage.GetValueString]));
end;
Se você quiser apenas ler um topic sem entrar em um grupo, deixe o GroupId vazio: Subscribe então atribui todas as partitions do topic e Poll retorna os registros a partir da posição selecionada por OffsetReset.
Topics e offsets
O cliente pode gerenciar topics e inspecionar offsets sem nenhuma ferramenta externa.
// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);
// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);
// commit the consumed offsets for the current group
oKafka.CommitSync;
Download
O cliente Kafka acompanha o sgcWebSockets 2026.6.0. Uma demo completa está incluída em Demos/02.WebSocket_Protocols/13.Kafka, mostrando conexão, produção, inscrição, poll, administração de topics e gerenciamento de offsets. Baixe o sgcWebSockets para experimentá-lo.
Dúvidas, comentários ou ajuda para começar? Entre em contato — você receberá uma resposta das pessoas que escreveram o código.
