Cliente Apache Kafka

Cliente Apache Kafka nativo para Delphi, C++Builder e .NET. Converse diretamente com o broker pelo protocolo de fio do Kafka sobre TCP puro, sem proxy REST nem biblioteca externa: produza e consuma registros, coordene grupos de consumidores, gerencie offsets e administre tópicos.

Cliente do subprotocolo Apache Kafka

Uma implementação Kafka de primeira classe que roda em qualquer lugar onde o runtime Delphi / .NET roda — de serviços desktop a dispositivos móveis, transmitindo registros de e para qualquer broker Kafka padrão.

Classe do componente

TsgcWSPClient_Kafka

Protocolo

Protocolo de fio do Apache Kafka sobre TCP

Plataformas

Windows, macOS, Linux, iOS, Android

Edição

Standard / Professional / Enterprise

Produza, consuma e administre o Kafka

Tudo o que você precisa para transmitir registros pelo protocolo nativo do Kafka, exposto por meio de métodos e eventos Delphi / .NET comuns.

Produzir

Chame Produce(topic, value, key, partition) para publicar um registro com chave e partição opcionais, ou ProduceBytes para um payload binário. Escolha a garantia de entrega por meio de KafkaOptions.Producer.Acks (kafkaAcksNone, kafkaAcksLeader ou kafkaAcksAll); cada resultado chega em OnKafkaProduce com o tópico, a partição e o offset armazenado.

Consumir

Subscribe([topic]) e então Poll(timeoutMs) busca os registros disponíveis, retornando uma lista TsgcKafkaMessages e disparando OnKafkaMessage por registro. Leia cada um com GetKeyString, GetValueString, Topic, Partition e Offset. Deixe GroupId vazio para ler todas as partições diretamente sem um grupo.

Coordenação de grupos de consumidores

Defina KafkaOptions.Consumer.GroupId e o cliente executa a descoberta do coordenador, join e sync, atribuição de partições e commit/fetch de offsets automaticamente. Chame CommitSync para confirmar os offsets do último Poll para que o grupo retome a partir deles na próxima sessão.

Gerenciamento de offsets

GetEarliestOffset, GetLatestOffset e GetCommittedOffset retornam o offset mais antigo, o próximo a ser escrito e o último confirmado por tópico e partição. CommitOffset(topic, partition, offset) define uma posição explícita para que o consumo retome exatamente onde você quiser.

Administração de tópicos

CreateTopic(name, partitions, replication) e DeleteTopic(name) gerenciam tópicos no broker, enquanto GetMetadata([topics]) retorna o layout do cluster e das partições (passe um array vazio para o cluster inteiro). O cliente também pode listar grupos de consumidores.

Formato de batch de registros v2

Lê e escreve o formato de batch de registros v2 introduzido no Apache Kafka 0.11, com compressão kafkaCompressionGzip opcional. Testado contra o Apache Kafka 3.x na porta padrão do broker 9092.

Solte o componente, configure algumas propriedades, pronto

Combine um TsgcWebSocketClient com TsgcWSPClient_Kafka, defina Specifications.RFC6455 como false para TCP puro, configure KafkaOptions e então Produce e Poll.

uses
  sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client;

var
  WSClient: TsgcWebSocketClient;
  Kafka: TsgcWSPClient_Kafka;
begin
  Kafka := TsgcWSPClient_Kafka.Create(nil);
  Kafka.KafkaOptions.ClientId := 'my-delphi-app';
  Kafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
  Kafka.KafkaOptions.Consumer.GroupId := 'my-group';
  Kafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
  Kafka.OnKafkaMessage := KafkaMessage;
  Kafka.OnKafkaProduce := KafkaProduce;

  WSClient := TsgcWebSocketClient.Create(nil);
  Kafka.Client := WSClient;
  WSClient.Specifications.RFC6455 := False;
  WSClient.Host := '127.0.0.1';
  WSClient.Port := 9092;
  WSClient.Active := True;

  // produz um registro em um tópico
  Kafka.Produce('my-topic', 'Hello Kafka', 'key-1');

  // consome: faça Subscribe uma vez, depois Poll repetidamente (ex.: a partir de um timer)
  Kafka.Subscribe(['my-topic']);
end;

procedure TForm1.KafkaMessage(Sender: TObject;
  const Message: TsgcKafkaMessage);
begin
  Memo1.Lines.Add(Message.GetKeyString + ' = ' + Message.GetValueString);
end;

procedure TForm1.KafkaProduce(Sender: TObject; const Topic: string;
  Partition: Integer; Offset: Int64; ErrorCode: Integer);
begin
  Memo1.Lines.Add(Format('Produced to %s [%d] at offset %d',
    [Topic, Partition, Offset]));
end;

// busca registros, confirma quando um lote é processado
var
  Messages: TsgcKafkaMessages;
begin
  Messages := Kafka.Poll(1000);
  try
    if Messages.Count > 0 then
      Kafka.CommitSync;
  finally
    Messages.Free;
  end;
end;
// uses: sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client
TsgcWSPClient_Kafka *Kafka = new TsgcWSPClient_Kafka(this);
Kafka->KafkaOptions->ClientId = "my-cbuilder-app";
Kafka->KafkaOptions->Producer->Acks = kafkaAcksLeader;
Kafka->KafkaOptions->Consumer->GroupId = "my-group";
Kafka->KafkaOptions->Consumer->OffsetReset = kafkaOffsetEarliest;
Kafka->OnKafkaMessage = KafkaMessage;
Kafka->OnKafkaProduce = KafkaProduce;

TsgcWebSocketClient *WSClient = new TsgcWebSocketClient(this);
Kafka->Client = WSClient;
WSClient->Specifications->RFC6455 = false;
WSClient->Host = "127.0.0.1";
WSClient->Port = 9092;
WSClient->Active = true;

// produz um registro em um tópico
Kafka->Produce("my-topic", "Hello Kafka", "key-1");

// consome: faça Subscribe uma vez, depois Poll repetidamente
Kafka->Subscribe(ARRAYOFCONST(("my-topic")));

void __fastcall TForm1::KafkaMessage(TObject *Sender, TsgcKafkaMessage *Message)
{
  Memo1->Lines->Add(Message->GetKeyString() + " = " + Message->GetValueString());
}

// busca registros, confirma quando um lote é processado
TsgcKafkaMessages *Messages = Kafka->Poll(1000);
try {
  if (Messages->Count > 0)
    Kafka->CommitSync();
} __finally {
  Messages->Free();
}
using esegece.sgcWebSockets;

var Kafka = new TsgcWSPClient_Kafka();
Kafka.KafkaOptions.ClientId = "my-net-app";
Kafka.KafkaOptions.Producer.Acks = TsgcKafkaAcks.kafkaAcksLeader;
Kafka.KafkaOptions.Consumer.GroupId = "my-group";
Kafka.KafkaOptions.Consumer.OffsetReset = TsgcKafkaOffsetReset.kafkaOffsetEarliest;
Kafka.OnKafkaMessage += (sender, message) =>
  Console.WriteLine(message.GetKeyString() + " = " + message.GetValueString());
Kafka.OnKafkaProduce += (sender, topic, partition, offset, error) =>
  Console.WriteLine($"Produced to {topic} [{partition}] at offset {offset}");

var WSClient = new TsgcWebSocketClient();
Kafka.Client = WSClient;
WSClient.Specifications.RFC6455 = false;
WSClient.Host = "127.0.0.1";
WSClient.Port = 9092;
WSClient.Active = true;

// produz um registro em um tópico
Kafka.Produce("my-topic", "Hello Kafka", "key-1");

// consome: faça Subscribe uma vez, depois Poll repetidamente
Kafka.Subscribe(new string[] { "my-topic" });

TsgcKafkaMessages Messages = Kafka.Poll(1000);
try {
  if (Messages.Count > 0)
    Kafka.CommitSync();
} finally {
  Messages.Free();
}

Especificações & referências

Fontes autoritativas para os protocolos que este componente implementa.

Documentação & Demos

Acesse direto a referência do componente, baixe o projeto de demonstração pronto para rodar e faça o download da versão de avaliação.

Ajuda online — Kafka Referência completa de propriedades, métodos e eventos deste componente.
Projeto de demonstração — Demos\Protocols\Kafka Projeto de exemplo pronto para rodar. Acompanha o pacote sgcWebSockets — baixe a versão de avaliação abaixo.
Manual do usuário (PDF) Manual abrangente cobrindo todos os componentes da biblioteca.

Pronto para transmitir com o Apache Kafka?

Baixe a versão de avaliação gratuita e comece a criar soluções de streaming de eventos em minutos.