AMQP Exchanges und Queues (1 / 3)

· Komponenten

Ab sgcWebSockets 2022.1 wird das AMQP-0.9.1-Protokoll unterstützt. Das Advanced Message Queuing Protocol (AMQP) ist ein offenes Standardprotokoll der Anwendungsschicht für nachrichtenorientierte Middleware. Die definierenden Merkmale von AMQP sind Nachrichtenorientierung, Queuing, Routing (einschließlich Point-to-Point und Publish-and-Subscribe), Zuverlässigkeit und Sicherheit.

AMQP ist ein binäres Protokoll der Anwendungsschicht, das entwickelt wurde, um eine Vielzahl von Messaging-Anwendungen und Kommunikationsmustern effizient zu unterstützen. Es bietet flussgesteuerte, nachrichtenorientierte Kommunikation mit Zustellgarantien wie at-most-once (jede Nachricht wird einmal oder gar nicht zugestellt), at-least-once (jede Nachricht wird sicher zugestellt, aber möglicherweise mehrfach) und exactly-once (die Nachricht trifft garantiert genau einmal ein) sowie Authentifizierung und/oder Verschlüsselung auf Basis von SASL und/oder TLS. Es setzt eine zugrundeliegende zuverlässige Transportschicht wie das Transmission Control Protocol (TCP) voraus.

Kanäle 

AMQP ist ein Multi-Channel-Protokoll. Kanäle bieten eine Möglichkeit, eine schwergewichtige TCP/IP-Verbindung in mehrere leichtgewichtige Verbindungen zu multiplexen. Dadurch wird das Protokoll "firewall-freundlicher", da die Portnutzung vorhersehbar ist. Außerdem bedeutet das, dass Traffic-Shaping und andere QoS-Netzwerkfunktionen leicht eingesetzt werden können.

Jeder Kanal läuft in seinem eigenen Thread. Sobald eine neue Nachricht eintrifft, identifiziert der Client zuerst den Kanal und reiht die Nachricht in eine Queue ein, die vom Kanal-Thread verarbeitet wird.

Der Lebenszyklus eines Kanals sieht so aus:

1. Der Client öffnet einen neuen Kanal (Open).

2. Der Server bestätigt, dass der neue Kanal bereit ist (Open-Ok).

3. Client und Server nutzen den Kanal nach Bedarf.

4. Ein Peer (Client oder Server) schließt den Kanal (Close).

5. Der andere Peer bestätigt die Kanalschließung (Close-Ok).


Um einen neuen Kanal zu erstellen, rufst du einfach die Methode OpenChannel auf und übergibst den Kanalnamen als Argument. Das Ereignis OnAMQPChannelOpen wird als Bestätigung des Servers ausgelöst, dass der Kanal geöffnet wurde.

AMQP.OpenChannel('channel_name');
procedure OnAMQPChannelOpen(Sender: TObject; const aChannel: string);
begin
  DoLog('#AMQP_channel_open: ' + aChannel);
end; 

Exchanges 

Die Exchange-Klasse erlaubt einer Anwendung, Exchanges auf dem Server zu verwalten. Diese Klasse ermöglicht es der Anwendung, ihre eigene Verdrahtung zu skripten (statt sich auf eine Konfigurationsschnittstelle zu verlassen). Hinweis: Die meisten Anwendungen benötigen diesen Grad an Komplexität nicht, und ältere Middleware kann diese Semantik wahrscheinlich nicht unterstützen.

Der Lebenszyklus eines Exchanges:

1. Der Client bittet den Server sicherzustellen, dass der Exchange existiert (Declare). Der Client kann das verfeinern in "Erstelle den Exchange, falls er nicht existiert" oder "Warne mich, aber erstelle ihn nicht, falls er nicht existiert".

2. Der Client veröffentlicht Nachrichten an den Exchange.

3. Der Client kann den Exchange löschen (Delete).

Die Methode DeclareExchange erstellt einen neuen Exchange oder verifiziert, dass ein Exchange bereits existiert. Die Methode hat folgende Argumente:


Um einen neuen Exchange zu deklarieren, rufst du einfach die Methode DeclareExchange auf und übergibst Kanalname, Exchange-Name und Exchange-Typ als Argumente. Das Ereignis OnAMQPExchangeDeclare wird als Bestätigung des Servers ausgelöst, dass der Exchange deklariert wurde.

AMQP.DeclareExchange('channel_name', 'exchange_name', 'direct');
procedure OnAMQPExchangeDeclare(Sender: TObject; const aChannel, aExchange: string);
begin
  DoLog('#AMQP_exchange_declare: [' + aChannel + '] ' + aExchange);
end; 

Queues 

Die Queue-Klasse erlaubt einer Anwendung, Message-Queues auf dem Server zu verwalten. Dies ist ein grundlegender Schritt in fast allen Anwendungen, die Nachrichten konsumieren, mindestens um zu verifizieren, dass eine erwartete Message-Queue tatsächlich vorhanden ist.

Der Lebenszyklus einer dauerhaften Message-Queue ist relativ einfach:

1. Der Client behauptet, dass die Message-Queue existiert (Declare mit dem "passive"-Argument).

2. Der Server bestätigt, dass die Message-Queue existiert (Declare-Ok).

3. Der Client liest Nachrichten aus der Message-Queue.

Der Lebenszyklus einer temporären Message-Queue ist interessanter:

1. Der Client erstellt die Message-Queue (Declare, oft ohne Queue-Namen, sodass der Server einen Namen zuweist). Der Server bestätigt (Declare-Ok).

2. Der Client startet einen Consumer auf der Message-Queue. Die genaue Funktionalität eines Consumers wird durch die Basic-Klasse definiert.

3. Der Client storniert den Consumer, entweder explizit oder durch Schließen des Kanals und/oder der Verbindung.

4. Wenn der letzte Consumer aus der Message-Queue verschwindet, löscht der Server nach einer angemessenen Wartezeit die Message-Queue.

AMQP implementiert den Zustellmechanismus für Topic-Abonnements als Message-Queues. Dies ermöglicht interessante Strukturen, bei denen ein Abonnement zwischen einem Pool kooperierender Subscriber-

Anwendungen lastverteilt werden kann.

Der Lebenszyklus eines Abonnements umfasst eine zusätzliche Bind-Phase:

1. Der Client erstellt die Message-Queue (Declare), und der Server bestätigt (Declare-Ok).

2. Der Client bindet die Message-Queue an einen Topic-Exchange (Bind), und der Server bestätigt (Bind-Ok).

3. Der Client verwendet die Message-Queue wie in den vorherigen Beispielen.

Die Methode DeclareQueue erstellt eine neue Queue oder verifiziert, dass eine Queue bereits existiert. Die Methode hat folgende Argumente:


Um eine neue Queue zu deklarieren, rufst du einfach die Methode DeclareQueue auf und übergibst Kanalname und Queue-Name als Argumente. Das Ereignis OnAMQPQueueDeclare wird als Bestätigung des Servers ausgelöst, dass die Queue deklariert wurde.

AMQP.DeclareQueue('channel_name', 'queue_name');
procedure OnAMQPQueueDeclare(Sender: TObject; const aChannel, aQueue: string; aMessageCount, aConsumerCount: Integer);
begin
  DoLog('#AMQP_queue_declare: [' + aChannel + '] ' + aQueue);
end;