AMQP 1 Client Delphi Update

· Funktionen

Die AMQP-1.0-Protokollimplementierung in sgcWebSockets wurde einer umfassenden Überprüfung gegen die OASIS-AMQP-1.0-Spezifikation unterzogen. Dieser Artikel dokumentiert die 30 Fixes, die auf 8 Quelldateien angewendet wurden, und deckt kritische Bugs, Memory Leaks, Spec-Compliance, Korrektheit des Zustandsautomaten, Heartbeat-Handling und Verbesserungen der Thread-Sicherheit ab.

Inhaltsverzeichnis

  1. Übersicht
  2. Kritische Bugfixes
  3. Memory-Leak-Fixes
  4. Spec-Compliance-Fixes
  5. Zustandsautomat & Verbindungs-Fixes
  6. Heartbeat- & Idle-Timeout-Fixes
  7. Thread-Safety-Fixes
  8. Geänderte Dateien

1. Übersicht

Insgesamt wurden 30 Fixes auf 6 Quelldateien und 2 Interface-Dateien in der AMQP-1.0-Implementierung angewendet. Die Fixes sind wie folgt kategorisiert:

Kategorie Anzahl Schweregrad
Kritische Bugfixes 6 Kritisch
Memory-Leak-Fixes 4 Kritisch
Spec-Compliance 10 Hoch
Zustandsautomat & Verbindung 5 Hoch
Heartbeat & Idle-Timeout 3 Hoch
Thread-Safety 2 Hoch

2. Kritische Bugfixes

Diese Fixes beheben Bugs, die zu sofortigen Laufzeitfehlern, Protokollkorruption oder inkorrektem Verhalten während der AMQP-1.0-Kommunikation führen würden.

2.1 Fehlendes raise-Schlüsselwort bei Exception

Datei: sgcAMQP1_Classes.pas
Ein Exception-Objekt wurde erstellt, aber nie ausgelöst, wenn ein ungültiger Frame-Typ auftrat. Die Exception wurde still verworfen, sodass beschädigte Frames unbemerkt verarbeitet werden konnten.

Vorher (Defekt)

TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);

Nachher (Korrigiert)

raise TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);

2.2 WriteMap Map32: fehlende Daten und falsche Größe

Datei: sgcAMQP1_Classes.pas
Im Map32-Encoding-Pfad fehlte der Aufruf von WriteBytes für die tatsächlichen Map-Daten, und das Size-Feld verwendete einen falschen Offset. Map32 nutzt ein 4-Byte-Count-Feld (anders als Map8, das 1 Byte nutzt), daher muss die Größe 4 zusätzliche Bytes enthalten.

Vorher (Defekt)

else
begin
  WriteUByte(Ord(amqp1DataMap32));
  _WriteUInt32(vSize + 1);
  _WriteUInt32(oJSON.Count * 2);
  // Missing: WriteBytes(oArray.Bytes);
end;

Nachher (Korrigiert)

else
begin
  WriteUByte(Ord(amqp1DataMap32));
  _WriteUInt32(vSize + 4);
  _WriteUInt32(oJSON.Count * 2);
  WriteBytes(oArray.Bytes);
end;

2.3 Invertierte ContainsError-Logik

Datei: sgcAMQP1_Frames.pas
Die Methode ContainsError in sowohl TsgcAMQP1FrameRejected als auch TsgcAMQP1DescribedListError lieferte True zurück, wenn kein Fehler vorlag, und False, wenn ein Fehler vorlag. Dadurch wurden Fehlerinformationen still verworfen und Null-Bytes geschrieben, obwohl tatsächliche Fehler hätten serialisiert werden sollen. Die Zweige DoWrite und DoWriteError wurden ebenfalls vertauscht, um zur korrigierten Logik zu passen.

Vorher (Defekt)

function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
  if not Assigned(FError) then
    Result := True              // Wrong: True when no error
  else
    Result := (Error.Condition = '') and (Error.Description = '') and
      (Error.Info = '');     // Wrong: True when all empty
end;

Nachher (Korrigiert)

function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
  if not Assigned(FError) then
    Result := False             // Correct: False when no error
  else
    Result := (Error.Condition <> '') or (Error.Description <> '') or
      (Error.Info <> '');    // Correct: True when any field set
end;

2.4 SASL-PLAIN-Null-Byte-Separatoren

Datei: sgcAMQP1_Frames.pas
Der SASL-PLAIN-Mechanismus erfordert das Format \0username\0password mit Null-Byte- ($00)-Separatoren. Die Implementierung initialisierte das Byte-Array nicht mit Nullen, sodass die Separator-Positionen Müll-Daten enthielten. Die Authentifizierung würde gegen jeden standardkonformen AMQP-1.0-Broker fehlschlagen.

Nachher (Korrigiert)

SetLength(FInitialResponse, 2 + Length(oUser) + Length(oPassword));
FillChar(FInitialResponse[0], Length(FInitialResponse), 0);  // Zero-fill for null separators

2.5 Fehlender inherited Create-Aufruf in TsgcAMQP1Message

Datei: sgcAMQP1_Message.pas
Der parametrisierte Konstruktor von TsgcAMQP1Message rief inherited Create nicht auf, was bedeutete, dass die Basisklasse nie initialisiert wurde. Das führte zu Access Violations oder korrupten Zuständen bei Verwendung des Convenience-Konstruktors.

Nachher (Korrigiert)

constructor TsgcAMQP1Message.Create(const aValue: string);
begin
  inherited Create;
  ApplicationData.ValueType := amqp1adtAmqpValue;
  ApplicationData.AMQPValue.Value := aValue;
end;

2.6 Fehlender Strichpunkt in AmqpValue.DoRead

Datei: sgcAMQP1_Frames.pas
Ein fehlender Strichpunkt in TsgcAMQP1FrameAmqpValue.DoRead hätte die Kompilierung verhindert.


3. Memory-Leak-Fixes

Diese Fixes beheben Probleme beim Object-Lifetime-Management, die zu einer Speicherakkumulation über die Zeit geführt hätten, besonders bei lange laufenden AMQP-1.0-Verbindungen mit vielen Nachrichtenaustauschen.

Fix File Beschreibung
3.1 sgcAMQP1_Frames.pas FDefaultOutcome wurde vor der Neuzuweisung beim Lesen des Source-Descriptors nicht freigegeben. Jedes Mal, wenn ein neues Default-Outcome empfangen wurde, leakte das vorherige Objekt.
3.2 sgcAMQP1_Session.pas Ein doppelter sgcFree(FCreditConsumed)-Aufruf im Destruktor führte zu einem potenziellen Double-Free. Die doppelte Zeile wurde entfernt.
3.3 sgcAMQP1_Session.pas FOutgoingDeliveries fehlte im Session-Destruktor. Die Delivery-Tracking-Liste wurde beim Zerstören einer Session nie freigegeben.
3.4 sgcAMQP1_Message.pas SetMessage und SetMessageAndFreeOnDestroy gaben die vorherige Nachricht nicht frei, wenn FFreeMessageOnDestroy aktiviert war. Wiederholte Nachrichten-Zuweisungen leakten Speicher.

Fix 3.1 – FDefaultOutcome wird vor Neuzuweisung freigegeben

sgcFree(FDefaultOutcome);  // Free previous instance before reassignment
if oDescriptor.Code = amqp1dcptReleased then
  FDefaultOutcome := TsgcAMQP1FrameReleased.Create
else if oDescriptor.Code = amqp1dcptAccepted then
  FDefaultOutcome := TsgcAMQP1FrameAccepted.Create
else if oDescriptor.Code = amqp1dcptRejected then
  FDefaultOutcome := TsgcAMQP1FrameRejected.Create

Fix 3.4 – SetMessage gibt alte Nachricht frei

procedure TsgcAMQP1Delivery.SetMessage(const aMessage: TsgcAMQP1Message);
begin
  if FFreeMessageOnDestroy and Assigned(F_Message) and (F_Message <> aMessage) then
    sgcFree(F_Message);
  FFreeMessageOnDestroy := False;
  F_Message := aMessage;
end;

4. Spec-Compliance-Fixes

Diese Fixes korrigieren Abweichungen von den AMQP-1.0-Transport-, Types- und Messaging-Spezifikationen.

4.1 Begin-Frame-Feld 7 in falsche Eigenschaft eingelesen

Datei: sgcAMQP1_Frames.pas
Feldindex 7 des begin-Performatives wurde in DesiredCapabilities statt in Properties eingelesen. Laut Spec sind die Begin-Felder: remote-channel(0), next-outgoing-id(1), incoming-window(2), outgoing-window(3), handle-max(4), offered-capabilities(5), desired-capabilities(6), properties(7).

4.2 Source und Target: fehlende Felder in DoWrite

Datei: sgcAMQP1_Frames.pas
Die Methode DoWrite des Source-Descriptors serialisierte die Felder default-outcome, outcomes und capabilities nicht. Beim Target-Descriptor fehlte capabilities. Dadurch verwendete der Broker Standardwerte statt der ausgehandelten Werte, was potenziell zu inkorrektem Delivery-State-Handling führte.

4.3 AmqpSequence in falsche Eigenschaft eingelesen

Datei: sgcAMQP1_Message.pas
Beim Lesen des Nachrichten-Bodys wurden amqp-sequence-Daten in ApplicationData.AMQPValue statt in ApplicationData.AMQPSequence eingelesen. Das beschädigte den Message-Body für jede Nachricht, die das amqp-sequence-Encoding verwendet.

4.4 TransactionalState: Outcome nicht geschrieben

Datei: sgcAMQP1_Frames.pas
Der transactional-state-Delivery-State serialisierte sein outcome-Feld nicht, das erforderlich ist, wenn Deliveries innerhalb einer Transaktion abgeschlossen werden.

4.5 Disposition Last-Feld: Null kann nicht von "ungesetzt" unterschieden werden

Dateien: sgcAMQP1_Frames.pas, sgcAMQP1_Frames.intf, sgcAMQP1_Session.pas
Das disposition-Performative hat ein optionales last-Feld (delivery-id). Da es ein Cardinal ist, ist der Wert 0 gültig und kann nicht als Sentinel für “nicht gesetzt” verwendet werden. Ein neues FLastAssigned- Boolean-Flag und ein SetLast-Setter wurden hinzugefügt, um zu tracken, ob das Feld explizit gesetzt wurde.

procedure TsgcAMQP1FrameDisposition.SetLast(const Value: Cardinal);
begin
  FLast := Value;
  FLastAssigned := True;
end;

4.6 AmqpSequence: fehlende Value-Eigenschaft und Read/Write

Dateien: sgcAMQP1_Frames.pas, sgcAMQP1_Frames.intf
Die Klasse TsgcAMQP1FrameAmqpSequence hatte keine Value-Eigenschaft und leere DoRead-/DoWrite- Methoden. Der amqp-sequence-Body-Typ war komplett nicht funktionsfähig.

4.7 Error-Info-Feld als String statt als Map gelesen

Datei: sgcAMQP1_Frames.pas
Die AMQP-1.0-Spec definiert das info-Feld des error-Typs als map. Es wurde mit ReadString statt mit ReadMap gelesen, was zu Parsing-Fehlern führte, wenn Broker strukturierte Fehlerinformationen sendeten.

4.8 Capabilities und Locales als String statt als Symbol geschrieben

Datei: sgcAMQP1_Frames.pas
Die AMQP-1.0-Spec definiert offered-capabilities, desired-capabilities, outgoing-locales und incoming-locales als Arrays von symbol. Sie wurden mit WriteString statt mit WriteSymbol in den open-, begin- und attach-Performatives geschrieben. Ein standardkonformer Broker würde diese Frames mit falschen Feldtypen ablehnen.

4.9 Fehlender Accepted-Descriptor im DefaultOutcome-Handler

Datei: sgcAMQP1_Frames.pas
Der default-outcome-Reader des Source-Descriptors behandelte nur released und rejected. Das häufigste Outcome, accepted, wurde nicht behandelt. Wenn ein Broker accepted als Default-Outcome sendete, wurde es still ignoriert.


5. Zustandsautomat & Verbindungs-Fixes

Diese Fixes betreffen den AMQP-1.0-Verbindungs-Zustandsautomaten und die Frame-Verarbeitungslogik.

Fix File Beschreibung
5.1 sgcAMQP1.pas Bei der Zustandstransition amqp1csOpenReceived fehlte das else DoRaiseInvalidState, das alle anderen Zustände hatten. Ungültige Übergänge wurden still ignoriert, statt einen Fehler auszulösen.
5.2 sgcAMQP1.pas Die Fehlermeldung der Frame-Size-Validierung zeigte RemoteMaxFrameSize an, hätte aber LocalMaxFrameSize anzeigen sollen, da das lokale Limit überprüft wurde.
5.3 sgcAMQP1.pas FLastTimeRead wurde mit 0 (Delphi-Epoch: 1899-12-30) statt mit Now initialisiert. Das führte beim Start zu sofortiger fälschlicher Idle-Timeout-Erkennung.
5.4 sgcAMQP1.pas Der TBytes-Overload von Read aktualisierte FLastTimeRead := Now nicht, anders als der TMemoryStream-Overload. Das führte zu inkonsistentem Heartbeat-Tracking.
5.5 sgcAMQP1.pas Die Zustandstransition "Header received" war konditional, obwohl sie immer ausgelöst werden sollte. Der Zustandsautomat muss laut AMQP-1.0-Spec bei jedem gültigen Header-Austausch übergehen.

Fix 5.1 – Fehlender Error-Zweig bei OpenReceived

amqp1csOpenReceived:
  begin
    if aState = amqp1csOpenSent then
      FConnectionState := amqp1csOpened
    else
      DoRaiseInvalidState;  // Added: was missing
  end;

6. Heartbeat- & Idle-Timeout-Fixes

Die AMQP-1.0-Spezifikation erfordert, dass, wenn ein Peer einen idle-timeout im open-Performative ankündigt, der andere Peer Heartbeat-Frames in der Hälfte des angekündigten Intervalls senden muss. Diese Fixes stellen sicher, dass der Heartbeat-Mechanismus tatsächlich funktioniert.

Fix File Beschreibung
6.1 sgcAMQP1_Client.pas HeartBeat wurde nie aktiviert. Beide Zweige der Idle-Timeout-Prüfung setzten HeartBeat.Enabled := False. Geändert zu True bei IdleTimeout > 0.
6.2 sgcAMQP1_Client.pas Disconnect deaktivierte den Heartbeat nicht oder setzte FConnected := False nicht früh genug. Die Reihenfolge wurde geändert, um zu verhindern, dass der Heartbeat während des Teardowns ausgelöst wird.
6.3 sgcAMQP1.pas FLastTimeRead wird im TBytes-Read-Overload nicht aktualisiert (auch im Zustandsautomat-Abschnitt aufgeführt).

Fix 6.1 – HeartBeat aktiviert

if oOpen.IdleTimeout > 0 then
begin
  HeartBeat.Interval := Trunc(oOpen.IdleTimeout / 2);
  HeartBeat.Enabled := True;   // Was: False (heartbeat never started)
end
else
  HeartBeat.Enabled := False;

Fix 6.2 – Disconnect-Reihenfolge geändert

procedure TsgcAMQP1_Client.Disconnect;
begin
  FConnected := False;           // Moved first: prevents heartbeat race
  DoStopIdleTimeout;
  HeartBeat.Enabled := False;    // Added: stop heartbeat during teardown
  Clear;
  DoConnectionState(amqp1csEnd);
end;

7. Thread-Safety-Fixes

Diese Fixes beheben Race Conditions beim parallelen Zugriff auf gemeinsame Datenstrukturen.

7.1 TsgcAMQP1Deliveries.First(): Bounds-Check und Locking

Datei: sgcAMQP1_Message.pas
Die Methode First() griff auf Items[0] zu, ohne zu prüfen, ob die Liste leer war, und ohne den Thread-Safe-Lock zu erwerben. In einer Multi-Thread-Umgebung könnte ein anderer Thread zwischen Count-Prüfung und Zugriff alle Items entfernen, was eine Index-out-of-bounds-Exception verursacht.

Nachher (Korrigiert)

function TsgcAMQP1Deliveries.First: TsgcAMQP1Delivery;
var
  oList: TList;
begin
  result := nil;
  oList := LockList;
  Try
    if oList.Count > 0 then
      result := TsgcAMQP1Delivery(oList[0]);
  Finally
    UnLockList;
  End;
end;

7.2 SetMessage: sichere Objektersetzung

Datei: sgcAMQP1_Message.pas
Die Methode SetMessage prüft jetzt, dass die neue Nachricht von der aktuellen abweicht, bevor sie freigegeben wird, um Use-after-free bei der Zuweisung desselben Message-Objekts zu verhindern.


8. Geänderte Dateien

File Fixes Kategorien
Source\sgcAMQP1_Classes.pas 2 Kritische Bugs
Source\sgcAMQP1_Frames.pas 16 Kritische Bugs, Memory Leaks, Spec-Compliance
Interfaces\sgcAMQP1_Frames.intf 2 Spec-Compliance (Disposition LastAssigned, AmqpSequence Value)
Source\sgcAMQP1_Message.pas 4 Kritische Bugs, Memory Leaks, Thread-Sicherheit
Source\sgcAMQP1_Session.pas 3 Memory Leaks, Spec-Compliance
Source\sgcAMQP1.pas 5 Zustandsautomat, Verbindung, Heartbeat
Source\sgcAMQP1_Client.pas 3 Heartbeat, Disconnect-Sicherheit

Insgesamt: 30 Fixes in 8 Dateien, die die Protokollkorrektheit, Speichersicherheit und Zuverlässigkeit der AMQP-1.0-Implementierung gegen die OASIS-Spezifikation verbessern.