Aktualizacja klienta AMQP 0.9.1 dla Delphi

· Funkcje

Implementacja AMQP 0-9-1 w sgcWebSockets otrzymała kompleksową aktualizację: 11 poprawek błędów obejmujących krytyczną kolejność parametrów, niezgodności typów, bezpieczeństwo wątków i utratę danych, a także 6 nowych funkcji protokołu — w tym Basic.Nack, wiązania Exchange-to-Exchange, Publisher Confirms, powiadomienia Connection.Blocked/Unblocked oraz Connection.UpdateSecret do odświeżania tokenów. Ten artykuł szczegółowo opisuje każdą zmianę wraz z przykładami kodu.

Spis treści

  1. Poprawki błędów
  2. Krytyczne: kolejność parametrów DeclareExchange
  3. Bajt typu tablicy pól
  4. Poprawki zgodności ze specyfikacją
  5. Pozostałe poprawki błędów
  6. Nowe funkcje
  7. Basic.Nack — potwierdzenia negatywne
  8. Exchange.Bind/Unbind — wiązania Exchange-to-Exchange
  9. Klasa Confirm — Publisher Confirms
  10. Connection.Blocked/Unblocked — alarmy zasobów
  11. Connection.UpdateSecret — odświeżanie tokenu
  12. Zmodyfikowane pliki

1. Poprawki błędów

Naprawiono łącznie 11 błędów w implementacji AMQP 0-9-1 — od krytycznych problemów z kolejnością parametrów po korekty zgodności ze specyfikacją.

Krytyczne: kolejność parametrów DeclareExchange

Metody DeclareExchange i DeclareExchangeEx przekazywały aNoWait, aAutoDelete i aInternal w złej kolejności do DoWrite_ExchDeclare. Powodowało to, że flaga auto-delete była wysyłana jako no-wait i odwrotnie, czego efektem były nieoczekiwane zachowania kanałów w brokerze.

Przed (nieprawidłowo)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);

Po (poprawnie)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);

File: sgcAMQP_Client.pas


Bajt typu tablicy pól

Procedura sgcWriteAMQPFieldTable zawsze zapisywała $53 ('S' = długi ciąg znaków) jako wskaźnik typu dla wszystkich wartości w tablicy pól, bez względu na rzeczywisty typ wartości. Oznaczało to, że wartości double, integer, boolean i int64 były niepoprawnie oznaczane jako ciągi znaków w formacie sieciowym.

Przed (nieprawidłowo)

sgcWriteAMQPByte($53, vBytes);  // Always wrote 'S' for ALL types
case vType of
  'd': sgcWriteAMQPDouble(...);
  'I': sgcWriteAMQPInt32(...);
  ...

Po (poprawnie)

case vType of
  'd':
  begin
    sgcWriteAMQPByte(Byte('d'), vBytes);  // Correct type per value
    sgcWriteAMQPDouble(...);
  end;
  'I':
  begin
    sgcWriteAMQPByte(Byte('I'), vBytes);
    sgcWriteAMQPInt32(...);
  end;
  'L':
  begin
    sgcWriteAMQPByte(Byte('L'), vBytes);
    sgcWriteAMQPInt64(...);
  end;
  't':
  begin
    sgcWriteAMQPByte(Byte('t'), vBytes);
    sgcWriteAMQPBoolean(...);
  end;
  'S':
  begin
    sgcWriteAMQPByte(Byte('S'), vBytes);
    sgcWriteAMQPLongString(...);
  end;

File: sgcAMQP_ReadWrite.pas


Poprawki zgodności ze specyfikacją

Problem Przed Po File
BasicGetEmpty.Reserved1 wrong type UInt16 / sgcReadAMQPUInt16 string / sgcReadAMQPShortString sgcAMQP_Classes.pas
ChannelOpenOk.Reserved1 wrong read sgcReadAMQPShortString sgcReadAMQPLongString sgcAMQP_Classes.pas
ConnectionClose/ChannelClose used class-specific method value getters sgcGetAMQPConnectionValue(FailMethodId) sgcGetAMQPMethodValue(FailClassId, FailMethodId) sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas

Dodano nową generyczną funkcję pomocniczą sgcGetAMQPMethodValue do sgcAMQP_Helpers.pas, która zwraca prawidłowe numeryczne ID metody dla dowolnej klasy AMQP, zastępując gettery specyficzne dla poszczególnych klas, które działały tylko w obrębie własnej klasy.


Pozostałe poprawki błędów

Błąd Opis File
Channel.CloseOk missing channel ID DoWrite_ChannCloseOk nie ustawiał oFrame.Header.Channel, w wyniku czego close-ok był wysyłany na kanale 0 (poziom połączenia) zamiast na docelowym kanale. Dodano parametr aChannelId: Word. sgcAMQP.pas
Typo in error constant Zmieniono 'Now Allowed' na 'Not Allowed'. sgcAMQP_Const.pas
QueueUnBind missing request data DoWrite_QueueUnBind nie zapisywał QueueUnBindQueue i QueueUnBindExchange w żądaniu kanału, powodując, że zdarzenie OnAMQPQueueUnBind zwracało puste wartości. sgcAMQP_Client.pas
Remaining bytes discarded after DoRead Gdy pętla odczytu kończyła się z 1–7 pozostałymi bajtami (niekompletna ramka), były one po cichu tracone. Teraz są zapisywane do FBytes na potrzeby kolejnego cyklu odczytu. sgcAMQP.pas
GetChannel ignores aRaiseIfNotFound Parametr aRaiseIfNotFound nigdy nie był sprawdzany. Teraz wyjątek jest zgłaszany tylko gdy flaga ma wartość True. sgcAMQP.pas

2. Nowe funkcje

Zaimplementowano sześć nowych funkcji protokołu AMQP 0-9-1, obejmujących powszechnie stosowane rozszerzenia RabbitMQ oraz dodatkowe metody specyfikacji.


Basic.Nack — Potwierdzenia negatywne

Basic.Nack (klasa 60, metoda 120) to rozszerzenie RabbitMQ umożliwiające odrzucenie jednej lub wielu wiadomości jednocześnie z opcją ponownego kolejkowania. W odróżnieniu od Basic.Reject, obsługuje flagę multiple, która odrzuca wszystkie wiadomości aż do wskazanego tagu dostawy (włącznie).

Method Opis Kierunek
NackMessage Wysyła do brokera potwierdzenie negatywne. Client → Server
OnAMQPBasicNack Wywoływane gdy serwer wysyła Nack (w trybie potwierdzeń wydawcy). Server → Client

NackMessage

procedure NackMessage(const aChannel: string;
  aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);

Zdarzenie OnAMQPBasicNack

procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
  const aChannel: string;
  const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
  Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aNack.Multiple, True) +
    ', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;

Exchange.Bind/Unbind — Wiązania Exchange-to-Exchange

Wiązania Exchange-to-Exchange (klasa 40, metody 30/31 i 40/51) umożliwiają trasowanie wiadomości między kanałami wymiany bez pośredniej kolejki. Jest to rozszerzenie RabbitMQ pozwalające tworzyć zaawansowane topologie, takie jak hierarchie fan-out i partycjonowanie tematów.

Method Opis
BindExchange / BindExchangeEx Wiąże docelowy kanał wymiany ze źródłowym za pomocą klucza trasowania. Wariant Ex czeka synchronicznie na odpowiedź brokera.
UnbindExchange / UnbindExchangeEx Usuwa wiązanie Exchange-to-Exchange.
OnAMQPExchangeBind Wywoływane gdy broker potwierdzi wiązanie kanału wymiany.
OnAMQPExchangeUnbind Wywoływane gdy broker potwierdzi usunięcie wiązania kanału wymiany.

BindExchange / BindExchangeEx

procedure BindExchange(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
  const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
  'downstream-exchange',  // destination
  'upstream-exchange',    // source
  'orders.#',             // routing key
  False)                  // wait for confirmation
then
  Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
  'downstream-exchange', 'upstream-exchange', 'orders.#', False);

Klasa Confirm — Potwierdzenia wydawcy

Potwierdzenia wydawcy (klasa 85, metody 10/11) umożliwiają brokerowi potwierdzanie odbioru opublikowanych wiadomości. Po przełączeniu kanału w tryb potwierdzeń przez Confirm.Select, broker wysyła Basic.Ack lub Basic.Nack dla każdej opublikowanej wiadomości, zapewniając niezawodne publikowanie bez transakcji.

Method / Event Opis
SelectConfirm / SelectConfirmEx Włącza tryb potwierdzeń wydawcy na kanale.
OnAMQPConfirmSelectOk Wywoływane gdy broker potwierdzi aktywację trybu potwierdzeń.
OnAMQPBasicAck Wywoływane gdy broker potwierdza odbiór opublikowanej wiadomości.
OnAMQPBasicNack Wywoływane gdy broker negatywnie potwierdza opublikowaną wiadomość.

SelectConfirm / SelectConfirmEx

procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;

Przykład: niezawodne publikowanie z potwierdzeniami

// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
  Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
  'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
  const aChannel: string;
  const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
  Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;

Connection.Blocked/Unblocked — Alarmy zasobów

Gdy brokerowi kończą się zasoby (pamięć, dysk), wysyła Connection.Blocked (klasa 10, metoda 60) z ciągiem opisującym przyczynę. Po ustąpieniu problemu wysyła Connection.Unblocked (metoda 61). Są to powiadomienia kierowane wyłącznie od serwera do klienta. Funkcja jest obsługiwana w klasie bazowej TsgcAMQP, dzięki czemu jest dostępna dla wszystkich komponentów AMQP.

Event Opis
OnAMQPConnectionBlocked Wywoływane gdy broker blokuje połączenie z powodu ograniczonych zasobów. Zawiera ciąg Reason (np. 'low on memory').
OnAMQPConnectionUnblocked Wywoływane gdy broker odblokowuje połączenie.
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
  const aReason: string);
begin
  Log('Connection BLOCKED: ' + aReason);
  // Pause publishing to avoid message loss
  FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
  Log('Connection unblocked - resuming');
  FPublishingPaused := False;
end;

Connection.UpdateSecret — Odświeżanie tokenu

Connection.UpdateSecret (klasa 10, metoda 70) umożliwia odświeżanie poświadczeń uwierzytelniania na aktywnym połączeniu bez konieczności ponownego łączenia. Jest to niezbędne przy uwierzytelnianiu OAuth2/JWT, gdzie tokeny wygasają okresowo.

Method / Event Opis
UpdateSecret / UpdateSecretEx Wysyła nowy sekret (token) do brokera wraz z opcjonalnym opisem przyczyny.
OnAMQPConnectionUpdateSecretOk Wywoływane gdy broker zaakceptuje nowy sekret.

UpdateSecret / UpdateSecretEx

procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
  vNewToken: string;
begin
  vNewToken := GetRefreshedOAuthToken();
  if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
    Log('Token refreshed successfully')
  else
    Log('Token refresh failed - reconnecting');
end;

3. Zmodyfikowane pliki

Plik Zmiany
sgcAMQP_Const.pas Poprawka literówki ('Not Allowed').
sgcAMQP_Helpers.pas Nowa funkcja sgcGetAMQPMethodValue, funkcje pomocnicze klasy Confirm (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), mapowania ID metod dla wszystkich nowych metod.
sgcAMQP_ReadWrite.pas Poprawka bajtu typu w tablicy pól — każdy typ wartości zapisuje teraz prawidłowy wskaźnik typu.
sgcAMQP_Classes.pas Nowe wyliczenia (amqpClassConfirm, 12 nowych metod), 13 nowych klas ładunku, zaktualizowane tabele dispatch, poprawka bezpieczeństwa wątków, poprawki zgodności ze specyfikacją, nowe pola przechowywania żądań.
sgcAMQP.pas 8 nowych typów zdarzeń, obsługa Connection.Blocked/Unblocked, poprawka ID kanału w DoWrite_ChannCloseOk, zachowywanie pozostałych bajtów, poprawka flagi GetChannel.
sgcAMQP_Client.pas 6 nowych procedur odczytu, 5 nowych metod zapisu, 11 nowych metod publicznych, 6 nowych zdarzeń, zaktualizowana tabela dispatch, poprawka kolejności parametrów, poprawka danych żądania.