Aggiornamento client AMQP 1 per Delphi

· Funzionalità

L'implementazione del protocollo AMQP 1.0 in sgcWebSockets è stata sottoposta a una revisione completa rispetto alla specifica OASIS AMQP 1.0. Questo articolo documenta le 30 correzioni applicate su 8 file sorgente, che coprono bug critici, memory leak, conformità alla specifica, correttezza delle macchine a stati, gestione degli heartbeat e miglioramenti alla thread safety.

Indice

  1. Panoramica
  2. Correzioni di bug critici
  3. Correzioni di memory leak
  4. Correzioni di conformità alla specifica
  5. Correzioni macchina a stati e connessione
  6. Correzioni heartbeat e idle timeout
  7. Correzioni di thread safety
  8. File modificati

1. Panoramica

Sono state applicate in totale 30 correzioni su 6 file sorgente e 2 file di interfaccia nell'implementazione AMQP 1.0. Le correzioni sono categorizzate come segue:

Categoria Conteggio Severità
Correzioni di bug critici 6 Critica
Correzioni di memory leak 4 Critica
Conformità alla specifica 10 Alta
Macchina a stati e connessione 5 Alta
Heartbeat e idle timeout 3 Alta
Thread safety 2 Alta

2. Correzioni di bug critici

Queste correzioni risolvono bug che causerebbero fallimenti immediati a runtime, corruzione del protocollo o comportamenti errati durante la comunicazione AMQP 1.0.

2.1 Parola chiave raise mancante nell'eccezione

File: sgcAMQP1_Classes.pas
Un oggetto eccezione veniva creato ma mai sollevato quando si incontrava un tipo di frame non valido. L'eccezione veniva scartata silenziosamente, permettendo l'elaborazione di frame corrotti senza rilevamento.

Prima (rotto)

TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);

Dopo (corretto)

raise TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);

2.2 WriteMap Map32 con dati mancanti e dimensione errata

File: sgcAMQP1_Classes.pas
Il percorso di codifica Map32 mancava della chiamata WriteBytes per i dati effettivi della map, e il campo size usava un offset non corretto. Map32 usa un campo count di 4 byte (a differenza di Map8 che usa 1 byte), quindi la size deve includere 4 byte extra.

Prima (rotto)

else
begin
  WriteUByte(Ord(amqp1DataMap32));
  _WriteUInt32(vSize + 1);
  _WriteUInt32(oJSON.Count * 2);
  // Missing: WriteBytes(oArray.Bytes);
end;

Dopo (corretto)

else
begin
  WriteUByte(Ord(amqp1DataMap32));
  _WriteUInt32(vSize + 4);
  _WriteUInt32(oJSON.Count * 2);
  WriteBytes(oArray.Bytes);
end;

2.3 Logica ContainsError invertita

File: sgcAMQP1_Frames.pas
Il metodo ContainsError sia in TsgcAMQP1FrameRejected sia in TsgcAMQP1DescribedListError restituiva True quando non c'era errore e False quando c'era un errore. Questo causava la perdita silenziosa delle informazioni di errore e la scrittura di byte null quando avrebbero dovuto essere serializzati errori reali. Anche i rami DoWrite e DoWriteError sono stati scambiati per corrispondere alla logica corretta.

Prima (rotto)

function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
  if not Assigned(FError) then
    Result := True              // Wrong: True when no error
  else
    Result := (Error.Condition = '') and (Error.Description = '') and
      (Error.Info = '');     // Wrong: True when all empty
end;

Dopo (corretto)

function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
  if not Assigned(FError) then
    Result := False             // Correct: False when no error
  else
    Result := (Error.Condition <> '') or (Error.Description <> '') or
      (Error.Info <> '');    // Correct: True when any field set
end;

2.4 Separatori byte null SASL PLAIN

File: sgcAMQP1_Frames.pas
Il meccanismo SASL PLAIN richiede il formato \0username\0password con separatori byte null ($00). L'implementazione non inizializzava a zero l'array di byte, quindi le posizioni dei separatori contenevano dati spazzatura. L'autenticazione sarebbe fallita contro qualsiasi broker AMQP 1.0 conforme agli standard.

Dopo (corretto)

SetLength(FInitialResponse, 2 + Length(oUser) + Length(oPassword));
FillChar(FInitialResponse[0], Length(FInitialResponse), 0);  // Zero-fill for null separators

2.5 inherited Create mancante in TsgcAMQP1Message

File: sgcAMQP1_Message.pas
Il costruttore parametrizzato di TsgcAMQP1Message non chiamava inherited Create, il che significa che la classe base non veniva mai inizializzata. Questo causava access violation o stato corrotto quando si usava il costruttore di comodo.

Dopo (corretto)

constructor TsgcAMQP1Message.Create(const aValue: string);
begin
  inherited Create;
  ApplicationData.ValueType := amqp1adtAmqpValue;
  ApplicationData.AMQPValue.Value := aValue;
end;

2.6 Punto e virgola mancante in AmqpValue.DoRead

File: sgcAMQP1_Frames.pas
Un punto e virgola mancante in TsgcAMQP1FrameAmqpValue.DoRead impediva la compilazione.


3. Correzioni di memory leak

Queste correzioni risolvono problemi di gestione del ciclo di vita degli oggetti che avrebbero causato l'accumulo di memoria nel tempo, particolarmente durante connessioni AMQP 1.0 di lunga durata con molti scambi di messaggi.

Correzione File Descrizione
3.1 sgcAMQP1_Frames.pas FDefaultOutcome non veniva liberato prima della riassegnazione durante la lettura del descriptor Source. Ogni volta che veniva ricevuto un nuovo default outcome, l'oggetto precedente perdeva memoria.
3.2 sgcAMQP1_Session.pas La chiamata duplicata sgcFree(FCreditConsumed) nel distruttore causava un potenziale double-free. Riga duplicata rimossa.
3.3 sgcAMQP1_Session.pas FOutgoingDeliveries mancava dal distruttore della sessione. La lista di tracking delle delivery non veniva mai liberata quando una sessione veniva distrutta.
3.4 sgcAMQP1_Message.pas SetMessage e SetMessageAndFreeOnDestroy non liberavano il messaggio precedente quando FFreeMessageOnDestroy era abilitato. Assegnazioni ripetute di messaggi causavano memory leak.

Fix 3.1 – FDefaultOutcome liberato prima della riassegnazione

sgcFree(FDefaultOutcome);  // Free previous instance before reassignment
if oDescriptor.Code = amqp1dcptReleased then
  FDefaultOutcome := TsgcAMQP1FrameReleased.Create
else if oDescriptor.Code = amqp1dcptAccepted then
  FDefaultOutcome := TsgcAMQP1FrameAccepted.Create
else if oDescriptor.Code = amqp1dcptRejected then
  FDefaultOutcome := TsgcAMQP1FrameRejected.Create

Fix 3.4 – SetMessage libera il vecchio messaggio

procedure TsgcAMQP1Delivery.SetMessage(const aMessage: TsgcAMQP1Message);
begin
  if FFreeMessageOnDestroy and Assigned(F_Message) and (F_Message <> aMessage) then
    sgcFree(F_Message);
  FFreeMessageOnDestroy := False;
  F_Message := aMessage;
end;

4. Correzioni di conformità alla specifica

Queste correzioni correggono deviazioni dalle specifiche AMQP 1.0 Transport, Types e Messaging.

4.1 Field 7 del frame Begin letto nella proprietà sbagliata

File: sgcAMQP1_Frames.pas
L'indice di campo 7 del performative begin veniva letto in DesiredCapabilities invece che in Properties. Secondo la specifica, i campi di begin sono: remote-channel(0), next-outgoing-id(1), incoming-window(2), outgoing-window(3), handle-max(4), offered-capabilities(5), desired-capabilities(6), properties(7).

4.2 Campi mancanti in Source e Target in DoWrite

File: sgcAMQP1_Frames.pas
Il metodo DoWrite del descriptor Source non serializzava i campi default-outcome, outcomes e capabilities. Al descriptor Target mancavano le capabilities. Questo causava l'uso da parte del broker dei valori di default invece di quelli negoziati, portando potenzialmente a una gestione errata dello stato di delivery.

4.3 AmqpSequence letto nella proprietà sbagliata

File: sgcAMQP1_Message.pas
Durante la lettura del body del messaggio, i dati amqp-sequence venivano letti in ApplicationData.AMQPValue invece di ApplicationData.AMQPSequence. Questo corrompeva il body del messaggio per qualsiasi messaggio che usasse la codifica amqp-sequence.

4.4 Outcome di TransactionalState non scritto

File: sgcAMQP1_Frames.pas
Lo stato di delivery transactional-state non serializzava il suo campo outcome, che è obbligatorio quando si chiudono delivery all'interno di una transazione.

4.5 Il campo Last di Disposition non distingue zero da unset

File: sgcAMQP1_Frames.pas, sgcAMQP1_Frames.intf, sgcAMQP1_Session.pas
Il performative disposition ha un campo opzionale last (delivery-id). Dato che è un Cardinal, il valore 0 è valido e non può essere usato come sentinella per “non impostato.” Sono stati aggiunti un nuovo flag booleano FLastAssigned e un setter SetLast per tenere traccia correttamente di quando il campo è stato impostato esplicitamente.

procedure TsgcAMQP1FrameDisposition.SetLast(const Value: Cardinal);
begin
  FLast := Value;
  FLastAssigned := True;
end;

4.6 AmqpSequence senza proprietà Value e senza Read/Write

File: sgcAMQP1_Frames.pas, sgcAMQP1_Frames.intf
La classe TsgcAMQP1FrameAmqpSequence non aveva una proprietà Value e aveva metodi DoRead/DoWrite vuoti. Il tipo di body amqp-sequence era completamente non funzionante.

4.7 Campo Info di Error letto come string invece di map

File: sgcAMQP1_Frames.pas
La specifica AMQP 1.0 definisce il campo info del tipo error come map. Veniva letto con ReadString invece di ReadMap, causando fallimenti di parsing quando i broker inviano informazioni di errore strutturate.

4.8 Capabilities e Locales scritti come string invece di symbol

File: sgcAMQP1_Frames.pas
La specifica AMQP 1.0 definisce offered-capabilities, desired-capabilities, outgoing-locales e incoming-locales come array di symbol. Venivano scritti con WriteString invece di WriteSymbol nei performative open, begin e attach. Un broker conforme agli standard rifiuterebbe questi frame come aventi tipi di campo non corretti.

4.9 Descriptor Accepted mancante nell'handler DefaultOutcome

File: sgcAMQP1_Frames.pas
Il reader del default-outcome del descriptor Source gestiva solo released e rejected. L'outcome più comune, accepted, non veniva gestito. Quando un broker inviava accepted come default outcome, veniva silenziosamente ignorato.


5. Correzioni macchina a stati e connessione

Queste correzioni affrontano la macchina a stati della connessione AMQP 1.0 e la logica di elaborazione dei frame.

Correzione File Descrizione
5.1 sgcAMQP1.pas La transizione di stato amqp1csOpenReceived mancava del else DoRaiseInvalidState che avevano tutti gli altri stati. Le transizioni non valide venivano ignorate silenziosamente invece di sollevare un errore.
5.2 sgcAMQP1.pas Il messaggio di errore di validazione della dimensione del frame mostrava RemoteMaxFrameSize ma avrebbe dovuto mostrare LocalMaxFrameSize, dato che il limite locale è ciò che veniva controllato.
5.3 sgcAMQP1.pas FLastTimeRead veniva inizializzato a 0 (epoca Delphi: 1899-12-30) invece di Now. Questo causava un rilevamento immediato di idle timeout falso all'avvio.
5.4 sgcAMQP1.pas L'overload TBytes di Read non aggiornava FLastTimeRead := Now, a differenza dell'overload TMemoryStream. Questo causava un tracking inconsistente dell'heartbeat.
5.5 sgcAMQP1.pas La transizione di stato header received era condizionale quando avrebbe dovuto attivarsi sempre. La macchina a stati deve transitare a ogni scambio di header valido secondo la specifica AMQP 1.0.

Fix 5.1 – ramo di errore mancante in OpenReceived

amqp1csOpenReceived:
  begin
    if aState = amqp1csOpenSent then
      FConnectionState := amqp1csOpened
    else
      DoRaiseInvalidState;  // Added: was missing
  end;

6. Correzioni heartbeat e idle timeout

La specifica AMQP 1.0 richiede che quando un peer pubblicizza un idle-timeout nel performative open, l'altro peer debba inviare frame di heartbeat a metà dell'intervallo pubblicizzato. Queste correzioni assicurano che il meccanismo di heartbeat funzioni effettivamente.

Correzione File Descrizione
6.1 sgcAMQP1_Client.pas HeartBeat non veniva mai abilitato. Entrambi i rami del controllo idle timeout impostavano HeartBeat.Enabled := False. Cambiato a True quando IdleTimeout > 0.
6.2 sgcAMQP1_Client.pas Disconnect non disabilitava l'heartbeat né impostava FConnected := False abbastanza presto. Riordinato per evitare che l'heartbeat venga generato durante il teardown.
6.3 sgcAMQP1.pas FLastTimeRead non aggiornato nell'overload TBytes di Read (elencato anche nella sezione macchina a stati).

Fix 6.1 – HeartBeat abilitato

if oOpen.IdleTimeout > 0 then
begin
  HeartBeat.Interval := Trunc(oOpen.IdleTimeout / 2);
  HeartBeat.Enabled := True;   // Was: False (heartbeat never started)
end
else
  HeartBeat.Enabled := False;

Fix 6.2 – Disconnect riordinato

procedure TsgcAMQP1_Client.Disconnect;
begin
  FConnected := False;           // Moved first: prevents heartbeat race
  DoStopIdleTimeout;
  HeartBeat.Enabled := False;    // Added: stop heartbeat during teardown
  Clear;
  DoConnectionState(amqp1csEnd);
end;

7. Correzioni di thread safety

Queste correzioni affrontano race condition nell'accesso concorrente a strutture dati condivise.

7.1 TsgcAMQP1Deliveries.First() controllo dei limiti e locking

File: sgcAMQP1_Message.pas
Il metodo First() accedeva a Items[0] senza controllare se la lista fosse vuota e senza acquisire il lock thread-safe. In un ambiente multi-thread, un altro thread poteva rimuovere tutti gli elementi tra il controllo del count e l'accesso, causando un'eccezione di index-out-of-bounds.

Dopo (corretto)

function TsgcAMQP1Deliveries.First: TsgcAMQP1Delivery;
var
  oList: TList;
begin
  result := nil;
  oList := LockList;
  Try
    if oList.Count > 0 then
      result := TsgcAMQP1Delivery(oList[0]);
  Finally
    UnLockList;
  End;
end;

7.2 Sostituzione sicura di oggetto in SetMessage

File: sgcAMQP1_Message.pas
Il metodo SetMessage ora controlla che il nuovo messaggio sia diverso da quello corrente prima di liberarlo, evitando use-after-free quando si assegna lo stesso oggetto messaggio.


8. File modificati

File Correzioni Categorie
Source\sgcAMQP1_Classes.pas 2 Bug critici
Source\sgcAMQP1_Frames.pas 16 Bug critici, memory leak, conformità alla specifica
Interfaces\sgcAMQP1_Frames.intf 2 Conformità alla specifica (Disposition LastAssigned, AmqpSequence Value)
Source\sgcAMQP1_Message.pas 4 Bug critici, memory leak, thread safety
Source\sgcAMQP1_Session.pas 3 Memory leak, conformità alla specifica
Source\sgcAMQP1.pas 5 Macchina a stati, connessione, heartbeat
Source\sgcAMQP1_Client.pas 3 Heartbeat, sicurezza di disconnect

Totale: 30 correzioni su 8 file, che migliorano la correttezza del protocollo, la sicurezza della memoria e l'affidabilità dell'implementazione AMQP 1.0 rispetto alla specifica OASIS.