Vanaf sgcWebSockets 2022.1 wordt het AMQP 0.9.1-protocol ondersteund. Het Advanced Message Queuing Protocol (AMQP) is een open-standaard application-layer-protocol voor message-oriented middleware. De kenmerkende eigenschappen van AMQP zijn message-orientation, queuing, routing (waaronder point-to-point en publish-and-subscribe), betrouwbaarheid en beveiliging.
AMQP is een binair application-layer-protocol, ontworpen om een breed scala aan messaging-toepassingen en communicatiepatronen efficiënt te ondersteunen. Het biedt flow-controlled, message-oriented communicatie met bezorggaranties zoals at-most-once (elk bericht wordt nul of één keer bezorgd), at-least-once (elk bericht komt zeker aan, maar kan meerdere keren worden bezorgd) en exactly-once (het bericht komt altijd aan en exact één keer), en authenticatie en/of encryptie op basis van SASL en/of TLS. Het gaat uit van een onderliggend betrouwbaar transport-layer-protocol zoals het Transmission Control Protocol (TCP).
Kanalen
AMQP is een multi-channel-protocol. Kanalen bieden een manier om een zware TCP/IP-verbinding te multiplexen tot meerdere lichtgewicht verbindingen. Hierdoor is het protocol "firewall-vriendelijker", omdat het poortgebruik voorspelbaar is. Het betekent ook dat traffic shaping en andere netwerk-QoS-functies eenvoudig kunnen worden ingezet.
Elk kanaal draait in zijn eigen thread, dus elke keer dat een nieuw bericht wordt ontvangen, identificeert de client eerst het kanaal en plaatst het bericht in een wachtrij die door de kanaal-thread wordt verwerkt.
De levenscyclus van een kanaal is als volgt:
1. De client opent een nieuw kanaal (Open).
2. De server bevestigt dat het nieuwe kanaal gereed is (Open-Ok).
3. Client en server gebruiken het kanaal naar wens.
4. Eén peer (client of server) sluit het kanaal (Close).
5. De andere peer bevestigt het sluiten van het kanaal (Close-Ok).
Om een nieuw kanaal aan te maken, roep je gewoon de methode OpenChannel aan en geef je de kanaalnaam als argument mee. De gebeurtenis OnAMQPChannelOpen wordt aangeroepen als bevestiging vanuit de server dat het kanaal is geopend.
AMQP.OpenChannel('channel_name');
procedure OnAMQPChannelOpen(Sender: TObject; const aChannel: string);
begin
DoLog('#AMQP_channel_open: ' + aChannel);
end;
Exchanges
Met de exchange-klasse kan een toepassing exchanges op de server beheren. Deze klasse laat de toepassing zijn eigen wiring scripten (in plaats van te vertrouwen op een configuratie-interface). Let op: de meeste toepassingen hebben dit niveau van complexiteit niet nodig, en legacy-middleware zal deze semantiek waarschijnlijk niet ondersteunen.
De levenscyclus van een exchange is:
1. De client vraagt de server om te bevestigen dat de exchange bestaat (Declare). De client kan dit verfijnen tot "maak de exchange aan als deze niet bestaat" of "waarschuw me maar maak hem niet aan als hij niet bestaat".
2. De client publiceert berichten naar de exchange.
3. De client kan ervoor kiezen de exchange te verwijderen (Delete).
De methode DeclareExchange maakt een nieuwe exchange aan of controleert of een Exchange al bestaat. De methode heeft de volgende argumenten:
- ChannelName: dit is de naam van het kanaal (moet geopend zijn voor deze methode wordt aangeroepen).
- ExchangeName: dit is de naam van de exchange, mag niet langer zijn dan 255 tekens en niet beginnen met "amq." (tenzij de passive-parameter true is).
- ExchangeType: dit is het exchange-type, alle AMQP-servers ondersteunen "direct"- en "fanout"-exchanges. Raadpleeg de serverdocumentatie om te zien welke exchange-types worden ondersteund.
- Passive: als passive true is, controleert de server alleen of de exchange al gedeclareerd is. Als passive false is en de exchange niet bestaat, maakt de server een nieuwe aan.
- Durable: als true, wordt de exchange opnieuw aangemaakt wanneer de server start. Als false, wordt de exchange verwijderd wanneer de server stopt.
- AutoDelete: als true, wordt de exchange verwijderd wanneer alle queues zijn ontbonden.
- Internal: altijd false.
- NoWait: als true, stuurt de server geen bevestiging naar de client.
Om een nieuwe Exchange te declareren, roep je gewoon de methode DeclareExchange aan en geef je de kanaalnaam, exchangenaam en het exchangetype als argumenten door. De gebeurtenis OnAMQPExchangeDeclare wordt aangeroepen als bevestiging vanuit de server dat de exchange is gedeclareerd.
AMQP.DeclareExchange('channel_name', 'exchange_name', 'direct');
procedure OnAMQPExchangeDeclare(Sender: TObject; const aChannel, aExchange: string);
begin
DoLog('#AMQP_exchange_declare: [' + aChannel + '] ' + aExchange);
end;
Queues
Met de queue-klasse kan een toepassing berichtenwachtrijen op de server beheren. Dit is een basisstap in bijna alle toepassingen die berichten verwerken, op zijn minst om te controleren dat een verwachte berichtenwachtrij daadwerkelijk aanwezig is.
De levenscyclus van een duurzame berichtenwachtrij is vrij eenvoudig:
1. De client stelt vast dat de berichtenwachtrij bestaat (Declare, met het "passive"-argument).
2. De server bevestigt dat de berichtenwachtrij bestaat (Declare-Ok).
3. De client leest berichten uit de berichtenwachtrij.
De levenscyclus van een tijdelijke berichtenwachtrij is interessanter:
1. De client maakt de berichtenwachtrij aan (Declare, vaak zonder naam zodat de server een naam toewijst). De server bevestigt (Declare-Ok).
2. De client start een consumer op de berichtenwachtrij. De exacte functionaliteit van een consumer wordt gedefinieerd door de Basic-klasse.
3. De client annuleert de consumer, expliciet of door het kanaal en/of de verbinding te sluiten.
4. Wanneer de laatste consumer verdwijnt uit de berichtenwachtrij, en na een nette time-out, verwijdert de server de berichtenwachtrij.
AMQP implementeert het bezorgmechanisme voor topic-subscriptions als berichtenwachtrijen. Dit maakt interessante structuren mogelijk waarbij een subscription kan worden load-balanced over een pool van samenwerkende subscriber-
toepassingen.
De levenscyclus van een subscription bevat een extra bind-fase:
1. De client maakt de berichtenwachtrij aan (Declare), en de server bevestigt (Declare-Ok).
2. De client koppelt de berichtenwachtrij aan een topic-exchange (Bind) en de server bevestigt (Bind-Ok).
3. De client gebruikt de berichtenwachtrij als in de vorige voorbeelden.
De methode DeclareQueue maakt een nieuwe queue aan of controleert of een Queue al bestaat. De methode heeft de volgende argumenten:
- ChannelName: dit is de naam van het kanaal (moet geopend zijn voor deze methode wordt aangeroepen).
- QueueName: dit is de naam van de queue, mag niet langer zijn dan 255 tekens en niet beginnen met "amq." (tenzij de passive-parameter true is).
- Passive: als passive true is, controleert de server alleen of de queue al gedeclareerd is. Als passive false is en de queue niet bestaat, maakt de server een nieuwe aan.
- Durable: als true, wordt de queue opnieuw aangemaakt wanneer de server start. Als false, wordt de queue verwijderd wanneer de server stopt.
- Exclusive: als true, betekent dit dat de queue alleen toegankelijk is voor de huidige verbinding.
- AutoDelete: als true, wordt de queue verwijderd wanneer geen consumers de queue meer gebruiken.
- NoWait: als true, stuurt de server geen bevestiging naar de client.
Om een nieuwe Queue te declareren, roep je gewoon de methode DeclareQueue aan en geef je de kanaalnaam en queue-naam als argumenten door. De gebeurtenis OnAMQPQueueDeclare wordt aangeroepen als bevestiging vanuit de server dat de exchange is gedeclareerd.
AMQP.DeclareQueue('channel_name', 'queue_name');
procedure OnAMQPQueueDeclare(Sender: TObject; const aChannel, aQueue: string; aMessageCount, aConsumerCount: Integer);
begin
DoLog('#AMQP_queue_declare: [' + aChannel + '] ' + aQueue);
end;
