sgcWebSockets 2026.6.0より、DelphiおよびC++Builder向けにネイティブなApache Kafkaクライアントが利用可能になりました。新しいTsgcWSPClient_Kafkaコンポーネントは、間にRESTゲートウェイや外部ライブラリを介することなく、ネイティブKafkaバイナリプロトコル(生のTCP)経由でKafkaブローカーと直接通信します。
Apache Kafkaは、レコードのストリームを発行・保存・消費するために使用される分散イベントストリーミングプラットフォームです。プロデューサーはメッセージをトピックに書き込み、ブローカーはそれらをパーティション分割された追記専用ログに永続化し、コンシューマーは(任意で、協調するコンシューマーグループの一部として)それらを読み戻します。sgcWebSocketsクライアントは、メッセージの送信・消費、トピック管理、オフセット管理を単一のコンポーネントから提供します。
機能
- ネイティブプロトコル:TCP経由のKafkaワイヤープロトコルを使用してブローカーと通信します。Confluent RESTプロキシ、Java、サードパーティ製DLLは不要です。
- プロデューサー:キーと値、設定可能な確認応答(none、leader、all)およびリクエストタイムアウトを指定して、任意のトピックおよびパーティションにレコードを発行します。
- コンシューマー:管理されたコンシューマーグループ(ブローカーがグループごとにコミット済みのオフセットを追跡)を使用するか、グループなし(トピックのすべてのパーティションを直接読み取る)でレコードを読み取ります。
- コンシューマーグループ:コーディネーターの自動検出、join/sync、パーティションの割り当て、オフセットのコミット/フェッチを行います。
- オフセット制御:最も古い、最新、コミット済みのオフセットを照会し、新しいグループの開始位置(最も古い、または最新)を選択します。
- トピック管理:トピックの作成と削除、クラスターメタデータの要求、コンシューマーグループの一覧表示をクライアントから直接行えます。
- 最新のレコード形式:現在のKafkaブローカーで使用されているv2レコードバッチ形式の読み書きに対応しています。
- イベント駆動:
OnKafkaConnect、OnKafkaMessage、OnKafkaProduce、OnKafkaError、OnKafkaDisconnectの各イベントが、接続上で発生するすべての事象を報告します。
対応Kafkaバージョン
本コンポーネントはApache Kafka 0.11で導入されたv2レコードバッチ形式を使用しており、Apache Kafka 3.xに対してテスト済みです。その範囲内の任意のブローカー、または同じプロトコルバージョンを話すKafka互換ブローカーで動作します。デフォルトのブローカーポートは9092です。
構成
Kafkaクライアントは、TsgcWebSocketClientの上で動作するプロトコルコンポーネントです。クライアントが生のTCPソケットとして接続するようにSpecifications.RFC6455 := Falseを設定し(KafkaプロトコルはWebSocketではありません)、そのクライアントをTsgcWSPClient_Kafkaコンポーネントに割り当て、ブローカーのHostとPortを設定します。
プロデューサーとコンシューマーの動作は、KafkaOptionsプロパティを通じて構成します:
- ClientId:アプリケーションをブローカーに識別させます。
- Producer.Acks:
kafkaAcksNone、kafkaAcksLeader、またはkafkaAcksAll。 - Producer.TimeoutMs:ブローカーが必要な確認応答を待機する時間。
- Consumer.GroupId:コンシューマーグループ名。グループなしで消費する場合は空のままにします。
- Consumer.OffsetReset:
kafkaOffsetEarliestまたはkafkaOffsetLatest。新規グループが読み取りを開始する位置を指定します。
uses
sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;
var
oClient: TsgcWebSocketClient;
oKafka: TsgcWSPClient_Kafka;
begin
oClient := TsgcWebSocketClient.Create(nil);
oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
oClient.Host := '127.0.0.1';
oClient.Port := 9092;
oKafka := TsgcWSPClient_Kafka.Create(nil);
oKafka.Client := oClient;
oKafka.KafkaOptions.ClientId := 'my-delphi-app';
oKafka.OnKafkaConnect := OnKafkaConnect;
oKafka.OnKafkaMessage := OnKafkaMessage;
oKafka.OnKafkaProduce := OnKafkaProduce;
oClient.Active := True; // connect to the broker
end;
メッセージの送信
接続後、トピック、値、および任意のキーとパーティションを指定してProduceを呼び出します。すべての発行の結果は、レコードが保存されたオフセットを含めてOnKafkaProduceイベントを通じて報告されます。
oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
if aErrorCode = 0 then
Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;
メッセージの消費
GroupIdとOffsetResetを設定し、1つ以上のトピックを購読してからPollを呼び出すと、利用可能なレコードを取得できます。PollはTsgcKafkaMessagesリストを返すとともに、各レコードに対してOnKafkaMessageイベントを発生させます。継続的に受信するには、(例えばタイマーから)Pollを繰り返し呼び出します。
oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);
var
oMessages: TsgcKafkaMessages;
i: Integer;
begin
oMessages := oKafka.Poll(1000); // timeout in milliseconds
try
for i := 0 to oMessages.Count - 1 do
Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
oMessages[i].Offset, oMessages[i].GetValueString]));
finally
oMessages.Free;
end;
end;
同じデータはOnKafkaMessageイベントを通じても配信されます。これは、バックグラウンドスレッドやタイマーからポーリングする際に便利です:
procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
Log(Format('received key="%s" value="%s"',
[aMessage.GetKeyString, aMessage.GetValueString]));
end;
グループに参加せずにトピックを読み取りたいだけの場合は、GroupIdを空のままにします。するとSubscribeはトピックのすべてのパーティションを割り当て、PollはOffsetResetで選択された位置からレコードを返します。
トピックとオフセット
クライアントは、外部ツールを一切使わずにトピックを管理し、オフセットを調べることができます。
// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);
// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);
// commit the consumed offsets for the current group
oKafka.CommitSync;
ダウンロード
KafkaクライアントはsgcWebSockets 2026.6.0に付属しています。接続、送信、購読、ポーリング、トピック管理、オフセット管理を示す完全なデモがDemos/02.WebSocket_Protocols/13.Kafkaに含まれています。お試しになるにはsgcWebSocketsをダウンロードしてください。
ご質問、ご意見、または導入のサポートが必要ですか?お問い合わせください—コードを書いた本人から返信いたします。
