De AMQP 0-9-1-implementatie in sgcWebSockets heeft een uitgebreide update gekregen: 11 bugfixes voor kritieke parametervolgorde, typeverschillen, thread-veiligheid en gegevensverlies, plus 6 nieuwe protocolfuncties waaronder Basic.Nack, Exchange-to-Exchange-bindings, Publisher Confirms, Connection.Blocked/Unblocked- meldingen en Connection.UpdateSecret voor het vernieuwen van tokens. Dit artikel beschrijft elke wijziging met codevoorbeelden.
Inhoudsopgave
- Bugfixes
- Kritiek: DeclareExchange-parametervolgorde
- Field Table-typebyte
- Spec-conformiteitsfixes
- Overige bugfixes
- Nieuwe functies
- Basic.Nack — negatieve bevestigingen
- Exchange.Bind/Unbind — Exchange-to-Exchange-bindings
- Confirm-klasse — Publisher Confirms
- Connection.Blocked/Unblocked — resource-alarmen
- Connection.UpdateSecret — tokenvernieuwing
- Gewijzigde bestanden
1. Bugfixes
In totaal zijn er 11 bugs opgelost in de AMQP 0-9-1-implementatie, variërend van kritieke problemen met parametervolgorde tot spec-conformiteitscorrecties.
Kritiek: DeclareExchange-parametervolgorde
De methoden DeclareExchange en
DeclareExchangeEx gaven
aNoWait,
aAutoDelete en
aInternal in de verkeerde volgorde door aan
DoWrite_ExchDeclare. Hierdoor werd de
auto-delete-vlag verzonden als
no-wait en omgekeerd, wat leidde tot onverwacht exchange-
gedrag op de broker.
Voor (onjuist)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);
Na (opgelost)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);
Bestand: sgcAMQP_Client.pas
Field Table-typebyte
De procedure sgcWriteAMQPFieldTable schreef altijd
$53 ('S' = long string)
als type-indicator voor alle field table-waarden, ongeacht het werkelijke waardetype. Dat betekende dat doubles, integers,
booleans en int64-waarden allemaal ten onrechte als strings werden gemarkeerd in het wire-formaat.
Voor (onjuist)
sgcWriteAMQPByte($53, vBytes); // Always wrote 'S' for ALL types
case vType of
'd': sgcWriteAMQPDouble(...);
'I': sgcWriteAMQPInt32(...);
...
Na (opgelost)
case vType of
'd':
begin
sgcWriteAMQPByte(Byte('d'), vBytes); // Correct type per value
sgcWriteAMQPDouble(...);
end;
'I':
begin
sgcWriteAMQPByte(Byte('I'), vBytes);
sgcWriteAMQPInt32(...);
end;
'L':
begin
sgcWriteAMQPByte(Byte('L'), vBytes);
sgcWriteAMQPInt64(...);
end;
't':
begin
sgcWriteAMQPByte(Byte('t'), vBytes);
sgcWriteAMQPBoolean(...);
end;
'S':
begin
sgcWriteAMQPByte(Byte('S'), vBytes);
sgcWriteAMQPLongString(...);
end;
Bestand: sgcAMQP_ReadWrite.pas
Spec-conformiteitsfixes
| Probleem | Voor | Na | Bestand |
|---|---|---|---|
| BasicGetEmpty.Reserved1 verkeerd type | UInt16 / sgcReadAMQPUInt16 |
string / sgcReadAMQPShortString |
sgcAMQP_Classes.pas |
| ChannelOpenOk.Reserved1 verkeerd gelezen | sgcReadAMQPShortString |
sgcReadAMQPLongString |
sgcAMQP_Classes.pas |
| ConnectionClose/ChannelClose gebruikten klassespecifieke method-value-getters | sgcGetAMQPConnectionValue(FailMethodId) |
sgcGetAMQPMethodValue(FailClassId, FailMethodId) |
sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas |
Een nieuwe generieke hulpfunctie sgcGetAMQPMethodValue
is toegevoegd aan sgcAMQP_Helpers.pas om de juiste
integer-ID van een method op te lossen voor elke AMQP-klasse, ter vervanging van de klassespecifieke getters die alleen voor hun eigen klasse werkten.
Overige bugfixes
| Bug | Beschrijving | Bestand |
|---|---|---|
| Channel.CloseOk miste kanaal-ID | DoWrite_ChannCloseOk stelde oFrame.Header.Channel niet in, waardoor de close-ok werd verstuurd op kanaal 0 (verbindingsniveau) in plaats van het doelkanaal. Parameter aChannelId: Word toegevoegd. |
sgcAMQP.pas |
| Typefout in foutconstante | 'Now Allowed' aangepast naar 'Not Allowed'. |
sgcAMQP_Const.pas |
| QueueUnBind miste request-data | DoWrite_QueueUnBind sloeg QueueUnBindQueue en QueueUnBindExchange niet op in de kanaalrequest, waardoor de gebeurtenis OnAMQPQueueUnBind lege waarden rapporteerde. |
sgcAMQP_Client.pas |
| Resterende bytes verloren na DoRead | Wanneer de leeslus stopte met 1–7 resterende bytes (gedeeltelijk frame), gingen die stilletjes verloren. Worden nu opgeslagen in FBytes voor de volgende leescyclus. |
sgcAMQP.pas |
| GetChannel negeert aRaiseIfNotFound | De parameter aRaiseIfNotFound werd nooit gecontroleerd. Nu wordt er alleen een fout opgeworpen als de vlag True is. |
sgcAMQP.pas |
2. Nieuwe functies
Er zijn zes nieuwe AMQP 0-9-1-protocolfuncties geïmplementeerd, die veelgebruikte RabbitMQ-uitbreidingen en aanvullende spec-methoden omvatten.
Basic.Nack — negatieve bevestigingen
Basic.Nack (class 60, method 120) is een RabbitMQ-
uitbreiding waarmee één of meer berichten tegelijk geweigerd kunnen worden, met optionele requeue. In tegenstelling tot
Basic.Reject ondersteunt het een
multiple-vlag om alle berichten tot en met de
opgegeven delivery tag te weigeren.
| Methode | Beschrijving | Richting |
|---|---|---|
NackMessage |
Een negatieve bevestiging naar de broker versturen. | Client → Server |
OnAMQPBasicNack |
Wordt aangeroepen wanneer de server een Nack verstuurt (in publisher confirm-modus). | Server → Client |
NackMessage
procedure NackMessage(const aChannel: string;
aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);
OnAMQPBasicNack-gebeurtenis
procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
const aChannel: string;
const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
', Multiple: ' + BoolToStr(aNack.Multiple, True) +
', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;
Exchange.Bind/Unbind — Exchange-to-Exchange-bindings
Exchange-to-exchange-bindings (class 40, methods 30/31 en 40/51) maken het routeren van berichten tussen exchanges mogelijk zonder tussenliggende queue. Dit is een RabbitMQ-uitbreiding die krachtige topologiepatronen mogelijk maakt zoals fan-out-hiërarchieën en topic-partitionering.
| Methode | Beschrijving |
|---|---|
BindExchange / BindExchangeEx |
Een bestemmings-exchange koppelen aan een bron-exchange met een routing key. De Ex-variant wacht synchroon op het antwoord van de broker. |
UnbindExchange / UnbindExchangeEx |
Een exchange-to-exchange-binding verwijderen. |
OnAMQPExchangeBind |
Wordt aangeroepen wanneer de broker een exchange-binding bevestigt. |
OnAMQPExchangeUnbind |
Wordt aangeroepen wanneer de broker het ontbinden van een exchange bevestigt. |
BindExchange / BindExchangeEx
procedure BindExchange(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
'downstream-exchange', // destination
'upstream-exchange', // source
'orders.#', // routing key
False) // wait for confirmation
then
Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
'downstream-exchange', 'upstream-exchange', 'orders.#', False);
Confirm-klasse — Publisher Confirms
Publisher confirms (class 85, methods 10/11) stellen de broker in staat om de ontvangst van gepubliceerde berichten te bevestigen.
Zodra een kanaal in confirm-modus is gezet via Confirm.Select,
stuurt de broker een Basic.Ack of
Basic.Nack voor elk gepubliceerd bericht, wat
betrouwbaar publiceren zonder transacties mogelijk maakt.
| Methode / gebeurtenis | Beschrijving |
|---|---|
SelectConfirm / SelectConfirmEx |
Publisher confirm-modus inschakelen op een kanaal. |
OnAMQPConfirmSelectOk |
Wordt aangeroepen wanneer de broker bevestigt dat confirm-modus actief is. |
OnAMQPBasicAck |
Wordt aangeroepen wanneer de broker een gepubliceerd bericht bevestigt. |
OnAMQPBasicNack |
Wordt aangeroepen wanneer de broker een gepubliceerd bericht negatief bevestigt. |
SelectConfirm / SelectConfirmEx
procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
Voorbeeld: betrouwbaar publiceren met confirms
// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
const aChannel: string;
const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;
Connection.Blocked/Unblocked — resource-alarmen
Wanneer de broker te weinig resources heeft (geheugen, schijf), stuurt deze
Connection.Blocked (class 10, method 60) met een
reden-string. Wanneer de situatie is opgelost, stuurt deze
Connection.Unblocked (method 61). Dit zijn alleen server-naar-client-
meldingen. Deze functie wordt afgehandeld in de basisklasse
TsgcAMQP, zodat deze beschikbaar is voor alle AMQP-componenten.
| Gebeurtenis | Beschrijving |
|---|---|
OnAMQPConnectionBlocked |
Wordt aangeroepen wanneer de broker de verbinding blokkeert vanwege resource-beperkingen. Bevat een Reason-string (bijv. 'low on memory'). |
OnAMQPConnectionUnblocked |
Wordt aangeroepen wanneer de broker de verbindingsblokkade opheft. |
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
const aReason: string);
begin
Log('Connection BLOCKED: ' + aReason);
// Pause publishing to avoid message loss
FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
Log('Connection unblocked - resuming');
FPublishingPaused := False;
end;
Connection.UpdateSecret — tokenvernieuwing
Connection.UpdateSecret (class 10, method 70) maakt het mogelijk
de authenticatiegegevens te vernieuwen op een actieve verbinding zonder opnieuw verbinding te maken. Dit is essentieel voor
OAuth2/JWT-gebaseerde authenticatie waarbij tokens periodiek verlopen.
| Methode / gebeurtenis | Beschrijving |
|---|---|
UpdateSecret / UpdateSecretEx |
Een nieuw secret (token) naar de broker versturen met een optionele reden-string. |
OnAMQPConnectionUpdateSecretOk |
Wordt aangeroepen wanneer de broker het nieuwe secret accepteert. |
UpdateSecret / UpdateSecretEx
procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
vNewToken: string;
begin
vNewToken := GetRefreshedOAuthToken();
if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
Log('Token refreshed successfully')
else
Log('Token refresh failed - reconnecting');
end;
3. Gewijzigde bestanden
| Bestand | Wijzigingen |
|---|---|
sgcAMQP_Const.pas |
Typefout opgelost ('Not Allowed'). |
sgcAMQP_Helpers.pas |
Nieuwe functie sgcGetAMQPMethodValue, Confirm-klasse-helpers (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), method-ID-toewijzingen voor alle nieuwe methoden. |
sgcAMQP_ReadWrite.pas |
Field table-typebyte fix — elk waardetype schrijft nu zijn juiste type-indicator. |
sgcAMQP_Classes.pas |
Nieuwe enums (amqpClassConfirm, 12 nieuwe methoden), 13 nieuwe payload-klassen, bijgewerkte dispatch-tabellen, thread-veiligheidsfix, spec-conformiteitsfixes, nieuwe request-opslagvelden. |
sgcAMQP.pas |
8 nieuwe gebeurtenistypes, afhandeling van Connection.Blocked/Unblocked, kanaal-ID-fix voor DoWrite_ChannCloseOk, behoud van resterende bytes, vlagfix voor GetChannel. |
sgcAMQP_Client.pas |
6 nieuwe read-handlers, 5 nieuwe write-methoden, 11 nieuwe publieke methoden, 6 nieuwe gebeurtenissen, bijgewerkte dispatch-tabel, parametervolgorde-fix, request-datafix. |
