AMQP 1 客户端 Delphi 更新

· 功能

sgcWebSockets 中的 AMQP 1.0 协议实现已经过对照 OASIS AMQP 1.0 规范 的全面审查。本文记录了跨 8 个源文件应用的 30 项修复,涵盖严重错误、内存泄漏、规范合规性、状态机正确性、心跳处理及线程安全改进。

目录

  1. 概览
  2. 严重错误修复
  3. 内存泄漏修复
  4. 规范合规性修复
  5. 状态机与连接修复
  6. 心跳与空闲超时修复
  7. 线程安全修复
  8. 修改的文件

1. 概览

共对 AMQP 1.0 实现中的 6 个源文件2 个接口文件 应用了 30 项修复。修复分类如下:

类别 数量 严重程度
严重错误修复 6 严重
内存泄漏修复 4 严重
规范合规性 10
状态机与连接 5
心跳与空闲超时 3
线程安全 2

2. 严重错误修复

这些修复解决了会导致 AMQP 1.0 通信期间立即发生运行时错误、协议损坏或行为不正确的缺陷。

2.1 异常中缺少 raise 关键字

File: sgcAMQP1_Classes.pas
An exception object was created but never raised when an invalid frame type was encountered. The exception was silently discarded, allowing corrupt frames to be processed without detection.

修改前(错误)

TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);

修改后(已修复)

raise TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);

2.2 WriteMap Map32 缺少数据且大小错误

File: sgcAMQP1_Classes.pas
Map32 编码路径缺少对实际 Map 数据的 WriteBytes 调用,且 size 字段使用了错误的偏移量。Map32 使用 4 字节计数字段(Map8 仅用 1 字节),因此 size 必须额外包含 4 个字节。

修改前(错误)

else
begin
  WriteUByte(Ord(amqp1DataMap32));
  _WriteUInt32(vSize + 1);
  _WriteUInt32(oJSON.Count * 2);
  // Missing: WriteBytes(oArray.Bytes);
end;

修改后(已修复)

else
begin
  WriteUByte(Ord(amqp1DataMap32));
  _WriteUInt32(vSize + 4);
  _WriteUInt32(oJSON.Count * 2);
  WriteBytes(oArray.Bytes);
end;

2.3 ContainsError 逻辑颠倒

File: sgcAMQP1_Frames.pas
TsgcAMQP1FrameRejectedTsgcAMQP1DescribedListError 中的 ContainsError 方法在没有错误时返回 True,在错误时返回 False。这导致错误信息被静默丢弃,并在本应序列化实际错误时写入空字节。DoWriteDoWriteError 分支也被交换以匹配修正后的逻辑。

修改前(错误)

function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
  if not Assigned(FError) then
    Result := True              // Wrong: True when no error
  else
    Result := (Error.Condition = '') and (Error.Description = '') and
      (Error.Info = '');     // Wrong: True when all empty
end;

修改后(已修复)

function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
  if not Assigned(FError) then
    Result := False             // Correct: False when no error
  else
    Result := (Error.Condition <> '') or (Error.Description <> '') or
      (Error.Info <> '');    // Correct: True when any field set
end;

2.4 SASL PLAIN 空字节分隔符

File: sgcAMQP1_Frames.pas
SASL PLAIN 机制要求格式为 \0username\0password,以空字节($00)作为分隔符。实现未将字节数组初始化为零,导致分隔符位置包含垃圾数据。与任何符合标准的 AMQP 1.0 代理进行身份验证均会失败。

修改后(已修复)

SetLength(FInitialResponse, 2 + Length(oUser) + Length(oPassword));
FillChar(FInitialResponse[0], Length(FInitialResponse), 0);  // Zero-fill for null separators

2.5 TsgcAMQP1Message 中缺少 inherited Create

File: sgcAMQP1_Message.pas
TsgcAMQP1Message 的参数化构造函数未调用 inherited Create,导致基类从未被初始化。使用便捷构造函数时会引发访问违规或状态损坏。

修改后(已修复)

constructor TsgcAMQP1Message.Create(const aValue: string);
begin
  inherited Create;
  ApplicationData.ValueType := amqp1adtAmqpValue;
  ApplicationData.AMQPValue.Value := aValue;
end;

2.6 AmqpValue.DoRead 中缺少分号

File: sgcAMQP1_Frames.pas
TsgcAMQP1FrameAmqpValue.DoRead 中缺少分号,导致无法编译。


3. 内存泄漏修复

这些修复解决了对象生命周期管理问题,该问题会导致内存随时间不断积累,尤其是在具有大量消息交换的长时间 AMQP 1.0 连接期间。

修复 文件 描述
3.1 sgcAMQP1_Frames.pas 读取 Source 描述符时,FDefaultOutcome 在重新赋值前未被释放。每次接收到新的默认 Outcome 时,上一个对象就会泄漏。
3.2 sgcAMQP1_Session.pas 析构函数中重复调用 sgcFree(FCreditConsumed) 导致潜在的双重释放问题,已移除重复行。
3.3 sgcAMQP1_Session.pas FOutgoingDeliveries 在会话析构函数中缺失,导致会话销毁时投递跟踪列表从未被释放。
3.4 sgcAMQP1_Message.pas FFreeMessageOnDestroy 启用时,SetMessageSetMessageAndFreeOnDestroy 未释放上一条消息。重复赋值消息会导致内存泄漏。

修复 3.1 — 重新赋值前释放 FDefaultOutcome

sgcFree(FDefaultOutcome);  // Free previous instance before reassignment
if oDescriptor.Code = amqp1dcptReleased then
  FDefaultOutcome := TsgcAMQP1FrameReleased.Create
else if oDescriptor.Code = amqp1dcptAccepted then
  FDefaultOutcome := TsgcAMQP1FrameAccepted.Create
else if oDescriptor.Code = amqp1dcptRejected then
  FDefaultOutcome := TsgcAMQP1FrameRejected.Create

修复 3.4 — SetMessage 释放旧消息

procedure TsgcAMQP1Delivery.SetMessage(const aMessage: TsgcAMQP1Message);
begin
  if FFreeMessageOnDestroy and Assigned(F_Message) and (F_Message <> aMessage) then
    sgcFree(F_Message);
  FFreeMessageOnDestroy := False;
  F_Message := aMessage;
end;

4. 规范合规性修复

这些修复纠正了与 AMQP 1.0 传输类型消息规范之间的偏差。

4.1 Begin 帧字段 7 读入错误属性

File: sgcAMQP1_Frames.pas
begin 性能字段索引 7 被读入 DesiredCapabilities 而非 Properties。根据规范,begin 字段依次为:remote-channel(0)、next-outgoing-id(1)、incoming-window(2)、outgoing-window(3)、handle-max(4)、offered-capabilities(5)、desired-capabilities(6)、properties(7)。

4.2 DoWrite 中 Source 和 Target 缺少字段

File: sgcAMQP1_Frames.pas
Source 描述符的 DoWrite 方法未序列化 default-outcomeoutcomescapabilities 字段。Target 描述符缺少 capabilities。这导致代理使用默认值而非协商值,可能引发错误的投递状态处理。

4.3 AmqpSequence 读入错误属性

File: sgcAMQP1_Message.pas
读取消息体时,amqp-sequence 数据被读入 ApplicationData.AMQPValue 而非 ApplicationData.AMQPSequence,导致使用 amqp-sequence 编码的所有消息体损坏。

4.4 TransactionalState Outcome 未写入

File: sgcAMQP1_Frames.pas
transactional-state 投递状态未序列化其 outcome 字段,而在事务内结算投递时该字段是必需的。

4.5 Disposition Last 字段无法区分零值和未设置

Files: sgcAMQP1_Frames.pas, sgcAMQP1_Frames.intf, sgcAMQP1_Session.pas
disposition 性能具有一个可选的 last 字段(delivery-id)。由于它是 Cardinal 类型,值 0 有效,不能用作"未设置"的哨兵值。新增了 FLastAssigned 布尔标志和 SetLast setter,以正确跟踪该字段是否被显式设置。

procedure TsgcAMQP1FrameDisposition.SetLast(const Value: Cardinal);
begin
  FLast := Value;
  FLastAssigned := True;
end;

4.6 AmqpSequence 缺少 Value 属性及读写方法

Files: sgcAMQP1_Frames.pas, sgcAMQP1_Frames.intf
TsgcAMQP1FrameAmqpSequence 类没有 Value 属性,且 DoRead/DoWrite 方法为空。amqp-sequence 消息体类型完全无法正常工作。

4.7 Error Info 字段被读为字符串而非 Map

File: sgcAMQP1_Frames.pas
AMQP 1.0 规范将 error 类型的 info 字段定义为 map。代码使用 ReadString 而非 ReadMap 读取该字段,导致代理发送结构化错误信息时解析失败。

4.8 Capabilities 和 Locales 被写为字符串而非 Symbol

File: sgcAMQP1_Frames.pas
AMQP 1.0 规范将 offered-capabilitiesdesired-capabilitiesoutgoing-localesincoming-locales 定义为 symbol 数组。在 openbeginattach 性能中,这些字段使用 WriteString 而非 WriteSymbol 写入。符合标准的代理将以字段类型不正确为由拒绝这些帧。

4.9 DefaultOutcome 处理器缺少 Accepted 描述符

File: sgcAMQP1_Frames.pas
Source 描述符的 default-outcome 读取器仅处理了 releasedrejected。最常见的 Outcome——accepted——未被处理。当代理将 accepted 作为默认 Outcome 发送时,它会被静默忽略。


5. 状态机与连接修复

这些修复解决了 AMQP 1.0 连接状态机和帧处理逻辑中的问题。

修复 文件 描述
5.1 sgcAMQP1.pas amqp1csOpenReceived 状态转换缺少其他所有状态均有的 else DoRaiseInvalidState,导致无效转换被静默忽略而非抛出错误。
5.2 sgcAMQP1.pas 帧大小验证错误消息显示的是 RemoteMaxFrameSize,但实际检查的是本地限制,应显示 LocalMaxFrameSize
5.3 sgcAMQP1.pas FLastTimeRead 被初始化为 0(Delphi 纪元:1899-12-30)而非 Now,导致启动时立即误触发空闲超时检测。
5.4 sgcAMQP1.pas ReadTBytes 重载未更新 FLastTimeRead := Now,而 TMemoryStream 重载则有更新,导致心跳跟踪不一致。
5.5 sgcAMQP1.pas 收到头部的状态转换存在条件判断,而实际上应始终触发。根据 AMQP 1.0 规范,状态机必须在每次有效头部交换时都进行转换。

修复 5.1 — OpenReceived 缺少错误分支

amqp1csOpenReceived:
  begin
    if aState = amqp1csOpenSent then
      FConnectionState := amqp1csOpened
    else
      DoRaiseInvalidState;  // Added: was missing
  end;

6. 心跳与空闲超时修复

AMQP 1.0 规范要求:当对端在 open 性能中通告 idle-timeout 时,另一端必须以通告间隔的一半频率发送心跳帧。这些修复确保心跳机制能真正正常工作。

修复 文件 描述
6.1 sgcAMQP1_Client.pas 心跳从未被启用。空闲超时检查的两个分支均将 HeartBeat.Enabled 设置为 False。已修改为当 IdleTimeout > 0 时设置为 True
6.2 sgcAMQP1_Client.pas Disconnect 未能及时禁用心跳或设置 FConnected := False,已重新排序以防止在断开过程中心跳误触发。
6.3 sgcAMQP1.pas TBytes Read 重载中未更新 FLastTimeRead(同见状态机章节)。

修复 6.1 — 启用心跳

if oOpen.IdleTimeout > 0 then
begin
  HeartBeat.Interval := Trunc(oOpen.IdleTimeout / 2);
  HeartBeat.Enabled := True;   // Was: False (heartbeat never started)
end
else
  HeartBeat.Enabled := False;

修复 6.2 — 重新排序断开连接

procedure TsgcAMQP1_Client.Disconnect;
begin
  FConnected := False;           // Moved first: prevents heartbeat race
  DoStopIdleTimeout;
  HeartBeat.Enabled := False;    // Added: stop heartbeat during teardown
  Clear;
  DoConnectionState(amqp1csEnd);
end;

7. 线程安全修复

这些修复解决了并发访问共享数据结构时的竞争条件问题。

7.1 TsgcAMQP1Deliveries.First() 边界检查与锁定

File: sgcAMQP1_Message.pas
First() 方法在访问 Items[0] 时未检查列表是否为空,也未获取线程安全锁。在多线程环境中,另一个线程可能在计数检查和访问之间移除所有项目,引发索引越界异常。

修改后(已修复)

function TsgcAMQP1Deliveries.First: TsgcAMQP1Delivery;
var
  oList: TList;
begin
  result := nil;
  oList := LockList;
  Try
    if oList.Count > 0 then
      result := TsgcAMQP1Delivery(oList[0]);
  Finally
    UnLockList;
  End;
end;

7.2 SetMessage 安全对象替换

File: sgcAMQP1_Message.pas
SetMessage 方法现在在释放前会检查新消息是否与当前消息不同,从而防止赋值同一消息对象时出现释放后使用的问题。


8. 修改的文件

文件 修复数 类别
Source\sgcAMQP1_Classes.pas 2 严重错误
Source\sgcAMQP1_Frames.pas 16 严重错误、内存泄漏、规范合规性
Interfaces\sgcAMQP1_Frames.intf 2 规范合规性(Disposition LastAssigned、AmqpSequence Value)
Source\sgcAMQP1_Message.pas 4 严重错误、内存泄漏、线程安全
Source\sgcAMQP1_Session.pas 3 内存泄漏、规范合规性
Source\sgcAMQP1.pas 5 状态机、连接、心跳
Source\sgcAMQP1_Client.pas 3 心跳、断开连接安全性

共计:30 项修复,涵盖 8 个文件,提升了 AMQP 1.0 实现相对于 OASIS 规范的协议正确性、内存安全性和可靠性。