AMQP 交换机与队列(第 1 / 3 部分)

· 组件

sgcWebSockets 2022.1 起支持 AMQP 0.9.1 协议。 The 高级消息队列协议(AMQP)是面向消息中间件的开放标准应用层协议。AMQP 的主要特性包括:消息导向、队列、路由(含点对点和发布-订阅)、可靠性和安全性。

AMQP 是一种二进制应用层协议,旨在高效支持各种消息应用程序和通信模式。 它提供流量控制的、面向消息的通信,具有消息投递保证,如最多一次(每条消息最多投递一次或不投递)、至少一次(每条消息必定投递,但可能多次)和恰好一次(消息始终只投递一次),以及基于 SASL 和/或 TLS 的身份验证和/或加密。它假定底层使用可靠的传输层协议,如 TCP。

Channels 

AMQP 是一种多通道协议。通道提供了一种将重型 TCP/IP 连接多路复用为多个轻量级连接的方式。这使协议更加"防火墙友好",因为端口使用是可预测的。这也意味着流量整形和其他网络 QoS 功能可以轻松使用。

每个通道在其自己的线程中运行,因此每次收到新消息时,客户端首先识别通道并将消息放入由通道线程处理的队列中。

通道生命周期如下:

1. 客户端打开新通道(Open)。

2. 服务器确认新通道已就绪(Open-Ok)。

3. 客户端和服务器按需使用通道。

4. 一方(客户端或服务器)关闭通道(Close)。

5. 另一方握手确认通道关闭(Close-Ok)。


要创建新通道,只需调用 OpenChannel 方法并传入通道名称作为参数。服务器发送确认通道已打开的 OnAMQPChannelOpen 事件将被触发。

AMQP.OpenChannel('channel_name');
procedure OnAMQPChannelOpen(Sender: TObject; const aChannel: string);
begin
  DoLog('#AMQP_channel_open: ' + aChannel);
end; 

Exchanges 

交换机类允许应用程序管理服务器上的交换机。该类允许应用程序编写自己的连接逻辑(而不是依赖某些配置界面)。注意:大多数应用程序不需要这种复杂程度,遗留中间件也不太可能支持此语义。

交换机生命周期如下:

1. 客户端要求服务器确保交换机存在(Declare)。客户端可以细化为"如果不存在则创建交换机",或"如果不存在则警告但不创建"。

2. 客户端向交换机发布消息。

3. 客户端可以选择删除交换机(Delete)。

DeclareExchange 方法创建新交换机或验证交换机是否已存在。该方法具有以下参数:


要声明新交换机,只需调用 DeclareExchange 方法并传入通道名称、交换机名称和交换机类型作为参数。服务器发送交换机已声明确认的 OnAMQPExchangeDeclare 事件将被触发。

AMQP.DeclareExchange('channel_name', 'exchange_name', 'direct');
procedure OnAMQPExchangeDeclare(Sender: TObject; const aChannel, aExchange: string);
begin
  DoLog('#AMQP_exchange_declare: [' + aChannel + '] ' + aExchange);
end; 

Queues 

队列类允许应用程序管理服务器上的消息队列。这是几乎所有消费消息的应用程序中的基本步骤,至少需要验证预期的消息队列确实存在。

持久消息队列的生命周期相当简单:

1. 客户端断言消息队列存在(Declare,使用"passive"参数)。

2. 服务器确认消息队列存在(Declare-Ok)。

3. 客户端从消息队列中读取消息。

临时消息队列的生命周期更为有趣:

1. 客户端创建消息队列(Declare,通常不设置队列名称,由服务器自动分配名称)。服务器确认(Declare-Ok)。

2. 客户端在消息队列上启动消费者。消费者的具体功能由 Basic 类定义。

3. 客户端取消消费者,可以显式取消或通过关闭通道和/或连接来取消。

4. 当最后一个消费者从消息队列中消失,并经过一段等待时间后,服务器删除消息队列。

AMQP 将主题订阅的投递机制实现为消息队列。这支持有趣的结构,其中订阅可以在协作订阅者池中进行负载均衡。

applications.

订阅的生命周期涉及一个额外的绑定阶段:

1. 客户端创建消息队列(Declare),服务器确认(Declare-Ok)。

2. 客户端将消息队列绑定到主题交换机(Bind),服务器确认(Bind-Ok)。

3. 客户端使用消息队列,与前面的示例类似。

DeclareQueue 方法创建新队列或验证队列是否已存在。该方法具有以下参数:


要声明新队列,只需调用 DeclareQueue 方法并传入通道名称和队列名称作为参数。服务器发送队列已声明确认的 OnAMQPQueueDeclare 事件将被触发。

AMQP.DeclareQueue('channel_name', 'queue_name');
procedure OnAMQPQueueDeclare(Sender: TObject; const aChannel, aQueue: string; aMessageCount, aConsumerCount: Integer);
begin
  DoLog('#AMQP_queue_declare: [' + aChannel + '] ' + aQueue);
end;