Apache Kafka クライアント

Delphi、C++Builder、.NET 向けのネイティブ Apache Kafka クライアントです。REST プロキシや外部ライブラリを介さず、プレーンな TCP 上の Kafka ワイヤプロトコルでブローカーと直接通信します。レコードのプロデュースとコンシューム、コンシューマーグループの調整、オフセットの管理、トピックの管理を行えます。

Apache Kafka サブプロトコルクライアント

Delphi / .NET ランタイムが動作するあらゆる環境 — デスクトップサービスからモバイルデバイスまで — で実行できる第一級の Kafka 実装であり、標準的なあらゆる Kafka ブローカーとの間でレコードをストリーミングします。

コンポーネントクラス

TsgcWSPClient_Kafka

プロトコル

TCP 上の Apache Kafka ワイヤプロトコル

プラットフォーム

Windows, macOS, Linux, iOS, Android

エディション

Standard / Professional / Enterprise

Kafka のプロデュース、コンシューム、管理

ネイティブな Kafka プロトコル上でレコードをストリーミングするために必要なすべてを、プレーンな Delphi / .NET のメソッドとイベントとして公開しています。

プロデュース

Produce(topic, value, key, partition) を呼び出すと、オプションのキーとパーティションを指定してレコードをパブリッシュできます。バイナリペイロードには ProduceBytes を使用します。配信保証は KafkaOptions.Producer.AckskafkaAcksNonekafkaAcksLeaderkafkaAcksAll)で選択でき、各結果はトピック、パーティション、保存されたオフセットとともに OnKafkaProduce に届きます。

コンシューム

Subscribe([topic]) を呼び出してから Poll(timeoutMs) で利用可能なレコードを取得します。TsgcKafkaMessages リストが返され、レコードごとに OnKafkaMessage が発生します。各レコードは GetKeyStringGetValueStringTopicPartitionOffset で読み取れます。GroupId を空のままにすると、グループなしですべてのパーティションを直接読み取れます。

コンシューマーグループの調整

KafkaOptions.Consumer.GroupId を設定すると、クライアントはコーディネーターの探索、参加と同期、パーティション割り当て、オフセットのコミット/フェッチを自動的に行います。CommitSync を呼び出すと直前の Poll のオフセットがコミットされ、次のセッションではその続きからグループが再開します。

オフセット管理

GetEarliestOffsetGetLatestOffsetGetCommittedOffset は、トピックとパーティションごとに最古、次に書き込まれる、最後にコミットされたオフセットを返します。CommitOffset(topic, partition, offset) で明示的な位置を設定すれば、コンシュームを任意の位置から正確に再開できます。

トピック管理

CreateTopic(name, partitions, replication)DeleteTopic(name) でブローカー上のトピックを管理し、GetMetadata([topics]) はクラスターとパーティションのレイアウトを返します(空の配列を渡すとクラスター全体)。クライアントはコンシューマーグループの一覧表示も行えます。

v2 レコードバッチ形式

Apache Kafka 0.11 で導入された v2 レコードバッチ形式の読み書きに対応し、オプションで kafkaCompressionGzip 圧縮を利用できます。デフォルトのブローカーポート 9092 上で Apache Kafka 3.x に対してテスト済みです。

コンポーネントを配置し、いくつかのプロパティを設定するだけ

TsgcWebSocketClient と TsgcWSPClient_Kafka を組み合わせ、プレーン TCP のために Specifications.RFC6455 を false に設定し、KafkaOptions を構成したうえで Produce と Poll を呼び出します。

uses
  sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client;

var
  WSClient: TsgcWebSocketClient;
  Kafka: TsgcWSPClient_Kafka;
begin
  Kafka := TsgcWSPClient_Kafka.Create(nil);
  Kafka.KafkaOptions.ClientId := 'my-delphi-app';
  Kafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
  Kafka.KafkaOptions.Consumer.GroupId := 'my-group';
  Kafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
  Kafka.OnKafkaMessage := KafkaMessage;
  Kafka.OnKafkaProduce := KafkaProduce;

  WSClient := TsgcWebSocketClient.Create(nil);
  Kafka.Client := WSClient;
  WSClient.Specifications.RFC6455 := False;
  WSClient.Host := '127.0.0.1';
  WSClient.Port := 9092;
  WSClient.Active := True;

  // トピックにレコードをプロデュース
  Kafka.Produce('my-topic', 'Hello Kafka', 'key-1');

  // コンシューム: 一度 Subscribe したら、繰り返し Poll する(例: タイマーから)
  Kafka.Subscribe(['my-topic']);
end;

procedure TForm1.KafkaMessage(Sender: TObject;
  const Message: TsgcKafkaMessage);
begin
  Memo1.Lines.Add(Message.GetKeyString + ' = ' + Message.GetValueString);
end;

procedure TForm1.KafkaProduce(Sender: TObject; const Topic: string;
  Partition: Integer; Offset: Int64; ErrorCode: Integer);
begin
  Memo1.Lines.Add(Format('Produced to %s [%d] at offset %d',
    [Topic, Partition, Offset]));
end;

// レコードをフェッチし、バッチ処理が終わったらコミット
var
  Messages: TsgcKafkaMessages;
begin
  Messages := Kafka.Poll(1000);
  try
    if Messages.Count > 0 then
      Kafka.CommitSync;
  finally
    Messages.Free;
  end;
end;
// uses: sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client
TsgcWSPClient_Kafka *Kafka = new TsgcWSPClient_Kafka(this);
Kafka->KafkaOptions->ClientId = "my-cbuilder-app";
Kafka->KafkaOptions->Producer->Acks = kafkaAcksLeader;
Kafka->KafkaOptions->Consumer->GroupId = "my-group";
Kafka->KafkaOptions->Consumer->OffsetReset = kafkaOffsetEarliest;
Kafka->OnKafkaMessage = KafkaMessage;
Kafka->OnKafkaProduce = KafkaProduce;

TsgcWebSocketClient *WSClient = new TsgcWebSocketClient(this);
Kafka->Client = WSClient;
WSClient->Specifications->RFC6455 = false;
WSClient->Host = "127.0.0.1";
WSClient->Port = 9092;
WSClient->Active = true;

// トピックにレコードをプロデュース
Kafka->Produce("my-topic", "Hello Kafka", "key-1");

// コンシューム: 一度 Subscribe したら、繰り返し Poll する
Kafka->Subscribe(ARRAYOFCONST(("my-topic")));

void __fastcall TForm1::KafkaMessage(TObject *Sender, TsgcKafkaMessage *Message)
{
  Memo1->Lines->Add(Message->GetKeyString() + " = " + Message->GetValueString());
}

// レコードをフェッチし、バッチ処理が終わったらコミット
TsgcKafkaMessages *Messages = Kafka->Poll(1000);
try {
  if (Messages->Count > 0)
    Kafka->CommitSync();
} __finally {
  Messages->Free();
}
using esegece.sgcWebSockets;

var Kafka = new TsgcWSPClient_Kafka();
Kafka.KafkaOptions.ClientId = "my-net-app";
Kafka.KafkaOptions.Producer.Acks = TsgcKafkaAcks.kafkaAcksLeader;
Kafka.KafkaOptions.Consumer.GroupId = "my-group";
Kafka.KafkaOptions.Consumer.OffsetReset = TsgcKafkaOffsetReset.kafkaOffsetEarliest;
Kafka.OnKafkaMessage += (sender, message) =>
  Console.WriteLine(message.GetKeyString() + " = " + message.GetValueString());
Kafka.OnKafkaProduce += (sender, topic, partition, offset, error) =>
  Console.WriteLine($"Produced to {topic} [{partition}] at offset {offset}");

var WSClient = new TsgcWebSocketClient();
Kafka.Client = WSClient;
WSClient.Specifications.RFC6455 = false;
WSClient.Host = "127.0.0.1";
WSClient.Port = 9092;
WSClient.Active = true;

// トピックにレコードをプロデュース
Kafka.Produce("my-topic", "Hello Kafka", "key-1");

// コンシューム: 一度 Subscribe したら、繰り返し Poll する
Kafka.Subscribe(new string[] { "my-topic" });

TsgcKafkaMessages Messages = Kafka.Poll(1000);
try {
  if (Messages.Count > 0)
    Kafka.CommitSync();
} finally {
  Messages.Free();
}

仕様 & 参考資料

このコンポーネントが実装するプロトコルの一次情報源です。

ドキュメント & デモ

コンポーネントリファレンスへのディープリンク、すぐに実行できるデモプロジェクト、そして体験版をダウンロードできます。

オンラインヘルプ — Kafka このコンポーネントのプロパティ、メソッド、イベントの完全なリファレンスです。
デモプロジェクト — Demos\Protocols\Kafka すぐに実行できるサンプルプロジェクトです。sgcWebSockets パッケージに同梱されています — 下記から体験版をダウンロードしてください。
ユーザーマニュアル (PDF) ライブラリ内のすべてのコンポーネントを網羅した包括的なマニュアルです。

Apache Kafka でストリーミングを始める準備はできましたか?

無料体験版をダウンロードして、イベントストリーミングソリューションの構築を数分で始めましょう。