AMQP Exchanges en Queues (1 / 3)

· Componenten

Vanaf sgcWebSockets 2022.1 wordt het AMQP 0.9.1-protocol ondersteund. Het Advanced Message Queuing Protocol (AMQP) is een open-standaard application-layer-protocol voor message-oriented middleware. De kenmerkende eigenschappen van AMQP zijn message-orientation, queuing, routing (waaronder point-to-point en publish-and-subscribe), betrouwbaarheid en beveiliging.

AMQP is een binair application-layer-protocol, ontworpen om een breed scala aan messaging-toepassingen en communicatiepatronen efficiënt te ondersteunen. Het biedt flow-controlled, message-oriented communicatie met bezorggaranties zoals at-most-once (elk bericht wordt nul of één keer bezorgd), at-least-once (elk bericht komt zeker aan, maar kan meerdere keren worden bezorgd) en exactly-once (het bericht komt altijd aan en exact één keer), en authenticatie en/of encryptie op basis van SASL en/of TLS. Het gaat uit van een onderliggend betrouwbaar transport-layer-protocol zoals het Transmission Control Protocol (TCP).

Kanalen 

AMQP is een multi-channel-protocol. Kanalen bieden een manier om een zware TCP/IP-verbinding te multiplexen tot meerdere lichtgewicht verbindingen. Hierdoor is het protocol "firewall-vriendelijker", omdat het poortgebruik voorspelbaar is. Het betekent ook dat traffic shaping en andere netwerk-QoS-functies eenvoudig kunnen worden ingezet.

Elk kanaal draait in zijn eigen thread, dus elke keer dat een nieuw bericht wordt ontvangen, identificeert de client eerst het kanaal en plaatst het bericht in een wachtrij die door de kanaal-thread wordt verwerkt.

De levenscyclus van een kanaal is als volgt:

1. De client opent een nieuw kanaal (Open).

2. De server bevestigt dat het nieuwe kanaal gereed is (Open-Ok).

3. Client en server gebruiken het kanaal naar wens.

4. Eén peer (client of server) sluit het kanaal (Close).

5. De andere peer bevestigt het sluiten van het kanaal (Close-Ok).


Om een nieuw kanaal aan te maken, roep je gewoon de methode OpenChannel aan en geef je de kanaalnaam als argument mee. De gebeurtenis OnAMQPChannelOpen wordt aangeroepen als bevestiging vanuit de server dat het kanaal is geopend.

AMQP.OpenChannel('channel_name');
procedure OnAMQPChannelOpen(Sender: TObject; const aChannel: string);
begin
  DoLog('#AMQP_channel_open: ' + aChannel);
end; 

Exchanges 

Met de exchange-klasse kan een toepassing exchanges op de server beheren. Deze klasse laat de toepassing zijn eigen wiring scripten (in plaats van te vertrouwen op een configuratie-interface). Let op: de meeste toepassingen hebben dit niveau van complexiteit niet nodig, en legacy-middleware zal deze semantiek waarschijnlijk niet ondersteunen.

De levenscyclus van een exchange is:

1. De client vraagt de server om te bevestigen dat de exchange bestaat (Declare). De client kan dit verfijnen tot "maak de exchange aan als deze niet bestaat" of "waarschuw me maar maak hem niet aan als hij niet bestaat".

2. De client publiceert berichten naar de exchange.

3. De client kan ervoor kiezen de exchange te verwijderen (Delete).

De methode DeclareExchange maakt een nieuwe exchange aan of controleert of een Exchange al bestaat. De methode heeft de volgende argumenten:


Om een nieuwe Exchange te declareren, roep je gewoon de methode DeclareExchange aan en geef je de kanaalnaam, exchangenaam en het exchangetype als argumenten door. De gebeurtenis OnAMQPExchangeDeclare wordt aangeroepen als bevestiging vanuit de server dat de exchange is gedeclareerd.

AMQP.DeclareExchange('channel_name', 'exchange_name', 'direct');
procedure OnAMQPExchangeDeclare(Sender: TObject; const aChannel, aExchange: string);
begin
  DoLog('#AMQP_exchange_declare: [' + aChannel + '] ' + aExchange);
end; 

Queues 

Met de queue-klasse kan een toepassing berichtenwachtrijen op de server beheren. Dit is een basisstap in bijna alle toepassingen die berichten verwerken, op zijn minst om te controleren dat een verwachte berichtenwachtrij daadwerkelijk aanwezig is.

De levenscyclus van een duurzame berichtenwachtrij is vrij eenvoudig:

1. De client stelt vast dat de berichtenwachtrij bestaat (Declare, met het "passive"-argument).

2. De server bevestigt dat de berichtenwachtrij bestaat (Declare-Ok).

3. De client leest berichten uit de berichtenwachtrij.

De levenscyclus van een tijdelijke berichtenwachtrij is interessanter:

1. De client maakt de berichtenwachtrij aan (Declare, vaak zonder naam zodat de server een naam toewijst). De server bevestigt (Declare-Ok).

2. De client start een consumer op de berichtenwachtrij. De exacte functionaliteit van een consumer wordt gedefinieerd door de Basic-klasse.

3. De client annuleert de consumer, expliciet of door het kanaal en/of de verbinding te sluiten.

4. Wanneer de laatste consumer verdwijnt uit de berichtenwachtrij, en na een nette time-out, verwijdert de server de berichtenwachtrij.

AMQP implementeert het bezorgmechanisme voor topic-subscriptions als berichtenwachtrijen. Dit maakt interessante structuren mogelijk waarbij een subscription kan worden load-balanced over een pool van samenwerkende subscriber-

toepassingen.

De levenscyclus van een subscription bevat een extra bind-fase:

1. De client maakt de berichtenwachtrij aan (Declare), en de server bevestigt (Declare-Ok).

2. De client koppelt de berichtenwachtrij aan een topic-exchange (Bind) en de server bevestigt (Bind-Ok).

3. De client gebruikt de berichtenwachtrij als in de vorige voorbeelden.

De methode DeclareQueue maakt een nieuwe queue aan of controleert of een Queue al bestaat. De methode heeft de volgende argumenten:


Om een nieuwe Queue te declareren, roep je gewoon de methode DeclareQueue aan en geef je de kanaalnaam en queue-naam als argumenten door. De gebeurtenis OnAMQPQueueDeclare wordt aangeroepen als bevestiging vanuit de server dat de exchange is gedeclareerd.

AMQP.DeclareQueue('channel_name', 'queue_name');
procedure OnAMQPQueueDeclare(Sender: TObject; const aChannel, aQueue: string; aMessageCount, aConsumerCount: Integer);
begin
  DoLog('#AMQP_queue_declare: [' + aChannel + '] ' + aQueue);
end;