AMQP 0.9.1 Delphi 客户端更新

· 功能

sgcWebSockets 中的 AMQP 0-9-1 实现已完成全面更新:11 项错误修复涵盖关键参数顺序错误、类型不匹配、线程安全及数据丢失问题,并新增 6 项协议功能,包括 Basic.Nack、交换机到交换机绑定、发布者确认、Connection.Blocked/Unblocked 通知以及用于令牌刷新的 Connection.UpdateSecret。本文详细介绍了每项变更及代码示例。

目录

  1. 错误修复
  2. 严重:DeclareExchange 参数顺序
  3. 字段表类型字节
  4. 规范合规性修复
  5. 其他错误修复
  6. 新功能
  7. Basic.Nack — 否定确认
  8. Exchange.Bind/Unbind — 交换机到交换机绑定
  9. Confirm 类 — 发布者确认
  10. Connection.Blocked/Unblocked — 资源告警
  11. Connection.UpdateSecret — 令牌刷新
  12. 修改的文件

1. 错误修复

AMQP 0-9-1 实现中共修复了 11 个错误,从关键参数顺序问题到规范合规性修正,涵盖范围广泛。

严重:DeclareExchange 参数顺序

DeclareExchangeDeclareExchangeEx 方法向 DoWrite_ExchDeclare 传递 aNoWaitaAutoDeleteaInternal 时顺序错误,导致 auto-delete 标志被作为 no-wait 发送(反之亦然),在代理上引发意外的交换机行为。

修改前(错误)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);

修改后(已修复)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);

File: sgcAMQP_Client.pas


字段表类型字节

sgcWriteAMQPFieldTable 过程总是将 $53'S' = 长字符串)作为所有字段表值的类型指示符写入,而不管实际值类型。这导致双精度浮点数、整数、布尔值和 int64 值在传输格式中均被错误地标记为字符串。

修改前(错误)

sgcWriteAMQPByte($53, vBytes);  // Always wrote 'S' for ALL types
case vType of
  'd': sgcWriteAMQPDouble(...);
  'I': sgcWriteAMQPInt32(...);
  ...

修改后(已修复)

case vType of
  'd':
  begin
    sgcWriteAMQPByte(Byte('d'), vBytes);  // Correct type per value
    sgcWriteAMQPDouble(...);
  end;
  'I':
  begin
    sgcWriteAMQPByte(Byte('I'), vBytes);
    sgcWriteAMQPInt32(...);
  end;
  'L':
  begin
    sgcWriteAMQPByte(Byte('L'), vBytes);
    sgcWriteAMQPInt64(...);
  end;
  't':
  begin
    sgcWriteAMQPByte(Byte('t'), vBytes);
    sgcWriteAMQPBoolean(...);
  end;
  'S':
  begin
    sgcWriteAMQPByte(Byte('S'), vBytes);
    sgcWriteAMQPLongString(...);
  end;

File: sgcAMQP_ReadWrite.pas


规范合规性修复

问题 修改前 修改后 文件
BasicGetEmpty.Reserved1 类型错误 UInt16 / sgcReadAMQPUInt16 string / sgcReadAMQPShortString sgcAMQP_Classes.pas
ChannelOpenOk.Reserved1 读取错误 sgcReadAMQPShortString sgcReadAMQPLongString sgcAMQP_Classes.pas
ConnectionClose/ChannelClose 使用了类特定的方法值获取器 sgcGetAMQPConnectionValue(FailMethodId) sgcGetAMQPMethodValue(FailClassId, FailMethodId) sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas

sgcAMQP_Helpers.pas 中新增了通用辅助函数 sgcGetAMQPMethodValue,用于解析任意 AMQP 类的正确方法整数 ID,替代了仅适用于各自类的特定 getter。


其他错误修复

错误 描述 文件
Channel.CloseOk 缺少通道 ID DoWrite_ChannCloseOk 未设置 oFrame.Header.Channel,导致 close-ok 被发送到通道 0(连接级别)而非目标通道。已新增 aChannelId: Word 参数。 sgcAMQP.pas
错误常量中的拼写错误 'Now Allowed' 更正为 'Not Allowed' sgcAMQP_Const.pas
QueueUnBind 缺少请求数据 DoWrite_QueueUnBind 未将 QueueUnBindQueueQueueUnBindExchange 存储在通道请求中,导致 OnAMQPQueueUnBind 事件报告空值。 sgcAMQP_Client.pas
DoRead 后剩余字节被丢弃 When the read loop exited with 1–7 remaining bytes (partial frame), they were silently lost. Now saved to FBytes for the next read cycle. sgcAMQP.pas
GetChannel 忽略 aRaiseIfNotFound aRaiseIfNotFound 参数从未被检查,现在仅在标志为 True 时抛出异常。 sgcAMQP.pas

2. 新功能

已实现六项新的 AMQP 0-9-1 协议功能,涵盖广泛使用的 RabbitMQ 扩展和额外的规范方法。


Basic.Nack — 否定确认

Basic.Nack(类 60,方法 120)是 RabbitMQ 扩展,允许一次性拒绝一条或多条消息并可选重新入队。与 Basic.Reject 不同,它支持 multiple 标志,可拒绝直至并包含指定投递标签的所有消息。

方法 描述 方向
NackMessage 向代理发送否定确认。 客户端 → 服务器
OnAMQPBasicNack 当服务器发送 Nack(在发布者确认模式下)时触发。 服务器 → 客户端

NackMessage

procedure NackMessage(const aChannel: string;
  aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);

OnAMQPBasicNack 事件

procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
  const aChannel: string;
  const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
  Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aNack.Multiple, True) +
    ', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;

Exchange.Bind/Unbind — 交换机到交换机绑定

交换机到交换机绑定(类 40,方法 30/31 和 40/51)允许在交换机之间路由消息而无需中间队列。这是 RabbitMQ 扩展,可实现扇出层次结构和主题分区等强大的拓扑模式。

方法 描述
BindExchange / BindExchangeEx 使用路由键将目标交换机绑定到源交换机。Ex 变体会同步等待代理响应。
UnbindExchange / UnbindExchangeEx 删除交换机到交换机的绑定。
OnAMQPExchangeBind 当代理确认交换机绑定时触发。
OnAMQPExchangeUnbind 当代理确认交换机解绑时触发。

BindExchange / BindExchangeEx

procedure BindExchange(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
  const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
  'downstream-exchange',  // destination
  'upstream-exchange',    // source
  'orders.#',             // routing key
  False)                  // wait for confirmation
then
  Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
  'downstream-exchange', 'upstream-exchange', 'orders.#', False);

Confirm 类 — 发布者确认

发布者确认(类 85,方法 10/11)允许代理确认已接收发布的消息。通道通过 Confirm.Select 进入确认模式后,代理将为每条发布的消息发送 Basic.AckBasic.Nack,从而实现无需事务的可靠发布。

方法 / 事件 描述
SelectConfirm / SelectConfirmEx 在通道上启用发布者确认模式。
OnAMQPConfirmSelectOk 当代理确认确认模式已激活时触发。
OnAMQPBasicAck 当代理确认已发布的消息时触发。
OnAMQPBasicNack 当代理对已发布的消息进行否定确认时触发。

SelectConfirm / SelectConfirmEx

procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;

示例:使用确认机制进行可靠发布

// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
  Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
  'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
  const aChannel: string;
  const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
  Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;

Connection.Blocked/Unblocked — 资源告警

当代理资源不足(内存、磁盘)时,会发送带有原因字符串的 Connection.Blocked(类 10,方法 60)。条件解除后发送 Connection.Unblocked(方法 61)。这些是仅限服务器到客户端的通知,由基类 TsgcAMQP 处理,适用于所有 AMQP 组件。

事件 描述
OnAMQPConnectionBlocked 当代理因资源限制阻塞连接时触发。包含 Reason 字符串(例如 'low on memory')。
OnAMQPConnectionUnblocked 当代理解除连接阻塞时触发。
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
  const aReason: string);
begin
  Log('Connection BLOCKED: ' + aReason);
  // Pause publishing to avoid message loss
  FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
  Log('Connection unblocked - resuming');
  FPublishingPaused := False;
end;

Connection.UpdateSecret — 令牌刷新

Connection.UpdateSecret(类 10,方法 70)允许在不断开连接的情况下刷新活跃连接上的身份验证凭据,这对于令牌定期过期的 OAuth2/JWT 身份验证至关重要。

方法 / 事件 描述
UpdateSecret / UpdateSecretEx 向代理发送新密钥(令牌)并附带可选的原因字符串。
OnAMQPConnectionUpdateSecretOk 当代理接受新密钥时触发。

UpdateSecret / UpdateSecretEx

procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
  vNewToken: string;
begin
  vNewToken := GetRefreshedOAuthToken();
  if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
    Log('Token refreshed successfully')
  else
    Log('Token refresh failed - reconnecting');
end;

3. 修改的文件

文件 变更
sgcAMQP_Const.pas 拼写错误修复('Not Allowed')。
sgcAMQP_Helpers.pas 新增 sgcGetAMQPMethodValue 函数、Confirm 类辅助函数(sgcGetAMQPConfirm/sgcGetAMQPConfirmValue)以及所有新方法的方法 ID 映射。
sgcAMQP_ReadWrite.pas 字段表类型字节修复——每种值类型现在均写入正确的类型指示符。
sgcAMQP_Classes.pas 新增枚举(amqpClassConfirm,12 个新方法)、13 个新载荷类、更新调度表、线程安全修复、规范合规性修复及新请求存储字段。
sgcAMQP.pas 8 个新事件类型、Connection.Blocked/Unblocked 处理、DoWrite_ChannCloseOk 通道 ID 修复、剩余字节保留及 GetChannel 标志修复。
sgcAMQP_Client.pas 6 个新读取处理程序、5 个新写入方法、11 个新公共方法、6 个新事件、更新调度表、参数顺序修复及请求数据修复。