eSeGeCe
software
The AMQP 0-9-1 implementation in sgcWebSockets has received a comprehensive update: 11 bug fixes covering critical parameter ordering, type mismatches, thread safety, and data loss issues, plus 6 new protocol features including Basic.Nack, Exchange-to-Exchange bindings, Publisher Confirms, Connection.Blocked/Unblocked notifications, and Connection.UpdateSecret for token refresh. This article details every change with code examples.
A total of 11 bugs have been fixed across the AMQP 0-9-1 implementation, ranging from critical parameter ordering issues to spec compliance corrections.
The DeclareExchange and
DeclareExchangeEx methods passed
aNoWait,
aAutoDelete, and
aInternal in the wrong order to
DoWrite_ExchDeclare. This caused the
auto-delete flag to be sent as
no-wait and vice versa, leading to unexpected exchange
behavior on the broker.
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);
File: sgcAMQP_Client.pas
The sgcWriteAMQPFieldTable procedure always wrote
$53 ('S' = long string)
as the type indicator for all field table values, regardless of the actual value type. This meant doubles, integers,
booleans, and int64 values were all incorrectly tagged as strings in the wire format.
sgcWriteAMQPByte($53, vBytes); // Always wrote 'S' for ALL types
case vType of
'd': sgcWriteAMQPDouble(...);
'I': sgcWriteAMQPInt32(...);
...
case vType of
'd':
begin
sgcWriteAMQPByte(Byte('d'), vBytes); // Correct type per value
sgcWriteAMQPDouble(...);
end;
'I':
begin
sgcWriteAMQPByte(Byte('I'), vBytes);
sgcWriteAMQPInt32(...);
end;
'L':
begin
sgcWriteAMQPByte(Byte('L'), vBytes);
sgcWriteAMQPInt64(...);
end;
't':
begin
sgcWriteAMQPByte(Byte('t'), vBytes);
sgcWriteAMQPBoolean(...);
end;
'S':
begin
sgcWriteAMQPByte(Byte('S'), vBytes);
sgcWriteAMQPLongString(...);
end;
File: sgcAMQP_ReadWrite.pas
| Issue | Before | After | File |
|---|---|---|---|
| BasicGetEmpty.Reserved1 wrong type | UInt16 / sgcReadAMQPUInt16 |
string / sgcReadAMQPShortString |
sgcAMQP_Classes.pas |
| ChannelOpenOk.Reserved1 wrong read | sgcReadAMQPShortString |
sgcReadAMQPLongString |
sgcAMQP_Classes.pas |
| ConnectionClose/ChannelClose used class-specific method value getters | sgcGetAMQPConnectionValue(FailMethodId) |
sgcGetAMQPMethodValue(FailClassId, FailMethodId) |
sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas |
A new generic helper function sgcGetAMQPMethodValue
was added to sgcAMQP_Helpers.pas to resolve the correct
method integer ID for any AMQP class, replacing the class-specific getters that only worked for their own class.
| Bug | Description | File |
|---|---|---|
| Channel.CloseOk missing channel ID | DoWrite_ChannCloseOk did not set oFrame.Header.Channel, so the close-ok was sent on channel 0 (connection level) instead of the target channel. Added aChannelId: Word parameter. |
sgcAMQP.pas |
| Typo in error constant | Changed 'Now Allowed' to 'Not Allowed'. |
sgcAMQP_Const.pas |
| QueueUnBind missing request data | DoWrite_QueueUnBind did not store QueueUnBindQueue and QueueUnBindExchange on the channel request, causing the OnAMQPQueueUnBind event to report empty values. |
sgcAMQP_Client.pas |
| Remaining bytes discarded after DoRead | When the read loop exited with 1–7 remaining bytes (partial frame), they were silently lost. Now saved to FBytes for the next read cycle. |
sgcAMQP.pas |
| GetChannel ignores aRaiseIfNotFound | The aRaiseIfNotFound parameter was never checked. Now only raises when the flag is True. |
sgcAMQP.pas |
Six new AMQP 0-9-1 protocol features have been implemented, covering widely-used RabbitMQ extensions and additional spec methods.
Basic.Nack (class 60, method 120) is a RabbitMQ
extension that allows rejecting one or more messages at once with optional requeue. Unlike
Basic.Reject, it supports a
multiple flag to reject all messages up to and including the
specified delivery tag.
| Method | Description | Direction |
|---|---|---|
NackMessage |
Send a negative acknowledgement to the broker. | Client → Server |
OnAMQPBasicNack |
Fired when the server sends a Nack (in publisher confirm mode). | Server → Client |
procedure NackMessage(const aChannel: string;
aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);
procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
const aChannel: string;
const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
', Multiple: ' + BoolToStr(aNack.Multiple, True) +
', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;
Exchange-to-exchange bindings (class 40, methods 30/31 and 40/51) allow routing messages between exchanges without an intermediate queue. This is a RabbitMQ extension that enables powerful topology patterns such as fan-out hierarchies and topic partitioning.
| Method | Description |
|---|---|
BindExchange / BindExchangeEx |
Bind a destination exchange to a source exchange with a routing key. The Ex variant waits synchronously for the broker response. |
UnbindExchange / UnbindExchangeEx |
Remove an exchange-to-exchange binding. |
OnAMQPExchangeBind |
Fired when the broker confirms an exchange bind. |
OnAMQPExchangeUnbind |
Fired when the broker confirms an exchange unbind. |
procedure BindExchange(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
'downstream-exchange', // destination
'upstream-exchange', // source
'orders.#', // routing key
False) // wait for confirmation
then
Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
'downstream-exchange', 'upstream-exchange', 'orders.#', False);
Publisher confirms (class 85, methods 10/11) allow the broker to acknowledge receipt of published messages.
Once a channel is put into confirm mode via Confirm.Select,
the broker will send Basic.Ack or
Basic.Nack for each published message, enabling
reliable publishing without transactions.
| Method / Event | Description |
|---|---|
SelectConfirm / SelectConfirmEx |
Enable publisher confirm mode on a channel. |
OnAMQPConfirmSelectOk |
Fired when the broker confirms that confirm mode is active. |
OnAMQPBasicAck |
Fired when the broker acknowledges a published message. |
OnAMQPBasicNack |
Fired when the broker negatively acknowledges a published message. |
procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
const aChannel: string;
const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;
When the broker runs low on resources (memory, disk), it sends
Connection.Blocked (class 10, method 60) with a
reason string. When the condition clears, it sends
Connection.Unblocked (method 61). These are server-to-client
only notifications. This feature is handled in the base
TsgcAMQP class, making it available to all AMQP components.
| Event | Description |
|---|---|
OnAMQPConnectionBlocked |
Fired when the broker blocks the connection due to resource constraints. Includes a Reason string (e.g., 'low on memory'). |
OnAMQPConnectionUnblocked |
Fired when the broker lifts the connection block. |
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
const aReason: string);
begin
Log('Connection BLOCKED: ' + aReason);
// Pause publishing to avoid message loss
FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
Log('Connection unblocked - resuming');
FPublishingPaused := False;
end;
Connection.UpdateSecret (class 10, method 70) allows
refreshing the authentication credentials on an active connection without reconnecting. This is essential for
OAuth2/JWT-based authentication where tokens expire periodically.
| Method / Event | Description |
|---|---|
UpdateSecret / UpdateSecretEx |
Send a new secret (token) to the broker with an optional reason string. |
OnAMQPConnectionUpdateSecretOk |
Fired when the broker accepts the new secret. |
procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
vNewToken: string;
begin
vNewToken := GetRefreshedOAuthToken();
if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
Log('Token refreshed successfully')
else
Log('Token refresh failed - reconnecting');
end;
| File | Changes |
|---|---|
sgcAMQP_Const.pas |
Typo fix ('Not Allowed'). |
sgcAMQP_Helpers.pas |
New sgcGetAMQPMethodValue function, Confirm class helpers (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), method ID mappings for all new methods. |
sgcAMQP_ReadWrite.pas |
Field table type byte fix — each value type now writes its correct type indicator. |
sgcAMQP_Classes.pas |
New enums (amqpClassConfirm, 12 new methods), 13 new payload classes, updated dispatch tables, thread safety fix, spec compliance fixes, new request storage fields. |
sgcAMQP.pas |
8 new event types, Connection.Blocked/Unblocked handling, DoWrite_ChannCloseOk channel ID fix, remaining bytes preservation, GetChannel flag fix. |
sgcAMQP_Client.pas |
6 new read handlers, 5 new write methods, 11 new public methods, 6 new events, updated dispatch table, parameter order fix, request data fix. |
When you subscribe to the blog, we will send you an e-mail when there are new updates on the site so you wouldn't miss them.