Aktualizacja klienta AMQP 1 dla Delphi

· Funkcje

Implementacja protokołu AMQP 1.0 w sgcWebSockets przeszła kompleksowy przegląd pod kątem zgodności ze specyfikacją OASIS AMQP 1.0. Artykuł dokumentuje 30 poprawek zastosowanych w 8 plikach źródłowych, obejmujących krytyczne błędy, wycieki pamięci, zgodność ze specyfikacją, poprawność automatu stanów, obsługę heartbeat i ulepszenia bezpieczeństwa wątkowego.

Spis treści

  1. Przegląd
  2. Naprawione błędy krytyczne
  3. Naprawione wycieki pamięci
  4. Poprawki zgodności ze specyfikacją
  5. Poprawki automatu stanów i połączenia
  6. Poprawki heartbeat i bezczynności
  7. Poprawki bezpieczeństwa wątkowego
  8. Zmodyfikowane pliki

1. Przegląd

W sumie zastosowano 30 poprawek w 6 plikach źródłowych i 2 plikach interfejsów implementacji AMQP 1.0. Poprawki podzielono następująco:

Kategoria Liczba Krytyczność
Naprawione błędy krytyczne 6 Critical
Naprawione wycieki pamięci 4 Critical
Zgodność ze specyfikacją 10 High
Automat stanów i połączenie 5 High
Heartbeat i bezczynność 3 High
Bezpieczeństwo wątkowe 2 High

2. Naprawione błędy krytyczne

Poprawki te dotyczą błędów powodujących natychmiastowe awarie w czasie wykonania, uszkodzenie protokołu lub nieprawidłowe zachowanie podczas komunikacji AMQP 1.0.

2.1 Brakujące słowo kluczowe raise w wyjątku

File: sgcAMQP1_Classes.pas
Obiekt wyjątku był tworzony, ale nigdy nie był rzucany przy napotkaniu nieprawidłowego typu ramki. Wyjątek był po cichu odrzucany, co pozwalało na przetwarzanie uszkodzonych ramek bez ich wykrycia.

Przed poprawką (błędny kod)

TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);

Po poprawce (poprawny kod)

raise TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);

2.2 WriteMap Map32 — brakujące dane i błędny rozmiar

File: sgcAMQP1_Classes.pas
Ścieżce kodowania Map32 brakowało wywołania WriteBytes dla rzeczywistych danych mapy, a pole rozmiaru używało nieprawidłowego przesunięcia. Map32 używa 4-bajtowego pola licznika (w odróżnieniu od Map8, które używa 1 bajtu), więc rozmiar musi zawierać 4 dodatkowe bajty.

Przed poprawką (błędny kod)

else
begin
  WriteUByte(Ord(amqp1DataMap32));
  _WriteUInt32(vSize + 1);
  _WriteUInt32(oJSON.Count * 2);
  // Missing: WriteBytes(oArray.Bytes);
end;

Po poprawce (poprawny kod)

else
begin
  WriteUByte(Ord(amqp1DataMap32));
  _WriteUInt32(vSize + 4);
  _WriteUInt32(oJSON.Count * 2);
  WriteBytes(oArray.Bytes);
end;

2.3 Odwrócona logika ContainsError

File: sgcAMQP1_Frames.pas
Metoda ContainsError zarówno w TsgcAMQP1FrameRejected, jak i TsgcAMQP1DescribedListError zwracała True, gdy nie było błędu, i False, gdy błąd był obecny. Powodowało to cichy brak informacji o błędach i zapisywanie bajtów null tam, gdzie powinny być serializowane rzeczywiste błędy. Gałęzie DoWrite i DoWriteError były również zamienione, by odpowiadały poprawionej logice.

Przed poprawką (błędny kod)

function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
  if not Assigned(FError) then
    Result := True              // Wrong: True when no error
  else
    Result := (Error.Condition = '') and (Error.Description = '') and
      (Error.Info = '');     // Wrong: True when all empty
end;

Po poprawce (poprawny kod)

function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
  if not Assigned(FError) then
    Result := False             // Correct: False when no error
  else
    Result := (Error.Condition <> '') or (Error.Description <> '') or
      (Error.Info <> '');    // Correct: True when any field set
end;

2.4 Separatory null w SASL PLAIN

File: sgcAMQP1_Frames.pas
Mechanizm SASL PLAIN wymaga formatu \0użytkownik\0hasło z bajtami null ($00) jako separatorami. Implementacja nie inicjalizowała tablicy bajtów zerami, więc pozycje separatorów zawierały przypadkowe dane. Uwierzytelnianie nie działało z żadnym brokerem AMQP 1.0 zgodnym ze standardem.

Po poprawce (poprawny kod)

SetLength(FInitialResponse, 2 + Length(oUser) + Length(oPassword));
FillChar(FInitialResponse[0], Length(FInitialResponse), 0);  // Zero-fill for null separators

2.5 Brakujące inherited Create w TsgcAMQP1Message

File: sgcAMQP1_Message.pas
Konstruktor parametryczny TsgcAMQP1Message nie wywoływał inherited Create, co oznacza, że klasa bazowa nigdy nie była inicjalizowana. Powodowało to naruszenia dostępu lub uszkodzony stan przy używaniu wygodnego konstruktora.

Po poprawce (poprawny kod)

constructor TsgcAMQP1Message.Create(const aValue: string);
begin
  inherited Create;
  ApplicationData.ValueType := amqp1adtAmqpValue;
  ApplicationData.AMQPValue.Value := aValue;
end;

2.6 Brakujący średnik w AmqpValue.DoRead

File: sgcAMQP1_Frames.pas
Brakujący średnik w TsgcAMQP1FrameAmqpValue.DoRead uniemożliwiał kompilację.


3. Naprawione wycieki pamięci

Poprawki te dotyczą problemów z zarządzaniem czasem życia obiektów, które powodowały akumulację pamięci w czasie, szczególnie podczas długotrwałych połączeń AMQP 1.0 z wieloma wymianami wiadomości.

Poprawka Plik Opis
3.1 sgcAMQP1_Frames.pas FDefaultOutcome nie był zwalniany przed ponownym przypisaniem podczas odczytu deskryptora Source. Przy każdym odebraniu nowego domyślnego wyniku poprzedni obiekt był tracony.
3.2 sgcAMQP1_Session.pas Zduplikowane wywołanie sgcFree(FCreditConsumed) w destruktorze powodowało potencjalny podwójny zwrot pamięci. Usunięto zduplikowaną linię.
3.3 sgcAMQP1_Session.pas FOutgoingDeliveries brakowało w destruktorze sesji. Lista śledzenia dostaw nigdy nie była zwalniana przy zniszczeniu sesji.
3.4 sgcAMQP1_Message.pas SetMessage i SetMessageAndFreeOnDestroy nie zwalniały poprzedniej wiadomości, gdy FFreeMessageOnDestroy był włączony. Wielokrotne przypisywanie wiadomości powodowało wycieki pamięci.

Poprawka 3.1 – FDefaultOutcome zwolniony przed ponownym przypisaniem

sgcFree(FDefaultOutcome);  // Free previous instance before reassignment
if oDescriptor.Code = amqp1dcptReleased then
  FDefaultOutcome := TsgcAMQP1FrameReleased.Create
else if oDescriptor.Code = amqp1dcptAccepted then
  FDefaultOutcome := TsgcAMQP1FrameAccepted.Create
else if oDescriptor.Code = amqp1dcptRejected then
  FDefaultOutcome := TsgcAMQP1FrameRejected.Create

Poprawka 3.4 – SetMessage zwalnia poprzednią wiadomość

procedure TsgcAMQP1Delivery.SetMessage(const aMessage: TsgcAMQP1Message);
begin
  if FFreeMessageOnDestroy and Assigned(F_Message) and (F_Message <> aMessage) then
    sgcFree(F_Message);
  FFreeMessageOnDestroy := False;
  F_Message := aMessage;
end;

4. Poprawki zgodności ze specyfikacją

Poprawki te korygują odchylenia od specyfikacji AMQP 1.0 Transport, Types i Messaging.

4.1 Pole 7 ramki begin odczytywane do błędnej właściwości

File: sgcAMQP1_Frames.pas
Pole o indeksie 7 performatywy begin było odczytywane do DesiredCapabilities zamiast Properties. Zgodnie ze specyfikacją pola begin to: remote-channel(0), next-outgoing-id(1), incoming-window(2), outgoing-window(3), handle-max(4), offered-capabilities(5), desired-capabilities(6), properties(7).

4.2 Brakujące pola Source i Target w DoWrite

File: sgcAMQP1_Frames.pas
Metoda DoWrite deskryptora Source nie serializowała pól default-outcome, outcomes i capabilities. W deskryptorze Target brakowało capabilities. Powodowało to, że broker używał wartości domyślnych zamiast wynegocjowanych, co mogło prowadzić do nieprawidłowej obsługi stanu dostaw.

4.3 AmqpSequence odczytywane do błędnej właściwości

File: sgcAMQP1_Message.pas
Podczas odczytu treści wiadomości dane amqp-sequence były odczytywane do ApplicationData.AMQPValue zamiast ApplicationData.AMQPSequence. Powodowało to uszkodzenie treści wiadomości dla każdej wiadomości używającej kodowania amqp-sequence.

4.4 TransactionalState — wynik nie jest zapisywany

File: sgcAMQP1_Frames.pas
Stan dostawy transactional-state nie serializował swojego pola outcome, które jest wymagane podczas rozliczania dostaw w ramach transakcji.

4.5 Pole Last w Disposition nie odróżnia zera od braku wartości

Files: sgcAMQP1_Frames.pas, sgcAMQP1_Frames.intf, sgcAMQP1_Session.pas
Performatywa disposition ma opcjonalne pole last (delivery-id). Ponieważ jest to Cardinal, wartość 0 jest prawidłowa i nie może być używana jako wartownik dla “nie ustawiono”. Dodano nową flagę boolowską FLastAssigned i setter SetLast, aby prawidłowo śledzić, czy pole zostało jawnie ustawione.

procedure TsgcAMQP1FrameDisposition.SetLast(const Value: Cardinal);
begin
  FLast := Value;
  FLastAssigned := True;
end;

4.6 AmqpSequence — brak właściwości Value i metod Read/Write

Files: sgcAMQP1_Frames.pas, sgcAMQP1_Frames.intf
Klasa TsgcAMQP1FrameAmqpSequence nie miała właściwości Value i miała puste metody DoRead/DoWrite. Typ treści amqp-sequence był całkowicie niefunkcjonalny.

4.7 Pole Info błędu odczytywane jako String zamiast Map

File: sgcAMQP1_Frames.pas
Specyfikacja AMQP 1.0 definiuje pole info typu error jako map. Było ono odczytywane przez ReadString zamiast ReadMap, powodując błędy parsowania gdy brokery wysyłają strukturalne informacje o błędach.

4.8 Capabilities i locales zapisywane jako String zamiast Symbol

File: sgcAMQP1_Frames.pas
Specyfikacja AMQP 1.0 definiuje offered-capabilities, desired-capabilities, outgoing-locales i incoming-locales jako tablice symbol. Były one zapisywane przez WriteString zamiast WriteSymbol w performatywach open, begin i attach. Broker zgodny ze standardami odrzucałby te ramki jako mające nieprawidłowe typy pól.

4.9 Brakujący deskryptor Accepted w obsłudze DefaultOutcome

File: sgcAMQP1_Frames.pas
Czytnik default-outcome deskryptora Source obsługiwał tylko released i rejected. Najczęstszy wynik, accepted, nie był obsługiwany. Gdy broker wysyłał accepted jako domyślny wynik, był po cichu ignorowany.


5. Poprawki automatu stanów i połączenia

Poprawki te dotyczą automatu stanów połączenia AMQP 1.0 i logiki przetwarzania ramek.

Poprawka Plik Opis
5.1 sgcAMQP1.pas Przejście stanu amqp1csOpenReceived nie miało gałęzi else DoRaiseInvalidState, którą posiadają wszystkie inne stany. Nieprawidłowe przejścia były po cichu ignorowane zamiast generować błąd.
5.2 sgcAMQP1.pas Komunikat błędu walidacji rozmiaru ramki wyświetlał RemoteMaxFrameSize, choć powinien pokazywać LocalMaxFrameSize, ponieważ to lokalny limit był sprawdzany.
5.3 sgcAMQP1.pas FLastTimeRead był inicjalizowany na 0 (epoka Delphi: 1899-12-30) zamiast Now. Powodowało to natychmiastowe fałszywe wykrycie bezczynności przy uruchomieniu.
5.4 sgcAMQP1.pas Przeciążenie TBytes metody Read nie aktualizowało FLastTimeRead := Now, w odróżnieniu od przeciążenia TMemoryStream. Powodowało to niespójne śledzenie heartbeat.
5.5 sgcAMQP1.pas Przejście stanu po odebraniu nagłówka było warunkowe, choć powinno zawsze następować. Automat stanów musi dokonywać przejścia przy każdej prawidłowej wymianie nagłówków zgodnie ze specyfikacją AMQP 1.0.

Poprawka 5.1 – OpenReceived — brakująca gałąź błędu

amqp1csOpenReceived:
  begin
    if aState = amqp1csOpenSent then
      FConnectionState := amqp1csOpened
    else
      DoRaiseInvalidState;  // Added: was missing
  end;

6. Poprawki heartbeat i bezczynności

Specyfikacja AMQP 1.0 wymaga, żeby gdy peer ogłosi idle-timeout w performatywie open, drugi peer wysyłał ramki heartbeat co połowę ogłoszonego interwału. Poprawki te zapewniają, że mechanizm heartbeat rzeczywście działa.

Poprawka Plik Opis
6.1 sgcAMQP1_Client.pas HeartBeat nigdy nie był włączany. Obie gałęzie sprawdzania bezczynności ustawiały HeartBeat.Enabled := False. Zmieniono na True, gdy IdleTimeout > 0.
6.2 sgcAMQP1_Client.pas Disconnect nie wyłączał heartbeat ani nie ustawiał FConnected := False wystarczająco wcześnie. Zmieniono kolejność, by zapobiec uruchamianiu heartbeat podczas zamykania połączenia.
6.3 sgcAMQP1.pas FLastTimeRead nie był aktualizowany w przeciążeniu TBytes metody Read (wymienione także w sekcji automatu stanów).

Poprawka 6.1 – HeartBeat włączony

if oOpen.IdleTimeout > 0 then
begin
  HeartBeat.Interval := Trunc(oOpen.IdleTimeout / 2);
  HeartBeat.Enabled := True;   // Was: False (heartbeat never started)
end
else
  HeartBeat.Enabled := False;

Poprawka 6.2 – Disconnect zmieniona kolejność

procedure TsgcAMQP1_Client.Disconnect;
begin
  FConnected := False;           // Moved first: prevents heartbeat race
  DoStopIdleTimeout;
  HeartBeat.Enabled := False;    // Added: stop heartbeat during teardown
  Clear;
  DoConnectionState(amqp1csEnd);
end;

7. Poprawki bezpieczeństwa wątkowego

Poprawki te dotyczą wyścigów wątków przy współbieżnym dostępie do współdzielonych struktur danych.

7.1 TsgcAMQP1Deliveries.First() — sprawdzanie granic i blokowanie

File: sgcAMQP1_Message.pas
Metoda First() uzyskiwała dostęp do Items[0] bez sprawdzenia, czy lista jest pusta, i bez pobierania blokady wątkowej. W środowisku wielowątkowym inny wątek mógł usunąć wszystkie elementy między sprawdzeniem licznika a dostępem, powodując wyjątek przekroczenia granic.

Po poprawce (poprawny kod)

function TsgcAMQP1Deliveries.First: TsgcAMQP1Delivery;
var
  oList: TList;
begin
  result := nil;
  oList := LockList;
  Try
    if oList.Count > 0 then
      result := TsgcAMQP1Delivery(oList[0]);
  Finally
    UnLockList;
  End;
end;

7.2 SetMessage — bezpieczna podmiana obiektu

File: sgcAMQP1_Message.pas
Metoda SetMessage sprawdza teraz, że nowa wiadomość różni się od bieżącej przed zwolnieniem, zapobiegając użyciu po zwolnieniu podczas przypisywania tego samego obiektu wiadomości.


8. Zmodyfikowane pliki

Plik Poprawki Kategorie
Source\sgcAMQP1_Classes.pas 2 Błędy krytyczne
Source\sgcAMQP1_Frames.pas 16 Błędy krytyczne, wycieki pamięci, zgodność ze specyfikacją
Interfaces\sgcAMQP1_Frames.intf 2 Zgodność ze specyfikacją (Disposition LastAssigned, AmqpSequence Value)
Source\sgcAMQP1_Message.pas 4 Błędy krytyczne, wycieki pamięci, bezpieczeństwo wątkowe
Source\sgcAMQP1_Session.pas 3 Wycieki pamięci, zgodność ze specyfikacją
Source\sgcAMQP1.pas 5 Automat stanów, połączenie, heartbeat
Source\sgcAMQP1_Client.pas 3 Heartbeat, bezpieczne rozłączanie

Łącznie: 30 poprawek w 8 plikach, poprawiających poprawność protokołu, bezpieczeństwo pamięci i niezawodność implementacji AMQP 1.0 względem specyfikacji OASIS.