AMQP Exchanges and Queues (1 / 3)

From sgcWebSockets 2022.1 AMQP 0.9.1 protocol is supported. The Advanced Message Queuing Protocol (AMQP) is an open standard application layer protocol for message-oriented middleware. The defining features of AMQP are message orientation, queuing, routing (including point-to-point and publish-and-subscribe), reliability and security.

AMQP is a binary, application layer protocol, designed to efficiently support a wide variety of messaging applications and communication patterns. It provides flow controlled, message-oriented communication with message-delivery guarantees such as at-most-once (where each message is delivered once or never), at-least-once (where each message is certain to be delivered, but may do so multiple times) and exactly-once (where the message will always certainly arrive and do so only once), and authentication and/or encryption based on SASL and/or TLS. It assumes an underlying reliable transport layer protocol such as Transmission Control Protocol (TCP).

Channels 

AMQP is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more "firewall friendly" since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed.

Every channel run in his own thread, so every time a new message is received, first the client identifies the channel and queues the message in a queue which is process by the thread channel.

The channel life-cycle is this:

1. The client opens a new channel (Open).

2. The server confirms that the new channel is ready (Open-Ok).

3. The client and server use the channel as desired.

4. One peer (client or server) closes the channel (Close).

5. The other peer hand-shakes the channel close (Close-Ok).


To create a new channel just call the method OpenChannel and pass the channel name as argument. The event OnAMQPChannelOpen is raised as a confirmation sent by the server that the channel has been opened.

AMQP.OpenChannel('channel_name');
 
procedure OnAMQPChannelOpen(Sender: TObject; const aChannel: string);
begin
  DoLog('#AMQP_channel_open: ' + aChannel);
end; 

Exchanges 

The exchange class lets an application manage exchanges on the server. This class lets the application script its own wiring (rather than relying on some configuration interface). Note: Most applications do not need this level of sophistication, and legacy middleware is unlikely to be able to support this semantic.

The exchange life-cycle is:

1. The client asks the server to make sure the exchange exists (Declare). The client can refine this into, "create the exchange if it does not exist", or "warn me but do not create it, if it does not exist".

2. The client publishes messages to the exchange.

3. The client may choose to delete the exchange (Delete).

The DeclareExchange method creates a new exchanges or verifies that an Exchange already exists. The method has the following arguments:

  • ChannelName: it's the name of the channel (must be open before call this method).
  • ExchangeName: it's the name of the exchange, must be no longer of 255 characters and not begin with "amq." (except if passive parameter is true).
  • ExchangeType: it's the exchange type, all AMQP servers support "direct" and "fanout" exchanges. Check the server documentation to know which exchanges types are supported.
  • Passive: if passive is true, the server only verifies that the exchange is already declared. If passive is false, and the exchange not exists, the server will create a new one.
  • Durable: if true, the exchange will be re-created when the server starts. If false, the exchange will be deleted when the server stops.
  • AutoDelete: if true, the exchange will be deleted when all queues have been unbound.
  • Internal: always false.
  • NoWait: if true, the server doesn't sends an acknowledgment to the client.

To Declare a new Exchange just call the method DeclareExchange and pass the channel name, exchange name and exchange type as arguments. The event OnAMQPExchangeDeclare is raised as a confirmation sent by the server that the exchange has been declared.

AMQP.DeclareExchange('channel_name', 'exchange_name', 'direct');
 
procedure OnAMQPExchangeDeclare(Sender: TObject; const aChannel, aExchange: string);
begin
  DoLog('#AMQP_exchange_declare: [' + aChannel + '] ' + aExchange);
end; 

Queues 

The queue class lets an application manage message queues on the server. This is a basic step in almost all applications that consume messages, at least to verify that an expected message queue is actually present.

The life-cycle for a durable message queue is fairly simple:

1. The client asserts that the message queue exists (Declare, with the "passive" argument).

2. The server confirms that the message queue exists (Declare-Ok).

3. The client reads messages off the message queue.

The life-cycle for a temporary message queue is more interesting:

1. The client creates the message queue (Declare, often with no message queue name so the server will assign a name). The server confirms (Declare-Ok).

2. The client starts a consumer on the message queue. The precise functionality of a consumer is defined by the Basic class.

3. The client cancels the consumer, either explicitly or by closing the channel and/or connection.

4. When the last consumer disappears from the message queue, and after a polite time-out, the server deletes the message queue.

AMQP implements the delivery mechanism for topic subscriptions as message queues. This enables interesting structures where a subscription can be load balanced among a pool of co-operating subscriber

applications.

The life-cycle for a subscription involves an extra bind stage:

1. The client creates the message queue (Declare), and the server confirms (Declare-Ok).

2. The client binds the message queue to a topic exchange (Bind) and the server confirms (Bind-Ok).

3. The client uses the message queue as in the previous examples.

The DeclareQueue method creates a new queue or verifies that a Queue already exists. The method has the following arguments:

  • ChannelName: it's the name of the channel (must be open before call this method).
  • QueueName: it's the name of the queue, must be no longer of 255 characters and not begin with "amq." (except if passive parameter is true).
  • Passive: if passive is true, the server only verifies that the queue is already declared. If passive is false, and the queue not exists, the server will create a new one.
  • Durable: if true, the queue will be re-created when the server starts. If false, the queue will be deleted when the server stops.
  • Exclusive: if true means the queue is only accessed by the current connection.
  • AutoDelete: if true, the queue will be deleted when all consumers no longer use the queue.
  • NoWait: if true, the server doesn't sends an acknowledgment to the client.

To Declare a new Queue just call the method DeclareQueue and pass the channel name and queue name as arguments. The event OnAMQPQueueDeclare is raised as a confirmation sent by the server that the exchange has been declared.

AMQP.DeclareQueue('channel_name', 'queue_name');
 
procedure OnAMQPQueueDeclare(Sender: TObject; const aChannel, aQueue: string; aMessageCount, aConsumerCount: Integer);
begin
  DoLog('#AMQP_queue_declare: [' + aChannel + '] ' + aQueue);
end; 
×
Stay Informed

When you subscribe to the blog, we will send you an e-mail when there are new updates on the site so you wouldn't miss them.

AMQP Publish Messages (2 / 3)
Telegram TDLib error: UPDATE_APP_TO_LOGIN

Related Posts