sgcWebSockets 2026.6.0'dan itibaren Delphi ve C++Builder için yerel bir Apache Kafka istemcisi mevcuttur. Yeni TsgcWSPClient_Kafka bileşeni, aradaki bir REST ağ geçidi veya harici kütüphane olmadan Kafka aracılarıyla doğrudan yerel Kafka ikili protokolü (ham TCP) üzerinden iletişim kurar.
Apache Kafka, kayıt akışlarını yayımlamak, saklamak ve tüketmek için kullanılan dağıtık bir olay-akışı platformudur. Üreticiler konulara mesaj yazar, aracılar bunları bölümlenmiş, yalnızca-ekleme günlüklerinde kalıcı kılar ve tüketiciler bunları, isteğe bağlı olarak koordineli bir tüketici grubunun parçası olarak geri okur. sgcWebSockets istemcisi, tek bir bileşenden üretme, tüketme, konu yönetimi ve ofset yönetimi sunar.
Özellikler
- Yerel protokol: aracıyla TCP üzerinden Kafka wire protokolünü kullanarak iletişim kurar. Confluent REST proxy yok, Java yok, üçüncü taraf DLL yok.
- Üretici (Producer): bir anahtar ve değerle, yapılandırılabilir onaylar (none, leader, all) ve istek zaman aşımıyla herhangi bir konuya ve bölüme kayıt yayımlayın.
- Tüketici (Consumer): kayıtları ya yönetilen bir tüketici grubuyla (aracı, grup başına işlenmiş ofsetleri izler) ya da grupsuz (bir konunun tüm bölümlerini doğrudan okuyun) okuyun.
- Tüketici grupları: otomatik koordinatör keşfi, katılma/senkronizasyon, bölüm atama ve ofset işleme/getirme.
- Ofset denetimi: en erken, en son ve işlenmiş ofsetleri sorgulayın ve yeni bir grubun nereden başlayacağını (earliest veya latest) seçin.
- Konu yönetimi: doğrudan istemciden konu oluşturup silin, küme meta verisi isteyin ve tüketici gruplarını listeleyin.
- Modern kayıt biçimi: mevcut Kafka aracıları tarafından kullanılan v2 kayıt yığını (record batch) biçimini okur ve yazar.
- Olay güdümlü:
OnKafkaConnect,OnKafkaMessage,OnKafkaProduce,OnKafkaErrorveOnKafkaDisconnectolayları, bağlantıda olan her şeyi raporlar.
Desteklenen Kafka sürümleri
Bileşen, Apache Kafka 0.11'de tanıtılan v2 kayıt yığını biçimini kullanır ve Apache Kafka 3.x'e karşı test edilmiştir. Bu aralıktaki herhangi bir aracı veya aynı protokol sürümlerini konuşan Kafka uyumlu bir aracı çalışır. Varsayılan aracı bağlantı noktası 9092'dir.
Yapılandırma
Kafka istemcisi, bir TsgcWebSocketClient üzerinde çalışan bir protokol bileşenidir. İstemcinin ham bir TCP soketi olarak bağlanması için Specifications.RFC6455 := False ayarlayın (Kafka protokolü WebSocket değildir), istemciyi TsgcWSPClient_Kafka bileşenine atayın ve aracı Host ve Port'unu ayarlayın.
Üreticinin ve tüketicinin davranışı KafkaOptions özelliği aracılığıyla yapılandırılır:
- ClientId: uygulamanızı aracıya tanımlar.
- Producer.Acks:
kafkaAcksNone,kafkaAcksLeaderveyakafkaAcksAll. - Producer.TimeoutMs: aracının gerekli onaylar için ne kadar beklediği.
- Consumer.GroupId: tüketici grubu adı. Bir grup olmadan tüketmek için boş bırakın.
- Consumer.OffsetReset:
kafkaOffsetEarliestveyakafkaOffsetLatest, yepyeni bir grubun okumaya nereden başladığı.
uses
sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;
var
oClient: TsgcWebSocketClient;
oKafka: TsgcWSPClient_Kafka;
begin
oClient := TsgcWebSocketClient.Create(nil);
oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
oClient.Host := '127.0.0.1';
oClient.Port := 9092;
oKafka := TsgcWSPClient_Kafka.Create(nil);
oKafka.Client := oClient;
oKafka.KafkaOptions.ClientId := 'my-delphi-app';
oKafka.OnKafkaConnect := OnKafkaConnect;
oKafka.OnKafkaMessage := OnKafkaMessage;
oKafka.OnKafkaProduce := OnKafkaProduce;
oClient.Active := True; // connect to the broker
end;
Mesaj üretme
Bağlandıktan sonra, konu, değer ve isteğe bağlı bir anahtar ve bölümle Produce çağırın. Her yayımlamanın sonucu, kaydın saklandığı ofset dahil olmak üzere OnKafkaProduce olayı aracılığıyla raporlanır.
oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
if aErrorCode = 0 then
Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;
Mesaj tüketme
Bir GroupId ve bir OffsetReset ayarlayın, bir veya daha fazla konuya abone olun ve mevcut kayıtları getirmek için Poll çağırın. Poll, bir TsgcKafkaMessages listesi döndürür ve ayrıca her kayıt için OnKafkaMessage olayını tetikler. Sürekli almak için Poll'u tekrar tekrar (örneğin bir zamanlayıcıdan) çağırın.
oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);
var
oMessages: TsgcKafkaMessages;
i: Integer;
begin
oMessages := oKafka.Poll(1000); // timeout in milliseconds
try
for i := 0 to oMessages.Count - 1 do
Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
oMessages[i].Offset, oMessages[i].GetValueString]));
finally
oMessages.Free;
end;
end;
Aynı veri, bir arka plan iş parçacığından veya bir zamanlayıcıdan yoklama yaptığınızda kullanışlı olan OnKafkaMessage olayı aracılığıyla teslim edilir:
procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
Log(Format('received key="%s" value="%s"',
[aMessage.GetKeyString, aMessage.GetValueString]));
end;
Bir gruba katılmadan yalnızca bir konuyu okumak istiyorsanız, GroupId'yi boş bırakın: Subscribe ardından konunun her bölümünü atar ve Poll, OffsetReset tarafından seçilen konumdan kayıtları döndürür.
Konular ve ofsetler
İstemci, herhangi bir harici araç olmadan konuları yönetebilir ve ofsetleri inceleyebilir.
// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);
// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);
// commit the consumed offsets for the current group
oKafka.CommitSync;
İndirme
Kafka istemcisi sgcWebSockets 2026.6.0 ile gelir. Bağlanma, üretme, abone olma, yoklama, konu yönetimi ve ofset yönetimini gösteren tam bir demo Demos/02.WebSocket_Protocols/13.Kafka içinde bulunur. Denemek için sgcWebSockets'i indirin.
Sorular, geri bildirim veya başlarken yardım mı? Bize ulaşın — kodu yazan kişilerden bir yanıt alacaksınız.
