AMQP エクスチェンジとキュー(1 / 3)

· コンポーネント

sgcWebSockets 2022.1 から AMQP 0.9.1 プロトコルがサポートされています。Advanced Message Queuing Protocol(AMQP)はメッセージ指向ミドルウェア向けのオープンスタンダードなアプリケーション層プロトコルです。AMQP の特徴的な機能は、メッセージ指向、キューイング、ルーティング(ポイント・ツー・ポイントおよびパブリッシュ・サブスクライブを含む)、信頼性、セキュリティです。

AMQP はバイナリのアプリケーション層プロトコルで、多種多様なメッセージングアプリケーションと通信パターンを効率的にサポートするよう設計されています。フロー制御付きのメッセージ指向通信を提供し、最大 1 回(各メッセージは 1 回または送られない)、少なくとも 1 回(各メッセージは必ず配信されるが複数回となる可能性がある)、ちょうど 1 回(メッセージは必ず到着し、かつ 1 回のみ)のメッセージ配信保証や、SASL や TLS に基づく認証・暗号化を提供します。基盤として Transmission Control Protocol(TCP)のような信頼性のあるトランスポート層プロトコルを想定しています。

チャネル

AMQP はマルチチャネルプロトコルです。チャネルは重い TCP/IP 接続を複数の軽量な接続に多重化する方法を提供します。これによりポート使用が予測可能になるため、プロトコルがより「ファイアウォールフレンドリー」になります。また、トラフィックシェーピングやその他のネットワーク QoS 機能を容易に利用できます。

各チャネルは独自のスレッドで実行されるため、新しいメッセージを受信するたびに、クライアントはまずチャネルを特定し、メッセージをチャネルスレッドが処理するキューにキューイングします。

チャネルのライフサイクルは次のとおりです:

1. クライアントが新しいチャネルを開きます(Open)。

2. サーバーが新しいチャネルが準備できたことを確認します(Open-Ok)。

3. クライアントとサーバーが必要に応じてチャネルを使用します。

4. 一方のピア(クライアントまたはサーバー)がチャネルを閉じます(Close)。

5. もう一方のピアがチャネルクローズをハンドシェイクします(Close-Ok)。


新しいチャネルを作成するには OpenChannel メソッドを呼び出し、チャネル名を引数として渡します。チャネルが開かれたことの確認としてサーバーから送られる OnAMQPChannelOpen イベントが発生します。

AMQP.OpenChannel('channel_name');
procedure OnAMQPChannelOpen(Sender: TObject; const aChannel: string);
begin
  DoLog('#AMQP_channel_open: ' + aChannel);
end; 

エクスチェンジ

エクスチェンジクラスはアプリケーションがサーバー上のエクスチェンジを管理できるようにします。このクラスによってアプリケーションは(設定インターフェースに依存するのではなく)独自のワイヤリングをスクリプト化できます。注意: ほとんどのアプリケーションにはこのレベルの高度な機能は不要であり、レガシーミドルウェアはこのセマンティクスをサポートできない可能性があります。

エクスチェンジのライフサイクルは次のとおりです。

1. クライアントはサーバーにエクスチェンジが存在することを確認するよう要求します(Declare)。クライアントはこれを「存在しない場合はエクスチェンジを作成する」または「存在しない場合は警告するが作成しない」のいずれかに絞り込めます。

2. クライアントはエクスチェンジにメッセージをパブリッシュします。

3. クライアントはエクスチェンジを削除することを選択できます(Delete)。

DeclareExchange メソッドは新しいエクスチェンジを作成するか、エクスチェンジが既に存在することを確認します。メソッドには以下の引数があります。


新しいエクスチェンジを宣言するには DeclareExchange メソッドを呼び出し、チャネル名、エクスチェンジ名、エクスチェンジタイプを引数として渡します。OnAMQPExchangeDeclare イベントはサーバーからエクスチェンジが宣言されたことの確認として発生します。

AMQP.DeclareExchange('channel_name', 'exchange_name', 'direct');
procedure OnAMQPExchangeDeclare(Sender: TObject; const aChannel, aExchange: string);
begin
  DoLog('#AMQP_exchange_declare: [' + aChannel + '] ' + aExchange);
end; 

キュー

キュークラスはアプリケーションがサーバー上のメッセージキューを管理できるようにします。これはメッセージを消費するほぼすべてのアプリケーションにおける基本的なステップで、少なくとも期待されるメッセージキューが実際に存在することを確認するために必要です。

永続メッセージキューのライフサイクルは比較的シンプルです。

1. クライアントはメッセージキューが存在することを確認します("passive" 引数を指定した Declare)。

2. サーバーはメッセージキューが存在することを確認します(Declare-Ok)。

3. クライアントはメッセージキューからメッセージを読み取ります。

一時メッセージキューのライフサイクルはより興味深いものです。

1. クライアントはメッセージキューを作成します(Declare。多くの場合メッセージキュー名は指定せず、サーバーが名前を割り当てます)。サーバーは確認します(Declare-Ok)。

2. クライアントはメッセージキュー上のコンシューマーを開始します。コンシューマーの正確な機能は Basic クラスによって定義されます。

3. クライアントはコンシューマーを明示的に、またはチャネルや接続をクローズすることでキャンセルします。

4. メッセージキューから最後のコンシューマーがなくなり、適切なタイムアウト後、サーバーはメッセージキューを削除します。

AMQP はトピックサブスクリプションの配信メカニズムをメッセージキューとして実装しています。これにより、サブスクリプションを協調するサブスクライバーアプリケーションのプール間で負荷分散できる興味深い構造が可能になります。

サブスクリプションのライフサイクルには追加のバインドステージがあります。

1. クライアントはメッセージキューを作成し(Declare)、サーバーが確認します(Declare-Ok)。

2. クライアントはメッセージキューをトピックエクスチェンジにバインドし(Bind)、サーバーが確認します(Bind-Ok)。

3. クライアントは前の例と同様にメッセージキューを使用します。

DeclareQueue メソッドは新しいキューを作成するか、キューが既に存在することを確認します。メソッドには以下の引数があります。


新しいキューを宣言するには DeclareQueue メソッドを呼び出し、チャネル名とキュー名を引数として渡します。OnAMQPQueueDeclare イベントはサーバーからキューが宣言されたことの確認として発生します。

AMQP.DeclareQueue('channel_name', 'queue_name');
procedure OnAMQPQueueDeclare(Sender: TObject; const aChannel, aQueue: string; aMessageCount, aConsumerCount: Integer);
begin
  DoLog('#AMQP_queue_declare: [' + aChannel + '] ' + aQueue);
end;