AMQP 0.9.1 Delphi Client Update

· Funktionen

Die AMQP-0-9-1-Implementierung in sgcWebSockets hat ein umfassendes Update erhalten: 11 Bugfixes zu kritischer Parameterreihenfolge, Typkonflikten, Thread-Sicherheit und Datenverlust-Problemen sowie 6 neue Protokollfunktionen wie Basic.Nack, Exchange-to-Exchange-Bindings, Publisher Confirms, Connection.Blocked/Unblocked- Benachrichtigungen und Connection.UpdateSecret für die Token-Erneuerung. Dieser Artikel beschreibt jede Änderung mit Codebeispielen.

Inhaltsverzeichnis

  1. Bugfixes
  2. Kritisch: Reihenfolge der DeclareExchange-Parameter
  3. Type-Byte der Field Table
  4. Korrekturen zur Spec-Compliance
  5. Weitere Bugfixes
  6. Neue Funktionen
  7. Basic.Nack — Negative Bestätigungen
  8. Exchange.Bind/Unbind — Exchange-zu-Exchange-Bindings
  9. Confirm-Klasse — Publisher Confirms
  10. Connection.Blocked/Unblocked — Ressourcen-Alarme
  11. Connection.UpdateSecret — Token-Erneuerung
  12. Geänderte Dateien

1. Bugfixes

Insgesamt wurden 11 Bugs in der AMQP-0-9-1-Implementierung behoben, von kritischen Problemen bei der Parameterreihenfolge bis hin zu Korrekturen der Spec-Compliance.

Kritisch: Reihenfolge der DeclareExchange-Parameter

Die Methoden DeclareExchange und DeclareExchangeEx übergaben aNoWait, aAutoDelete und aInternal in falscher Reihenfolge an DoWrite_ExchDeclare. Dadurch wurde das auto-delete-Flag als no-wait gesendet und umgekehrt, was zu unerwartetem Exchange- Verhalten am Broker führte.

Vorher (Fehlerhaft)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);

Nachher (Korrigiert)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);

Datei: sgcAMQP_Client.pas


Type-Byte der Field Table

Die Prozedur sgcWriteAMQPFieldTable schrieb stets $53 ('S' = long string) als Typindikator für alle Field-Table-Werte, unabhängig vom tatsächlichen Werttyp. Dadurch wurden Doubles, Integer, Booleans und Int64-Werte im Wire-Format fälschlicherweise alle als Strings markiert.

Vorher (Fehlerhaft)

sgcWriteAMQPByte($53, vBytes);  // Always wrote 'S' for ALL types
case vType of
  'd': sgcWriteAMQPDouble(...);
  'I': sgcWriteAMQPInt32(...);
  ...

Nachher (Korrigiert)

case vType of
  'd':
  begin
    sgcWriteAMQPByte(Byte('d'), vBytes);  // Correct type per value
    sgcWriteAMQPDouble(...);
  end;
  'I':
  begin
    sgcWriteAMQPByte(Byte('I'), vBytes);
    sgcWriteAMQPInt32(...);
  end;
  'L':
  begin
    sgcWriteAMQPByte(Byte('L'), vBytes);
    sgcWriteAMQPInt64(...);
  end;
  't':
  begin
    sgcWriteAMQPByte(Byte('t'), vBytes);
    sgcWriteAMQPBoolean(...);
  end;
  'S':
  begin
    sgcWriteAMQPByte(Byte('S'), vBytes);
    sgcWriteAMQPLongString(...);
  end;

Datei: sgcAMQP_ReadWrite.pas


Korrekturen zur Spec-Compliance

Problem Vorher Nachher Datei
BasicGetEmpty.Reserved1 falscher Typ UInt16 / sgcReadAMQPUInt16 string / sgcReadAMQPShortString sgcAMQP_Classes.pas
ChannelOpenOk.Reserved1 falscher Lesevorgang sgcReadAMQPShortString sgcReadAMQPLongString sgcAMQP_Classes.pas
ConnectionClose/ChannelClose verwendeten klassen-spezifische Method-Value-Getter sgcGetAMQPConnectionValue(FailMethodId) sgcGetAMQPMethodValue(FailClassId, FailMethodId) sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas

Eine neue generische Helper-Funktion sgcGetAMQPMethodValue wurde zu sgcAMQP_Helpers.pas hinzugefügt, um die korrekte Method-Integer-ID für jede AMQP-Klasse aufzulösen und die klassen-spezifischen Getter zu ersetzen, die nur für ihre eigene Klasse funktionierten.


Weitere Bugfixes

Bug Beschreibung Datei
Channel.CloseOk fehlende Channel-ID DoWrite_ChannCloseOk setzte oFrame.Header.Channel nicht, sodass das close-ok auf Channel 0 (Verbindungsebene) statt am Ziel-Channel gesendet wurde. Parameter aChannelId: Word hinzugefügt. sgcAMQP.pas
Tippfehler in Fehlerkonstante 'Now Allowed' in 'Not Allowed' geändert. sgcAMQP_Const.pas
QueueUnBind fehlende Request-Daten DoWrite_QueueUnBind speicherte QueueUnBindQueue und QueueUnBindExchange nicht in der Channel-Request, sodass das Ereignis OnAMQPQueueUnBind leere Werte meldete. sgcAMQP_Client.pas
Verbleibende Bytes nach DoRead verworfen Wenn die Read-Schleife mit 1–7 verbleibenden Bytes (partial frame) endete, gingen diese still verloren. Sie werden jetzt in FBytes für den nächsten Lesezyklus gespeichert. sgcAMQP.pas
GetChannel ignoriert aRaiseIfNotFound Der Parameter aRaiseIfNotFound wurde nie geprüft. Eine Exception wird jetzt nur ausgelöst, wenn das Flag True ist. sgcAMQP.pas

2. Neue Funktionen

Sechs neue AMQP-0-9-1-Protokollfunktionen wurden implementiert, die weit verbreitete RabbitMQ-Erweiterungen und zusätzliche Spec-Methoden abdecken.


Basic.Nack — Negative Bestätigungen

Basic.Nack (Klasse 60, Methode 120) ist eine RabbitMQ- Erweiterung, die das Ablehnen einer oder mehrerer Nachrichten auf einmal mit optionalem Requeue ermöglicht. Im Gegensatz zu Basic.Reject unterstützt sie ein multiple-Flag, um alle Nachrichten bis einschließlich des angegebenen Delivery-Tags abzulehnen.

Methode Beschreibung Richtung
NackMessage Sendet eine negative Bestätigung an den Broker. Client → Server
OnAMQPBasicNack Wird ausgelöst, wenn der Server ein Nack sendet (im Publisher-Confirm-Modus). Server → Client

NackMessage

procedure NackMessage(const aChannel: string;
  aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);

OnAMQPBasicNack-Ereignis

procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
  const aChannel: string;
  const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
  Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aNack.Multiple, True) +
    ', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;

Exchange.Bind/Unbind — Exchange-zu-Exchange-Bindings

Exchange-zu-Exchange-Bindings (Klasse 40, Methoden 30/31 und 40/51) ermöglichen das Routing von Nachrichten zwischen Exchanges ohne eine zwischengeschaltete Queue. Das ist eine RabbitMQ-Erweiterung, die mächtige Topologie-Muster wie Fan-out-Hierarchien und Topic-Partitionierung ermöglicht.

Methode Beschreibung
BindExchange / BindExchangeEx Bindet einen Ziel-Exchange an einen Quell-Exchange mit einem Routing-Schlüssel. Die Ex-Variante wartet synchron auf die Broker-Antwort.
UnbindExchange / UnbindExchangeEx Entfernt ein Exchange-zu-Exchange-Binding.
OnAMQPExchangeBind Wird ausgelöst, wenn der Broker einen Exchange-Bind bestätigt.
OnAMQPExchangeUnbind Wird ausgelöst, wenn der Broker einen Exchange-Unbind bestätigt.

BindExchange / BindExchangeEx

procedure BindExchange(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
  const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
  'downstream-exchange',  // destination
  'upstream-exchange',    // source
  'orders.#',             // routing key
  False)                  // wait for confirmation
then
  Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
  'downstream-exchange', 'upstream-exchange', 'orders.#', False);

Confirm-Klasse — Publisher Confirms

Publisher Confirms (Klasse 85, Methoden 10/11) ermöglichen dem Broker, den Empfang veröffentlichter Nachrichten zu bestätigen. Sobald ein Channel über Confirm.Select in den Confirm-Modus versetzt wird, sendet der Broker Basic.Ack oder Basic.Nack für jede veröffentlichte Nachricht, was zuverlässiges Publishing ohne Transaktionen ermöglicht.

Methode / Ereignis Beschreibung
SelectConfirm / SelectConfirmEx Aktiviert den Publisher-Confirm-Modus auf einem Channel.
OnAMQPConfirmSelectOk Wird ausgelöst, wenn der Broker bestätigt, dass der Confirm-Modus aktiv ist.
OnAMQPBasicAck Wird ausgelöst, wenn der Broker eine veröffentlichte Nachricht bestätigt.
OnAMQPBasicNack Wird ausgelöst, wenn der Broker eine veröffentlichte Nachricht negativ bestätigt.

SelectConfirm / SelectConfirmEx

procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;

Beispiel: Zuverlässiges Publishing mit Confirms

// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
  Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
  'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
  const aChannel: string;
  const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
  Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;

Connection.Blocked/Unblocked — Ressourcen-Alarme

Wenn dem Broker die Ressourcen ausgehen (Speicher, Disk), sendet er Connection.Blocked (Klasse 10, Methode 60) mit einem Grund-String. Wenn die Situation behoben ist, sendet er Connection.Unblocked (Methode 61). Das sind reine Server-zu-Client- Benachrichtigungen. Diese Funktion wird in der Basis- Klasse TsgcAMQP behandelt, sodass sie allen AMQP-Komponenten zur Verfügung steht.

Ereignis Beschreibung
OnAMQPConnectionBlocked Wird ausgelöst, wenn der Broker die Verbindung wegen Ressourcenbeschränkungen blockiert. Enthält einen Reason-String (z.B. 'low on memory').
OnAMQPConnectionUnblocked Wird ausgelöst, wenn der Broker die Verbindungsblockade aufhebt.
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
  const aReason: string);
begin
  Log('Connection BLOCKED: ' + aReason);
  // Pause publishing to avoid message loss
  FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
  Log('Connection unblocked - resuming');
  FPublishingPaused := False;
end;

Connection.UpdateSecret — Token-Erneuerung

Connection.UpdateSecret (Klasse 10, Methode 70) ermöglicht das Erneuern der Authentifizierungsdaten auf einer aktiven Verbindung, ohne sich neu zu verbinden. Das ist wichtig für OAuth2/JWT-basierte Authentifizierung, bei der Tokens periodisch ablaufen.

Methode / Ereignis Beschreibung
UpdateSecret / UpdateSecretEx Sendet ein neues Secret (Token) an den Broker mit einem optionalen Grund-String.
OnAMQPConnectionUpdateSecretOk Wird ausgelöst, wenn der Broker das neue Secret akzeptiert.

UpdateSecret / UpdateSecretEx

procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
  vNewToken: string;
begin
  vNewToken := GetRefreshedOAuthToken();
  if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
    Log('Token refreshed successfully')
  else
    Log('Token refresh failed - reconnecting');
end;

3. Geänderte Dateien

Datei Änderungen
sgcAMQP_Const.pas Tippfehler-Korrektur ('Not Allowed').
sgcAMQP_Helpers.pas Neue Funktion sgcGetAMQPMethodValue, Confirm-Klassen-Helper (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), Method-ID-Mappings für alle neuen Methoden.
sgcAMQP_ReadWrite.pas Field-Table-Type-Byte-Fix — jeder Werttyp schreibt jetzt seinen korrekten Typindikator.
sgcAMQP_Classes.pas Neue Enums (amqpClassConfirm, 12 neue Methoden), 13 neue Payload-Klassen, aktualisierte Dispatch-Tabellen, Thread-Sicherheits-Fix, Spec-Compliance-Fixes, neue Request-Speicherfelder.
sgcAMQP.pas 8 neue Ereignistypen, Connection.Blocked/Unblocked-Handling, DoWrite_ChannCloseOk-Channel-ID-Fix, Erhalt verbleibender Bytes, GetChannel-Flag-Fix.
sgcAMQP_Client.pas 6 neue Read-Handler, 5 neue Write-Methoden, 11 neue öffentliche Methoden, 6 neue Ereignisse, aktualisierte Dispatch-Tabelle, Parameter-Reihenfolge-Fix, Request-Daten-Fix.