AMQP 协议

用于可靠企业级消息传递的高级消息队列协议。完整支持 AMQP 0.9.1 和 1.0,兼容 RabbitMQ 和 Azure Service Bus。

什么是 AMQP?

AMQP 是企业级消息传递的开放标准,可在不同平台和实现之间提供可靠、互操作的消息投递。

企业级消息队列

AMQP 定义了面向消息的中间件的线级协议,确保不同厂商实现之间的真正互操作性。其基于交换机、队列和绑定的复杂路由模型支持多种消息模式,包括直接路由、基于主题的路由、扇出路由和基于消息头的路由。sgcWebSockets 同时实现了 AMQP 0.9.1(RabbitMQ 广泛采用的版本)和 AMQP 1.0(Azure Service Bus 所使用的 OASIS 标准),为您提供最大的灵活性。

  • 线级协议保障跨厂商互操作性
  • 通过交换机、队列和绑定实现灵活路由
  • 通过确认机制保证消息可靠投递
  • 同时支持 AMQP 0.9.1 和 1.0
PRODUCER EXCH Q1 Q2 CONSUMER

AMQP 特性

适用于关键业务应用程序的工业级消息传递功能。

可靠消息投递

通过发布者确认和消费者确认实现端到端投递保证,消息永不丢失。

交换机/队列/绑定模型

通过直连、主题、扇出和消息头交换机实现灵活路由,并以可配置路由键绑定至队列。

消息确认

消费者显式确认已处理的消息,未确认的消息自动重新入队以便重新投递。

事务支持

将发布和确认操作组合为原子事务,要么全部成功,要么全部回滚。

流量控制

内置流量控制防止快速生产者淹没慢速消费者,确保系统稳定运行。

通道多路复用

在单个 TCP 连接上承载多个逻辑通道,在保持隔离的同时降低连接开销。

RabbitMQ 与 Azure Service Bus

已通过 RabbitMQ(AMQP 0.9.1)和 Azure Service Bus(AMQP 1.0)的生产环境验证与测试。

AMQP 使用案例

对可靠性和互操作性要求最高的关键业务消息传递场景。

金融交易处理

以有保证的投递和精确一次语义处理金融交易,适用于银行和支付系统。

订单管理

通过可靠的消息队列管理订单录入、履单、发货和通知系统之间的订单工作流。

企业服务总线

构建企业服务总线,通过可靠的异步消息投递连接异构系统。

分布式计算

通过任务队列和结果收集模式,将工作分发到多个处理节点。

审计日志

通过有保证的投递将审计事件捕获并路由到日志系统,确保审计追踪记录不丢失。

Delphi AMQP 示例

连接 AMQP 代理,声明队列并交换消息。

uses
  sgcAMQP_Client, sgcAMQP_Classes;

var
  AMQPClient: TsgcAMQPClient;

procedure TForm1.FormCreate(Sender: TObject);
begin
  AMQPClient := TsgcAMQPClient.Create(nil);
  AMQPClient.Host := 'rabbitmq.example.com';
  AMQPClient.Port := 5672;
  AMQPClient.Authentication.Username := 'guest';
  AMQPClient.Authentication.Password := 'guest';
  AMQPClient.VirtualHost := '/';

  // Set up event handlers
  AMQPClient.OnAMQPConnect := OnAMQPConnect;
  AMQPClient.OnAMQPMessage := OnAMQPMessage;
  AMQPClient.Connect;
end;

procedure TForm1.OnAMQPConnect(Sender: TObject);
begin
  // Open a channel
  AMQPClient.OpenChannel(1);

  // Declare a queue
  AMQPClient.DeclareQueue(1, 'orders',
    False, True, False, False);

  // Start consuming messages
  AMQPClient.BasicConsume(1, 'orders',
    'consumer-1', False, False, False, False);
end;

procedure TForm1.OnAMQPMessage(Sender: TObject;
  aChannel: Integer; aMessage: TsgcAMQPMessage);
begin
  // Process the message
  Memo1.Lines.Add('Received: ' + aMessage.Body);

  // Acknowledge the message
  AMQPClient.BasicAck(aChannel, aMessage.DeliveryTag, False);
end;

procedure TForm1.ButtonPublishClick(Sender: TObject);
begin
  // Publish a message to the default exchange
  AMQPClient.BasicPublish(1, '', 'orders',
    '{"orderId": 67890, "total": 99.95}');
end;

准备好使用 AMQP 了吗?

下载免费试用版,几分钟内即可连接企业级消息代理。