The AMQP 1.0 protocol implementation in sgcWebSockets has undergone a comprehensive review against the OASIS AMQP 1.0 specification. This article documents the 30 fixes applied across 8 source files, covering critical bugs, memory leaks, specification compliance, state machine correctness, heartbeat handling, and thread safety improvements.
Table of Contents
1. Overview
A total of 30 fixes were applied across 6 source files and 2 interface files in the AMQP 1.0 implementation. The fixes are categorized as follows:
| Category | Count | Severity |
|---|---|---|
| Critical Bug Fixes | 6 | Critical |
| Memory Leak Fixes | 4 | Critical |
| Specification Compliance | 10 | High |
| State Machine & Connection | 5 | High |
| Heartbeat & Idle Timeout | 3 | High |
| Thread Safety | 2 | High |
2. Critical Bug Fixes
These fixes address bugs that would cause immediate runtime failures, protocol corruption, or incorrect behavior during AMQP 1.0 communication.
2.1 Missing raise Keyword in Exception
File: sgcAMQP1_Classes.pas
An exception object was created but never raised when an invalid frame type was encountered. The exception was
silently discarded, allowing corrupt frames to be processed without detection.
Before (Broken)
TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);
After (Fixed)
raise TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);
2.2 WriteMap Map32 Missing Data and Wrong Size
File: sgcAMQP1_Classes.pas
The Map32 encoding path was missing the WriteBytes call for the
actual map data, and the size field used an incorrect offset. Map32 uses a 4-byte count field (unlike Map8 which uses
1 byte), so the size must include 4 extra bytes.
Before (Broken)
else
begin
WriteUByte(Ord(amqp1DataMap32));
_WriteUInt32(vSize + 1);
_WriteUInt32(oJSON.Count * 2);
// Missing: WriteBytes(oArray.Bytes);
end;
After (Fixed)
else
begin
WriteUByte(Ord(amqp1DataMap32));
_WriteUInt32(vSize + 4);
_WriteUInt32(oJSON.Count * 2);
WriteBytes(oArray.Bytes);
end;
2.3 Inverted ContainsError Logic
File: sgcAMQP1_Frames.pas
The ContainsError method in both
TsgcAMQP1FrameRejected and
TsgcAMQP1DescribedListError
returned True when there was no error and
False when there was an error. This caused
error information to be silently dropped and null bytes to be written when actual errors should have been serialized.
The DoWrite and
DoWriteError branches were also swapped to match the corrected logic.
Before (Broken)
function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
if not Assigned(FError) then
Result := True // Wrong: True when no error
else
Result := (Error.Condition = '') and (Error.Description = '') and
(Error.Info = ''); // Wrong: True when all empty
end;
After (Fixed)
function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
if not Assigned(FError) then
Result := False // Correct: False when no error
else
Result := (Error.Condition <> '') or (Error.Description <> '') or
(Error.Info <> ''); // Correct: True when any field set
end;
2.4 SASL PLAIN Null Byte Separators
File: sgcAMQP1_Frames.pas
The SASL PLAIN mechanism requires the format \0username\0password with null byte
($00) separators. The implementation was not
initializing the byte array to zero, so separator positions contained garbage data. Authentication would fail against
any standards-compliant AMQP 1.0 broker.
After (Fixed)
SetLength(FInitialResponse, 2 + Length(oUser) + Length(oPassword));
FillChar(FInitialResponse[0], Length(FInitialResponse), 0); // Zero-fill for null separators
2.5 Missing inherited Create in TsgcAMQP1Message
File: sgcAMQP1_Message.pas
The parameterized constructor of TsgcAMQP1Message did not call
inherited Create, meaning the base class was never initialized.
This would cause access violations or corrupt state when using the convenience constructor.
After (Fixed)
constructor TsgcAMQP1Message.Create(const aValue: string);
begin
inherited Create;
ApplicationData.ValueType := amqp1adtAmqpValue;
ApplicationData.AMQPValue.Value := aValue;
end;
2.6 Missing Semicolon in AmqpValue.DoRead
File: sgcAMQP1_Frames.pas
A missing semicolon in TsgcAMQP1FrameAmqpValue.DoRead
would prevent compilation.
3. Memory Leak Fixes
These fixes address object lifetime management issues that would cause memory to accumulate over time, particularly during long-running AMQP 1.0 connections with many message exchanges.
| Fix | File | Description |
|---|---|---|
| 3.1 | sgcAMQP1_Frames.pas |
FDefaultOutcome was not freed before reassignment when reading the Source descriptor. Each time a new default outcome was received, the previous object was leaked. |
| 3.2 | sgcAMQP1_Session.pas |
Duplicate sgcFree(FCreditConsumed) call in the destructor caused a potential double-free. Removed the duplicate line. |
| 3.3 | sgcAMQP1_Session.pas |
FOutgoingDeliveries was missing from the session destructor. The delivery tracking list was never freed when a session was destroyed. |
| 3.4 | sgcAMQP1_Message.pas |
SetMessage and SetMessageAndFreeOnDestroy did not free the previous message when FFreeMessageOnDestroy was enabled. Repeated message assignments leaked memory. |
Fix 3.1 – FDefaultOutcome Freed Before Reassignment
sgcFree(FDefaultOutcome); // Free previous instance before reassignment
if oDescriptor.Code = amqp1dcptReleased then
FDefaultOutcome := TsgcAMQP1FrameReleased.Create
else if oDescriptor.Code = amqp1dcptAccepted then
FDefaultOutcome := TsgcAMQP1FrameAccepted.Create
else if oDescriptor.Code = amqp1dcptRejected then
FDefaultOutcome := TsgcAMQP1FrameRejected.Create
Fix 3.4 – SetMessage Frees Old Message
procedure TsgcAMQP1Delivery.SetMessage(const aMessage: TsgcAMQP1Message);
begin
if FFreeMessageOnDestroy and Assigned(F_Message) and (F_Message <> aMessage) then
sgcFree(F_Message);
FFreeMessageOnDestroy := False;
F_Message := aMessage;
end;
4. Specification Compliance Fixes
These fixes correct deviations from the AMQP 1.0 Transport, Types, and Messaging specifications.
4.1 Begin Frame Field 7 Read Into Wrong Property
File: sgcAMQP1_Frames.pas
Field index 7 of the begin performative was being read into
DesiredCapabilities instead of
Properties. Per the spec,
begin fields are: remote-channel(0), next-outgoing-id(1), incoming-window(2), outgoing-window(3), handle-max(4),
offered-capabilities(5), desired-capabilities(6), properties(7).
4.2 Source and Target Missing Fields in DoWrite
File: sgcAMQP1_Frames.pas
The Source descriptor's
DoWrite method was not serializing the
default-outcome,
outcomes, and
capabilities fields. The
Target descriptor was missing
capabilities. This caused the broker to
use defaults instead of the negotiated values, potentially leading to incorrect delivery state handling.
4.3 AmqpSequence Read Into Wrong Property
File: sgcAMQP1_Message.pas
When reading the message body, amqp-sequence data was being
read into ApplicationData.AMQPValue instead of
ApplicationData.AMQPSequence. This corrupted
the message body for any message using the amqp-sequence encoding.
4.4 TransactionalState Outcome Not Written
File: sgcAMQP1_Frames.pas
The transactional-state delivery state was not
serializing its outcome field, which is required
when settling deliveries within a transaction.
4.5 Disposition Last Field Cannot Distinguish Zero From Unset
Files: sgcAMQP1_Frames.pas,
sgcAMQP1_Frames.intf,
sgcAMQP1_Session.pas
The disposition performative has an optional
last field (delivery-id).
Since it is a Cardinal, the value 0 is valid and cannot be
used as a sentinel for “not set.” A new FLastAssigned
boolean flag and SetLast setter were added to
properly track whether the field was explicitly set.
procedure TsgcAMQP1FrameDisposition.SetLast(const Value: Cardinal);
begin
FLast := Value;
FLastAssigned := True;
end;
4.6 AmqpSequence Missing Value Property and Read/Write
Files: sgcAMQP1_Frames.pas,
sgcAMQP1_Frames.intf
The TsgcAMQP1FrameAmqpSequence class had no
Value property and empty
DoRead/DoWrite
methods. The amqp-sequence body type was
completely non-functional.
4.7 Error Info Field Read as String Instead of Map
File: sgcAMQP1_Frames.pas
The AMQP 1.0 spec defines the info field of the
error type as a
map. It was being read with
ReadString instead of
ReadMap, causing parsing failures
when brokers send structured error information.
4.8 Capabilities and Locales Written as String Instead of Symbol
File: sgcAMQP1_Frames.pas
The AMQP 1.0 spec defines offered-capabilities,
desired-capabilities,
outgoing-locales, and
incoming-locales as arrays of
symbol. They were being written
with WriteString instead of
WriteSymbol in the
open,
begin, and
attach performatives.
A standards-compliant broker would reject these frames as having incorrect field types.
4.9 Missing Accepted Descriptor in DefaultOutcome Handler
File: sgcAMQP1_Frames.pas
The Source descriptor's default-outcome
reader only handled released and
rejected. The most common outcome,
accepted, was not handled.
When a broker sent accepted as the default outcome,
it was silently ignored.
5. State Machine & Connection Fixes
These fixes address the AMQP 1.0 connection state machine and frame processing logic.
| Fix | File | Description |
|---|---|---|
| 5.1 | sgcAMQP1.pas |
The amqp1csOpenReceived state transition was missing the else DoRaiseInvalidState that all other states had. Invalid transitions were silently ignored instead of raising an error. |
| 5.2 | sgcAMQP1.pas |
Frame size validation error message displayed RemoteMaxFrameSize but should have shown LocalMaxFrameSize, since the local limit is what was being checked. |
| 5.3 | sgcAMQP1.pas |
FLastTimeRead was initialized to 0 (Delphi epoch: 1899-12-30) instead of Now. This caused immediate false idle timeout detection on startup. |
| 5.4 | sgcAMQP1.pas |
The TBytes overload of Read did not update FLastTimeRead := Now, unlike the TMemoryStream overload. This caused inconsistent heartbeat tracking. |
| 5.5 | sgcAMQP1.pas |
Header received state transition was conditional when it should always trigger. The state machine must transition on every valid header exchange per the AMQP 1.0 spec. |
Fix 5.1 – OpenReceived Missing Error Branch
amqp1csOpenReceived:
begin
if aState = amqp1csOpenSent then
FConnectionState := amqp1csOpened
else
DoRaiseInvalidState; // Added: was missing
end;
6. Heartbeat & Idle Timeout Fixes
The AMQP 1.0 specification requires that when a peer advertises an
idle-timeout in the
open performative, the other peer must send
heartbeat frames at half the advertised interval. These fixes ensure the heartbeat mechanism
actually functions.
| Fix | File | Description |
|---|---|---|
| 6.1 | sgcAMQP1_Client.pas |
HeartBeat was never enabled. Both branches of the idle timeout check set HeartBeat.Enabled := False. Changed to True when IdleTimeout > 0. |
| 6.2 | sgcAMQP1_Client.pas |
Disconnect did not disable heartbeat or set FConnected := False early enough. Reordered to prevent heartbeat firing during teardown. |
| 6.3 | sgcAMQP1.pas |
FLastTimeRead not updated in the TBytes Read overload (also listed in State Machine section). |
Fix 6.1 – HeartBeat Enabled
if oOpen.IdleTimeout > 0 then
begin
HeartBeat.Interval := Trunc(oOpen.IdleTimeout / 2);
HeartBeat.Enabled := True; // Was: False (heartbeat never started)
end
else
HeartBeat.Enabled := False;
Fix 6.2 – Disconnect Reordered
procedure TsgcAMQP1_Client.Disconnect;
begin
FConnected := False; // Moved first: prevents heartbeat race
DoStopIdleTimeout;
HeartBeat.Enabled := False; // Added: stop heartbeat during teardown
Clear;
DoConnectionState(amqp1csEnd);
end;
7. Thread Safety Fixes
These fixes address race conditions in concurrent access to shared data structures.
7.1 TsgcAMQP1Deliveries.First() Bounds Check and Locking
File: sgcAMQP1_Message.pas
The First() method accessed
Items[0] without checking if the list was empty
and without acquiring the thread-safe lock. In a multi-threaded environment, another thread could
remove all items between the count check and the access, causing an index-out-of-bounds exception.
After (Fixed)
function TsgcAMQP1Deliveries.First: TsgcAMQP1Delivery;
var
oList: TList;
begin
result := nil;
oList := LockList;
Try
if oList.Count > 0 then
result := TsgcAMQP1Delivery(oList[0]);
Finally
UnLockList;
End;
end;
7.2 SetMessage Safe Object Replacement
File: sgcAMQP1_Message.pas
The SetMessage method now checks
that the new message is different from the current one before freeing, preventing use-after-free
when assigning the same message object.
8. Files Modified
| File | Fixes | Categories |
|---|---|---|
Source\sgcAMQP1_Classes.pas |
2 | Critical bugs |
Source\sgcAMQP1_Frames.pas |
16 | Critical bugs, memory leaks, spec compliance |
Interfaces\sgcAMQP1_Frames.intf |
2 | Spec compliance (Disposition LastAssigned, AmqpSequence Value) |
Source\sgcAMQP1_Message.pas |
4 | Critical bugs, memory leaks, thread safety |
Source\sgcAMQP1_Session.pas |
3 | Memory leaks, spec compliance |
Source\sgcAMQP1.pas |
5 | State machine, connection, heartbeat |
Source\sgcAMQP1_Client.pas |
3 | Heartbeat, disconnect safety |
Total: 30 fixes across 8 files, improving protocol correctness, memory safety, and reliability of the AMQP 1.0 implementation against the OASIS specification.