AMQP 允许以 2 种模式接收消息:
- 客户端请求:使用 GetMessage 方法。如果队列中没有消息,将调用 OnAMQPBasicGetEmpty 事件。
- 服务器推送:使用 Consume 方法。
Consume
消费者从队列中消费。要消费消息,必须有一个队列。当添加新消费者时,假设队列中已有就绪的消息,投递将立即开始。
消费者注册时目标队列可以为空。在这种情况下,当新消息入队时才会开始第一次投递。
消费消息是一项异步任务,这意味着每次有新消息可以投递到消费者队列时,服务器会自动将其推送到客户端。您可以参阅同步接收消息的替代方法。
Consume 方法在队列中创建新消费者,每次有新消息时都会自动将其投递给消费者客户端。
该方法具有以下参数:
- ChannelName:通道名称(调用此方法前必须已打开)。
- QueueName:队列名称,不得超过 255 个字符,且不能以"amq."开头(除非 passive 参数为 true)。
- ConsumerTag:消费者名称,必须唯一。如果未设置,服务器将创建一个新的。
- NoLocal:如果为 true,表示消费者永远不会消费在同一通道上发布的消息。
- NoAck:如果为 true,表示服务器不期望对每条投递的消息进行确认。
- Exclusive:如果为 true,阻止其他消费者从该队列消费消息。
- NoWait:如果为 true,服务器不会向客户端发送确认。
消息通过 OnAMQPBasigGetOk 事件投递。
AMQP.Consume('channel_name', 'exchange_name', 'consumer_tag');
procedure OnAMQPBasicGetOk(Sender: TObject; const aChannel: string; const aGetOk: TsgcAMQPFramePayload_Method_BasicGetOk; const aContent: TsgcAMQPMessageContent);
begin
DoLog('#AMQP_basic_GetOk: ' + aChannel + ' ' + IntToStr(aGetOk.MessageCount) + ' ' + aContent.Body.AsString);
end;
Get Messages
获取消息是一项同步任务,这意味着由客户端向服务器查询队列中是否有消息。您可以参阅异步接收消息的替代方法。
GetMessage 方法向 AMQP 服务器发送请求,询问队列中是否有可用消息。如果有消息,将通过 OnAMQPBasicGetOk 事件分发;如果队列为空,将调用 OnAMQPBasicGetEmpty 事件。
该方法具有以下参数:
- ChannelName:通道名称(调用此方法前必须已打开)。
- QueueName:队列名称,不得超过 255 个字符,且不能以"amq."开头(除非 passive 参数为 true)。
- NoWait:如果为 true,服务器不会向客户端发送确认。
AMQP.GetMessage('channel_name', 'exchange_name');
procedure OnAMQPBasicGetOk(Sender: TObject; const aChannel: string; const aGetOk: TsgcAMQPFramePayload_Method_BasicGetOk; const aContent: TsgcAMQPMessageContent);
begin
DoLog('#AMQP_basic_GetOk: ' + aChannel + ' ' + IntToStr(aGetOk.MessageCount) + ' ' + aContent.Body.AsString);
end;
procedure OnAMQPBasicGetEmpty(Sender: TObject; const aChannel: string);
begin
DoLog('#AMQP_basic_GetEmpty: ' + aChannel);
end;
