By Admin on Wednesday, 25 February 2026
Category: All

AMQP 0.9.1 Delphi Client Update

The AMQP 0-9-1 implementation in sgcWebSockets has received a comprehensive update: 11 bug fixes covering critical parameter ordering, type mismatches, thread safety, and data loss issues, plus 6 new protocol features including Basic.Nack, Exchange-to-Exchange bindings, Publisher Confirms, Connection.Blocked/Unblocked notifications, and Connection.UpdateSecret for token refresh. This article details every change with code examples.

Table of Contents

  1. Bug Fixes
  2. Critical: DeclareExchange Parameter Order
  3. Field Table Type Byte
  4. Spec Compliance Fixes
  5. Other Bug Fixes
  6. New Features
  7. Basic.Nack — Negative Acknowledgements
  8. Exchange.Bind/Unbind — Exchange-to-Exchange Bindings
  9. Confirm Class — Publisher Confirms
  10. Connection.Blocked/Unblocked — Resource Alarms
  11. Connection.UpdateSecret — Token Refresh
  12. Files Modified

1. Bug Fixes

A total of 11 bugs have been fixed across the AMQP 0-9-1 implementation, ranging from critical parameter ordering issues to spec compliance corrections.

Critical: DeclareExchange Parameter Order

The DeclareExchange and DeclareExchangeEx methods passed aNoWait, aAutoDelete, and aInternal in the wrong order to DoWrite_ExchDeclare. This caused the auto-delete flag to be sent as no-wait and vice versa, leading to unexpected exchange behavior on the broker.

Before (Incorrect)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);

After (Fixed)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);

File: sgcAMQP_Client.pas


Field Table Type Byte

The sgcWriteAMQPFieldTable procedure always wrote $53 ('S' = long string) as the type indicator for all field table values, regardless of the actual value type. This meant doubles, integers, booleans, and int64 values were all incorrectly tagged as strings in the wire format.

Before (Incorrect)

sgcWriteAMQPByte($53, vBytes);  // Always wrote 'S' for ALL types
case vType of
  'd': sgcWriteAMQPDouble(...);
  'I': sgcWriteAMQPInt32(...);
  ...

After (Fixed)

case vType of
  'd':
  begin
    sgcWriteAMQPByte(Byte('d'), vBytes);  // Correct type per value
    sgcWriteAMQPDouble(...);
  end;
  'I':
  begin
    sgcWriteAMQPByte(Byte('I'), vBytes);
    sgcWriteAMQPInt32(...);
  end;
  'L':
  begin
    sgcWriteAMQPByte(Byte('L'), vBytes);
    sgcWriteAMQPInt64(...);
  end;
  't':
  begin
    sgcWriteAMQPByte(Byte('t'), vBytes);
    sgcWriteAMQPBoolean(...);
  end;
  'S':
  begin
    sgcWriteAMQPByte(Byte('S'), vBytes);
    sgcWriteAMQPLongString(...);
  end;

File: sgcAMQP_ReadWrite.pas


Spec Compliance Fixes

Issue Before After File
BasicGetEmpty.Reserved1 wrong type UInt16 / sgcReadAMQPUInt16 string / sgcReadAMQPShortString sgcAMQP_Classes.pas
ChannelOpenOk.Reserved1 wrong read sgcReadAMQPShortString sgcReadAMQPLongString sgcAMQP_Classes.pas
ConnectionClose/ChannelClose used class-specific method value getters sgcGetAMQPConnectionValue(FailMethodId) sgcGetAMQPMethodValue(FailClassId, FailMethodId) sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas

A new generic helper function sgcGetAMQPMethodValue was added to sgcAMQP_Helpers.pas to resolve the correct method integer ID for any AMQP class, replacing the class-specific getters that only worked for their own class.


Other Bug Fixes

Bug Description File
Channel.CloseOk missing channel ID DoWrite_ChannCloseOk did not set oFrame.Header.Channel, so the close-ok was sent on channel 0 (connection level) instead of the target channel. Added aChannelId: Word parameter. sgcAMQP.pas
Typo in error constant Changed 'Now Allowed' to 'Not Allowed'. sgcAMQP_Const.pas
QueueUnBind missing request data DoWrite_QueueUnBind did not store QueueUnBindQueue and QueueUnBindExchange on the channel request, causing the OnAMQPQueueUnBind event to report empty values. sgcAMQP_Client.pas
Remaining bytes discarded after DoRead When the read loop exited with 1–7 remaining bytes (partial frame), they were silently lost. Now saved to FBytes for the next read cycle. sgcAMQP.pas
GetChannel ignores aRaiseIfNotFound The aRaiseIfNotFound parameter was never checked. Now only raises when the flag is True. sgcAMQP.pas

2. New Features

Six new AMQP 0-9-1 protocol features have been implemented, covering widely-used RabbitMQ extensions and additional spec methods.


Basic.Nack — Negative Acknowledgements

Basic.Nack (class 60, method 120) is a RabbitMQ extension that allows rejecting one or more messages at once with optional requeue. Unlike Basic.Reject, it supports a multiple flag to reject all messages up to and including the specified delivery tag.

Method Description Direction
NackMessage Send a negative acknowledgement to the broker. Client → Server
OnAMQPBasicNack Fired when the server sends a Nack (in publisher confirm mode). Server → Client

NackMessage

procedure NackMessage(const aChannel: string;
  aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);

// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);

OnAMQPBasicNack Event

procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
  const aChannel: string;
  const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
  Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aNack.Multiple, True) +
    ', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;

Exchange.Bind/Unbind — Exchange-to-Exchange Bindings

Exchange-to-exchange bindings (class 40, methods 30/31 and 40/51) allow routing messages between exchanges without an intermediate queue. This is a RabbitMQ extension that enables powerful topology patterns such as fan-out hierarchies and topic partitioning.

Method Description
BindExchange / BindExchangeEx Bind a destination exchange to a source exchange with a routing key. The Ex variant waits synchronously for the broker response.
UnbindExchange / UnbindExchangeEx Remove an exchange-to-exchange binding.
OnAMQPExchangeBind Fired when the broker confirms an exchange bind.
OnAMQPExchangeUnbind Fired when the broker confirms an exchange unbind.

BindExchange / BindExchangeEx

procedure BindExchange(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  const aArguments: string = '');

function BindExchangeEx(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
  const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
  'downstream-exchange',  // destination
  'upstream-exchange',    // source
  'orders.#',             // routing key
  False)                  // wait for confirmation
then
  Log('Exchange binding created successfully');

// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
  'downstream-exchange', 'upstream-exchange', 'orders.#', False);

Confirm Class — Publisher Confirms

Publisher confirms (class 85, methods 10/11) allow the broker to acknowledge receipt of published messages. Once a channel is put into confirm mode via Confirm.Select, the broker will send Basic.Ack or Basic.Nack for each published message, enabling reliable publishing without transactions.

Method / Event Description
SelectConfirm / SelectConfirmEx Enable publisher confirm mode on a channel.
OnAMQPConfirmSelectOk Fired when the broker confirms that confirm mode is active.
OnAMQPBasicAck Fired when the broker acknowledges a published message.
OnAMQPBasicNack Fired when the broker negatively acknowledges a published message.

SelectConfirm / SelectConfirmEx

procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);

function SelectConfirmEx(const aChannel: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;

Example: Reliable Publishing with Confirms

// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
  Log('Confirm mode enabled');

// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
  'routing.key', 'Hello World');

// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
  const aChannel: string;
  const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
  Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;

Connection.Blocked/Unblocked — Resource Alarms

When the broker runs low on resources (memory, disk), it sends Connection.Blocked (class 10, method 60) with a reason string. When the condition clears, it sends Connection.Unblocked (method 61). These are server-to-client only notifications. This feature is handled in the base TsgcAMQP class, making it available to all AMQP components.

Event Description
OnAMQPConnectionBlocked Fired when the broker blocks the connection due to resource constraints. Includes a Reason string (e.g., 'low on memory').
OnAMQPConnectionUnblocked Fired when the broker lifts the connection block.
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
  const aReason: string);
begin
  Log('Connection BLOCKED: ' + aReason);
  // Pause publishing to avoid message loss
  FPublishingPaused := True;
end;

procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
  Log('Connection unblocked - resuming');
  FPublishingPaused := False;
end;

Connection.UpdateSecret — Token Refresh

Connection.UpdateSecret (class 10, method 70) allows refreshing the authentication credentials on an active connection without reconnecting. This is essential for OAuth2/JWT-based authentication where tokens expire periodically.

Method / Event Description
UpdateSecret / UpdateSecretEx Send a new secret (token) to the broker with an optional reason string.
OnAMQPConnectionUpdateSecretOk Fired when the broker accepts the new secret.

UpdateSecret / UpdateSecretEx

procedure UpdateSecret(const aNewSecret, aReason: string);

function UpdateSecretEx(const aNewSecret, aReason: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
  vNewToken: string;
begin
  vNewToken := GetRefreshedOAuthToken();

  if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
    Log('Token refreshed successfully')
  else
    Log('Token refresh failed - reconnecting');
end;

3. Files Modified

File Changes
sgcAMQP_Const.pas Typo fix ('Not Allowed').
sgcAMQP_Helpers.pas New sgcGetAMQPMethodValue function, Confirm class helpers (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), method ID mappings for all new methods.
sgcAMQP_ReadWrite.pas Field table type byte fix — each value type now writes its correct type indicator.
sgcAMQP_Classes.pas New enums (amqpClassConfirm, 12 new methods), 13 new payload classes, updated dispatch tables, thread safety fix, spec compliance fixes, new request storage fields.
sgcAMQP.pas 8 new event types, Connection.Blocked/Unblocked handling, DoWrite_ChannCloseOk channel ID fix, remaining bytes preservation, GetChannel flag fix.
sgcAMQP_Client.pas 6 new read handlers, 5 new write methods, 11 new public methods, 6 new events, updated dispatch table, parameter order fix, request data fix.

Related Posts