The AMQP 0-9-1 implementation in sgcWebSockets has received a comprehensive update: 11 bug fixes covering critical parameter ordering, type mismatches, thread safety, and data loss issues, plus 6 new protocol features including Basic.Nack, Exchange-to-Exchange bindings, Publisher Confirms, Connection.Blocked/Unblocked notifications, and Connection.UpdateSecret for token refresh. This article details every change with code examples.
Table of Contents
- Bug Fixes
- Critical: DeclareExchange Parameter Order
- Field Table Type Byte
- Spec Compliance Fixes
- Other Bug Fixes
- New Features
- Basic.Nack — Negative Acknowledgements
- Exchange.Bind/Unbind — Exchange-to-Exchange Bindings
- Confirm Class — Publisher Confirms
- Connection.Blocked/Unblocked — Resource Alarms
- Connection.UpdateSecret — Token Refresh
- Files Modified
1. Bug Fixes
A total of 11 bugs have been fixed across the AMQP 0-9-1 implementation, ranging from critical parameter ordering issues to spec compliance corrections.
Critical: DeclareExchange Parameter Order
The DeclareExchange and
DeclareExchangeEx methods passed
aNoWait,
aAutoDelete, and
aInternal in the wrong order to
DoWrite_ExchDeclare. This caused the
auto-delete flag to be sent as
no-wait and vice versa, leading to unexpected exchange
behavior on the broker.
Before (Incorrect)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);
After (Fixed)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);
File: sgcAMQP_Client.pas
Field Table Type Byte
The sgcWriteAMQPFieldTable procedure always wrote
$53 ('S' = long string)
as the type indicator for all field table values, regardless of the actual value type. This meant doubles, integers,
booleans, and int64 values were all incorrectly tagged as strings in the wire format.
Before (Incorrect)
sgcWriteAMQPByte($53, vBytes); // Always wrote 'S' for ALL types
case vType of
'd': sgcWriteAMQPDouble(...);
'I': sgcWriteAMQPInt32(...);
...
After (Fixed)
case vType of
'd':
begin
sgcWriteAMQPByte(Byte('d'), vBytes); // Correct type per value
sgcWriteAMQPDouble(...);
end;
'I':
begin
sgcWriteAMQPByte(Byte('I'), vBytes);
sgcWriteAMQPInt32(...);
end;
'L':
begin
sgcWriteAMQPByte(Byte('L'), vBytes);
sgcWriteAMQPInt64(...);
end;
't':
begin
sgcWriteAMQPByte(Byte('t'), vBytes);
sgcWriteAMQPBoolean(...);
end;
'S':
begin
sgcWriteAMQPByte(Byte('S'), vBytes);
sgcWriteAMQPLongString(...);
end;
File: sgcAMQP_ReadWrite.pas
Spec Compliance Fixes
| Issue | Before | After | File |
|---|---|---|---|
| BasicGetEmpty.Reserved1 wrong type | UInt16 / sgcReadAMQPUInt16 |
string / sgcReadAMQPShortString |
sgcAMQP_Classes.pas |
| ChannelOpenOk.Reserved1 wrong read | sgcReadAMQPShortString |
sgcReadAMQPLongString |
sgcAMQP_Classes.pas |
| ConnectionClose/ChannelClose used class-specific method value getters | sgcGetAMQPConnectionValue(FailMethodId) |
sgcGetAMQPMethodValue(FailClassId, FailMethodId) |
sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas |
A new generic helper function sgcGetAMQPMethodValue
was added to sgcAMQP_Helpers.pas to resolve the correct
method integer ID for any AMQP class, replacing the class-specific getters that only worked for their own class.
Other Bug Fixes
| Bug | Description | File |
|---|---|---|
| Channel.CloseOk missing channel ID | DoWrite_ChannCloseOk did not set oFrame.Header.Channel, so the close-ok was sent on channel 0 (connection level) instead of the target channel. Added aChannelId: Word parameter. |
sgcAMQP.pas |
| Typo in error constant | Changed 'Now Allowed' to 'Not Allowed'. |
sgcAMQP_Const.pas |
| QueueUnBind missing request data | DoWrite_QueueUnBind did not store QueueUnBindQueue and QueueUnBindExchange on the channel request, causing the OnAMQPQueueUnBind event to report empty values. |
sgcAMQP_Client.pas |
| Remaining bytes discarded after DoRead | When the read loop exited with 1–7 remaining bytes (partial frame), they were silently lost. Now saved to FBytes for the next read cycle. |
sgcAMQP.pas |
| GetChannel ignores aRaiseIfNotFound | The aRaiseIfNotFound parameter was never checked. Now only raises when the flag is True. |
sgcAMQP.pas |
2. New Features
Six new AMQP 0-9-1 protocol features have been implemented, covering widely-used RabbitMQ extensions and additional spec methods.
Basic.Nack — Negative Acknowledgements
Basic.Nack (class 60, method 120) is a RabbitMQ
extension that allows rejecting one or more messages at once with optional requeue. Unlike
Basic.Reject, it supports a
multiple flag to reject all messages up to and including the
specified delivery tag.
| Method | Description | Direction |
|---|---|---|
NackMessage |
Send a negative acknowledgement to the broker. | Client → Server |
OnAMQPBasicNack |
Fired when the server sends a Nack (in publisher confirm mode). | Server → Client |
NackMessage
procedure NackMessage(const aChannel: string;
aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);
OnAMQPBasicNack Event
procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
const aChannel: string;
const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
', Multiple: ' + BoolToStr(aNack.Multiple, True) +
', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;
Exchange.Bind/Unbind — Exchange-to-Exchange Bindings
Exchange-to-exchange bindings (class 40, methods 30/31 and 40/51) allow routing messages between exchanges without an intermediate queue. This is a RabbitMQ extension that enables powerful topology patterns such as fan-out hierarchies and topic partitioning.
| Method | Description |
|---|---|
BindExchange / BindExchangeEx |
Bind a destination exchange to a source exchange with a routing key. The Ex variant waits synchronously for the broker response. |
UnbindExchange / UnbindExchangeEx |
Remove an exchange-to-exchange binding. |
OnAMQPExchangeBind |
Fired when the broker confirms an exchange bind. |
OnAMQPExchangeUnbind |
Fired when the broker confirms an exchange unbind. |
BindExchange / BindExchangeEx
procedure BindExchange(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
'downstream-exchange', // destination
'upstream-exchange', // source
'orders.#', // routing key
False) // wait for confirmation
then
Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
'downstream-exchange', 'upstream-exchange', 'orders.#', False);
Confirm Class — Publisher Confirms
Publisher confirms (class 85, methods 10/11) allow the broker to acknowledge receipt of published messages.
Once a channel is put into confirm mode via Confirm.Select,
the broker will send Basic.Ack or
Basic.Nack for each published message, enabling
reliable publishing without transactions.
| Method / Event | Description |
|---|---|
SelectConfirm / SelectConfirmEx |
Enable publisher confirm mode on a channel. |
OnAMQPConfirmSelectOk |
Fired when the broker confirms that confirm mode is active. |
OnAMQPBasicAck |
Fired when the broker acknowledges a published message. |
OnAMQPBasicNack |
Fired when the broker negatively acknowledges a published message. |
SelectConfirm / SelectConfirmEx
procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
Example: Reliable Publishing with Confirms
// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
const aChannel: string;
const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;
Connection.Blocked/Unblocked — Resource Alarms
When the broker runs low on resources (memory, disk), it sends
Connection.Blocked (class 10, method 60) with a
reason string. When the condition clears, it sends
Connection.Unblocked (method 61). These are server-to-client
only notifications. This feature is handled in the base
TsgcAMQP class, making it available to all AMQP components.
| Event | Description |
|---|---|
OnAMQPConnectionBlocked |
Fired when the broker blocks the connection due to resource constraints. Includes a Reason string (e.g., 'low on memory'). |
OnAMQPConnectionUnblocked |
Fired when the broker lifts the connection block. |
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
const aReason: string);
begin
Log('Connection BLOCKED: ' + aReason);
// Pause publishing to avoid message loss
FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
Log('Connection unblocked - resuming');
FPublishingPaused := False;
end;
Connection.UpdateSecret — Token Refresh
Connection.UpdateSecret (class 10, method 70) allows
refreshing the authentication credentials on an active connection without reconnecting. This is essential for
OAuth2/JWT-based authentication where tokens expire periodically.
| Method / Event | Description |
|---|---|
UpdateSecret / UpdateSecretEx |
Send a new secret (token) to the broker with an optional reason string. |
OnAMQPConnectionUpdateSecretOk |
Fired when the broker accepts the new secret. |
UpdateSecret / UpdateSecretEx
procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
vNewToken: string;
begin
vNewToken := GetRefreshedOAuthToken();
if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
Log('Token refreshed successfully')
else
Log('Token refresh failed - reconnecting');
end;
3. Files Modified
| File | Changes |
|---|---|
sgcAMQP_Const.pas |
Typo fix ('Not Allowed'). |
sgcAMQP_Helpers.pas |
New sgcGetAMQPMethodValue function, Confirm class helpers (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), method ID mappings for all new methods. |
sgcAMQP_ReadWrite.pas |
Field table type byte fix — each value type now writes its correct type indicator. |
sgcAMQP_Classes.pas |
New enums (amqpClassConfirm, 12 new methods), 13 new payload classes, updated dispatch tables, thread safety fix, spec compliance fixes, new request storage fields. |
sgcAMQP.pas |
8 new event types, Connection.Blocked/Unblocked handling, DoWrite_ChannCloseOk channel ID fix, remaining bytes preservation, GetChannel flag fix. |
sgcAMQP_Client.pas |
6 new read handlers, 5 new write methods, 11 new public methods, 6 new events, updated dispatch table, parameter order fix, request data fix. |