AMQP 1.0.0 Delphi 客户端

· 组件

sgcWebSockets 2024.2.0 起支持 AMQP 1.0.0

AMQP(Advanced Message Queuing Protocol)1.0.0 是一种消息传递协议,专为分布式系统之间可靠异步通信而设计。它以解耦方式促进应用程序或组件之间的消息交换,使其无需直接依赖即可通信。

总体而言,AMQP 1.0.0 为不同软件组件和系统以松耦合方式通信提供了标准化、可互操作的途径,适用于各种分布式及企业级应用程序。

AMQP 功能特性

配置

AMQP 1.0.0 客户端具有 AMQPOptions 属性,您可以在其中配置连接。

AMQP 身份验证必须在 Authentication 属性中配置。

连接

 连接从客户端 (通常是消息应用程序或服务)向服务器(消息代理)发起 TCP 连接。客户端连接到服务器端口,通常非 TLS 连接为 5672,TLS 加密连接为 5671。TCP 连接建立后,客户端和服务器协商要使用的 AMQP 协议版本。AMQP 1.0.0 支持多个版本,协商期间双方同意使用 1.0.0 版本。

协议协商后,客户端可能需要根据服务器配置向服务器进行身份验证。身份验证机制可以包括 SASL(简单身份验证和安全层)机制,如 PLAIN、EXTERNAL 或服务器支持的其他机制。

示例:连接到监听安全端口 5671 并使用 SASL 凭据的 AMQP 服务器

// Creating AMQP client
oAMQP := TsgcWSPClient_AMQP1.Create(nil);
// Setting AMQP authentication options
oAMQP.AMQPOptions.Authentication.AuthType := amqp1authSASLPlain;
oAMQP.AMQPOptions.Authentication.Username := 'sgc';
oAMQP.AMQPOptions.Authentication.Password := 'sgc';
// Creating WebSocket client
oClient := TsgcWebSocketClient.Create(nil);
// Setting WebSocket specifications
oClient.Specifications.RFC6455 := False;
// Setting WebSocket client properties
oClient.Host := 'www.esegece.com';
oClient.Port := 5671;
oClient.TLS := True;
// Assigning WebSocket client to AMQP client
oAMQP.Client := oClient;
// Activating WebSocket client
oClient.Active := True; 

会话

身份验证后,客户端打开一个 AMQP 会话。会话是客户端和服务器之间通信的逻辑上下文。会话用于将相关的消息操作分组在一起。使用 CreateSession 方法创建新会话,该方法允许设置会话名称,或留空让组件自动分配一个。

如果会话创建成功,将触发 OnAMQPSessionOpen 事件,并提供会话详情。 

AMQP1.CreateSession('MySession');
procedure AMQP1AMQPSessionOpen(Sender: TObject; const aSession: TsgcAMQP1Session; 
  const aBegin: TsgcAMQP1FrameBegin);
begin
  ShowMessage('#session-open: ' + aSession.Id);
end; 

链路

在会话中,客户端创建链路以与服务器提供的特定实体(如队列、主题或其他资源)进行通信。链路是用于发送和接收消息的双向通信通道。

该组件可以作为发送方和接收方节点工作。允许为每个会话创建任意数量的链路,最多不超过 MaxLinksPerSession 属性设置的限制。 

发送方链路

要创建新的发送方链路,使用 CreateSenderLink 方法并传递会话名称和可选的发送方链路名称。如果链路创建成功,将触发 OnAMQPLinkOpen 事件。

AMQP1.CreateSenderLink('MySession', 'MySenderLink');
procedure procedure TfrmClientAMQP1.AMQP1AMQPLinkOpen(Sender: TObject; 
  const aSession: TsgcAMQP1Session; const aLink: TsgcAMQP1Link; const aAttach: TsgcAMQP1FrameAttach);
begin
  ShowMessage('#link-open: ' + aLink.Name);
end; 

接收方链路

要创建新的接收方链路,使用 CreateReceiverLink 方法并传递会话名称和可选的接收方链路名称。如果链路创建成功,将触发 OnAMQPLinkOpen 事件。 

AMQP1.CreateReceiverLink('MySession', 'MyReceiverLink');
procedure procedure TfrmClientAMQP1.AMQP1AMQPLinkOpen(Sender: TObject; 
  const aSession: TsgcAMQP1Session; const aLink: TsgcAMQP1Link; const aAttach: TsgcAMQP1FrameAttach);
begin
  ShowMessage('#link-open: ' + aLink.Name);
end; 

发送消息

会话建立并创建链路后,客户端可以开始执行消息操作,例如向目的地发送消息。使用 SendMessage 方法通过发送方链路发送消息。 

AMQP1.SendMessage('MySession', 'MySenderLink', 'My first AMQP Message'); 

读取消息

默认情况下,接收方链路以自动模式创建,这意味着每次有新消息到达时,都会自动将其投递给客户端。

如果接收方链路以手动模式创建,使用同步方法 GetMessage 来获取并等待新消息到达。

在自动和手动模式下,每次有新消息到达时,都会触发 OnAMQPMessage 事件。 

procedure OnAMQPMessageEvent(Sender: TObject; const aSession:
    TsgcAMQP1Session; const aLink: TsgcAMQP1ReceiverLink; const aMessage:
    TsgcAMQP1Message; var DeliveryState: TsgcAMQP1MessageDeliveryState);
begin
  ShowMessage(aMessage.ApplicationData.AMQPValue.Value);
end; 

文档

AMQP1 Delphi 客户端

了解更多关于 Delphi / CBuilder AMQP 1.0.0 客户端的信息 https://www.esegece.com/help/sgcWebSockets/#t=Components%2FProtocols%2FSubprotocols%2FAMQP1%2FProtocol_AMQP1.htm

下载演示

下载使用 sgcWebSockets 库为 Windows 编译的 AMQP 1.0.0 客户端演示。