Delphi向けApache Kafkaクライアント

· コンポーネント

sgcWebSockets 2026.6.0より、DelphiおよびC++Builder向けにネイティブなApache Kafkaクライアントが利用可能になりました。新しいTsgcWSPClient_Kafkaコンポーネントは、間にRESTゲートウェイや外部ライブラリを介することなく、ネイティブKafkaバイナリプロトコル(生のTCP)経由でKafkaブローカーと直接通信します。

Apache Kafkaは、レコードのストリームを発行・保存・消費するために使用される分散イベントストリーミングプラットフォームです。プロデューサーはメッセージをトピックに書き込み、ブローカーはそれらをパーティション分割された追記専用ログに永続化し、コンシューマーは(任意で、協調するコンシューマーグループの一部として)それらを読み戻します。sgcWebSocketsクライアントは、メッセージの送信・消費、トピック管理、オフセット管理を単一のコンポーネントから提供します。

機能

対応Kafkaバージョン

本コンポーネントはApache Kafka 0.11で導入されたv2レコードバッチ形式を使用しており、Apache Kafka 3.xに対してテスト済みです。その範囲内の任意のブローカー、または同じプロトコルバージョンを話すKafka互換ブローカーで動作します。デフォルトのブローカーポートは9092です。

構成

Kafkaクライアントは、TsgcWebSocketClientの上で動作するプロトコルコンポーネントです。クライアントが生のTCPソケットとして接続するようにSpecifications.RFC6455 := Falseを設定し(KafkaプロトコルはWebSocketではありません)、そのクライアントをTsgcWSPClient_Kafkaコンポーネントに割り当て、ブローカーのHostPortを設定します。

プロデューサーとコンシューマーの動作は、KafkaOptionsプロパティを通じて構成します:

uses
  sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;

var
  oClient: TsgcWebSocketClient;
  oKafka: TsgcWSPClient_Kafka;
begin
  oClient := TsgcWebSocketClient.Create(nil);
  oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
  oClient.Host := '127.0.0.1';
  oClient.Port := 9092;

  oKafka := TsgcWSPClient_Kafka.Create(nil);
  oKafka.Client := oClient;
  oKafka.KafkaOptions.ClientId := 'my-delphi-app';
  oKafka.OnKafkaConnect := OnKafkaConnect;
  oKafka.OnKafkaMessage := OnKafkaMessage;
  oKafka.OnKafkaProduce := OnKafkaProduce;

  oClient.Active := True; // connect to the broker
end;

メッセージの送信

接続後、トピック、値、および任意のキーとパーティションを指定してProduceを呼び出します。すべての発行の結果は、レコードが保存されたオフセットを含めてOnKafkaProduceイベントを通じて報告されます。

oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
  aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
  if aErrorCode = 0 then
    Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;

メッセージの消費

GroupIdOffsetResetを設定し、1つ以上のトピックを購読してからPollを呼び出すと、利用可能なレコードを取得できます。PollTsgcKafkaMessagesリストを返すとともに、各レコードに対してOnKafkaMessageイベントを発生させます。継続的に受信するには、(例えばタイマーから)Pollを繰り返し呼び出します。

oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);

var
  oMessages: TsgcKafkaMessages;
  i: Integer;
begin
  oMessages := oKafka.Poll(1000); // timeout in milliseconds
  try
    for i := 0 to oMessages.Count - 1 do
      Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
        oMessages[i].Offset, oMessages[i].GetValueString]));
  finally
    oMessages.Free;
  end;
end;

同じデータはOnKafkaMessageイベントを通じても配信されます。これは、バックグラウンドスレッドやタイマーからポーリングする際に便利です:

procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
  Log(Format('received key="%s" value="%s"',
    [aMessage.GetKeyString, aMessage.GetValueString]));
end;

グループに参加せずにトピックを読み取りたいだけの場合は、GroupIdを空のままにします。するとSubscribeはトピックのすべてのパーティションを割り当て、PollOffsetResetで選択された位置からレコードを返します。

トピックとオフセット

クライアントは、外部ツールを一切使わずにトピックを管理し、オフセットを調べることができます。

// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);

// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);

// commit the consumed offsets for the current group
oKafka.CommitSync;

ダウンロード

KafkaクライアントはsgcWebSockets 2026.6.0に付属しています。接続、送信、購読、ポーリング、トピック管理、オフセット管理を示す完全なデモがDemos/02.WebSocket_Protocols/13.Kafkaに含まれています。お試しになるにはsgcWebSocketsをダウンロードしてください。

ご質問、ご意見、または導入のサポートが必要ですか?お問い合わせください—コードを書いた本人から返信いたします。