sgcWebSockets 2022.1 から AMQP 0.9.1 プロトコルがサポートされています。Advanced Message Queuing Protocol(AMQP)はメッセージ指向ミドルウェア向けのオープンスタンダードなアプリケーション層プロトコルです。AMQP の特徴的な機能は、メッセージ指向、キューイング、ルーティング(ポイント・ツー・ポイントおよびパブリッシュ・サブスクライブを含む)、信頼性、セキュリティです。
AMQP はバイナリのアプリケーション層プロトコルで、多種多様なメッセージングアプリケーションと通信パターンを効率的にサポートするよう設計されています。フロー制御付きのメッセージ指向通信を提供し、最大 1 回(各メッセージは 1 回または送られない)、少なくとも 1 回(各メッセージは必ず配信されるが複数回となる可能性がある)、ちょうど 1 回(メッセージは必ず到着し、かつ 1 回のみ)のメッセージ配信保証や、SASL や TLS に基づく認証・暗号化を提供します。基盤として Transmission Control Protocol(TCP)のような信頼性のあるトランスポート層プロトコルを想定しています。
チャネル
AMQP はマルチチャネルプロトコルです。チャネルは重い TCP/IP 接続を複数の軽量な接続に多重化する方法を提供します。これによりポート使用が予測可能になるため、プロトコルがより「ファイアウォールフレンドリー」になります。また、トラフィックシェーピングやその他のネットワーク QoS 機能を容易に利用できます。
各チャネルは独自のスレッドで実行されるため、新しいメッセージを受信するたびに、クライアントはまずチャネルを特定し、メッセージをチャネルスレッドが処理するキューにキューイングします。
チャネルのライフサイクルは次のとおりです:
1. クライアントが新しいチャネルを開きます(Open)。
2. サーバーが新しいチャネルが準備できたことを確認します(Open-Ok)。
3. クライアントとサーバーが必要に応じてチャネルを使用します。
4. 一方のピア(クライアントまたはサーバー)がチャネルを閉じます(Close)。
5. もう一方のピアがチャネルクローズをハンドシェイクします(Close-Ok)。
新しいチャネルを作成するには OpenChannel メソッドを呼び出し、チャネル名を引数として渡します。チャネルが開かれたことの確認としてサーバーから送られる OnAMQPChannelOpen イベントが発生します。
AMQP.OpenChannel('channel_name');
procedure OnAMQPChannelOpen(Sender: TObject; const aChannel: string);
begin
DoLog('#AMQP_channel_open: ' + aChannel);
end;
エクスチェンジ
エクスチェンジクラスはアプリケーションがサーバー上のエクスチェンジを管理できるようにします。このクラスによってアプリケーションは(設定インターフェースに依存するのではなく)独自のワイヤリングをスクリプト化できます。注意: ほとんどのアプリケーションにはこのレベルの高度な機能は不要であり、レガシーミドルウェアはこのセマンティクスをサポートできない可能性があります。
エクスチェンジのライフサイクルは次のとおりです。
1. クライアントはサーバーにエクスチェンジが存在することを確認するよう要求します(Declare)。クライアントはこれを「存在しない場合はエクスチェンジを作成する」または「存在しない場合は警告するが作成しない」のいずれかに絞り込めます。
2. クライアントはエクスチェンジにメッセージをパブリッシュします。
3. クライアントはエクスチェンジを削除することを選択できます(Delete)。
DeclareExchange メソッドは新しいエクスチェンジを作成するか、エクスチェンジが既に存在することを確認します。メソッドには以下の引数があります。
- ChannelName: チャネルの名前(このメソッドを呼び出す前にオープンされている必要があります)。
- ExchangeName: エクスチェンジの名前。255 文字以内で、"amq." で始まってはいけません(passive パラメーターが true の場合を除く)。
- ExchangeType: エクスチェンジのタイプ。すべての AMQP サーバーは "direct" と "fanout" エクスチェンジをサポートしています。サポートされているエクスチェンジタイプはサーバーのドキュメントを参照してください。
- Passive: true の場合、サーバーはエクスチェンジが既に宣言されていることのみ確認します。false の場合、エクスチェンジが存在しなければ新しいエクスチェンジが作成されます。
- Durable: true の場合、エクスチェンジはサーバー起動時に再作成されます。false の場合、サーバー停止時に削除されます。
- AutoDelete: true の場合、すべてのキューがアンバインドされるとエクスチェンジが削除されます。
- Internal: 常に false です。
- NoWait: true の場合、サーバーはクライアントに確認応答を送信しません。
新しいエクスチェンジを宣言するには DeclareExchange メソッドを呼び出し、チャネル名、エクスチェンジ名、エクスチェンジタイプを引数として渡します。OnAMQPExchangeDeclare イベントはサーバーからエクスチェンジが宣言されたことの確認として発生します。
AMQP.DeclareExchange('channel_name', 'exchange_name', 'direct');
procedure OnAMQPExchangeDeclare(Sender: TObject; const aChannel, aExchange: string);
begin
DoLog('#AMQP_exchange_declare: [' + aChannel + '] ' + aExchange);
end;
キュー
キュークラスはアプリケーションがサーバー上のメッセージキューを管理できるようにします。これはメッセージを消費するほぼすべてのアプリケーションにおける基本的なステップで、少なくとも期待されるメッセージキューが実際に存在することを確認するために必要です。
永続メッセージキューのライフサイクルは比較的シンプルです。
1. クライアントはメッセージキューが存在することを確認します("passive" 引数を指定した Declare)。
2. サーバーはメッセージキューが存在することを確認します(Declare-Ok)。
3. クライアントはメッセージキューからメッセージを読み取ります。
一時メッセージキューのライフサイクルはより興味深いものです。
1. クライアントはメッセージキューを作成します(Declare。多くの場合メッセージキュー名は指定せず、サーバーが名前を割り当てます)。サーバーは確認します(Declare-Ok)。
2. クライアントはメッセージキュー上のコンシューマーを開始します。コンシューマーの正確な機能は Basic クラスによって定義されます。
3. クライアントはコンシューマーを明示的に、またはチャネルや接続をクローズすることでキャンセルします。
4. メッセージキューから最後のコンシューマーがなくなり、適切なタイムアウト後、サーバーはメッセージキューを削除します。
AMQP はトピックサブスクリプションの配信メカニズムをメッセージキューとして実装しています。これにより、サブスクリプションを協調するサブスクライバーアプリケーションのプール間で負荷分散できる興味深い構造が可能になります。
サブスクリプションのライフサイクルには追加のバインドステージがあります。
1. クライアントはメッセージキューを作成し(Declare)、サーバーが確認します(Declare-Ok)。
2. クライアントはメッセージキューをトピックエクスチェンジにバインドし(Bind)、サーバーが確認します(Bind-Ok)。
3. クライアントは前の例と同様にメッセージキューを使用します。
DeclareQueue メソッドは新しいキューを作成するか、キューが既に存在することを確認します。メソッドには以下の引数があります。
- ChannelName: チャネルの名前(このメソッドを呼び出す前にオープンされている必要があります)。
- QueueName: キューの名前。255 文字以内で、"amq." で始まってはいけません(passive パラメーターが true の場合を除く)。
- Passive: true の場合、サーバーはキューが既に宣言されていることのみ確認します。false の場合、キューが存在しなければ新しいキューが作成されます。
- Durable: true の場合、キューはサーバー起動時に再作成されます。false の場合、サーバー停止時に削除されます。
- Exclusive: true の場合、キューは現在の接続のみがアクセスできます。
- AutoDelete: true の場合、すべてのコンシューマーがキューを使用しなくなるとキューが削除されます。
- NoWait: true の場合、サーバーはクライアントに確認応答を送信しません。
新しいキューを宣言するには DeclareQueue メソッドを呼び出し、チャネル名とキュー名を引数として渡します。OnAMQPQueueDeclare イベントはサーバーからキューが宣言されたことの確認として発生します。
AMQP.DeclareQueue('channel_name', 'queue_name');
procedure OnAMQPQueueDeclare(Sender: TObject; const aChannel, aQueue: string; aMessageCount, aConsumerCount: Integer);
begin
DoLog('#AMQP_queue_declare: [' + aChannel + '] ' + aQueue);
end;
