Aggiornamento client AMQP 0.9.1 per Delphi

· Funzionalità

L'implementazione AMQP 0-9-1 in sgcWebSockets ha ricevuto un aggiornamento completo: 11 correzioni di bug che coprono ordinamento critico dei parametri, incongruenze di tipo, thread safety e perdita di dati, oltre a 6 nuove funzionalità di protocollo tra cui Basic.Nack, binding Exchange-to-Exchange, Publisher Confirms, notifiche Connection.Blocked/Unblocked e Connection.UpdateSecret per il refresh dei token. Questo articolo descrive ogni modifica con esempi di codice.

Indice

  1. Correzioni di bug
  2. Critico: ordine dei parametri di DeclareExchange
  3. Byte di tipo della field table
  4. Correzioni di conformità alla specifica
  5. Altre correzioni di bug
  6. Nuove funzionalità
  7. Basic.Nack — conferme negative
  8. Exchange.Bind/Unbind — binding Exchange-to-Exchange
  9. Classe Confirm — Publisher Confirms
  10. Connection.Blocked/Unblocked — allarmi delle risorse
  11. Connection.UpdateSecret — refresh del token
  12. File modificati

1. Correzioni di bug

In totale sono stati corretti 11 bug nell'implementazione AMQP 0-9-1, da problemi critici di ordinamento dei parametri a correzioni di conformità alla specifica.

Critico: ordine dei parametri di DeclareExchange

I metodi DeclareExchange e DeclareExchangeEx passavano aNoWait, aAutoDelete e aInternal nell'ordine sbagliato a DoWrite_ExchDeclare. Questo faceva sì che il flag auto-delete venisse inviato come no-wait e viceversa, generando comportamenti inattesi degli exchange sul broker.

Prima (errato)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);

Dopo (corretto)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);

File: sgcAMQP_Client.pas


Byte di tipo della field table

La procedura sgcWriteAMQPFieldTable scriveva sempre $53 ('S' = long string) come indicatore di tipo per tutti i valori della field table, indipendentemente dal tipo reale. Di conseguenza valori double, intero, booleano e int64 venivano tutti taggati erroneamente come stringhe nel formato wire.

Prima (errato)

sgcWriteAMQPByte($53, vBytes);  // Scriveva sempre 'S' per TUTTI i tipi
case vType of
  'd': sgcWriteAMQPDouble(...);
  'I': sgcWriteAMQPInt32(...);
  ...

Dopo (corretto)

case vType of
  'd':
  begin
    sgcWriteAMQPByte(Byte('d'), vBytes);  // Correct type per value
    sgcWriteAMQPDouble(...);
  end;
  'I':
  begin
    sgcWriteAMQPByte(Byte('I'), vBytes);
    sgcWriteAMQPInt32(...);
  end;
  'L':
  begin
    sgcWriteAMQPByte(Byte('L'), vBytes);
    sgcWriteAMQPInt64(...);
  end;
  't':
  begin
    sgcWriteAMQPByte(Byte('t'), vBytes);
    sgcWriteAMQPBoolean(...);
  end;
  'S':
  begin
    sgcWriteAMQPByte(Byte('S'), vBytes);
    sgcWriteAMQPLongString(...);
  end;

File: sgcAMQP_ReadWrite.pas


Correzioni di conformità alla specifica

Problema Prima Dopo File
BasicGetEmpty.Reserved1 tipo errato UInt16 / sgcReadAMQPUInt16 string / sgcReadAMQPShortString sgcAMQP_Classes.pas
ChannelOpenOk.Reserved1 lettura errata sgcReadAMQPShortString sgcReadAMQPLongString sgcAMQP_Classes.pas
ConnectionClose/ChannelClose usavano getter di method value specifici per classe sgcGetAMQPConnectionValue(FailMethodId) sgcGetAMQPMethodValue(FailClassId, FailMethodId) sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas

È stata aggiunta una nuova funzione helper generica sgcGetAMQPMethodValue in sgcAMQP_Helpers.pas per risolvere il corretto ID intero del metodo per qualunque classe AMQP, sostituendo i getter specifici per classe che funzionavano solo per la loro classe.


Altre correzioni di bug

Bug Descrizione File
Channel.CloseOk senza channel ID DoWrite_ChannCloseOk non impostava oFrame.Header.Channel, quindi la close-ok veniva inviata sul canale 0 (livello connessione) anziché sul canale di destinazione. Aggiunto il parametro aChannelId: Word. sgcAMQP.pas
Refuso in una costante di errore Cambiato 'Now Allowed' in 'Not Allowed'. sgcAMQP_Const.pas
QueueUnBind senza dati di richiesta DoWrite_QueueUnBind non memorizzava QueueUnBindQueue e QueueUnBindExchange sulla richiesta del canale, causando l'evento OnAMQPQueueUnBind con valori vuoti. sgcAMQP_Client.pas
Byte rimanenti scartati dopo DoRead Quando il ciclo di lettura usciva con 1–7 byte rimanenti (frame parziale), questi venivano silenziosamente persi. Ora vengono salvati in FBytes per il ciclo di lettura successivo. sgcAMQP.pas
GetChannel ignora aRaiseIfNotFound Il parametro aRaiseIfNotFound non veniva mai controllato. Ora solleva eccezione solo quando il flag è True. sgcAMQP.pas

2. Nuove funzionalità

Sono state implementate sei nuove funzionalità del protocollo AMQP 0-9-1, che coprono estensioni RabbitMQ molto usate e metodi aggiuntivi della specifica.


Basic.Nack — conferme negative

Basic.Nack (classe 60, metodo 120) è un'estensione RabbitMQ che permette di rifiutare uno o più messaggi in un'unica volta con requeue opzionale. A differenza di Basic.Reject, supporta un flag multiple per rifiutare tutti i messaggi fino a (incluso) il delivery tag specificato.

Metodo Descrizione Direzione
NackMessage Invia una conferma negativa al broker. Client → Server
OnAMQPBasicNack Si attiva quando il server invia un Nack (in modalità publisher confirm). Server → Client

NackMessage

procedure NackMessage(const aChannel: string;
  aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Rifiuta un singolo messaggio e rimettilo in coda
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Rifiuta tutti i messaggi non confermati fino a questo tag, scartali
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);

Evento OnAMQPBasicNack

procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
  const aChannel: string;
  const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
  Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aNack.Multiple, True) +
    ', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;

Exchange.Bind/Unbind — binding Exchange-to-Exchange

I binding exchange-to-exchange (classe 40, metodi 30/31 e 40/51) consentono di instradare messaggi tra exchange senza una coda intermedia. È un'estensione RabbitMQ che abilita potenti pattern di topologia come gerarchie fan-out e partizionamento di topic.

Metodo Descrizione
BindExchange / BindExchangeEx Associa un exchange di destinazione a un exchange di origine con una routing key. La variante Ex attende in modo sincrono la risposta del broker.
UnbindExchange / UnbindExchangeEx Rimuove un binding exchange-to-exchange.
OnAMQPExchangeBind Si attiva quando il broker conferma un bind di exchange.
OnAMQPExchangeUnbind Si attiva quando il broker conferma un unbind di exchange.

BindExchange / BindExchangeEx

procedure BindExchange(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
  const aArguments: string = ''): Boolean;
// Lega 'downstream-exchange' a 'upstream-exchange' con routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
  'downstream-exchange',  // destinazione
  'upstream-exchange',    // origine
  'orders.#',             // routing key
  False)                  // attendi conferma
then
  Log('Exchange binding created successfully');
// Rimuove il binding
sgcAMQPClient.UnbindExchange('my-channel',
  'downstream-exchange', 'upstream-exchange', 'orders.#', False);

Classe Confirm — Publisher Confirms

I publisher confirms (classe 85, metodi 10/11) permettono al broker di confermare la ricezione dei messaggi pubblicati. Una volta che un canale viene messo in modalità confirm tramite Confirm.Select, il broker invierà Basic.Ack o Basic.Nack per ogni messaggio pubblicato, abilitando la pubblicazione affidabile senza transazioni.

Metodo / Evento Descrizione
SelectConfirm / SelectConfirmEx Abilita la modalità publisher confirm su un canale.
OnAMQPConfirmSelectOk Si attiva quando il broker conferma che la modalità confirm è attiva.
OnAMQPBasicAck Si attiva quando il broker conferma un messaggio pubblicato.
OnAMQPBasicNack Si attiva quando il broker invia una conferma negativa per un messaggio pubblicato.

SelectConfirm / SelectConfirmEx

procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;

Esempio: pubblicazione affidabile con conferme

// 1. Abilita la modalità confirm sul canale
if sgcAMQPClient.SelectConfirmEx('my-channel') then
  Log('Confirm mode enabled');
// 2. Pubblica un messaggio - il broker invierà Ack o Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
  'routing.key', 'Hello World');
// 3. Gestisci le conferme del server
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
  const aChannel: string;
  const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
  Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;

Connection.Blocked/Unblocked — allarmi delle risorse

Quando il broker esaurisce le risorse (memoria, disco) invia Connection.Blocked (classe 10, metodo 60) con una stringa di motivo. Quando la condizione si risolve, invia Connection.Unblocked (metodo 61). Queste sono notifiche solo dal server al client. Questa funzionalità è gestita nella classe base TsgcAMQP, rendendola disponibile a tutti i componenti AMQP.

Evento Descrizione
OnAMQPConnectionBlocked Si attiva quando il broker blocca la connessione per vincoli sulle risorse. Include una stringa Reason (ad es. 'low on memory').
OnAMQPConnectionUnblocked Si attiva quando il broker rimuove il blocco della connessione.
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
  const aReason: string);
begin
  Log('Connection BLOCKED: ' + aReason);
  // Sospendi la pubblicazione per evitare la perdita di messaggi
  FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
  Log('Connection unblocked - resuming');
  FPublishingPaused := False;
end;

Connection.UpdateSecret — refresh del token

Connection.UpdateSecret (classe 10, metodo 70) consente di aggiornare le credenziali di autenticazione su una connessione attiva senza riconnettersi. È essenziale per l'autenticazione basata su OAuth2/JWT, dove i token scadono periodicamente.

Metodo / Evento Descrizione
UpdateSecret / UpdateSecretEx Invia un nuovo secret (token) al broker con una stringa di motivo opzionale.
OnAMQPConnectionUpdateSecretOk Si attiva quando il broker accetta il nuovo secret.

UpdateSecret / UpdateSecretEx

procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Aggiorna il token OAuth prima che scada
var
  vNewToken: string;
begin
  vNewToken := GetRefreshedOAuthToken();
  if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
    Log('Token refreshed successfully')
  else
    Log('Token refresh failed - reconnecting');
end;

3. File modificati

File Modifiche
sgcAMQP_Const.pas Correzione di un refuso ('Not Allowed').
sgcAMQP_Helpers.pas Nuova funzione sgcGetAMQPMethodValue, helper della classe Confirm (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), mapping degli ID dei metodi per tutti i nuovi metodi.
sgcAMQP_ReadWrite.pas Correzione del byte di tipo della field table — ogni tipo di valore scrive ora il proprio indicatore di tipo corretto.
sgcAMQP_Classes.pas Nuove enum (amqpClassConfirm, 12 nuovi metodi), 13 nuove classi payload, tabelle di dispatch aggiornate, correzione di thread safety, correzioni di conformità alla specifica, nuovi campi di storage delle richieste.
sgcAMQP.pas 8 nuovi tipi di evento, gestione di Connection.Blocked/Unblocked, correzione del channel ID in DoWrite_ChannCloseOk, conservazione dei byte rimanenti, correzione del flag in GetChannel.
sgcAMQP_Client.pas 6 nuovi gestori di lettura, 5 nuovi metodi di scrittura, 11 nuovi metodi pubblici, 6 nuovi eventi, tabella di dispatch aggiornata, correzione dell'ordine dei parametri, correzione dei dati di richiesta.