From sgcWebSockets 2022.1 AMQP 0.9.1 protocol is supported. The Advanced Message Queuing Protocol (AMQP) is an open standard application layer protocol for message-oriented middleware. The defining features of AMQP are message orientation, queuing, routing (including point-to-point and publish-and-subscribe), reliability and security.
AMQP to binarny protokół warstwy aplikacji, zaprojektowany do efektywnej obsługi szerokiej gamy aplikacji i wzorców komunikacji. Zapewnia komunikację zorientowaną na wiadomości z kontrolą przepływu i gwarancjami dostarczania: at-most-once (każda wiadomość dostarczona raz lub wcale), at-least-once (wiadomość na pewno zostanie dostarczona, ale może wielokrotnie) oraz exactly-once (wiadomość zawsze dotrze dokładnie raz). Uwierzytelnianie i/lub szyfrowanie opiera się na SASL i/lub TLS. Protokół zakłada niezawodną warstwę transportową, np. TCP.
Channels
AMQP jest protokołem wielokanałowym. Kanały zapewniają sposób multipleksowania ciężkiego połączenia TCP/IP na kilka lekkich połączeń. Dzięki temu protokół jest bardziej "przyjazny dla firewalla", ponieważ użycie portów jest przewidywalne. Oznacza to też, że kształtowanie ruchu i inne funkcje QoS sieci można łatwo zastosować.
Każdy kanał działa we własnym wątku, więc za każdym razem, gdy odbierana jest nowa wiadomość, klient najpierw identyfikuje kanał i kolejkuje wiadomość, którą następnie przetwarza wątek kanału.
Cykl życia kanału wygląda następująco:
1. Klient otwiera nowy kanał (Open).
2. Serwer potwierdza gotowość kanału (Open-Ok).
3. Klient i serwer korzystają z kanału wedle potrzeb.
4. Jeden z peerów (klient lub serwer) zamyka kanał (Close).
5. Drugi peer potwierdza zamknięcie kanału (Close-Ok).
Aby utworzyć nowy kanał, wywołaj metodę OpenChannel i przekaż nazwę kanału jako argument. Zdarzenie OnAMQPChannelOpen jest wywoływane jako potwierdzenie od serwera, że kanał został otwarty.
AMQP.OpenChannel('channel_name');
procedure OnAMQPChannelOpen(Sender: TObject; const aChannel: string);
begin
DoLog('#AMQP_channel_open: ' + aChannel);
end;
Exchanges
Klasa exchange umożliwia aplikacji zarządzanie węzłami wymiany na serwerze. Pozwala aplikacji skryptować własne połączenia (zamiast polegać na interfejsie konfiguracyjnym). Uwaga: większość aplikacji nie potrzebuje takiego poziomu zaawansowania, a starsze middleware prawdopodobnie nie obsługuje tej semantyki.
Cykl życia węzła wymiany:
1. Klient prosi serwer o sprawdzenie, czy węzeł wymiany istnieje (Declare). Klient może doprecyzować żądanie: "utwórz węzeł, jeśli nie istnieje" lub "ostrzeż mnie, ale nie twórz, jeśli nie istnieje".
2. Klient publikuje wiadomości do węzła wymiany.
3. Klient może zdecydować o usunięciu węzła wymiany (Delete).
Metoda DeclareExchange tworzy nowy węzeł wymiany lub weryfikuje, czy już istnieje. Metoda przyjmuje następujące argumenty:
- ChannelName: nazwa kanału (musi być otwarty przed wywołaniem tej metody).
- ExchangeName: nazwa węzła wymiany, nie dłuższa niż 255 znaków i nie zaczynająca się od "amq." (wyjątek: jeśli parametr passive jest true).
- ExchangeType: typ węzła wymiany; wszystkie serwery AMQP obsługują typy "direct" i "fanout". Sprawdź dokumentację serwera, jakie typy są obsługiwane.
- Passive: jeśli true, serwer tylko weryfikuje, czy węzeł jest już zadeklarowany. Jeśli false, a węzeł nie istnieje, serwer utworzy nowy.
- Durable: jeśli true, węzeł wymiany zostanie odtworzony po uruchomieniu serwera. Jeśli false, zostanie usunięty po zatrzymaniu serwera.
- AutoDelete: jeśli true, węzeł wymiany zostanie usunięty po odwiązaniu wszystkich kolejek.
- Internal: zawsze false.
- NoWait: jeśli true, serwer nie wysyła potwierdzenia do klienta.
Aby zadeklarować nowy węzeł wymiany, wywołaj metodę DeclareExchange i przekaż nazwę kanału, nazwę węzła i typ węzła jako argumenty. Zdarzenie OnAMQPExchangeDeclare jest wywoływane jako potwierdzenie od serwera, że węzeł wymiany został zadeklarowany.
AMQP.DeclareExchange('channel_name', 'exchange_name', 'direct');
procedure OnAMQPExchangeDeclare(Sender: TObject; const aChannel, aExchange: string);
begin
DoLog('#AMQP_exchange_declare: [' + aChannel + '] ' + aExchange);
end;
Queues
Klasa kolejki umożliwia aplikacji zarządzanie kolejkami wiadomości na serwerze. Jest to podstawowy krok w niemal wszystkich aplikacjach konsumujących wiadomości — co najmniej w celu sprawdzenia, czy oczekiwana kolejka wiadomości rzeczywiście istnieje.
Cykl życia trwałej kolejki wiadomości jest prosty:
1. Klient sprawdza, czy kolejka wiadomości istnieje (Declare z argumentem "passive").
2. Serwer potwierdza istnienie kolejki wiadomości (Declare-Ok).
3. Klient odczytuje wiadomości z kolejki.
Cykl życia tymczasowej kolejki wiadomości jest bardziej złożony:
1. Klient tworzy kolejkę wiadomości (Declare, często bez nazwy — serwer przypisuje ją sam). Serwer potwierdza (Declare-Ok).
2. Klient uruchamia konsumenta na kolejce wiadomości. Dokładna funkcjonalność konsumenta jest zdefiniowana przez klasę Basic.
3. Klient anuluje konsumenta — jawnie lub przez zamknięcie kanału i/lub połączenia.
4. Gdy ostatni konsument znika z kolejki wiadomości, serwer po pewnym czasie ją usuwa.
AMQP implementuje mechanizm dostarczania dla subskrypcji tematów jako kolejki wiadomości. Umożliwia to ciekawe struktury, w których subskrypcja może być równoważona obciążenia między pulą współpracujących aplikacji subskrybentów.
Cykl życia subskrypcji obejmuje dodatkowy etap wiązania:
1. Klient tworzy kolejkę wiadomości (Declare), a serwer potwierdza (Declare-Ok).
2. Klient wiąże kolejkę wiadomości z węzłem wymiany tematu (Bind), a serwer potwierdza (Bind-Ok).
3. Klient używa kolejki wiadomości jak w poprzednich przykładach.
Metoda DeclareQueue tworzy nową kolejkę lub weryfikuje, czy już istnieje. Metoda przyjmuje następujące argumenty:
- ChannelName: nazwa kanału (musi być otwarty przed wywołaniem tej metody).
- QueueName: nazwa kolejki, nie dłuższa niż 255 znaków i nie zaczynająca się od "amq." (wyjątek: jeśli parametr passive jest true).
- Passive: jeśli true, serwer tylko weryfikuje, czy kolejka jest już zadeklarowana. Jeśli false, a kolejka nie istnieje, serwer utworzy nową.
- Durable: jeśli true, kolejka zostanie odtworzona po uruchomieniu serwera. Jeśli false, zostanie usunięta po zatrzymaniu serwera.
- Exclusive: jeśli true, kolejka jest dostępna tylko z bieżącego połączenia.
- AutoDelete: jeśli true, kolejka zostanie usunięta, gdy nie będzie jej już używał żaden konsument.
- NoWait: jeśli true, serwer nie wysyła potwierdzenia do klienta.
Aby zadeklarować nową kolejkę, wywołaj metodę DeclareQueue i przekaż nazwę kanału oraz nazwę kolejki jako argumenty. Zdarzenie OnAMQPQueueDeclare jest wywoływane jako potwierdzenie od serwera, że kolejka została zadeklarowana.
AMQP.DeclareQueue('channel_name', 'queue_name');
procedure OnAMQPQueueDeclare(Sender: TObject; const aChannel, aQueue: string; aMessageCount, aConsumerCount: Integer);
begin
DoLog('#AMQP_queue_declare: [' + aChannel + '] ' + aQueue);
end;
