Exchanges et files AMQP (1 / 3)

· Composants

À partir de sgcWebSockets 2022.1, le protocole AMQP 0.9.1 est pris en charge. L'Advanced Message Queuing Protocol (AMQP) est un protocole standard ouvert de couche application pour middleware orienté messages. Les caractéristiques principales d'AMQP sont l'orientation messages, la mise en file, le routage (incluant point à point et publish-and-subscribe), la fiabilité et la sécurité.

AMQP est un protocole binaire de couche application, conçu pour prendre en charge efficacement une grande variété d'applications de messagerie et de modèles de communication. Il fournit une communication orientée messages avec contrôle de flux et des garanties de livraison comme at-most-once (chaque message est livré une fois ou jamais), at-least-once (chaque message est garanti d'être livré, mais peut l'être plusieurs fois) et exactly-once (le message arrive toujours et une seule fois), ainsi qu'une authentification et/ou un chiffrement basés sur SASL et/ou TLS. Il suppose un protocole de transport sous-jacent fiable comme Transmission Control Protocol (TCP).

Canaux 

AMQP est un protocole multicanal. Les canaux permettent de multiplexer une connexion TCP/IP lourde en plusieurs connexions légères. Cela rend le protocole plus « firewall friendly » puisque l'utilisation des ports est prévisible. Cela signifie aussi que le traffic shaping et d'autres fonctionnalités QoS réseau peuvent être employés facilement.

Chaque canal tourne dans son propre thread, donc à chaque réception d'un nouveau message, le client identifie d'abord le canal et place le message dans une file traitée par le thread du canal.

Le cycle de vie du canal est le suivant :

1. Le client ouvre un nouveau canal (Open).

2. Le serveur confirme que le nouveau canal est prêt (Open-Ok).

3. Le client et le serveur utilisent le canal selon leurs besoins.

4. Un peer (client ou serveur) ferme le canal (Close).

5. L'autre peer accuse réception de la fermeture (Close-Ok).


Pour créer un nouveau canal, il suffit d'appeler la méthode OpenChannel et de passer le nom du canal en argument. L'événement OnAMQPChannelOpen est levé comme confirmation envoyée par le serveur que le canal a été ouvert.

AMQP.OpenChannel('channel_name');
procedure OnAMQPChannelOpen(Sender: TObject; const aChannel: string);
begin
  DoLog('#AMQP_channel_open: ' + aChannel);
end; 

Exchanges 

La classe exchange permet à une application de gérer les exchanges sur le serveur. Cette classe permet à l'application de scripter son propre câblage (plutôt que de dépendre d'une interface de configuration). Note : la plupart des applications n'ont pas besoin de ce niveau de sophistication, et il est peu probable qu'un middleware historique puisse prendre en charge cette sémantique.

Le cycle de vie d'un exchange est :

1. Le client demande au serveur de s'assurer que l'exchange existe (Declare). Le client peut affiner ceci en « créer l'exchange s'il n'existe pas » ou « m'avertir mais ne pas le créer s'il n'existe pas ».

2. Le client publie des messages sur l'exchange.

3. Le client peut choisir de supprimer l'exchange (Delete).

La méthode DeclareExchange crée un nouvel exchange ou vérifie qu'un exchange existe déjà. La méthode prend les arguments suivants :


Pour déclarer un nouvel exchange, il suffit d'appeler la méthode DeclareExchange et de passer le nom du canal, le nom de l'exchange et le type de l'exchange en arguments. L'événement OnAMQPExchangeDeclare est levé comme confirmation envoyée par le serveur que l'exchange a été déclaré.

AMQP.DeclareExchange('channel_name', 'exchange_name', 'direct');
procedure OnAMQPExchangeDeclare(Sender: TObject; const aChannel, aExchange: string);
begin
  DoLog('#AMQP_exchange_declare: [' + aChannel + '] ' + aExchange);
end; 

Files 

La classe queue permet à une application de gérer les files de messages sur le serveur. C'est une étape de base dans presque toutes les applications qui consomment des messages, au moins pour vérifier qu'une file attendue est bien présente.

Le cycle de vie d'une file de messages durable est assez simple :

1. Le client affirme que la file de messages existe (Declare, avec l'argument « passive »).

2. Le serveur confirme que la file de messages existe (Declare-Ok).

3. Le client lit les messages dans la file.

Le cycle de vie d'une file de messages temporaire est plus intéressant :

1. Le client crée la file de messages (Declare, souvent sans nom de file pour que le serveur en assigne un). Le serveur confirme (Declare-Ok).

2. Le client démarre un consommateur sur la file. La fonctionnalité précise d'un consommateur est définie par la classe Basic.

3. Le client annule le consommateur, soit explicitement, soit en fermant le canal et/ou la connexion.

4. Quand le dernier consommateur disparaît de la file de messages, et après un délai poli, le serveur supprime la file.

AMQP implémente le mécanisme de livraison pour les abonnements à des topics sous forme de files de messages. Cela permet des structures intéressantes où un abonnement peut être équilibré en charge sur un pool d'applications abonnées

coopérantes.

Le cycle de vie d'un abonnement comporte une étape de liaison supplémentaire :

1. Le client crée la file de messages (Declare), et le serveur confirme (Declare-Ok).

2. Le client lie la file de messages à un topic exchange (Bind) et le serveur confirme (Bind-Ok).

3. Le client utilise la file comme dans les exemples précédents.

La méthode DeclareQueue crée une nouvelle file ou vérifie qu'une file existe déjà. La méthode prend les arguments suivants :


Pour déclarer une nouvelle file, il suffit d'appeler la méthode DeclareQueue et de passer le nom du canal et le nom de la file en arguments. L'événement OnAMQPQueueDeclare est levé comme confirmation envoyée par le serveur que la file a été déclarée.

AMQP.DeclareQueue('channel_name', 'queue_name');
procedure OnAMQPQueueDeclare(Sender: TObject; const aChannel, aQueue: string; aMessageCount, aConsumerCount: Integer);
begin
  DoLog('#AMQP_queue_declare: [' + aChannel + '] ' + aQueue);
end;