AMQP 1 Client Delphi Update

The AMQP 1.0 protocol implementation in sgcWebSockets has undergone a comprehensive review against the OASIS AMQP 1.0 specification. This article documents the 30 fixes applied across 8 source files, covering critical bugs, memory leaks, specification compliance, state machine correctness, heartbeat handling, and thread safety improvements.

1. Overview

A total of 30 fixes were applied across 6 source files and 2 interface files in the AMQP 1.0 implementation. The fixes are categorized as follows:

Category Count Severity
Critical Bug Fixes 6 Critical
Memory Leak Fixes 4 Critical
Specification Compliance 10 High
State Machine & Connection 5 High
Heartbeat & Idle Timeout 3 High
Thread Safety 2 High

2. Critical Bug Fixes

These fixes address bugs that would cause immediate runtime failures, protocol corruption, or incorrect behavior during AMQP 1.0 communication.

2.1 Missing raise Keyword in Exception

File: sgcAMQP1_Classes.pas
An exception object was created but never raised when an invalid frame type was encountered. The exception was silently discarded, allowing corrupt frames to be processed without detection.

Before (Broken)

TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);

After (Fixed)

raise TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);

2.2 WriteMap Map32 Missing Data and Wrong Size

File: sgcAMQP1_Classes.pas
The Map32 encoding path was missing the WriteBytes call for the actual map data, and the size field used an incorrect offset. Map32 uses a 4-byte count field (unlike Map8 which uses 1 byte), so the size must include 4 extra bytes.

Before (Broken)

else
begin
  WriteUByte(Ord(amqp1DataMap32));
  _WriteUInt32(vSize + 1);
  _WriteUInt32(oJSON.Count * 2);
  // Missing: WriteBytes(oArray.Bytes);
end;

After (Fixed)

else
begin
  WriteUByte(Ord(amqp1DataMap32));
  _WriteUInt32(vSize + 4);
  _WriteUInt32(oJSON.Count * 2);
  WriteBytes(oArray.Bytes);
end;

2.3 Inverted ContainsError Logic

File: sgcAMQP1_Frames.pas
The ContainsError method in both TsgcAMQP1FrameRejected and TsgcAMQP1DescribedListError returned True when there was no error and False when there was an error. This caused error information to be silently dropped and null bytes to be written when actual errors should have been serialized. The DoWrite and DoWriteError branches were also swapped to match the corrected logic.

Before (Broken)

function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
  if not Assigned(FError) then
    Result := True              // Wrong: True when no error
  else
    Result := (Error.Condition = '') and (Error.Description = '') and
      (Error.Info = '');     // Wrong: True when all empty
end;

After (Fixed)

function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
  if not Assigned(FError) then
    Result := False             // Correct: False when no error
  else
    Result := (Error.Condition <> '') or (Error.Description <> '') or
      (Error.Info <> '');    // Correct: True when any field set
end;

2.4 SASL PLAIN Null Byte Separators

File: sgcAMQP1_Frames.pas
The SASL PLAIN mechanism requires the format \0username\0password with null byte ($00) separators. The implementation was not initializing the byte array to zero, so separator positions contained garbage data. Authentication would fail against any standards-compliant AMQP 1.0 broker.

After (Fixed)

SetLength(FInitialResponse, 2 + Length(oUser) + Length(oPassword));
FillChar(FInitialResponse[0], Length(FInitialResponse), 0);  // Zero-fill for null separators

2.5 Missing inherited Create in TsgcAMQP1Message

File: sgcAMQP1_Message.pas
The parameterized constructor of TsgcAMQP1Message did not call inherited Create, meaning the base class was never initialized. This would cause access violations or corrupt state when using the convenience constructor.

After (Fixed)

constructor TsgcAMQP1Message.Create(const aValue: string);
begin
  inherited Create;
  ApplicationData.ValueType := amqp1adtAmqpValue;
  ApplicationData.AMQPValue.Value := aValue;
end;

2.6 Missing Semicolon in AmqpValue.DoRead

File: sgcAMQP1_Frames.pas
A missing semicolon in TsgcAMQP1FrameAmqpValue.DoRead would prevent compilation.


3. Memory Leak Fixes

These fixes address object lifetime management issues that would cause memory to accumulate over time, particularly during long-running AMQP 1.0 connections with many message exchanges.

Fix File Description
3.1 sgcAMQP1_Frames.pas FDefaultOutcome was not freed before reassignment when reading the Source descriptor. Each time a new default outcome was received, the previous object was leaked.
3.2 sgcAMQP1_Session.pas Duplicate sgcFree(FCreditConsumed) call in the destructor caused a potential double-free. Removed the duplicate line.
3.3 sgcAMQP1_Session.pas FOutgoingDeliveries was missing from the session destructor. The delivery tracking list was never freed when a session was destroyed.
3.4 sgcAMQP1_Message.pas SetMessage and SetMessageAndFreeOnDestroy did not free the previous message when FFreeMessageOnDestroy was enabled. Repeated message assignments leaked memory.

Fix 3.1 – FDefaultOutcome Freed Before Reassignment

sgcFree(FDefaultOutcome);  // Free previous instance before reassignment
if oDescriptor.Code = amqp1dcptReleased then
  FDefaultOutcome := TsgcAMQP1FrameReleased.Create
else if oDescriptor.Code = amqp1dcptAccepted then
  FDefaultOutcome := TsgcAMQP1FrameAccepted.Create
else if oDescriptor.Code = amqp1dcptRejected then
  FDefaultOutcome := TsgcAMQP1FrameRejected.Create

Fix 3.4 – SetMessage Frees Old Message

procedure TsgcAMQP1Delivery.SetMessage(const aMessage: TsgcAMQP1Message);
begin
  if FFreeMessageOnDestroy and Assigned(F_Message) and (F_Message <> aMessage) then
    sgcFree(F_Message);
  FFreeMessageOnDestroy := False;
  F_Message := aMessage;
end;

4. Specification Compliance Fixes

These fixes correct deviations from the AMQP 1.0 Transport, Types, and Messaging specifications.

4.1 Begin Frame Field 7 Read Into Wrong Property

File: sgcAMQP1_Frames.pas
Field index 7 of the begin performative was being read into DesiredCapabilities instead of Properties. Per the spec, begin fields are: remote-channel(0), next-outgoing-id(1), incoming-window(2), outgoing-window(3), handle-max(4), offered-capabilities(5), desired-capabilities(6), properties(7).

4.2 Source and Target Missing Fields in DoWrite

File: sgcAMQP1_Frames.pas
The Source descriptor's DoWrite method was not serializing the default-outcome, outcomes, and capabilities fields. The Target descriptor was missing capabilities. This caused the broker to use defaults instead of the negotiated values, potentially leading to incorrect delivery state handling.

4.3 AmqpSequence Read Into Wrong Property

File: sgcAMQP1_Message.pas
When reading the message body, amqp-sequence data was being read into ApplicationData.AMQPValue instead of ApplicationData.AMQPSequence. This corrupted the message body for any message using the amqp-sequence encoding.

4.4 TransactionalState Outcome Not Written

File: sgcAMQP1_Frames.pas
The transactional-state delivery state was not serializing its outcome field, which is required when settling deliveries within a transaction.

4.5 Disposition Last Field Cannot Distinguish Zero From Unset

Files: sgcAMQP1_Frames.pas, sgcAMQP1_Frames.intf, sgcAMQP1_Session.pas
The disposition performative has an optional last field (delivery-id). Since it is a Cardinal, the value 0 is valid and cannot be used as a sentinel for “not set.” A new FLastAssigned boolean flag and SetLast setter were added to properly track whether the field was explicitly set.

procedure TsgcAMQP1FrameDisposition.SetLast(const Value: Cardinal);
begin
  FLast := Value;
  FLastAssigned := True;
end;

4.6 AmqpSequence Missing Value Property and Read/Write

Files: sgcAMQP1_Frames.pas, sgcAMQP1_Frames.intf
The TsgcAMQP1FrameAmqpSequence class had no Value property and empty DoRead/DoWrite methods. The amqp-sequence body type was completely non-functional.

4.7 Error Info Field Read as String Instead of Map

File: sgcAMQP1_Frames.pas
The AMQP 1.0 spec defines the info field of the error type as a map. It was being read with ReadString instead of ReadMap, causing parsing failures when brokers send structured error information.

4.8 Capabilities and Locales Written as String Instead of Symbol

File: sgcAMQP1_Frames.pas
The AMQP 1.0 spec defines offered-capabilities, desired-capabilities, outgoing-locales, and incoming-locales as arrays of symbol. They were being written with WriteString instead of WriteSymbol in the open, begin, and attach performatives. A standards-compliant broker would reject these frames as having incorrect field types.

4.9 Missing Accepted Descriptor in DefaultOutcome Handler

File: sgcAMQP1_Frames.pas
The Source descriptor's default-outcome reader only handled released and rejected. The most common outcome, accepted, was not handled. When a broker sent accepted as the default outcome, it was silently ignored.


5. State Machine & Connection Fixes

These fixes address the AMQP 1.0 connection state machine and frame processing logic.

Fix File Description
5.1 sgcAMQP1.pas The amqp1csOpenReceived state transition was missing the else DoRaiseInvalidState that all other states had. Invalid transitions were silently ignored instead of raising an error.
5.2 sgcAMQP1.pas Frame size validation error message displayed RemoteMaxFrameSize but should have shown LocalMaxFrameSize, since the local limit is what was being checked.
5.3 sgcAMQP1.pas FLastTimeRead was initialized to 0 (Delphi epoch: 1899-12-30) instead of Now. This caused immediate false idle timeout detection on startup.
5.4 sgcAMQP1.pas The TBytes overload of Read did not update FLastTimeRead := Now, unlike the TMemoryStream overload. This caused inconsistent heartbeat tracking.
5.5 sgcAMQP1.pas Header received state transition was conditional when it should always trigger. The state machine must transition on every valid header exchange per the AMQP 1.0 spec.

Fix 5.1 – OpenReceived Missing Error Branch

amqp1csOpenReceived:
  begin
    if aState = amqp1csOpenSent then
      FConnectionState := amqp1csOpened
    else
      DoRaiseInvalidState;  // Added: was missing
  end;

6. Heartbeat & Idle Timeout Fixes

The AMQP 1.0 specification requires that when a peer advertises an idle-timeout in the open performative, the other peer must send heartbeat frames at half the advertised interval. These fixes ensure the heartbeat mechanism actually functions.

Fix File Description
6.1 sgcAMQP1_Client.pas HeartBeat was never enabled. Both branches of the idle timeout check set HeartBeat.Enabled := False. Changed to True when IdleTimeout > 0.
6.2 sgcAMQP1_Client.pas Disconnect did not disable heartbeat or set FConnected := False early enough. Reordered to prevent heartbeat firing during teardown.
6.3 sgcAMQP1.pas FLastTimeRead not updated in the TBytes Read overload (also listed in State Machine section).

Fix 6.1 – HeartBeat Enabled

if oOpen.IdleTimeout > 0 then
begin
  HeartBeat.Interval := Trunc(oOpen.IdleTimeout / 2);
  HeartBeat.Enabled := True;   // Was: False (heartbeat never started)
end
else
  HeartBeat.Enabled := False;

Fix 6.2 – Disconnect Reordered

procedure TsgcAMQP1_Client.Disconnect;
begin
  FConnected := False;           // Moved first: prevents heartbeat race
  DoStopIdleTimeout;
  HeartBeat.Enabled := False;    // Added: stop heartbeat during teardown
  Clear;
  DoConnectionState(amqp1csEnd);
end;

7. Thread Safety Fixes

These fixes address race conditions in concurrent access to shared data structures.

7.1 TsgcAMQP1Deliveries.First() Bounds Check and Locking

File: sgcAMQP1_Message.pas
The First() method accessed Items[0] without checking if the list was empty and without acquiring the thread-safe lock. In a multi-threaded environment, another thread could remove all items between the count check and the access, causing an index-out-of-bounds exception.

After (Fixed)

function TsgcAMQP1Deliveries.First: TsgcAMQP1Delivery;
var
  oList: TList;
begin
  result := nil;
  oList := LockList;
  Try
    if oList.Count > 0 then
      result := TsgcAMQP1Delivery(oList[0]);
  Finally
    UnLockList;
  End;
end;

7.2 SetMessage Safe Object Replacement

File: sgcAMQP1_Message.pas
The SetMessage method now checks that the new message is different from the current one before freeing, preventing use-after-free when assigning the same message object.


8. Files Modified

File Fixes Categories
Source\sgcAMQP1_Classes.pas 2 Critical bugs
Source\sgcAMQP1_Frames.pas 16 Critical bugs, memory leaks, spec compliance
Interfaces\sgcAMQP1_Frames.intf 2 Spec compliance (Disposition LastAssigned, AmqpSequence Value)
Source\sgcAMQP1_Message.pas 4 Critical bugs, memory leaks, thread safety
Source\sgcAMQP1_Session.pas 3 Memory leaks, spec compliance
Source\sgcAMQP1.pas 5 State machine, connection, heartbeat
Source\sgcAMQP1_Client.pas 3 Heartbeat, disconnect safety

Total: 30 fixes across 8 files, improving protocol correctness, memory safety, and reliability of the AMQP 1.0 implementation against the OASIS specification.

Stay Informed

When you subscribe to the blog, we will send you an e-mail when there are new updates on the site so you wouldn't miss them.

Bitstamp API Update for sgcWebSockets
Bybit Update API sgcWebSockets

Related Posts