sgcWebSockets の AMQP 1.0 プロトコル実装は、以下の仕様に対する包括的なレビューを実施しました: OASIS AMQP 1.0 仕様。 本記事では 8 つのソースファイルにわたって適用された 30 件の修正(重大バグ、メモリリーク、 仕様準拠、ステートマシンの正確性、ハートビート処理、スレッドセーフティの改善)を文書化します。
目次
1. 概要
AMQP 1.0 実装において、6 つのソースファイルと2 つのインターフェースファイルにわたって合計 30 件の修正が適用されました。修正は以下のカテゴリーに分類されます:
| カテゴリー | 件数 | 重大度 |
|---|---|---|
| 重大バグ修正 | 6 | Critical |
| メモリリーク修正 | 4 | Critical |
| 仕様準拠 | 10 | High |
| ステートマシン・接続 | 5 | High |
| ハートビート・アイドルタイムアウト | 3 | High |
| スレッドセーフティ | 2 | High |
2. 重要なバグ修正
これらの修正は、AMQP 1.0 通信中に即座のランタイム障害・プロトコルの破損・誤った動作を引き起こすバグに対処します。
2.1 例外に raise キーワードがない
ファイル:sgcAMQP1_Classes.pas
無効なフレームタイプが検出された際に例外オブジェクトが生成されましたが、raise されませんでした。例外は無言で破棄され、破損したフレームが検出されずに処理されていました。
修正前(不具合あり)
TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);
修正後(正常)
raise TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);
2.2 WriteMap Map32 でデータが欠落しサイズが誤っている
ファイル:sgcAMQP1_Classes.pas
Map32 エンコーディングパスで実際のマップデータの WriteBytes 呼び出しが欠落しており、サイズフィールドのオフセットが誤っていました。Map32 は 4 バイトのカウントフィールドを使用します(Map8 は 1 バイト)。そのためサイズには 4 バイト余分に含める必要があります。
修正前(不具合あり)
else
begin
WriteUByte(Ord(amqp1DataMap32));
_WriteUInt32(vSize + 1);
_WriteUInt32(oJSON.Count * 2);
// Missing: WriteBytes(oArray.Bytes);
end;
修正後(正常)
else
begin
WriteUByte(Ord(amqp1DataMap32));
_WriteUInt32(vSize + 4);
_WriteUInt32(oJSON.Count * 2);
WriteBytes(oArray.Bytes);
end;
2.3 ContainsError のロジックが反転している
ファイル:sgcAMQP1_Frames.pas
TsgcAMQP1FrameRejected と TsgcAMQP1DescribedListError の両方の ContainsError メソッドは、エラーがないときに True、エラーがあるときに False を返していました。これにより、エラー情報が無言で破棄され、実際のエラーがシリアライズされるべき場合に null バイトが書き込まれていました。DoWrite と DoWriteError のブランチも修正後のロジックに合わせて入れ替えられました。
修正前(不具合あり)
function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
if not Assigned(FError) then
Result := True // Wrong: True when no error
else
Result := (Error.Condition = '') and (Error.Description = '') and
(Error.Info = ''); // Wrong: True when all empty
end;
修正後(正常)
function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
if not Assigned(FError) then
Result := False // Correct: False when no error
else
Result := (Error.Condition <> '') or (Error.Description <> '') or
(Error.Info <> ''); // Correct: True when any field set
end;
2.4 SASL PLAIN の null バイト区切り文字
ファイル:sgcAMQP1_Frames.pas
SASL PLAIN メカニズムは null バイト($00)区切り文字を使用した \0username\0password 形式を必要とします。実装ではバイト配列をゼロで初期化していなかったため、区切り位置にゴミデータが含まれていました。標準準拠のすべての AMQP 1.0 ブローカーに対して認証が失敗していました。
修正後(正常)
SetLength(FInitialResponse, 2 + Length(oUser) + Length(oPassword));
FillChar(FInitialResponse[0], Length(FInitialResponse), 0); // Zero-fill for null separators
2.5 TsgcAMQP1Message で inherited Create がない
ファイル:sgcAMQP1_Message.pas
TsgcAMQP1Message のパラメーター付きコンストラクターが inherited Create を呼び出していなかったため、基底クラスが初期化されませんでした。ショートカットコンストラクター使用時にアクセス違反または状態の破損が発生していました。
修正後(正常)
constructor TsgcAMQP1Message.Create(const aValue: string);
begin
inherited Create;
ApplicationData.ValueType := amqp1adtAmqpValue;
ApplicationData.AMQPValue.Value := aValue;
end;
2.6 AmqpValue.DoRead でセミコロンが欠落
ファイル:sgcAMQP1_Frames.pas
TsgcAMQP1FrameAmqpValue.DoRead でセミコロンが欠落しており、コンパイルが妨げられていました。
3. メモリリークの修正
これらの修正は、特に多くのメッセージ交換を伴う長時間の AMQP 1.0 接続中に時間の経過とともにメモリが蓄積される原因となるオブジェクトライフタイム管理の問題に対処します。
| Fix | File | 説明 |
|---|---|---|
| 3.1 | sgcAMQP1_Frames.pas |
Source ディスクリプター読み取り時に FDefaultOutcome が再割り当て前に解放されていませんでした。新しいデフォルトアウトカムを受信するたびに、以前のオブジェクトがリークしていました。 |
| 3.2 | sgcAMQP1_Session.pas |
デストラクターの sgcFree(FCreditConsumed) 呼び出しが重複しており、二重解放の可能性がありました。重複行を削除しました。 |
| 3.3 | sgcAMQP1_Session.pas |
FOutgoingDeliveries がセッションのデストラクターから欠落していました。セッション破棄時に配信追跡リストが解放されませんでした。 |
| 3.4 | sgcAMQP1_Message.pas |
FFreeMessageOnDestroy が有効な場合、SetMessage と SetMessageAndFreeOnDestroy は以前のメッセージを解放していませんでした。メッセージを繰り返し割り当てるとメモリがリークしていました。 |
Fix 3.1 – FDefaultOutcome Freed Before Reassignment
sgcFree(FDefaultOutcome); // Free previous instance before reassignment
if oDescriptor.Code = amqp1dcptReleased then
FDefaultOutcome := TsgcAMQP1FrameReleased.Create
else if oDescriptor.Code = amqp1dcptAccepted then
FDefaultOutcome := TsgcAMQP1FrameAccepted.Create
else if oDescriptor.Code = amqp1dcptRejected then
FDefaultOutcome := TsgcAMQP1FrameRejected.Create
Fix 3.4 – SetMessage Frees Old Message
procedure TsgcAMQP1Delivery.SetMessage(const aMessage: TsgcAMQP1Message);
begin
if FFreeMessageOnDestroy and Assigned(F_Message) and (F_Message <> aMessage) then
sgcFree(F_Message);
FFreeMessageOnDestroy := False;
F_Message := aMessage;
end;
4. 仕様準拠の修正
これらの修正は、AMQP 1.0 Transport・Types・Messaging 仕様からの逸脱を修正します。
4.1 Begin フレームのフィールド 7 が誤ったプロパティに読み込まれる
ファイル:sgcAMQP1_Frames.pas
begin パフォーマティブのフィールドインデックス 7 が Properties ではなく DesiredCapabilities に読み込まれていました。仕様によると begin フィールドは:remote-channel(0)、next-outgoing-id(1)、incoming-window(2)、outgoing-window(3)、handle-max(4)、offered-capabilities(5)、desired-capabilities(6)、properties(7) です。
4.2 Source と Target の DoWrite でフィールドが欠落
ファイル:sgcAMQP1_Frames.pas
Source ディスクリプターの DoWrite メソッドが default-outcome・outcomes・capabilities フィールドをシリアライズしていませんでした。Target ディスクリプターも capabilities が欠落していました。これによりブローカーがネゴシエート済みの値ではなくデフォルト値を使用し、誤った配信状態処理につながる可能性がありました。
4.3 AmqpSequence が誤ったプロパティに読み込まれる
ファイル:sgcAMQP1_Message.pas
メッセージボディの読み取り時、amqp-sequence データが ApplicationData.AMQPSequence ではなく ApplicationData.AMQPValue に読み込まれていました。これにより amqp-sequence エンコーディングを使用するメッセージのメッセージボディが破損していました。
4.4 TransactionalState の Outcome が書き込まれない
ファイル:sgcAMQP1_Frames.pas
transactional-state 配信状態がトランザクション内の配信決済時に必要な outcome フィールドをシリアライズしていませんでした。
4.5 Disposition の Last フィールドがゼロと未設定を区別できない
ファイル:sgcAMQP1_Frames.pas、sgcAMQP1_Frames.intf、sgcAMQP1_Session.pas
disposition パフォーマティブにはオプションの last フィールド(delivery-id)があります。Cardinal 型のため値 0 は有効であり、「未設定」のセンティネルとして使用できません。フィールドが明示的に設定されたかどうかを適切に追跡するために、新しい FLastAssigned ブール値フラグと SetLast セッターが追加されました。
procedure TsgcAMQP1FrameDisposition.SetLast(const Value: Cardinal);
begin
FLast := Value;
FLastAssigned := True;
end;
4.6 AmqpSequence に Value プロパティと Read/Write がない
ファイル:sgcAMQP1_Frames.pas、sgcAMQP1_Frames.intf
TsgcAMQP1FrameAmqpSequence クラスには Value プロパティがなく、DoRead/DoWrite メソッドも空でした。amqp-sequence ボディタイプは完全に機能していませんでした。
4.7 Error の Info フィールドが Map ではなく String として読まれる
ファイル:sgcAMQP1_Frames.pas
AMQP 1.0 仕様では error 型の info フィールドを map と定義しています。ReadMap ではなく ReadString で読み取られていたため、ブローカーが構造化エラー情報を送信した際に解析エラーが発生していました。
4.8 Capabilities と Locales が Symbol ではなく String として書き込まれる
ファイル:sgcAMQP1_Frames.pas
AMQP 1.0 仕様では offered-capabilities・desired-capabilities・outgoing-locales・incoming-locales を symbol の配列として定義しています。open・begin・attach パフォーマティブで WriteSymbol ではなく WriteString で書き込まれていました。標準準拠のブローカーはフィールド型が誤っているとしてこれらのフレームを拒否していました。
4.9 DefaultOutcome ハンドラーに Accepted ディスクリプターがない
ファイル:sgcAMQP1_Frames.pas
Source ディスクリプターの default-outcome リーダーは released と rejected のみを処理していました。最も一般的なアウトカムである accepted は処理されていませんでした。ブローカーがデフォルトアウトカムとして accepted を送信した場合、無言で無視されていました。
5. ステートマシンと接続の修正
これらの修正は AMQP 1.0 接続ステートマシンとフレーム処理ロジックに対処します。
| 修正 | ファイル | 説明 |
|---|---|---|
| 5.1 | sgcAMQP1.pas |
amqp1csOpenReceived の状態遷移に、他のすべての状態が持つ else DoRaiseInvalidState がありませんでした。無効な遷移がエラーを発生させずに無言で無視されていました。 |
| 5.2 | sgcAMQP1.pas |
フレームサイズ検証エラーメッセージに RemoteMaxFrameSize が表示されていましたが、チェックされていたのはローカル制限であるため LocalMaxFrameSize を表示すべきでした。 |
| 5.3 | sgcAMQP1.pas |
FLastTimeRead が Now ではなく 0(Delphi エポック:1899-12-30)で初期化されていました。これにより起動時に即座に誤ったアイドルタイムアウト検出が発生していました。 |
| 5.4 | sgcAMQP1.pas |
Read の TBytes オーバーロードが TMemoryStream オーバーロードと異なり FLastTimeRead := Now を更新していませんでした。これにより一貫性のないハートビート追跡が発生していました。 |
| 5.5 | sgcAMQP1.pas |
ヘッダー受信の状態遷移は常にトリガーされるべきですが条件付きでした。AMQP 1.0 仕様に従い、ステートマシンはすべての有効なヘッダー交換で遷移しなければなりません。 |
Fix 5.1 – OpenReceived Missing Error Branch
amqp1csOpenReceived:
begin
if aState = amqp1csOpenSent then
FConnectionState := amqp1csOpened
else
DoRaiseInvalidState; // Added: was missing
end;
6. ハートビートとアイドルタイムアウトの修正
AMQP 1.0 仕様では、ピアが open パフォーマティブで idle-timeout をアドバタイズした場合、もう一方のピアはアドバタイズされた間隔の半分でハートビートフレームを送信しなければならないと要求しています。これらの修正によりハートビートメカニズムが実際に機能するようになります。
| 修正 | ファイル | 説明 |
|---|---|---|
| 6.1 | sgcAMQP1_Client.pas |
HeartBeat が一度も有効化されませんでした。アイドルタイムアウトチェックの両方のブランチが HeartBeat.Enabled := False を設定していました。IdleTimeout > 0 の場合に True に変更しました。 |
| 6.2 | sgcAMQP1_Client.pas |
Disconnect がハートビートを無効化したり、十分に早い段階で FConnected := False を設定していませんでした。切断処理中にハートビートが発火することを防ぐために順序を変更しました。 |
| 6.3 | sgcAMQP1.pas |
FLastTimeRead が TBytes Read オーバーロードで更新されていませんでした(ステートマシンセクションにも記載)。 |
Fix 6.1 – HeartBeat Enabled
if oOpen.IdleTimeout > 0 then
begin
HeartBeat.Interval := Trunc(oOpen.IdleTimeout / 2);
HeartBeat.Enabled := True; // Was: False (heartbeat never started)
end
else
HeartBeat.Enabled := False;
Fix 6.2 – Disconnect Reordered
procedure TsgcAMQP1_Client.Disconnect;
begin
FConnected := False; // Moved first: prevents heartbeat race
DoStopIdleTimeout;
HeartBeat.Enabled := False; // Added: stop heartbeat during teardown
Clear;
DoConnectionState(amqp1csEnd);
end;
7. スレッドセーフティの修正
これらの修正は、共有データ構造への並行アクセスにおける競合状態に対処します。
7.1 TsgcAMQP1Deliveries.First() の境界チェックとロック
ファイル:sgcAMQP1_Message.pas
First() メソッドはリストが空かどうかを確認せず、スレッドセーフロックを取得せずに Items[0] にアクセスしていました。マルチスレッド環境では、カウントチェックとアクセスの間に別のスレッドがすべてのアイテムを削除し、インデックス範囲外例外を引き起こす可能性がありました。
修正後(正常)
function TsgcAMQP1Deliveries.First: TsgcAMQP1Delivery;
var
oList: TList;
begin
result := nil;
oList := LockList;
Try
if oList.Count > 0 then
result := TsgcAMQP1Delivery(oList[0]);
Finally
UnLockList;
End;
end;
7.2 SetMessage の安全なオブジェクト置換
ファイル:sgcAMQP1_Message.pas
SetMessage メソッドは、解放前に新しいメッセージが現在のものと異なることを確認するようになり、同じメッセージオブジェクトを割り当てる際の解放後使用を防ぎます。
8. 変更されたファイル
| ファイル | 修正件数 | カテゴリー |
|---|---|---|
Source\sgcAMQP1_Classes.pas |
2 | 重大バグ |
Source\sgcAMQP1_Frames.pas |
16 | 重大バグ・メモリリーク・仕様準拠 |
Interfaces\sgcAMQP1_Frames.intf |
2 | 仕様準拠(Disposition LastAssigned・AmqpSequence Value) |
Source\sgcAMQP1_Message.pas |
4 | 重大バグ・メモリリーク・スレッドセーフティ |
Source\sgcAMQP1_Session.pas |
3 | メモリリーク・仕様準拠 |
Source\sgcAMQP1.pas |
5 | ステートマシン・接続・ハートビート |
Source\sgcAMQP1_Client.pas |
3 | ハートビート・切断の安全性 |
合計:30 件の修正が 8 つのファイルにわたって適用され、OASIS 仕様に対する AMQP 1.0 実装のプロトコル正確性・メモリ安全性・信頼性が向上しました。
