AMQP 0.9.1 Delphi-client update

· Functies

De AMQP 0-9-1-implementatie in sgcWebSockets heeft een uitgebreide update gekregen: 11 bugfixes voor kritieke parametervolgorde, typeverschillen, thread-veiligheid en gegevensverlies, plus 6 nieuwe protocolfuncties waaronder Basic.Nack, Exchange-to-Exchange-bindings, Publisher Confirms, Connection.Blocked/Unblocked- meldingen en Connection.UpdateSecret voor het vernieuwen van tokens. Dit artikel beschrijft elke wijziging met codevoorbeelden.

Inhoudsopgave

  1. Bugfixes
  2. Kritiek: DeclareExchange-parametervolgorde
  3. Field Table-typebyte
  4. Spec-conformiteitsfixes
  5. Overige bugfixes
  6. Nieuwe functies
  7. Basic.Nack — negatieve bevestigingen
  8. Exchange.Bind/Unbind — Exchange-to-Exchange-bindings
  9. Confirm-klasse — Publisher Confirms
  10. Connection.Blocked/Unblocked — resource-alarmen
  11. Connection.UpdateSecret — tokenvernieuwing
  12. Gewijzigde bestanden

1. Bugfixes

In totaal zijn er 11 bugs opgelost in de AMQP 0-9-1-implementatie, variërend van kritieke problemen met parametervolgorde tot spec-conformiteitscorrecties.

Kritiek: DeclareExchange-parametervolgorde

De methoden DeclareExchange en DeclareExchangeEx gaven aNoWait, aAutoDelete en aInternal in de verkeerde volgorde door aan DoWrite_ExchDeclare. Hierdoor werd de auto-delete-vlag verzonden als no-wait en omgekeerd, wat leidde tot onverwacht exchange- gedrag op de broker.

Voor (onjuist)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);

Na (opgelost)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);

Bestand: sgcAMQP_Client.pas


Field Table-typebyte

De procedure sgcWriteAMQPFieldTable schreef altijd $53 ('S' = long string) als type-indicator voor alle field table-waarden, ongeacht het werkelijke waardetype. Dat betekende dat doubles, integers, booleans en int64-waarden allemaal ten onrechte als strings werden gemarkeerd in het wire-formaat.

Voor (onjuist)

sgcWriteAMQPByte($53, vBytes);  // Always wrote 'S' for ALL types
case vType of
  'd': sgcWriteAMQPDouble(...);
  'I': sgcWriteAMQPInt32(...);
  ...

Na (opgelost)

case vType of
  'd':
  begin
    sgcWriteAMQPByte(Byte('d'), vBytes);  // Correct type per value
    sgcWriteAMQPDouble(...);
  end;
  'I':
  begin
    sgcWriteAMQPByte(Byte('I'), vBytes);
    sgcWriteAMQPInt32(...);
  end;
  'L':
  begin
    sgcWriteAMQPByte(Byte('L'), vBytes);
    sgcWriteAMQPInt64(...);
  end;
  't':
  begin
    sgcWriteAMQPByte(Byte('t'), vBytes);
    sgcWriteAMQPBoolean(...);
  end;
  'S':
  begin
    sgcWriteAMQPByte(Byte('S'), vBytes);
    sgcWriteAMQPLongString(...);
  end;

Bestand: sgcAMQP_ReadWrite.pas


Spec-conformiteitsfixes

Probleem Voor Na Bestand
BasicGetEmpty.Reserved1 verkeerd type UInt16 / sgcReadAMQPUInt16 string / sgcReadAMQPShortString sgcAMQP_Classes.pas
ChannelOpenOk.Reserved1 verkeerd gelezen sgcReadAMQPShortString sgcReadAMQPLongString sgcAMQP_Classes.pas
ConnectionClose/ChannelClose gebruikten klassespecifieke method-value-getters sgcGetAMQPConnectionValue(FailMethodId) sgcGetAMQPMethodValue(FailClassId, FailMethodId) sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas

Een nieuwe generieke hulpfunctie sgcGetAMQPMethodValue is toegevoegd aan sgcAMQP_Helpers.pas om de juiste integer-ID van een method op te lossen voor elke AMQP-klasse, ter vervanging van de klassespecifieke getters die alleen voor hun eigen klasse werkten.


Overige bugfixes

Bug Beschrijving Bestand
Channel.CloseOk miste kanaal-ID DoWrite_ChannCloseOk stelde oFrame.Header.Channel niet in, waardoor de close-ok werd verstuurd op kanaal 0 (verbindingsniveau) in plaats van het doelkanaal. Parameter aChannelId: Word toegevoegd. sgcAMQP.pas
Typefout in foutconstante 'Now Allowed' aangepast naar 'Not Allowed'. sgcAMQP_Const.pas
QueueUnBind miste request-data DoWrite_QueueUnBind sloeg QueueUnBindQueue en QueueUnBindExchange niet op in de kanaalrequest, waardoor de gebeurtenis OnAMQPQueueUnBind lege waarden rapporteerde. sgcAMQP_Client.pas
Resterende bytes verloren na DoRead Wanneer de leeslus stopte met 1–7 resterende bytes (gedeeltelijk frame), gingen die stilletjes verloren. Worden nu opgeslagen in FBytes voor de volgende leescyclus. sgcAMQP.pas
GetChannel negeert aRaiseIfNotFound De parameter aRaiseIfNotFound werd nooit gecontroleerd. Nu wordt er alleen een fout opgeworpen als de vlag True is. sgcAMQP.pas

2. Nieuwe functies

Er zijn zes nieuwe AMQP 0-9-1-protocolfuncties geïmplementeerd, die veelgebruikte RabbitMQ-uitbreidingen en aanvullende spec-methoden omvatten.


Basic.Nack — negatieve bevestigingen

Basic.Nack (class 60, method 120) is een RabbitMQ- uitbreiding waarmee één of meer berichten tegelijk geweigerd kunnen worden, met optionele requeue. In tegenstelling tot Basic.Reject ondersteunt het een multiple-vlag om alle berichten tot en met de opgegeven delivery tag te weigeren.

Methode Beschrijving Richting
NackMessage Een negatieve bevestiging naar de broker versturen. Client → Server
OnAMQPBasicNack Wordt aangeroepen wanneer de server een Nack verstuurt (in publisher confirm-modus). Server → Client

NackMessage

procedure NackMessage(const aChannel: string;
  aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);

OnAMQPBasicNack-gebeurtenis

procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
  const aChannel: string;
  const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
  Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aNack.Multiple, True) +
    ', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;

Exchange.Bind/Unbind — Exchange-to-Exchange-bindings

Exchange-to-exchange-bindings (class 40, methods 30/31 en 40/51) maken het routeren van berichten tussen exchanges mogelijk zonder tussenliggende queue. Dit is een RabbitMQ-uitbreiding die krachtige topologiepatronen mogelijk maakt zoals fan-out-hiërarchieën en topic-partitionering.

Methode Beschrijving
BindExchange / BindExchangeEx Een bestemmings-exchange koppelen aan een bron-exchange met een routing key. De Ex-variant wacht synchroon op het antwoord van de broker.
UnbindExchange / UnbindExchangeEx Een exchange-to-exchange-binding verwijderen.
OnAMQPExchangeBind Wordt aangeroepen wanneer de broker een exchange-binding bevestigt.
OnAMQPExchangeUnbind Wordt aangeroepen wanneer de broker het ontbinden van een exchange bevestigt.

BindExchange / BindExchangeEx

procedure BindExchange(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
  const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
  'downstream-exchange',  // destination
  'upstream-exchange',    // source
  'orders.#',             // routing key
  False)                  // wait for confirmation
then
  Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
  'downstream-exchange', 'upstream-exchange', 'orders.#', False);

Confirm-klasse — Publisher Confirms

Publisher confirms (class 85, methods 10/11) stellen de broker in staat om de ontvangst van gepubliceerde berichten te bevestigen. Zodra een kanaal in confirm-modus is gezet via Confirm.Select, stuurt de broker een Basic.Ack of Basic.Nack voor elk gepubliceerd bericht, wat betrouwbaar publiceren zonder transacties mogelijk maakt.

Methode / gebeurtenis Beschrijving
SelectConfirm / SelectConfirmEx Publisher confirm-modus inschakelen op een kanaal.
OnAMQPConfirmSelectOk Wordt aangeroepen wanneer de broker bevestigt dat confirm-modus actief is.
OnAMQPBasicAck Wordt aangeroepen wanneer de broker een gepubliceerd bericht bevestigt.
OnAMQPBasicNack Wordt aangeroepen wanneer de broker een gepubliceerd bericht negatief bevestigt.

SelectConfirm / SelectConfirmEx

procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;

Voorbeeld: betrouwbaar publiceren met confirms

// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
  Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
  'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
  const aChannel: string;
  const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
  Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;

Connection.Blocked/Unblocked — resource-alarmen

Wanneer de broker te weinig resources heeft (geheugen, schijf), stuurt deze Connection.Blocked (class 10, method 60) met een reden-string. Wanneer de situatie is opgelost, stuurt deze Connection.Unblocked (method 61). Dit zijn alleen server-naar-client- meldingen. Deze functie wordt afgehandeld in de basisklasse TsgcAMQP, zodat deze beschikbaar is voor alle AMQP-componenten.

Gebeurtenis Beschrijving
OnAMQPConnectionBlocked Wordt aangeroepen wanneer de broker de verbinding blokkeert vanwege resource-beperkingen. Bevat een Reason-string (bijv. 'low on memory').
OnAMQPConnectionUnblocked Wordt aangeroepen wanneer de broker de verbindingsblokkade opheft.
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
  const aReason: string);
begin
  Log('Connection BLOCKED: ' + aReason);
  // Pause publishing to avoid message loss
  FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
  Log('Connection unblocked - resuming');
  FPublishingPaused := False;
end;

Connection.UpdateSecret — tokenvernieuwing

Connection.UpdateSecret (class 10, method 70) maakt het mogelijk de authenticatiegegevens te vernieuwen op een actieve verbinding zonder opnieuw verbinding te maken. Dit is essentieel voor OAuth2/JWT-gebaseerde authenticatie waarbij tokens periodiek verlopen.

Methode / gebeurtenis Beschrijving
UpdateSecret / UpdateSecretEx Een nieuw secret (token) naar de broker versturen met een optionele reden-string.
OnAMQPConnectionUpdateSecretOk Wordt aangeroepen wanneer de broker het nieuwe secret accepteert.

UpdateSecret / UpdateSecretEx

procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
  vNewToken: string;
begin
  vNewToken := GetRefreshedOAuthToken();
  if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
    Log('Token refreshed successfully')
  else
    Log('Token refresh failed - reconnecting');
end;

3. Gewijzigde bestanden

Bestand Wijzigingen
sgcAMQP_Const.pas Typefout opgelost ('Not Allowed').
sgcAMQP_Helpers.pas Nieuwe functie sgcGetAMQPMethodValue, Confirm-klasse-helpers (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), method-ID-toewijzingen voor alle nieuwe methoden.
sgcAMQP_ReadWrite.pas Field table-typebyte fix — elk waardetype schrijft nu zijn juiste type-indicator.
sgcAMQP_Classes.pas Nieuwe enums (amqpClassConfirm, 12 nieuwe methoden), 13 nieuwe payload-klassen, bijgewerkte dispatch-tabellen, thread-veiligheidsfix, spec-conformiteitsfixes, nieuwe request-opslagvelden.
sgcAMQP.pas 8 nieuwe gebeurtenistypes, afhandeling van Connection.Blocked/Unblocked, kanaal-ID-fix voor DoWrite_ChannCloseOk, behoud van resterende bytes, vlagfix voor GetChannel.
sgcAMQP_Client.pas 6 nieuwe read-handlers, 5 nieuwe write-methoden, 11 nieuwe publieke methoden, 6 nieuwe gebeurtenissen, bijgewerkte dispatch-tabel, parametervolgorde-fix, request-datafix.