AMQP 1-client Delphi-update

· Functies

De AMQP 1.0-protocolimplementatie in sgcWebSockets heeft een uitgebreide review ondergaan tegen de OASIS AMQP 1.0-specificatie. Dit artikel documenteert de 30 fixes die zijn toegepast op 8 broncode-bestanden, voor kritieke bugs, memory leaks, spec-conformiteit, correctheid van de state machine, heartbeat-afhandeling en verbeteringen in thread-veiligheid.

Inhoudsopgave

  1. Overzicht
  2. Kritieke bugfixes
  3. Memory leak-fixes
  4. Spec-conformiteitsfixes
  5. State machine- en verbindingsfixes
  6. Heartbeat- en idle timeout-fixes
  7. Thread-veiligheidsfixes
  8. Gewijzigde bestanden

1. Overzicht

In totaal zijn 30 fixes toegepast op 6 broncode-bestanden en 2 interface-bestanden in de AMQP 1.0-implementatie. De fixes zijn als volgt gecategoriseerd:

Categorie Aantal Ernst
Kritieke bugfixes 6 Kritiek
Memory leak-fixes 4 Kritiek
Spec-conformiteit 10 Hoog
State machine en verbinding 5 Hoog
Heartbeat en idle timeout 3 Hoog
Thread-veiligheid 2 Hoog

2. Kritieke bugfixes

Deze fixes verhelpen bugs die directe runtime-storingen, protocolcorruptie of onjuist gedrag veroorzaakten tijdens AMQP 1.0-communicatie.

2.1 Ontbrekend raise-keyword in exception

Bestand: sgcAMQP1_Classes.pas
Een exception-object werd aangemaakt maar nooit opgeworpen wanneer een ongeldig frametype werd aangetroffen. De exception werd stilletjes weggegooid, waardoor corrupte frames konden worden verwerkt zonder dat dit werd opgemerkt.

Voor (kapot)

TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);

Na (opgelost)

raise TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);

2.2 WriteMap Map32 ontbrekende data en verkeerde grootte

Bestand: sgcAMQP1_Classes.pas
In het Map32-encoderpad ontbrak de WriteBytes-aanroep voor de eigenlijke map-data, en het size-veld gebruikte een onjuiste offset. Map32 gebruikt een 4-byte count-veld (in tegenstelling tot Map8 dat 1 byte gebruikt), dus moet de grootte 4 extra bytes bevatten.

Voor (kapot)

else
begin
  WriteUByte(Ord(amqp1DataMap32));
  _WriteUInt32(vSize + 1);
  _WriteUInt32(oJSON.Count * 2);
  // Missing: WriteBytes(oArray.Bytes);
end;

Na (opgelost)

else
begin
  WriteUByte(Ord(amqp1DataMap32));
  _WriteUInt32(vSize + 4);
  _WriteUInt32(oJSON.Count * 2);
  WriteBytes(oArray.Bytes);
end;

2.3 Geïnverteerde ContainsError-logica

Bestand: sgcAMQP1_Frames.pas
De methode ContainsError in zowel TsgcAMQP1FrameRejected als TsgcAMQP1DescribedListError gaf True terug wanneer er geen fout was en False wanneer er wel een fout was. Hierdoor werd foutinformatie stilletjes weggegooid en werden null-bytes geschreven wanneer echte fouten geserialiseerd hadden moeten worden. De takken DoWrite en DoWriteError werden ook verwisseld om bij de gecorrigeerde logica te passen.

Voor (kapot)

function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
  if not Assigned(FError) then
    Result := True              // Wrong: True when no error
  else
    Result := (Error.Condition = '') and (Error.Description = '') and
      (Error.Info = '');     // Wrong: True when all empty
end;

Na (opgelost)

function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
  if not Assigned(FError) then
    Result := False             // Correct: False when no error
  else
    Result := (Error.Condition <> '') or (Error.Description <> '') or
      (Error.Info <> '');    // Correct: True when any field set
end;

2.4 SASL PLAIN null-byte-scheidingstekens

Bestand: sgcAMQP1_Frames.pas
Het SASL PLAIN-mechanisme vereist het formaat \0username\0password met null-byte- scheidingstekens ($00). De implementatie initialiseerde de byte-array niet op nul, waardoor de scheidingsteken-posities rommel bevatten. Authenticatie zou mislukken tegen elke standaardconforme AMQP 1.0-broker.

Na (opgelost)

SetLength(FInitialResponse, 2 + Length(oUser) + Length(oPassword));
FillChar(FInitialResponse[0], Length(FInitialResponse), 0);  // Zero-fill for null separators

2.5 Ontbrekende inherited Create in TsgcAMQP1Message

Bestand: sgcAMQP1_Message.pas
De geparametriseerde constructor van TsgcAMQP1Message riep inherited Create niet aan, waardoor de basisklasse nooit werd geïnitialiseerd. Dit veroorzaakte access violations of een corrupte state bij gebruik van de convenience-constructor.

Na (opgelost)

constructor TsgcAMQP1Message.Create(const aValue: string);
begin
  inherited Create;
  ApplicationData.ValueType := amqp1adtAmqpValue;
  ApplicationData.AMQPValue.Value := aValue;
end;

2.6 Ontbrekende puntkomma in AmqpValue.DoRead

Bestand: sgcAMQP1_Frames.pas
Een ontbrekende puntkomma in TsgcAMQP1FrameAmqpValue.DoRead zou compilatie verhinderen.


3. Memory leak-fixes

Deze fixes verhelpen problemen met objectlevensduurbeheer waardoor geheugen mettertijd zou ophopen, met name tijdens langlopende AMQP 1.0-verbindingen met veel berichtenuitwisseling.

Fix Bestand Beschrijving
3.1 sgcAMQP1_Frames.pas FDefaultOutcome werd niet vrijgegeven vóór toewijzing bij het lezen van de Source-descriptor. Telkens wanneer een nieuw default outcome werd ontvangen, lekte het vorige object.
3.2 sgcAMQP1_Session.pas Dubbele sgcFree(FCreditConsumed)-aanroep in de destructor veroorzaakte een potentiële double-free. De dubbele regel is verwijderd.
3.3 sgcAMQP1_Session.pas FOutgoingDeliveries ontbrak in de session-destructor. De tracking-lijst voor deliveries werd nooit vrijgegeven wanneer een sessie werd vernietigd.
3.4 sgcAMQP1_Message.pas SetMessage en SetMessageAndFreeOnDestroy gaven het vorige bericht niet vrij wanneer FFreeMessageOnDestroy was ingeschakeld. Herhaalde toewijzingen van berichten lekten geheugen.

Fix 3.1 – FDefaultOutcome vrijgegeven vóór toewijzing

sgcFree(FDefaultOutcome);  // Free previous instance before reassignment
if oDescriptor.Code = amqp1dcptReleased then
  FDefaultOutcome := TsgcAMQP1FrameReleased.Create
else if oDescriptor.Code = amqp1dcptAccepted then
  FDefaultOutcome := TsgcAMQP1FrameAccepted.Create
else if oDescriptor.Code = amqp1dcptRejected then
  FDefaultOutcome := TsgcAMQP1FrameRejected.Create

Fix 3.4 – SetMessage geeft het oude bericht vrij

procedure TsgcAMQP1Delivery.SetMessage(const aMessage: TsgcAMQP1Message);
begin
  if FFreeMessageOnDestroy and Assigned(F_Message) and (F_Message <> aMessage) then
    sgcFree(F_Message);
  FFreeMessageOnDestroy := False;
  F_Message := aMessage;
end;

4. Spec-conformiteitsfixes

Deze fixes corrigeren afwijkingen van de specificaties AMQP 1.0 Transport, Types en Messaging.

4.1 Begin-frame veld 7 gelezen in verkeerde eigenschap

Bestand: sgcAMQP1_Frames.pas
Veldindex 7 van het begin-performative werd gelezen in DesiredCapabilities in plaats van Properties. Volgens de spec zijn de begin-velden: remote-channel(0), next-outgoing-id(1), incoming-window(2), outgoing-window(3), handle-max(4), offered-capabilities(5), desired-capabilities(6), properties(7).

4.2 Ontbrekende velden in Source en Target DoWrite

Bestand: sgcAMQP1_Frames.pas
De DoWrite-methode van de Source-descriptor serialiseerde de velden default-outcome, outcomes en capabilities niet. De Target-descriptor miste capabilities. Hierdoor gebruikte de broker defaults in plaats van de onderhandelde waarden, wat kon leiden tot onjuiste afhandeling van de delivery state.

4.3 AmqpSequence gelezen in verkeerde eigenschap

Bestand: sgcAMQP1_Message.pas
Bij het lezen van het berichtlichaam werd amqp-sequence-data gelezen in ApplicationData.AMQPValue in plaats van ApplicationData.AMQPSequence. Dit corrumpeerde het berichtlichaam voor elk bericht dat de amqp-sequence-codering gebruikte.

4.4 TransactionalState outcome niet geschreven

Bestand: sgcAMQP1_Frames.pas
De delivery state transactional-state serialiseerde haar outcome-veld niet, wat vereist is bij het afronden van deliveries binnen een transactie.

4.5 Disposition Last-veld kan nul niet onderscheiden van niet-ingesteld

Bestanden: sgcAMQP1_Frames.pas, sgcAMQP1_Frames.intf, sgcAMQP1_Session.pas
Het disposition-performative heeft een optioneel last-veld (delivery-id). Aangezien dit een Cardinal is, is de waarde 0 geldig en kan deze niet worden gebruikt als sentinel voor “niet ingesteld”. Een nieuwe boolean-vlag FLastAssigned en setter SetLast zijn toegevoegd om netjes bij te houden of het veld expliciet is ingesteld.

procedure TsgcAMQP1FrameDisposition.SetLast(const Value: Cardinal);
begin
  FLast := Value;
  FLastAssigned := True;
end;

4.6 AmqpSequence miste Value-eigenschap en Read/Write

Bestanden: sgcAMQP1_Frames.pas, sgcAMQP1_Frames.intf
De klasse TsgcAMQP1FrameAmqpSequence had geen Value-eigenschap en lege DoRead/DoWrite- methoden. Het lichaamstype amqp-sequence was volledig niet functioneel.

4.7 Error Info-veld gelezen als string in plaats van map

Bestand: sgcAMQP1_Frames.pas
De AMQP 1.0-spec definieert het info-veld van het error-type als een map. Het werd gelezen met ReadString in plaats van ReadMap, wat leidde tot parsing-fouten wanneer brokers gestructureerde foutinformatie verstuurden.

4.8 Capabilities en Locales geschreven als string in plaats van symbol

Bestand: sgcAMQP1_Frames.pas
De AMQP 1.0-spec definieert offered-capabilities, desired-capabilities, outgoing-locales en incoming-locales als arrays van symbol. Ze werden geschreven met WriteString in plaats van WriteSymbol in de performatives open, begin en attach. Een standaardconforme broker zou deze frames weigeren omdat ze onjuiste veldtypes hebben.

4.9 Ontbrekende Accepted-descriptor in DefaultOutcome-handler

Bestand: sgcAMQP1_Frames.pas
De default-outcome-reader van de Source-descriptor verwerkte alleen released en rejected. De meest voorkomende outcome, accepted, werd niet verwerkt. Wanneer een broker accepted verstuurde als default outcome, werd dat stilletjes genegeerd.


5. State machine- en verbindingsfixes

Deze fixes verhelpen de state machine van de AMQP 1.0-verbinding en de frameverwerkingslogica.

Fix Bestand Beschrijving
5.1 sgcAMQP1.pas Bij de state-overgang amqp1csOpenReceived ontbrak de else DoRaiseInvalidState die alle andere states wel hadden. Ongeldige overgangen werden stilletjes genegeerd in plaats van een fout op te werpen.
5.2 sgcAMQP1.pas De foutmelding voor framesize-validatie toonde RemoteMaxFrameSize maar had LocalMaxFrameSize moeten tonen, omdat de lokale limiet werd gecontroleerd.
5.3 sgcAMQP1.pas FLastTimeRead werd geïnitialiseerd op 0 (Delphi-epoch: 1899-12-30) in plaats van Now. Dit veroorzaakte onmiddellijke valse idle-timeout-detectie bij het opstarten.
5.4 sgcAMQP1.pas De TBytes-overload van Read werkte FLastTimeRead := Now niet bij, in tegenstelling tot de TMemoryStream-overload. Dit veroorzaakte inconsistente heartbeat-tracking.
5.5 sgcAMQP1.pas De state-overgang voor 'header ontvangen' was conditioneel terwijl deze altijd moest worden geactiveerd. De state machine moet bij elke geldige header-uitwisseling overgaan volgens de AMQP 1.0-spec.

Fix 5.1 – OpenReceived mist error-tak

amqp1csOpenReceived:
  begin
    if aState = amqp1csOpenSent then
      FConnectionState := amqp1csOpened
    else
      DoRaiseInvalidState;  // Added: was missing
  end;

6. Heartbeat- en idle timeout-fixes

De AMQP 1.0-specificatie vereist dat wanneer een peer een idle-timeout aanbiedt in het open-performative, de andere peer heartbeat-frames moet versturen op de helft van het aangegeven interval. Deze fixes zorgen dat het heartbeat-mechanisme daadwerkelijk werkt.

Fix Bestand Beschrijving
6.1 sgcAMQP1_Client.pas HeartBeat werd nooit ingeschakeld. Beide takken van de idle-timeout-check zetten HeartBeat.Enabled := False. Aangepast naar True wanneer IdleTimeout > 0.
6.2 sgcAMQP1_Client.pas Disconnect schakelde heartbeat niet uit en zette FConnected := False niet vroeg genoeg. Volgorde aangepast om te voorkomen dat heartbeat afgaat tijdens teardown.
6.3 sgcAMQP1.pas FLastTimeRead niet bijgewerkt in de TBytes Read-overload (ook genoemd in de sectie State Machine).

Fix 6.1 – HeartBeat ingeschakeld

if oOpen.IdleTimeout > 0 then
begin
  HeartBeat.Interval := Trunc(oOpen.IdleTimeout / 2);
  HeartBeat.Enabled := True;   // Was: False (heartbeat never started)
end
else
  HeartBeat.Enabled := False;

Fix 6.2 – Disconnect heringedeeld

procedure TsgcAMQP1_Client.Disconnect;
begin
  FConnected := False;           // Moved first: prevents heartbeat race
  DoStopIdleTimeout;
  HeartBeat.Enabled := False;    // Added: stop heartbeat during teardown
  Clear;
  DoConnectionState(amqp1csEnd);
end;

7. Thread-veiligheidsfixes

Deze fixes verhelpen race conditions bij gelijktijdige toegang tot gedeelde datastructuren.

7.1 TsgcAMQP1Deliveries.First() bounds-check en locking

Bestand: sgcAMQP1_Message.pas
De methode First() opende Items[0] zonder te controleren of de lijst leeg was en zonder de thread-safe lock op te halen. In een multi-threaded-omgeving kon een andere thread alle items verwijderen tussen de count-check en de toegang, met een index-out-of-bounds-exception tot gevolg.

Na (opgelost)

function TsgcAMQP1Deliveries.First: TsgcAMQP1Delivery;
var
  oList: TList;
begin
  result := nil;
  oList := LockList;
  Try
    if oList.Count > 0 then
      result := TsgcAMQP1Delivery(oList[0]);
  Finally
    UnLockList;
  End;
end;

7.2 SetMessage veilige objectvervanging

Bestand: sgcAMQP1_Message.pas
De methode SetMessage controleert nu dat het nieuwe bericht verschilt van het huidige voordat dit wordt vrijgegeven, om use-after-free te voorkomen bij het toewijzen van hetzelfde berichtobject.


8. Gewijzigde bestanden

Bestand Fixes Categorieën
Source\sgcAMQP1_Classes.pas 2 Kritieke bugs
Source\sgcAMQP1_Frames.pas 16 Kritieke bugs, memory leaks, spec-conformiteit
Interfaces\sgcAMQP1_Frames.intf 2 Spec-conformiteit (Disposition LastAssigned, AmqpSequence Value)
Source\sgcAMQP1_Message.pas 4 Kritieke bugs, memory leaks, thread-veiligheid
Source\sgcAMQP1_Session.pas 3 Memory leaks, spec-conformiteit
Source\sgcAMQP1.pas 5 State machine, verbinding, heartbeat
Source\sgcAMQP1_Client.pas 3 Heartbeat, disconnect-veiligheid

Totaal: 30 fixes over 8 bestanden, voor verbetering van protocolcorrectheid, geheugenveiligheid en betrouwbaarheid van de AMQP 1.0-implementatie ten opzichte van de OASIS-specificatie.