L'implementazione AMQP 0-9-1 in sgcWebSockets ha ricevuto un aggiornamento completo: 11 correzioni di bug che coprono ordinamento critico dei parametri, incongruenze di tipo, thread safety e perdita di dati, oltre a 6 nuove funzionalità di protocollo tra cui Basic.Nack, binding Exchange-to-Exchange, Publisher Confirms, notifiche Connection.Blocked/Unblocked e Connection.UpdateSecret per il refresh dei token. Questo articolo descrive ogni modifica con esempi di codice.
Indice
- Correzioni di bug
- Critico: ordine dei parametri di DeclareExchange
- Byte di tipo della field table
- Correzioni di conformità alla specifica
- Altre correzioni di bug
- Nuove funzionalità
- Basic.Nack — conferme negative
- Exchange.Bind/Unbind — binding Exchange-to-Exchange
- Classe Confirm — Publisher Confirms
- Connection.Blocked/Unblocked — allarmi delle risorse
- Connection.UpdateSecret — refresh del token
- File modificati
1. Correzioni di bug
In totale sono stati corretti 11 bug nell'implementazione AMQP 0-9-1, da problemi critici di ordinamento dei parametri a correzioni di conformità alla specifica.
Critico: ordine dei parametri di DeclareExchange
I metodi DeclareExchange e
DeclareExchangeEx passavano
aNoWait,
aAutoDelete e
aInternal nell'ordine sbagliato a
DoWrite_ExchDeclare. Questo faceva sì che il flag
auto-delete venisse inviato come
no-wait e viceversa, generando comportamenti inattesi degli exchange
sul broker.
Prima (errato)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);
Dopo (corretto)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);
File: sgcAMQP_Client.pas
Byte di tipo della field table
La procedura sgcWriteAMQPFieldTable scriveva sempre
$53 ('S' = long string)
come indicatore di tipo per tutti i valori della field table, indipendentemente dal tipo reale. Di conseguenza valori double, intero,
booleano e int64 venivano tutti taggati erroneamente come stringhe nel formato wire.
Prima (errato)
sgcWriteAMQPByte($53, vBytes); // Scriveva sempre 'S' per TUTTI i tipi
case vType of
'd': sgcWriteAMQPDouble(...);
'I': sgcWriteAMQPInt32(...);
...
Dopo (corretto)
case vType of
'd':
begin
sgcWriteAMQPByte(Byte('d'), vBytes); // Correct type per value
sgcWriteAMQPDouble(...);
end;
'I':
begin
sgcWriteAMQPByte(Byte('I'), vBytes);
sgcWriteAMQPInt32(...);
end;
'L':
begin
sgcWriteAMQPByte(Byte('L'), vBytes);
sgcWriteAMQPInt64(...);
end;
't':
begin
sgcWriteAMQPByte(Byte('t'), vBytes);
sgcWriteAMQPBoolean(...);
end;
'S':
begin
sgcWriteAMQPByte(Byte('S'), vBytes);
sgcWriteAMQPLongString(...);
end;
File: sgcAMQP_ReadWrite.pas
Correzioni di conformità alla specifica
| Problema | Prima | Dopo | File |
|---|---|---|---|
| BasicGetEmpty.Reserved1 tipo errato | UInt16 / sgcReadAMQPUInt16 |
string / sgcReadAMQPShortString |
sgcAMQP_Classes.pas |
| ChannelOpenOk.Reserved1 lettura errata | sgcReadAMQPShortString |
sgcReadAMQPLongString |
sgcAMQP_Classes.pas |
| ConnectionClose/ChannelClose usavano getter di method value specifici per classe | sgcGetAMQPConnectionValue(FailMethodId) |
sgcGetAMQPMethodValue(FailClassId, FailMethodId) |
sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas |
È stata aggiunta una nuova funzione helper generica sgcGetAMQPMethodValue
in sgcAMQP_Helpers.pas per risolvere il corretto
ID intero del metodo per qualunque classe AMQP, sostituendo i getter specifici per classe che funzionavano solo per la loro classe.
Altre correzioni di bug
| Bug | Descrizione | File |
|---|---|---|
| Channel.CloseOk senza channel ID | DoWrite_ChannCloseOk non impostava oFrame.Header.Channel, quindi la close-ok veniva inviata sul canale 0 (livello connessione) anziché sul canale di destinazione. Aggiunto il parametro aChannelId: Word. |
sgcAMQP.pas |
| Refuso in una costante di errore | Cambiato 'Now Allowed' in 'Not Allowed'. |
sgcAMQP_Const.pas |
| QueueUnBind senza dati di richiesta | DoWrite_QueueUnBind non memorizzava QueueUnBindQueue e QueueUnBindExchange sulla richiesta del canale, causando l'evento OnAMQPQueueUnBind con valori vuoti. |
sgcAMQP_Client.pas |
| Byte rimanenti scartati dopo DoRead | Quando il ciclo di lettura usciva con 1–7 byte rimanenti (frame parziale), questi venivano silenziosamente persi. Ora vengono salvati in FBytes per il ciclo di lettura successivo. |
sgcAMQP.pas |
| GetChannel ignora aRaiseIfNotFound | Il parametro aRaiseIfNotFound non veniva mai controllato. Ora solleva eccezione solo quando il flag è True. |
sgcAMQP.pas |
2. Nuove funzionalità
Sono state implementate sei nuove funzionalità del protocollo AMQP 0-9-1, che coprono estensioni RabbitMQ molto usate e metodi aggiuntivi della specifica.
Basic.Nack — conferme negative
Basic.Nack (classe 60, metodo 120) è un'estensione RabbitMQ
che permette di rifiutare uno o più messaggi in un'unica volta con requeue opzionale. A differenza di
Basic.Reject, supporta un flag
multiple per rifiutare tutti i messaggi fino a (incluso) il
delivery tag specificato.
| Metodo | Descrizione | Direzione |
|---|---|---|
NackMessage |
Invia una conferma negativa al broker. | Client → Server |
OnAMQPBasicNack |
Si attiva quando il server invia un Nack (in modalità publisher confirm). | Server → Client |
NackMessage
procedure NackMessage(const aChannel: string;
aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Rifiuta un singolo messaggio e rimettilo in coda
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Rifiuta tutti i messaggi non confermati fino a questo tag, scartali
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);
Evento OnAMQPBasicNack
procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
const aChannel: string;
const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
', Multiple: ' + BoolToStr(aNack.Multiple, True) +
', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;
Exchange.Bind/Unbind — binding Exchange-to-Exchange
I binding exchange-to-exchange (classe 40, metodi 30/31 e 40/51) consentono di instradare messaggi tra exchange senza una coda intermedia. È un'estensione RabbitMQ che abilita potenti pattern di topologia come gerarchie fan-out e partizionamento di topic.
| Metodo | Descrizione |
|---|---|
BindExchange / BindExchangeEx |
Associa un exchange di destinazione a un exchange di origine con una routing key. La variante Ex attende in modo sincrono la risposta del broker. |
UnbindExchange / UnbindExchangeEx |
Rimuove un binding exchange-to-exchange. |
OnAMQPExchangeBind |
Si attiva quando il broker conferma un bind di exchange. |
OnAMQPExchangeUnbind |
Si attiva quando il broker conferma un unbind di exchange. |
BindExchange / BindExchangeEx
procedure BindExchange(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
const aArguments: string = ''): Boolean;
// Lega 'downstream-exchange' a 'upstream-exchange' con routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
'downstream-exchange', // destinazione
'upstream-exchange', // origine
'orders.#', // routing key
False) // attendi conferma
then
Log('Exchange binding created successfully');
// Rimuove il binding
sgcAMQPClient.UnbindExchange('my-channel',
'downstream-exchange', 'upstream-exchange', 'orders.#', False);
Classe Confirm — Publisher Confirms
I publisher confirms (classe 85, metodi 10/11) permettono al broker di confermare la ricezione dei messaggi pubblicati.
Una volta che un canale viene messo in modalità confirm tramite Confirm.Select,
il broker invierà Basic.Ack o
Basic.Nack per ogni messaggio pubblicato, abilitando
la pubblicazione affidabile senza transazioni.
| Metodo / Evento | Descrizione |
|---|---|
SelectConfirm / SelectConfirmEx |
Abilita la modalità publisher confirm su un canale. |
OnAMQPConfirmSelectOk |
Si attiva quando il broker conferma che la modalità confirm è attiva. |
OnAMQPBasicAck |
Si attiva quando il broker conferma un messaggio pubblicato. |
OnAMQPBasicNack |
Si attiva quando il broker invia una conferma negativa per un messaggio pubblicato. |
SelectConfirm / SelectConfirmEx
procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
Esempio: pubblicazione affidabile con conferme
// 1. Abilita la modalità confirm sul canale
if sgcAMQPClient.SelectConfirmEx('my-channel') then
Log('Confirm mode enabled');
// 2. Pubblica un messaggio - il broker invierà Ack o Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
'routing.key', 'Hello World');
// 3. Gestisci le conferme del server
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
const aChannel: string;
const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;
Connection.Blocked/Unblocked — allarmi delle risorse
Quando il broker esaurisce le risorse (memoria, disco) invia
Connection.Blocked (classe 10, metodo 60) con una
stringa di motivo. Quando la condizione si risolve, invia
Connection.Unblocked (metodo 61). Queste sono notifiche solo
dal server al client. Questa funzionalità è gestita nella classe base
TsgcAMQP, rendendola disponibile a tutti i componenti AMQP.
| Evento | Descrizione |
|---|---|
OnAMQPConnectionBlocked |
Si attiva quando il broker blocca la connessione per vincoli sulle risorse. Include una stringa Reason (ad es. 'low on memory'). |
OnAMQPConnectionUnblocked |
Si attiva quando il broker rimuove il blocco della connessione. |
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
const aReason: string);
begin
Log('Connection BLOCKED: ' + aReason);
// Sospendi la pubblicazione per evitare la perdita di messaggi
FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
Log('Connection unblocked - resuming');
FPublishingPaused := False;
end;
Connection.UpdateSecret — refresh del token
Connection.UpdateSecret (classe 10, metodo 70) consente di
aggiornare le credenziali di autenticazione su una connessione attiva senza riconnettersi. È essenziale per
l'autenticazione basata su OAuth2/JWT, dove i token scadono periodicamente.
| Metodo / Evento | Descrizione |
|---|---|
UpdateSecret / UpdateSecretEx |
Invia un nuovo secret (token) al broker con una stringa di motivo opzionale. |
OnAMQPConnectionUpdateSecretOk |
Si attiva quando il broker accetta il nuovo secret. |
UpdateSecret / UpdateSecretEx
procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Aggiorna il token OAuth prima che scada
var
vNewToken: string;
begin
vNewToken := GetRefreshedOAuthToken();
if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
Log('Token refreshed successfully')
else
Log('Token refresh failed - reconnecting');
end;
3. File modificati
| File | Modifiche |
|---|---|
sgcAMQP_Const.pas |
Correzione di un refuso ('Not Allowed'). |
sgcAMQP_Helpers.pas |
Nuova funzione sgcGetAMQPMethodValue, helper della classe Confirm (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), mapping degli ID dei metodi per tutti i nuovi metodi. |
sgcAMQP_ReadWrite.pas |
Correzione del byte di tipo della field table — ogni tipo di valore scrive ora il proprio indicatore di tipo corretto. |
sgcAMQP_Classes.pas |
Nuove enum (amqpClassConfirm, 12 nuovi metodi), 13 nuove classi payload, tabelle di dispatch aggiornate, correzione di thread safety, correzioni di conformità alla specifica, nuovi campi di storage delle richieste. |
sgcAMQP.pas |
8 nuovi tipi di evento, gestione di Connection.Blocked/Unblocked, correzione del channel ID in DoWrite_ChannCloseOk, conservazione dei byte rimanenti, correzione del flag in GetChannel. |
sgcAMQP_Client.pas |
6 nuovi gestori di lettura, 5 nuovi metodi di scrittura, 11 nuovi metodi pubblici, 6 nuovi eventi, tabella di dispatch aggiornata, correzione dell'ordine dei parametri, correzione dei dati di richiesta. |
