Delphi için Apache Kafka İstemcisi

· Bileşenler

sgcWebSockets 2026.6.0'dan itibaren Delphi ve C++Builder için yerel bir Apache Kafka istemcisi mevcuttur. Yeni TsgcWSPClient_Kafka bileşeni, aradaki bir REST ağ geçidi veya harici kütüphane olmadan Kafka aracılarıyla doğrudan yerel Kafka ikili protokolü (ham TCP) üzerinden iletişim kurar.

Apache Kafka, kayıt akışlarını yayımlamak, saklamak ve tüketmek için kullanılan dağıtık bir olay-akışı platformudur. Üreticiler konulara mesaj yazar, aracılar bunları bölümlenmiş, yalnızca-ekleme günlüklerinde kalıcı kılar ve tüketiciler bunları, isteğe bağlı olarak koordineli bir tüketici grubunun parçası olarak geri okur. sgcWebSockets istemcisi, tek bir bileşenden üretme, tüketme, konu yönetimi ve ofset yönetimi sunar.

Özellikler

Desteklenen Kafka sürümleri

Bileşen, Apache Kafka 0.11'de tanıtılan v2 kayıt yığını biçimini kullanır ve Apache Kafka 3.x'e karşı test edilmiştir. Bu aralıktaki herhangi bir aracı veya aynı protokol sürümlerini konuşan Kafka uyumlu bir aracı çalışır. Varsayılan aracı bağlantı noktası 9092'dir.

Yapılandırma

Kafka istemcisi, bir TsgcWebSocketClient üzerinde çalışan bir protokol bileşenidir. İstemcinin ham bir TCP soketi olarak bağlanması için Specifications.RFC6455 := False ayarlayın (Kafka protokolü WebSocket değildir), istemciyi TsgcWSPClient_Kafka bileşenine atayın ve aracı Host ve Port'unu ayarlayın.

Üreticinin ve tüketicinin davranışı KafkaOptions özelliği aracılığıyla yapılandırılır:

uses
  sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;

var
  oClient: TsgcWebSocketClient;
  oKafka: TsgcWSPClient_Kafka;
begin
  oClient := TsgcWebSocketClient.Create(nil);
  oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
  oClient.Host := '127.0.0.1';
  oClient.Port := 9092;

  oKafka := TsgcWSPClient_Kafka.Create(nil);
  oKafka.Client := oClient;
  oKafka.KafkaOptions.ClientId := 'my-delphi-app';
  oKafka.OnKafkaConnect := OnKafkaConnect;
  oKafka.OnKafkaMessage := OnKafkaMessage;
  oKafka.OnKafkaProduce := OnKafkaProduce;

  oClient.Active := True; // connect to the broker
end;

Mesaj üretme

Bağlandıktan sonra, konu, değer ve isteğe bağlı bir anahtar ve bölümle Produce çağırın. Her yayımlamanın sonucu, kaydın saklandığı ofset dahil olmak üzere OnKafkaProduce olayı aracılığıyla raporlanır.

oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
  aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
  if aErrorCode = 0 then
    Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;

Mesaj tüketme

Bir GroupId ve bir OffsetReset ayarlayın, bir veya daha fazla konuya abone olun ve mevcut kayıtları getirmek için Poll çağırın. Poll, bir TsgcKafkaMessages listesi döndürür ve ayrıca her kayıt için OnKafkaMessage olayını tetikler. Sürekli almak için Poll'u tekrar tekrar (örneğin bir zamanlayıcıdan) çağırın.

oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);

var
  oMessages: TsgcKafkaMessages;
  i: Integer;
begin
  oMessages := oKafka.Poll(1000); // timeout in milliseconds
  try
    for i := 0 to oMessages.Count - 1 do
      Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
        oMessages[i].Offset, oMessages[i].GetValueString]));
  finally
    oMessages.Free;
  end;
end;

Aynı veri, bir arka plan iş parçacığından veya bir zamanlayıcıdan yoklama yaptığınızda kullanışlı olan OnKafkaMessage olayı aracılığıyla teslim edilir:

procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
  Log(Format('received key="%s" value="%s"',
    [aMessage.GetKeyString, aMessage.GetValueString]));
end;

Bir gruba katılmadan yalnızca bir konuyu okumak istiyorsanız, GroupId'yi boş bırakın: Subscribe ardından konunun her bölümünü atar ve Poll, OffsetReset tarafından seçilen konumdan kayıtları döndürür.

Konular ve ofsetler

İstemci, herhangi bir harici araç olmadan konuları yönetebilir ve ofsetleri inceleyebilir.

// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);

// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);

// commit the consumed offsets for the current group
oKafka.CommitSync;

İndirme

Kafka istemcisi sgcWebSockets 2026.6.0 ile gelir. Bağlanma, üretme, abone olma, yoklama, konu yönetimi ve ofset yönetimini gösteren tam bir demo Demos/02.WebSocket_Protocols/13.Kafka içinde bulunur. Denemek için sgcWebSockets'i indirin.

Sorular, geri bildirim veya başlarken yardım mı? Bize ulaşın — kodu yazan kişilerden bir yanıt alacaksınız.