AMQP Exchanges and Queues (1 / 3)

· Komponenty

From sgcWebSockets 2022.1 AMQP 0.9.1 protocol is supported. The Advanced Message Queuing Protocol (AMQP) is an open standard application layer protocol for message-oriented middleware. The defining features of AMQP are message orientation, queuing, routing (including point-to-point and publish-and-subscribe), reliability and security.

AMQP to binarny protokół warstwy aplikacji, zaprojektowany do efektywnej obsługi szerokiej gamy aplikacji i wzorców komunikacji. Zapewnia komunikację zorientowaną na wiadomości z kontrolą przepływu i gwarancjami dostarczania: at-most-once (każda wiadomość dostarczona raz lub wcale), at-least-once (wiadomość na pewno zostanie dostarczona, ale może wielokrotnie) oraz exactly-once (wiadomość zawsze dotrze dokładnie raz). Uwierzytelnianie i/lub szyfrowanie opiera się na SASL i/lub TLS. Protokół zakłada niezawodną warstwę transportową, np. TCP.

Channels 

AMQP jest protokołem wielokanałowym. Kanały zapewniają sposób multipleksowania ciężkiego połączenia TCP/IP na kilka lekkich połączeń. Dzięki temu protokół jest bardziej "przyjazny dla firewalla", ponieważ użycie portów jest przewidywalne. Oznacza to też, że kształtowanie ruchu i inne funkcje QoS sieci można łatwo zastosować.

Każdy kanał działa we własnym wątku, więc za każdym razem, gdy odbierana jest nowa wiadomość, klient najpierw identyfikuje kanał i kolejkuje wiadomość, którą następnie przetwarza wątek kanału.

Cykl życia kanału wygląda następująco:

1. Klient otwiera nowy kanał (Open).

2. Serwer potwierdza gotowość kanału (Open-Ok).

3. Klient i serwer korzystają z kanału wedle potrzeb.

4. Jeden z peerów (klient lub serwer) zamyka kanał (Close).

5. Drugi peer potwierdza zamknięcie kanału (Close-Ok).


Aby utworzyć nowy kanał, wywołaj metodę OpenChannel i przekaż nazwę kanału jako argument. Zdarzenie OnAMQPChannelOpen jest wywoływane jako potwierdzenie od serwera, że kanał został otwarty.

AMQP.OpenChannel('channel_name');
procedure OnAMQPChannelOpen(Sender: TObject; const aChannel: string);
begin
  DoLog('#AMQP_channel_open: ' + aChannel);
end; 

Exchanges 

Klasa exchange umożliwia aplikacji zarządzanie węzłami wymiany na serwerze. Pozwala aplikacji skryptować własne połączenia (zamiast polegać na interfejsie konfiguracyjnym). Uwaga: większość aplikacji nie potrzebuje takiego poziomu zaawansowania, a starsze middleware prawdopodobnie nie obsługuje tej semantyki.

Cykl życia węzła wymiany:

1. Klient prosi serwer o sprawdzenie, czy węzeł wymiany istnieje (Declare). Klient może doprecyzować żądanie: "utwórz węzeł, jeśli nie istnieje" lub "ostrzeż mnie, ale nie twórz, jeśli nie istnieje".

2. Klient publikuje wiadomości do węzła wymiany.

3. Klient może zdecydować o usunięciu węzła wymiany (Delete).

Metoda DeclareExchange tworzy nowy węzeł wymiany lub weryfikuje, czy już istnieje. Metoda przyjmuje następujące argumenty:


Aby zadeklarować nowy węzeł wymiany, wywołaj metodę DeclareExchange i przekaż nazwę kanału, nazwę węzła i typ węzła jako argumenty. Zdarzenie OnAMQPExchangeDeclare jest wywoływane jako potwierdzenie od serwera, że węzeł wymiany został zadeklarowany.

AMQP.DeclareExchange('channel_name', 'exchange_name', 'direct');
procedure OnAMQPExchangeDeclare(Sender: TObject; const aChannel, aExchange: string);
begin
  DoLog('#AMQP_exchange_declare: [' + aChannel + '] ' + aExchange);
end; 

Queues 

Klasa kolejki umożliwia aplikacji zarządzanie kolejkami wiadomości na serwerze. Jest to podstawowy krok w niemal wszystkich aplikacjach konsumujących wiadomości — co najmniej w celu sprawdzenia, czy oczekiwana kolejka wiadomości rzeczywiście istnieje.

Cykl życia trwałej kolejki wiadomości jest prosty:

1. Klient sprawdza, czy kolejka wiadomości istnieje (Declare z argumentem "passive").

2. Serwer potwierdza istnienie kolejki wiadomości (Declare-Ok).

3. Klient odczytuje wiadomości z kolejki.

Cykl życia tymczasowej kolejki wiadomości jest bardziej złożony:

1. Klient tworzy kolejkę wiadomości (Declare, często bez nazwy — serwer przypisuje ją sam). Serwer potwierdza (Declare-Ok).

2. Klient uruchamia konsumenta na kolejce wiadomości. Dokładna funkcjonalność konsumenta jest zdefiniowana przez klasę Basic.

3. Klient anuluje konsumenta — jawnie lub przez zamknięcie kanału i/lub połączenia.

4. Gdy ostatni konsument znika z kolejki wiadomości, serwer po pewnym czasie ją usuwa.

AMQP implementuje mechanizm dostarczania dla subskrypcji tematów jako kolejki wiadomości. Umożliwia to ciekawe struktury, w których subskrypcja może być równoważona obciążenia między pulą współpracujących aplikacji subskrybentów.

Cykl życia subskrypcji obejmuje dodatkowy etap wiązania:

1. Klient tworzy kolejkę wiadomości (Declare), a serwer potwierdza (Declare-Ok).

2. Klient wiąże kolejkę wiadomości z węzłem wymiany tematu (Bind), a serwer potwierdza (Bind-Ok).

3. Klient używa kolejki wiadomości jak w poprzednich przykładach.

Metoda DeclareQueue tworzy nową kolejkę lub weryfikuje, czy już istnieje. Metoda przyjmuje następujące argumenty:


Aby zadeklarować nową kolejkę, wywołaj metodę DeclareQueue i przekaż nazwę kanału oraz nazwę kolejki jako argumenty. Zdarzenie OnAMQPQueueDeclare jest wywoływane jako potwierdzenie od serwera, że kolejka została zadeklarowana.

AMQP.DeclareQueue('channel_name', 'queue_name');
procedure OnAMQPQueueDeclare(Sender: TObject; const aChannel, aQueue: string; aMessageCount, aConsumerCount: Integer);
begin
  DoLog('#AMQP_queue_declare: [' + aChannel + '] ' + aQueue);
end;