AMQP 1 Delphi クライアントの更新

· 機能

sgcWebSockets の AMQP 1.0 プロトコル実装は、以下の仕様に対する包括的なレビューを実施しました: OASIS AMQP 1.0 仕様。 本記事では 8 つのソースファイルにわたって適用された 30 件の修正(重大バグ、メモリリーク、 仕様準拠、ステートマシンの正確性、ハートビート処理、スレッドセーフティの改善)を文書化します。

目次

  1. 概要
  2. 重要なバグ修正
  3. メモリリークの修正
  4. 仕様準拠の修正
  5. ステートマシンと接続の修正
  6. ハートビートとアイドルタイムアウトの修正
  7. スレッドセーフティの修正
  8. 変更されたファイル

1. 概要

AMQP 1.0 実装において、6 つのソースファイル2 つのインターフェースファイルにわたって合計 30 件の修正が適用されました。修正は以下のカテゴリーに分類されます:

カテゴリー 件数 重大度
重大バグ修正 6 Critical
メモリリーク修正 4 Critical
仕様準拠 10 High
ステートマシン・接続 5 High
ハートビート・アイドルタイムアウト 3 High
スレッドセーフティ 2 High

2. 重要なバグ修正

これらの修正は、AMQP 1.0 通信中に即座のランタイム障害・プロトコルの破損・誤った動作を引き起こすバグに対処します。

2.1 例外に raise キーワードがない

ファイル:sgcAMQP1_Classes.pas
無効なフレームタイプが検出された際に例外オブジェクトが生成されましたが、raise されませんでした。例外は無言で破棄され、破損したフレームが検出されずに処理されていました。

修正前(不具合あり)

TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);

修正後(正常)

raise TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);

2.2 WriteMap Map32 でデータが欠落しサイズが誤っている

ファイル:sgcAMQP1_Classes.pas
Map32 エンコーディングパスで実際のマップデータの WriteBytes 呼び出しが欠落しており、サイズフィールドのオフセットが誤っていました。Map32 は 4 バイトのカウントフィールドを使用します(Map8 は 1 バイト)。そのためサイズには 4 バイト余分に含める必要があります。

修正前(不具合あり)

else
begin
  WriteUByte(Ord(amqp1DataMap32));
  _WriteUInt32(vSize + 1);
  _WriteUInt32(oJSON.Count * 2);
  // Missing: WriteBytes(oArray.Bytes);
end;

修正後(正常)

else
begin
  WriteUByte(Ord(amqp1DataMap32));
  _WriteUInt32(vSize + 4);
  _WriteUInt32(oJSON.Count * 2);
  WriteBytes(oArray.Bytes);
end;

2.3 ContainsError のロジックが反転している

ファイル:sgcAMQP1_Frames.pas
TsgcAMQP1FrameRejectedTsgcAMQP1DescribedListError の両方の ContainsError メソッドは、エラーがないときに True、エラーがあるときに False を返していました。これにより、エラー情報が無言で破棄され、実際のエラーがシリアライズされるべき場合に null バイトが書き込まれていました。DoWriteDoWriteError のブランチも修正後のロジックに合わせて入れ替えられました。

修正前(不具合あり)

function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
  if not Assigned(FError) then
    Result := True              // Wrong: True when no error
  else
    Result := (Error.Condition = '') and (Error.Description = '') and
      (Error.Info = '');     // Wrong: True when all empty
end;

修正後(正常)

function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
  if not Assigned(FError) then
    Result := False             // Correct: False when no error
  else
    Result := (Error.Condition <> '') or (Error.Description <> '') or
      (Error.Info <> '');    // Correct: True when any field set
end;

2.4 SASL PLAIN の null バイト区切り文字

ファイル:sgcAMQP1_Frames.pas
SASL PLAIN メカニズムは null バイト($00)区切り文字を使用した \0username\0password 形式を必要とします。実装ではバイト配列をゼロで初期化していなかったため、区切り位置にゴミデータが含まれていました。標準準拠のすべての AMQP 1.0 ブローカーに対して認証が失敗していました。

修正後(正常)

SetLength(FInitialResponse, 2 + Length(oUser) + Length(oPassword));
FillChar(FInitialResponse[0], Length(FInitialResponse), 0);  // Zero-fill for null separators

2.5 TsgcAMQP1Message で inherited Create がない

ファイル:sgcAMQP1_Message.pas
TsgcAMQP1Message のパラメーター付きコンストラクターが inherited Create を呼び出していなかったため、基底クラスが初期化されませんでした。ショートカットコンストラクター使用時にアクセス違反または状態の破損が発生していました。

修正後(正常)

constructor TsgcAMQP1Message.Create(const aValue: string);
begin
  inherited Create;
  ApplicationData.ValueType := amqp1adtAmqpValue;
  ApplicationData.AMQPValue.Value := aValue;
end;

2.6 AmqpValue.DoRead でセミコロンが欠落

ファイル:sgcAMQP1_Frames.pas
TsgcAMQP1FrameAmqpValue.DoRead でセミコロンが欠落しており、コンパイルが妨げられていました。


3. メモリリークの修正

これらの修正は、特に多くのメッセージ交換を伴う長時間の AMQP 1.0 接続中に時間の経過とともにメモリが蓄積される原因となるオブジェクトライフタイム管理の問題に対処します。

Fix File 説明
3.1 sgcAMQP1_Frames.pas Source ディスクリプター読み取り時に FDefaultOutcome が再割り当て前に解放されていませんでした。新しいデフォルトアウトカムを受信するたびに、以前のオブジェクトがリークしていました。
3.2 sgcAMQP1_Session.pas デストラクターの sgcFree(FCreditConsumed) 呼び出しが重複しており、二重解放の可能性がありました。重複行を削除しました。
3.3 sgcAMQP1_Session.pas FOutgoingDeliveries がセッションのデストラクターから欠落していました。セッション破棄時に配信追跡リストが解放されませんでした。
3.4 sgcAMQP1_Message.pas FFreeMessageOnDestroy が有効な場合、SetMessageSetMessageAndFreeOnDestroy は以前のメッセージを解放していませんでした。メッセージを繰り返し割り当てるとメモリがリークしていました。

Fix 3.1 – FDefaultOutcome Freed Before Reassignment

sgcFree(FDefaultOutcome);  // Free previous instance before reassignment
if oDescriptor.Code = amqp1dcptReleased then
  FDefaultOutcome := TsgcAMQP1FrameReleased.Create
else if oDescriptor.Code = amqp1dcptAccepted then
  FDefaultOutcome := TsgcAMQP1FrameAccepted.Create
else if oDescriptor.Code = amqp1dcptRejected then
  FDefaultOutcome := TsgcAMQP1FrameRejected.Create

Fix 3.4 – SetMessage Frees Old Message

procedure TsgcAMQP1Delivery.SetMessage(const aMessage: TsgcAMQP1Message);
begin
  if FFreeMessageOnDestroy and Assigned(F_Message) and (F_Message <> aMessage) then
    sgcFree(F_Message);
  FFreeMessageOnDestroy := False;
  F_Message := aMessage;
end;

4. 仕様準拠の修正

これらの修正は、AMQP 1.0 TransportTypesMessaging 仕様からの逸脱を修正します。

4.1 Begin フレームのフィールド 7 が誤ったプロパティに読み込まれる

ファイル:sgcAMQP1_Frames.pas
begin パフォーマティブのフィールドインデックス 7 が Properties ではなく DesiredCapabilities に読み込まれていました。仕様によると begin フィールドは:remote-channel(0)、next-outgoing-id(1)、incoming-window(2)、outgoing-window(3)、handle-max(4)、offered-capabilities(5)、desired-capabilities(6)、properties(7) です。

4.2 Source と Target の DoWrite でフィールドが欠落

ファイル:sgcAMQP1_Frames.pas
Source ディスクリプターの DoWrite メソッドが default-outcomeoutcomescapabilities フィールドをシリアライズしていませんでした。Target ディスクリプターも capabilities が欠落していました。これによりブローカーがネゴシエート済みの値ではなくデフォルト値を使用し、誤った配信状態処理につながる可能性がありました。

4.3 AmqpSequence が誤ったプロパティに読み込まれる

ファイル:sgcAMQP1_Message.pas
メッセージボディの読み取り時、amqp-sequence データが ApplicationData.AMQPSequence ではなく ApplicationData.AMQPValue に読み込まれていました。これにより amqp-sequence エンコーディングを使用するメッセージのメッセージボディが破損していました。

4.4 TransactionalState の Outcome が書き込まれない

ファイル:sgcAMQP1_Frames.pas
transactional-state 配信状態がトランザクション内の配信決済時に必要な outcome フィールドをシリアライズしていませんでした。

4.5 Disposition の Last フィールドがゼロと未設定を区別できない

ファイル:sgcAMQP1_Frames.passgcAMQP1_Frames.intfsgcAMQP1_Session.pas
disposition パフォーマティブにはオプションの last フィールド(delivery-id)があります。Cardinal 型のため値 0 は有効であり、「未設定」のセンティネルとして使用できません。フィールドが明示的に設定されたかどうかを適切に追跡するために、新しい FLastAssigned ブール値フラグと SetLast セッターが追加されました。

procedure TsgcAMQP1FrameDisposition.SetLast(const Value: Cardinal);
begin
  FLast := Value;
  FLastAssigned := True;
end;

4.6 AmqpSequence に Value プロパティと Read/Write がない

ファイル:sgcAMQP1_Frames.passgcAMQP1_Frames.intf
TsgcAMQP1FrameAmqpSequence クラスには Value プロパティがなく、DoRead/DoWrite メソッドも空でした。amqp-sequence ボディタイプは完全に機能していませんでした。

4.7 Error の Info フィールドが Map ではなく String として読まれる

ファイル:sgcAMQP1_Frames.pas
AMQP 1.0 仕様では error 型の info フィールドを map と定義しています。ReadMap ではなく ReadString で読み取られていたため、ブローカーが構造化エラー情報を送信した際に解析エラーが発生していました。

4.8 Capabilities と Locales が Symbol ではなく String として書き込まれる

ファイル:sgcAMQP1_Frames.pas
AMQP 1.0 仕様では offered-capabilitiesdesired-capabilitiesoutgoing-localesincoming-localessymbol の配列として定義しています。openbeginattach パフォーマティブで WriteSymbol ではなく WriteString で書き込まれていました。標準準拠のブローカーはフィールド型が誤っているとしてこれらのフレームを拒否していました。

4.9 DefaultOutcome ハンドラーに Accepted ディスクリプターがない

ファイル:sgcAMQP1_Frames.pas
Source ディスクリプターの default-outcome リーダーは releasedrejected のみを処理していました。最も一般的なアウトカムである accepted は処理されていませんでした。ブローカーがデフォルトアウトカムとして accepted を送信した場合、無言で無視されていました。


5. ステートマシンと接続の修正

これらの修正は AMQP 1.0 接続ステートマシンとフレーム処理ロジックに対処します。

修正 ファイル 説明
5.1 sgcAMQP1.pas amqp1csOpenReceived の状態遷移に、他のすべての状態が持つ else DoRaiseInvalidState がありませんでした。無効な遷移がエラーを発生させずに無言で無視されていました。
5.2 sgcAMQP1.pas フレームサイズ検証エラーメッセージに RemoteMaxFrameSize が表示されていましたが、チェックされていたのはローカル制限であるため LocalMaxFrameSize を表示すべきでした。
5.3 sgcAMQP1.pas FLastTimeReadNow ではなく 0(Delphi エポック:1899-12-30)で初期化されていました。これにより起動時に即座に誤ったアイドルタイムアウト検出が発生していました。
5.4 sgcAMQP1.pas ReadTBytes オーバーロードが TMemoryStream オーバーロードと異なり FLastTimeRead := Now を更新していませんでした。これにより一貫性のないハートビート追跡が発生していました。
5.5 sgcAMQP1.pas ヘッダー受信の状態遷移は常にトリガーされるべきですが条件付きでした。AMQP 1.0 仕様に従い、ステートマシンはすべての有効なヘッダー交換で遷移しなければなりません。

Fix 5.1 – OpenReceived Missing Error Branch

amqp1csOpenReceived:
  begin
    if aState = amqp1csOpenSent then
      FConnectionState := amqp1csOpened
    else
      DoRaiseInvalidState;  // Added: was missing
  end;

6. ハートビートとアイドルタイムアウトの修正

AMQP 1.0 仕様では、ピアが open パフォーマティブで idle-timeout をアドバタイズした場合、もう一方のピアはアドバタイズされた間隔の半分でハートビートフレームを送信しなければならないと要求しています。これらの修正によりハートビートメカニズムが実際に機能するようになります。

修正 ファイル 説明
6.1 sgcAMQP1_Client.pas HeartBeat が一度も有効化されませんでした。アイドルタイムアウトチェックの両方のブランチが HeartBeat.Enabled := False を設定していました。IdleTimeout > 0 の場合に True に変更しました。
6.2 sgcAMQP1_Client.pas Disconnect がハートビートを無効化したり、十分に早い段階で FConnected := False を設定していませんでした。切断処理中にハートビートが発火することを防ぐために順序を変更しました。
6.3 sgcAMQP1.pas FLastTimeRead が TBytes Read オーバーロードで更新されていませんでした(ステートマシンセクションにも記載)。

Fix 6.1 – HeartBeat Enabled

if oOpen.IdleTimeout > 0 then
begin
  HeartBeat.Interval := Trunc(oOpen.IdleTimeout / 2);
  HeartBeat.Enabled := True;   // Was: False (heartbeat never started)
end
else
  HeartBeat.Enabled := False;

Fix 6.2 – Disconnect Reordered

procedure TsgcAMQP1_Client.Disconnect;
begin
  FConnected := False;           // Moved first: prevents heartbeat race
  DoStopIdleTimeout;
  HeartBeat.Enabled := False;    // Added: stop heartbeat during teardown
  Clear;
  DoConnectionState(amqp1csEnd);
end;

7. スレッドセーフティの修正

これらの修正は、共有データ構造への並行アクセスにおける競合状態に対処します。

7.1 TsgcAMQP1Deliveries.First() の境界チェックとロック

ファイル:sgcAMQP1_Message.pas
First() メソッドはリストが空かどうかを確認せず、スレッドセーフロックを取得せずに Items[0] にアクセスしていました。マルチスレッド環境では、カウントチェックとアクセスの間に別のスレッドがすべてのアイテムを削除し、インデックス範囲外例外を引き起こす可能性がありました。

修正後(正常)

function TsgcAMQP1Deliveries.First: TsgcAMQP1Delivery;
var
  oList: TList;
begin
  result := nil;
  oList := LockList;
  Try
    if oList.Count > 0 then
      result := TsgcAMQP1Delivery(oList[0]);
  Finally
    UnLockList;
  End;
end;

7.2 SetMessage の安全なオブジェクト置換

ファイル:sgcAMQP1_Message.pas
SetMessage メソッドは、解放前に新しいメッセージが現在のものと異なることを確認するようになり、同じメッセージオブジェクトを割り当てる際の解放後使用を防ぎます。


8. 変更されたファイル

ファイル 修正件数 カテゴリー
Source\sgcAMQP1_Classes.pas 2 重大バグ
Source\sgcAMQP1_Frames.pas 16 重大バグ・メモリリーク・仕様準拠
Interfaces\sgcAMQP1_Frames.intf 2 仕様準拠(Disposition LastAssigned・AmqpSequence Value)
Source\sgcAMQP1_Message.pas 4 重大バグ・メモリリーク・スレッドセーフティ
Source\sgcAMQP1_Session.pas 3 メモリリーク・仕様準拠
Source\sgcAMQP1.pas 5 ステートマシン・接続・ハートビート
Source\sgcAMQP1_Client.pas 3 ハートビート・切断の安全性

合計:30 件の修正が 8 つのファイルにわたって適用され、OASIS 仕様に対する AMQP 1.0 実装のプロトコル正確性・メモリ安全性・信頼性が向上しました。