sgcWebSockets 中的 AMQP 0-9-1 实现已完成全面更新:11 项错误修复涵盖关键参数顺序错误、类型不匹配、线程安全及数据丢失问题,并新增 6 项协议功能,包括 Basic.Nack、交换机到交换机绑定、发布者确认、Connection.Blocked/Unblocked 通知以及用于令牌刷新的 Connection.UpdateSecret。本文详细介绍了每项变更及代码示例。
目录
- 错误修复
- 严重:DeclareExchange 参数顺序
- 字段表类型字节
- 规范合规性修复
- 其他错误修复
- 新功能
- Basic.Nack — 否定确认
- Exchange.Bind/Unbind — 交换机到交换机绑定
- Confirm 类 — 发布者确认
- Connection.Blocked/Unblocked — 资源告警
- Connection.UpdateSecret — 令牌刷新
- 修改的文件
1. 错误修复
AMQP 0-9-1 实现中共修复了 11 个错误,从关键参数顺序问题到规范合规性修正,涵盖范围广泛。
严重:DeclareExchange 参数顺序
DeclareExchange 和 DeclareExchangeEx 方法向 DoWrite_ExchDeclare 传递 aNoWait、aAutoDelete 和 aInternal 时顺序错误,导致 auto-delete 标志被作为 no-wait 发送(反之亦然),在代理上引发意外的交换机行为。
修改前(错误)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);
修改后(已修复)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);
File: sgcAMQP_Client.pas
字段表类型字节
sgcWriteAMQPFieldTable 过程总是将 $53('S' = 长字符串)作为所有字段表值的类型指示符写入,而不管实际值类型。这导致双精度浮点数、整数、布尔值和 int64 值在传输格式中均被错误地标记为字符串。
修改前(错误)
sgcWriteAMQPByte($53, vBytes); // Always wrote 'S' for ALL types
case vType of
'd': sgcWriteAMQPDouble(...);
'I': sgcWriteAMQPInt32(...);
...
修改后(已修复)
case vType of
'd':
begin
sgcWriteAMQPByte(Byte('d'), vBytes); // Correct type per value
sgcWriteAMQPDouble(...);
end;
'I':
begin
sgcWriteAMQPByte(Byte('I'), vBytes);
sgcWriteAMQPInt32(...);
end;
'L':
begin
sgcWriteAMQPByte(Byte('L'), vBytes);
sgcWriteAMQPInt64(...);
end;
't':
begin
sgcWriteAMQPByte(Byte('t'), vBytes);
sgcWriteAMQPBoolean(...);
end;
'S':
begin
sgcWriteAMQPByte(Byte('S'), vBytes);
sgcWriteAMQPLongString(...);
end;
File: sgcAMQP_ReadWrite.pas
规范合规性修复
| 问题 | 修改前 | 修改后 | 文件 |
|---|---|---|---|
| BasicGetEmpty.Reserved1 类型错误 | UInt16 / sgcReadAMQPUInt16 |
string / sgcReadAMQPShortString |
sgcAMQP_Classes.pas |
| ChannelOpenOk.Reserved1 读取错误 | sgcReadAMQPShortString |
sgcReadAMQPLongString |
sgcAMQP_Classes.pas |
| ConnectionClose/ChannelClose 使用了类特定的方法值获取器 | sgcGetAMQPConnectionValue(FailMethodId) |
sgcGetAMQPMethodValue(FailClassId, FailMethodId) |
sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas |
在 sgcAMQP_Helpers.pas 中新增了通用辅助函数 sgcGetAMQPMethodValue,用于解析任意 AMQP 类的正确方法整数 ID,替代了仅适用于各自类的特定 getter。
其他错误修复
| 错误 | 描述 | 文件 |
|---|---|---|
| Channel.CloseOk 缺少通道 ID | DoWrite_ChannCloseOk 未设置 oFrame.Header.Channel,导致 close-ok 被发送到通道 0(连接级别)而非目标通道。已新增 aChannelId: Word 参数。 |
sgcAMQP.pas |
| 错误常量中的拼写错误 | 将 'Now Allowed' 更正为 'Not Allowed'。 |
sgcAMQP_Const.pas |
| QueueUnBind 缺少请求数据 | DoWrite_QueueUnBind 未将 QueueUnBindQueue 和 QueueUnBindExchange 存储在通道请求中,导致 OnAMQPQueueUnBind 事件报告空值。 |
sgcAMQP_Client.pas |
| DoRead 后剩余字节被丢弃 | When the read loop exited with 1–7 remaining bytes (partial frame), they were silently lost. Now saved to FBytes for the next read cycle. |
sgcAMQP.pas |
| GetChannel 忽略 aRaiseIfNotFound | aRaiseIfNotFound 参数从未被检查,现在仅在标志为 True 时抛出异常。 |
sgcAMQP.pas |
2. 新功能
已实现六项新的 AMQP 0-9-1 协议功能,涵盖广泛使用的 RabbitMQ 扩展和额外的规范方法。
Basic.Nack — 否定确认
Basic.Nack(类 60,方法 120)是 RabbitMQ 扩展,允许一次性拒绝一条或多条消息并可选重新入队。与 Basic.Reject 不同,它支持 multiple 标志,可拒绝直至并包含指定投递标签的所有消息。
| 方法 | 描述 | 方向 |
|---|---|---|
NackMessage |
向代理发送否定确认。 | 客户端 → 服务器 |
OnAMQPBasicNack |
当服务器发送 Nack(在发布者确认模式下)时触发。 | 服务器 → 客户端 |
NackMessage
procedure NackMessage(const aChannel: string;
aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);
OnAMQPBasicNack 事件
procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
const aChannel: string;
const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
', Multiple: ' + BoolToStr(aNack.Multiple, True) +
', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;
Exchange.Bind/Unbind — 交换机到交换机绑定
交换机到交换机绑定(类 40,方法 30/31 和 40/51)允许在交换机之间路由消息而无需中间队列。这是 RabbitMQ 扩展,可实现扇出层次结构和主题分区等强大的拓扑模式。
| 方法 | 描述 |
|---|---|
BindExchange / BindExchangeEx |
使用路由键将目标交换机绑定到源交换机。Ex 变体会同步等待代理响应。 |
UnbindExchange / UnbindExchangeEx |
删除交换机到交换机的绑定。 |
OnAMQPExchangeBind |
当代理确认交换机绑定时触发。 |
OnAMQPExchangeUnbind |
当代理确认交换机解绑时触发。 |
BindExchange / BindExchangeEx
procedure BindExchange(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
'downstream-exchange', // destination
'upstream-exchange', // source
'orders.#', // routing key
False) // wait for confirmation
then
Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
'downstream-exchange', 'upstream-exchange', 'orders.#', False);
Confirm 类 — 发布者确认
发布者确认(类 85,方法 10/11)允许代理确认已接收发布的消息。通道通过 Confirm.Select 进入确认模式后,代理将为每条发布的消息发送 Basic.Ack 或 Basic.Nack,从而实现无需事务的可靠发布。
| 方法 / 事件 | 描述 |
|---|---|
SelectConfirm / SelectConfirmEx |
在通道上启用发布者确认模式。 |
OnAMQPConfirmSelectOk |
当代理确认确认模式已激活时触发。 |
OnAMQPBasicAck |
当代理确认已发布的消息时触发。 |
OnAMQPBasicNack |
当代理对已发布的消息进行否定确认时触发。 |
SelectConfirm / SelectConfirmEx
procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
示例:使用确认机制进行可靠发布
// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
const aChannel: string;
const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;
Connection.Blocked/Unblocked — 资源告警
当代理资源不足(内存、磁盘)时,会发送带有原因字符串的 Connection.Blocked(类 10,方法 60)。条件解除后发送 Connection.Unblocked(方法 61)。这些是仅限服务器到客户端的通知,由基类 TsgcAMQP 处理,适用于所有 AMQP 组件。
| 事件 | 描述 |
|---|---|
OnAMQPConnectionBlocked |
当代理因资源限制阻塞连接时触发。包含 Reason 字符串(例如 'low on memory')。 |
OnAMQPConnectionUnblocked |
当代理解除连接阻塞时触发。 |
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
const aReason: string);
begin
Log('Connection BLOCKED: ' + aReason);
// Pause publishing to avoid message loss
FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
Log('Connection unblocked - resuming');
FPublishingPaused := False;
end;
Connection.UpdateSecret — 令牌刷新
Connection.UpdateSecret(类 10,方法 70)允许在不断开连接的情况下刷新活跃连接上的身份验证凭据,这对于令牌定期过期的 OAuth2/JWT 身份验证至关重要。
| 方法 / 事件 | 描述 |
|---|---|
UpdateSecret / UpdateSecretEx |
向代理发送新密钥(令牌)并附带可选的原因字符串。 |
OnAMQPConnectionUpdateSecretOk |
当代理接受新密钥时触发。 |
UpdateSecret / UpdateSecretEx
procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
vNewToken: string;
begin
vNewToken := GetRefreshedOAuthToken();
if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
Log('Token refreshed successfully')
else
Log('Token refresh failed - reconnecting');
end;
3. 修改的文件
| 文件 | 变更 |
|---|---|
sgcAMQP_Const.pas |
拼写错误修复('Not Allowed')。 |
sgcAMQP_Helpers.pas |
新增 sgcGetAMQPMethodValue 函数、Confirm 类辅助函数(sgcGetAMQPConfirm/sgcGetAMQPConfirmValue)以及所有新方法的方法 ID 映射。 |
sgcAMQP_ReadWrite.pas |
字段表类型字节修复——每种值类型现在均写入正确的类型指示符。 |
sgcAMQP_Classes.pas |
新增枚举(amqpClassConfirm,12 个新方法)、13 个新载荷类、更新调度表、线程安全修复、规范合规性修复及新请求存储字段。 |
sgcAMQP.pas |
8 个新事件类型、Connection.Blocked/Unblocked 处理、DoWrite_ChannCloseOk 通道 ID 修复、剩余字节保留及 GetChannel 标志修复。 |
sgcAMQP_Client.pas |
6 个新读取处理程序、5 个新写入方法、11 个新公共方法、6 个新事件、更新调度表、参数顺序修复及请求数据修复。 |
