Implementacja AMQP 0-9-1 w sgcWebSockets otrzymała kompleksową aktualizację: 11 poprawek błędów obejmujących krytyczną kolejność parametrów, niezgodności typów, bezpieczeństwo wątków i utratę danych, a także 6 nowych funkcji protokołu — w tym Basic.Nack, wiązania Exchange-to-Exchange, Publisher Confirms, powiadomienia Connection.Blocked/Unblocked oraz Connection.UpdateSecret do odświeżania tokenów. Ten artykuł szczegółowo opisuje każdą zmianę wraz z przykładami kodu.
Spis treści
- Poprawki błędów
- Krytyczne: kolejność parametrów DeclareExchange
- Bajt typu tablicy pól
- Poprawki zgodności ze specyfikacją
- Pozostałe poprawki błędów
- Nowe funkcje
- Basic.Nack — potwierdzenia negatywne
- Exchange.Bind/Unbind — wiązania Exchange-to-Exchange
- Klasa Confirm — Publisher Confirms
- Connection.Blocked/Unblocked — alarmy zasobów
- Connection.UpdateSecret — odświeżanie tokenu
- Zmodyfikowane pliki
1. Poprawki błędów
Naprawiono łącznie 11 błędów w implementacji AMQP 0-9-1 — od krytycznych problemów z kolejnością parametrów po korekty zgodności ze specyfikacją.
Krytyczne: kolejność parametrów DeclareExchange
Metody DeclareExchange i
DeclareExchangeEx przekazywały
aNoWait,
aAutoDelete i
aInternal w złej kolejności do
DoWrite_ExchDeclare. Powodowało to, że flaga
auto-delete była wysyłana jako
no-wait i odwrotnie, czego efektem były nieoczekiwane zachowania
kanałów w brokerze.
Przed (nieprawidłowo)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);
Po (poprawnie)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);
File: sgcAMQP_Client.pas
Bajt typu tablicy pól
Procedura sgcWriteAMQPFieldTable zawsze zapisywała
$53 ('S' = długi ciąg znaków)
jako wskaźnik typu dla wszystkich wartości w tablicy pól, bez względu na rzeczywisty typ wartości. Oznaczało to, że wartości double, integer,
boolean i int64 były niepoprawnie oznaczane jako ciągi znaków w formacie sieciowym.
Przed (nieprawidłowo)
sgcWriteAMQPByte($53, vBytes); // Always wrote 'S' for ALL types
case vType of
'd': sgcWriteAMQPDouble(...);
'I': sgcWriteAMQPInt32(...);
...
Po (poprawnie)
case vType of
'd':
begin
sgcWriteAMQPByte(Byte('d'), vBytes); // Correct type per value
sgcWriteAMQPDouble(...);
end;
'I':
begin
sgcWriteAMQPByte(Byte('I'), vBytes);
sgcWriteAMQPInt32(...);
end;
'L':
begin
sgcWriteAMQPByte(Byte('L'), vBytes);
sgcWriteAMQPInt64(...);
end;
't':
begin
sgcWriteAMQPByte(Byte('t'), vBytes);
sgcWriteAMQPBoolean(...);
end;
'S':
begin
sgcWriteAMQPByte(Byte('S'), vBytes);
sgcWriteAMQPLongString(...);
end;
File: sgcAMQP_ReadWrite.pas
Poprawki zgodności ze specyfikacją
| Problem | Przed | Po | File |
|---|---|---|---|
| BasicGetEmpty.Reserved1 wrong type | UInt16 / sgcReadAMQPUInt16 |
string / sgcReadAMQPShortString |
sgcAMQP_Classes.pas |
| ChannelOpenOk.Reserved1 wrong read | sgcReadAMQPShortString |
sgcReadAMQPLongString |
sgcAMQP_Classes.pas |
| ConnectionClose/ChannelClose used class-specific method value getters | sgcGetAMQPConnectionValue(FailMethodId) |
sgcGetAMQPMethodValue(FailClassId, FailMethodId) |
sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas |
Dodano nową generyczną funkcję pomocniczą sgcGetAMQPMethodValue
do sgcAMQP_Helpers.pas, która zwraca prawidłowe numeryczne ID metody dla dowolnej klasy AMQP,
zastępując gettery specyficzne dla poszczególnych klas, które działały tylko w obrębie własnej klasy.
Pozostałe poprawki błędów
| Błąd | Opis | File |
|---|---|---|
| Channel.CloseOk missing channel ID | DoWrite_ChannCloseOk nie ustawiał oFrame.Header.Channel, w wyniku czego close-ok był wysyłany na kanale 0 (poziom połączenia) zamiast na docelowym kanale. Dodano parametr aChannelId: Word. |
sgcAMQP.pas |
| Typo in error constant | Zmieniono 'Now Allowed' na 'Not Allowed'. |
sgcAMQP_Const.pas |
| QueueUnBind missing request data | DoWrite_QueueUnBind nie zapisywał QueueUnBindQueue i QueueUnBindExchange w żądaniu kanału, powodując, że zdarzenie OnAMQPQueueUnBind zwracało puste wartości. |
sgcAMQP_Client.pas |
| Remaining bytes discarded after DoRead | Gdy pętla odczytu kończyła się z 1–7 pozostałymi bajtami (niekompletna ramka), były one po cichu tracone. Teraz są zapisywane do FBytes na potrzeby kolejnego cyklu odczytu. |
sgcAMQP.pas |
| GetChannel ignores aRaiseIfNotFound | Parametr aRaiseIfNotFound nigdy nie był sprawdzany. Teraz wyjątek jest zgłaszany tylko gdy flaga ma wartość True. |
sgcAMQP.pas |
2. Nowe funkcje
Zaimplementowano sześć nowych funkcji protokołu AMQP 0-9-1, obejmujących powszechnie stosowane rozszerzenia RabbitMQ oraz dodatkowe metody specyfikacji.
Basic.Nack — Potwierdzenia negatywne
Basic.Nack (klasa 60, metoda 120) to rozszerzenie RabbitMQ
umożliwiające odrzucenie jednej lub wielu wiadomości jednocześnie z opcją ponownego kolejkowania. W odróżnieniu od
Basic.Reject, obsługuje flagę
multiple, która odrzuca wszystkie wiadomości aż do wskazanego tagu dostawy (włącznie).
| Method | Opis | Kierunek |
|---|---|---|
NackMessage |
Wysyła do brokera potwierdzenie negatywne. | Client → Server |
OnAMQPBasicNack |
Wywoływane gdy serwer wysyła Nack (w trybie potwierdzeń wydawcy). | Server → Client |
NackMessage
procedure NackMessage(const aChannel: string;
aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);
Zdarzenie OnAMQPBasicNack
procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
const aChannel: string;
const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
', Multiple: ' + BoolToStr(aNack.Multiple, True) +
', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;
Exchange.Bind/Unbind — Wiązania Exchange-to-Exchange
Wiązania Exchange-to-Exchange (klasa 40, metody 30/31 i 40/51) umożliwiają trasowanie wiadomości między kanałami wymiany bez pośredniej kolejki. Jest to rozszerzenie RabbitMQ pozwalające tworzyć zaawansowane topologie, takie jak hierarchie fan-out i partycjonowanie tematów.
| Method | Opis |
|---|---|
BindExchange / BindExchangeEx |
Wiąże docelowy kanał wymiany ze źródłowym za pomocą klucza trasowania. Wariant Ex czeka synchronicznie na odpowiedź brokera. |
UnbindExchange / UnbindExchangeEx |
Usuwa wiązanie Exchange-to-Exchange. |
OnAMQPExchangeBind |
Wywoływane gdy broker potwierdzi wiązanie kanału wymiany. |
OnAMQPExchangeUnbind |
Wywoływane gdy broker potwierdzi usunięcie wiązania kanału wymiany. |
BindExchange / BindExchangeEx
procedure BindExchange(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
'downstream-exchange', // destination
'upstream-exchange', // source
'orders.#', // routing key
False) // wait for confirmation
then
Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
'downstream-exchange', 'upstream-exchange', 'orders.#', False);
Klasa Confirm — Potwierdzenia wydawcy
Potwierdzenia wydawcy (klasa 85, metody 10/11) umożliwiają brokerowi potwierdzanie odbioru opublikowanych wiadomości.
Po przełączeniu kanału w tryb potwierdzeń przez Confirm.Select,
broker wysyła Basic.Ack lub
Basic.Nack dla każdej opublikowanej wiadomości, zapewniając
niezawodne publikowanie bez transakcji.
| Method / Event | Opis |
|---|---|
SelectConfirm / SelectConfirmEx |
Włącza tryb potwierdzeń wydawcy na kanale. |
OnAMQPConfirmSelectOk |
Wywoływane gdy broker potwierdzi aktywację trybu potwierdzeń. |
OnAMQPBasicAck |
Wywoływane gdy broker potwierdza odbiór opublikowanej wiadomości. |
OnAMQPBasicNack |
Wywoływane gdy broker negatywnie potwierdza opublikowaną wiadomość. |
SelectConfirm / SelectConfirmEx
procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
Przykład: niezawodne publikowanie z potwierdzeniami
// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
const aChannel: string;
const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;
Connection.Blocked/Unblocked — Alarmy zasobów
Gdy brokerowi kończą się zasoby (pamięć, dysk), wysyła
Connection.Blocked (klasa 10, metoda 60) z
ciągiem opisującym przyczynę. Po ustąpieniu problemu wysyła
Connection.Unblocked (metoda 61). Są to powiadomienia kierowane wyłącznie
od serwera do klienta. Funkcja jest obsługiwana w klasie bazowej
TsgcAMQP, dzięki czemu jest dostępna dla wszystkich komponentów AMQP.
| Event | Opis |
|---|---|
OnAMQPConnectionBlocked |
Wywoływane gdy broker blokuje połączenie z powodu ograniczonych zasobów. Zawiera ciąg Reason (np. 'low on memory'). |
OnAMQPConnectionUnblocked |
Wywoływane gdy broker odblokowuje połączenie. |
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
const aReason: string);
begin
Log('Connection BLOCKED: ' + aReason);
// Pause publishing to avoid message loss
FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
Log('Connection unblocked - resuming');
FPublishingPaused := False;
end;
Connection.UpdateSecret — Odświeżanie tokenu
Connection.UpdateSecret (klasa 10, metoda 70) umożliwia
odświeżanie poświadczeń uwierzytelniania na aktywnym połączeniu bez konieczności ponownego łączenia. Jest to niezbędne
przy uwierzytelnianiu OAuth2/JWT, gdzie tokeny wygasają okresowo.
| Method / Event | Opis |
|---|---|
UpdateSecret / UpdateSecretEx |
Wysyła nowy sekret (token) do brokera wraz z opcjonalnym opisem przyczyny. |
OnAMQPConnectionUpdateSecretOk |
Wywoływane gdy broker zaakceptuje nowy sekret. |
UpdateSecret / UpdateSecretEx
procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
vNewToken: string;
begin
vNewToken := GetRefreshedOAuthToken();
if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
Log('Token refreshed successfully')
else
Log('Token refresh failed - reconnecting');
end;
3. Zmodyfikowane pliki
| Plik | Zmiany |
|---|---|
sgcAMQP_Const.pas |
Poprawka literówki ('Not Allowed'). |
sgcAMQP_Helpers.pas |
Nowa funkcja sgcGetAMQPMethodValue, funkcje pomocnicze klasy Confirm (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), mapowania ID metod dla wszystkich nowych metod. |
sgcAMQP_ReadWrite.pas |
Poprawka bajtu typu w tablicy pól — każdy typ wartości zapisuje teraz prawidłowy wskaźnik typu. |
sgcAMQP_Classes.pas |
Nowe wyliczenia (amqpClassConfirm, 12 nowych metod), 13 nowych klas ładunku, zaktualizowane tabele dispatch, poprawka bezpieczeństwa wątków, poprawki zgodności ze specyfikacją, nowe pola przechowywania żądań. |
sgcAMQP.pas |
8 nowych typów zdarzeń, obsługa Connection.Blocked/Unblocked, poprawka ID kanału w DoWrite_ChannCloseOk, zachowywanie pozostałych bajtów, poprawka flagi GetChannel. |
sgcAMQP_Client.pas |
6 nowych procedur odczytu, 5 nowych metod zapisu, 11 nowych metod publicznych, 6 nowych zdarzeń, zaktualizowana tabela dispatch, poprawka kolejności parametrów, poprawka danych żądania. |
