从 sgcWebSockets 2024.2.0 起支持 AMQP 1.0.0。
AMQP(Advanced Message Queuing Protocol)1.0.0 是一种消息传递协议,专为分布式系统之间可靠、异步通信而设计。它以解耦方式促进应用程序或组件之间的消息交换,使其无需直接依赖即可通信。
总体而言,AMQP 1.0.0 为不同软件组件和系统以松耦合方式通信提供了标准化、可互操作的途径,适用于各种分布式及企业级应用程序。
AMQP 功能特性
- 面向消息的通信: AMQP 1.0.0 以消息为核心概念。消息可以携带数据、指令或命令,是通信的基本单位。
- 消息代理: 该协议基于代理消息模型运行。代理(可以是服务器或中间实体)管理生产者和消费者之间的消息路由和投递。
- 队列和交换机: 队列是代理中临时存储消息的实体。交换机定义了根据消息内容或路由键等标准将消息从生产者路由到队列的规则。
- 地址和链路: 地址标识消息基础设施中的消息目的地。链路是与特定地址关联的发送方(生产者)和接收方(消费者)之间的通信通道。
- 会话和连接: 会话表示通信的逻辑通道,允许在单个连接中进行多个消息流。连接管理客户端应用程序和消息代理之间的整体通信链路。
- 安全性: AMQP 1.0.0 支持多种安全机制,包括身份验证和授权,以确保客户端和代理之间的安全通信。
- 传输无关: 该协议设计为传输无关,意味着它可以在 TCP、TLS 或 WebSocket 等不同网络传输上运行,提供部署灵活性。
- 流量控制: AMQP 1.0.0 包含流量控制机制,允许消费者指示其处理特定速率传入消息的能力,从而防止大量消息压垮消费者。
- 错误处理: 该协议规定了处理错误的机制,包括确认和拒绝消息,确保消息投递的健壮性和可靠性。
- SASL 身份验证: 简单身份验证和安全层(SASL)用于对客户端和代理之间的连接进行身份验证和保护。
配置
AMQP 1.0.0 客户端具有 AMQPOptions 属性,您可以在其中配置连接。
- ChannelMax:channel-max 值是连接上可使用的最高通道编号。该值加一即为连接上可同时处于活跃状态的最大会话数。
- ContainerId:(可选)源容器的名称,在服务器中唯一标识该连接。
- CreditSize:信用流的默认大小。
- IdleTimeout:当超过阈值后未接收到任何帧时,本地对端触发超时。空闲超时以毫秒为单位,从接收到最后一帧时开始计算。
- MaxFrameSize:可接受的最大帧大小。
- MaxLinksPerSession:每个会话的最大链路数。
- WindowSize:默认窗口大小。
AMQP 身份验证必须在 Authentication 属性中配置。
- AuthType:身份验证类型
- amqp1authNone:未配置。
- amqp1authSASLAnonymous:匿名身份验证
- amqp1authSASLPlain:用户名/密码身份验证。此类型的身份验证需要填写以下属性:
- Username
- Password
- amqp1authSASLExternal:外部身份验证
连接
连接从客户端 (通常是消息应用程序或服务)向服务器(消息代理)发起 TCP 连接。客户端连接到服务器端口,通常非 TLS 连接为 5672,TLS 加密连接为 5671。TCP 连接建立后,客户端和服务器协商要使用的 AMQP 协议版本。AMQP 1.0.0 支持多个版本,协商期间双方同意使用 1.0.0 版本。
协议协商后,客户端可能需要根据服务器配置向服务器进行身份验证。身份验证机制可以包括 SASL(简单身份验证和安全层)机制,如 PLAIN、EXTERNAL 或服务器支持的其他机制。
示例:连接到监听安全端口 5671 并使用 SASL 凭据的 AMQP 服务器
// Creating AMQP client oAMQP := TsgcWSPClient_AMQP1.Create(nil); // Setting AMQP authentication options oAMQP.AMQPOptions.Authentication.AuthType := amqp1authSASLPlain; oAMQP.AMQPOptions.Authentication.Username := 'sgc'; oAMQP.AMQPOptions.Authentication.Password := 'sgc'; // Creating WebSocket client oClient := TsgcWebSocketClient.Create(nil); // Setting WebSocket specifications oClient.Specifications.RFC6455 := False; // Setting WebSocket client properties oClient.Host := 'www.esegece.com'; oClient.Port := 5671; oClient.TLS := True; // Assigning WebSocket client to AMQP client oAMQP.Client := oClient; // Activating WebSocket client oClient.Active := True;
会话
身份验证后,客户端打开一个 AMQP 会话。会话是客户端和服务器之间通信的逻辑上下文。会话用于将相关的消息操作分组在一起。使用 CreateSession 方法创建新会话,该方法允许设置会话名称,或留空让组件自动分配一个。
如果会话创建成功,将触发 OnAMQPSessionOpen 事件,并提供会话详情。
AMQP1.CreateSession('MySession');
procedure AMQP1AMQPSessionOpen(Sender: TObject; const aSession: TsgcAMQP1Session;
const aBegin: TsgcAMQP1FrameBegin);
begin
ShowMessage('#session-open: ' + aSession.Id);
end;
链路
在会话中,客户端创建链路以与服务器提供的特定实体(如队列、主题或其他资源)进行通信。链路是用于发送和接收消息的双向通信通道。
该组件可以作为发送方和接收方节点工作。允许为每个会话创建任意数量的链路,最多不超过 MaxLinksPerSession 属性设置的限制。
发送方链路
要创建新的发送方链路,使用 CreateSenderLink 方法并传递会话名称和可选的发送方链路名称。如果链路创建成功,将触发 OnAMQPLinkOpen 事件。
AMQP1.CreateSenderLink('MySession', 'MySenderLink');
procedure procedure TfrmClientAMQP1.AMQP1AMQPLinkOpen(Sender: TObject;
const aSession: TsgcAMQP1Session; const aLink: TsgcAMQP1Link; const aAttach: TsgcAMQP1FrameAttach);
begin
ShowMessage('#link-open: ' + aLink.Name);
end;
接收方链路
要创建新的接收方链路,使用 CreateReceiverLink 方法并传递会话名称和可选的接收方链路名称。如果链路创建成功,将触发 OnAMQPLinkOpen 事件。
AMQP1.CreateReceiverLink('MySession', 'MyReceiverLink');
procedure procedure TfrmClientAMQP1.AMQP1AMQPLinkOpen(Sender: TObject;
const aSession: TsgcAMQP1Session; const aLink: TsgcAMQP1Link; const aAttach: TsgcAMQP1FrameAttach);
begin
ShowMessage('#link-open: ' + aLink.Name);
end;
发送消息
会话建立并创建链路后,客户端可以开始执行消息操作,例如向目的地发送消息。使用 SendMessage 方法通过发送方链路发送消息。
AMQP1.SendMessage('MySession', 'MySenderLink', 'My first AMQP Message');
读取消息
默认情况下,接收方链路以自动模式创建,这意味着每次有新消息到达时,都会自动将其投递给客户端。
如果接收方链路以手动模式创建,使用同步方法 GetMessage 来获取并等待新消息到达。
在自动和手动模式下,每次有新消息到达时,都会触发 OnAMQPMessage 事件。
procedure OnAMQPMessageEvent(Sender: TObject; const aSession:
TsgcAMQP1Session; const aLink: TsgcAMQP1ReceiverLink; const aMessage:
TsgcAMQP1Message; var DeliveryState: TsgcAMQP1MessageDeliveryState);
begin
ShowMessage(aMessage.ApplicationData.AMQPValue.Value);
end;
文档
AMQP1 Delphi 客户端
了解更多关于 Delphi / CBuilder AMQP 1.0.0 客户端的信息 https://www.esegece.com/help/sgcWebSockets/#t=Components%2FProtocols%2FSubprotocols%2FAMQP1%2FProtocol_AMQP1.htm下载演示
下载使用 sgcWebSockets 库为 Windows 编译的 AMQP 1.0.0 客户端演示。
