从 sgcWebSockets 2022.1 起支持 AMQP 0.9.1 协议。 The 高级消息队列协议(AMQP)是面向消息中间件的开放标准应用层协议。AMQP 的主要特性包括:消息导向、队列、路由(含点对点和发布-订阅)、可靠性和安全性。
AMQP 是一种二进制应用层协议,旨在高效支持各种消息应用程序和通信模式。 它提供流量控制的、面向消息的通信,具有消息投递保证,如最多一次(每条消息最多投递一次或不投递)、至少一次(每条消息必定投递,但可能多次)和恰好一次(消息始终只投递一次),以及基于 SASL 和/或 TLS 的身份验证和/或加密。它假定底层使用可靠的传输层协议,如 TCP。
Channels
AMQP 是一种多通道协议。通道提供了一种将重型 TCP/IP 连接多路复用为多个轻量级连接的方式。这使协议更加"防火墙友好",因为端口使用是可预测的。这也意味着流量整形和其他网络 QoS 功能可以轻松使用。
每个通道在其自己的线程中运行,因此每次收到新消息时,客户端首先识别通道并将消息放入由通道线程处理的队列中。
通道生命周期如下:
1. 客户端打开新通道(Open)。
2. 服务器确认新通道已就绪(Open-Ok)。
3. 客户端和服务器按需使用通道。
4. 一方(客户端或服务器)关闭通道(Close)。
5. 另一方握手确认通道关闭(Close-Ok)。
要创建新通道,只需调用 OpenChannel 方法并传入通道名称作为参数。服务器发送确认通道已打开的 OnAMQPChannelOpen 事件将被触发。
AMQP.OpenChannel('channel_name');
procedure OnAMQPChannelOpen(Sender: TObject; const aChannel: string);
begin
DoLog('#AMQP_channel_open: ' + aChannel);
end;
Exchanges
交换机类允许应用程序管理服务器上的交换机。该类允许应用程序编写自己的连接逻辑(而不是依赖某些配置界面)。注意:大多数应用程序不需要这种复杂程度,遗留中间件也不太可能支持此语义。
交换机生命周期如下:
1. 客户端要求服务器确保交换机存在(Declare)。客户端可以细化为"如果不存在则创建交换机",或"如果不存在则警告但不创建"。
2. 客户端向交换机发布消息。
3. 客户端可以选择删除交换机(Delete)。
DeclareExchange 方法创建新交换机或验证交换机是否已存在。该方法具有以下参数:
- ChannelName:通道名称(调用此方法前必须已打开)。
- ExchangeName:交换机名称,不得超过 255 个字符,且不能以"amq."开头(除非 passive 参数为 true)。
- ExchangeType:交换机类型,所有 AMQP 服务器都支持"direct"和"fanout"交换机。请查看服务器文档了解支持哪些交换机类型。
- Passive:如果 passive 为 true,服务器只验证交换机是否已声明。如果 passive 为 false 且交换机不存在,服务器将创建新的。
- Durable:如果为 true,服务器启动时将重新创建交换机。如果为 false,服务器停止时交换机将被删除。
- AutoDelete:如果为 true,当所有队列都解绑时,交换机将被删除。
- Internal:始终为 false。
- NoWait:如果为 true,服务器不向客户端发送确认。
要声明新交换机,只需调用 DeclareExchange 方法并传入通道名称、交换机名称和交换机类型作为参数。服务器发送交换机已声明确认的 OnAMQPExchangeDeclare 事件将被触发。
AMQP.DeclareExchange('channel_name', 'exchange_name', 'direct');
procedure OnAMQPExchangeDeclare(Sender: TObject; const aChannel, aExchange: string);
begin
DoLog('#AMQP_exchange_declare: [' + aChannel + '] ' + aExchange);
end;
Queues
队列类允许应用程序管理服务器上的消息队列。这是几乎所有消费消息的应用程序中的基本步骤,至少需要验证预期的消息队列确实存在。
持久消息队列的生命周期相当简单:
1. 客户端断言消息队列存在(Declare,使用"passive"参数)。
2. 服务器确认消息队列存在(Declare-Ok)。
3. 客户端从消息队列中读取消息。
临时消息队列的生命周期更为有趣:
1. 客户端创建消息队列(Declare,通常不设置队列名称,由服务器自动分配名称)。服务器确认(Declare-Ok)。
2. 客户端在消息队列上启动消费者。消费者的具体功能由 Basic 类定义。
3. 客户端取消消费者,可以显式取消或通过关闭通道和/或连接来取消。
4. 当最后一个消费者从消息队列中消失,并经过一段等待时间后,服务器删除消息队列。
AMQP 将主题订阅的投递机制实现为消息队列。这支持有趣的结构,其中订阅可以在协作订阅者池中进行负载均衡。
applications.
订阅的生命周期涉及一个额外的绑定阶段:
1. 客户端创建消息队列(Declare),服务器确认(Declare-Ok)。
2. 客户端将消息队列绑定到主题交换机(Bind),服务器确认(Bind-Ok)。
3. 客户端使用消息队列,与前面的示例类似。
DeclareQueue 方法创建新队列或验证队列是否已存在。该方法具有以下参数:
- ChannelName:通道名称(调用此方法前必须已打开)。
- QueueName:队列名称,不得超过 255 个字符,且不能以"amq."开头(除非 passive 参数为 true)。
- Passive: if passive is true, the server only verifies that the queue is already declared. If passive is false, and the queue not exists, the server will create a new one.
- Durable: if true, the queue will be re-created when the server starts. If false, the queue will be deleted when the server stops.
- Exclusive:如果为 true,表示队列只能由当前连接访问。
- AutoDelete:如果为 true,当所有消费者不再使用队列时,队列将被删除。
- NoWait:如果为 true,服务器不向客户端发送确认。
要声明新队列,只需调用 DeclareQueue 方法并传入通道名称和队列名称作为参数。服务器发送队列已声明确认的 OnAMQPQueueDeclare 事件将被触发。
AMQP.DeclareQueue('channel_name', 'queue_name');
procedure OnAMQPQueueDeclare(Sender: TObject; const aChannel, aQueue: string; aMessageCount, aConsumerCount: Integer);
begin
DoLog('#AMQP_queue_declare: [' + aChannel + '] ' + aQueue);
end;
