Mise à jour du client Delphi AMQP 0.9.1

· Fonctionnalités

L'implémentation AMQP 0-9-1 dans sgcWebSockets a reçu une mise à jour complète : 11 corrections de bugs couvrant un ordre de paramètres critique, des incompatibilités de types, la sécurité des threads et des problèmes de perte de données, plus 6 nouvelles fonctionnalités protocolaires dont Basic.Nack, les liaisons Exchange-to-Exchange, les Publisher Confirms, les notifications Connection.Blocked/Unblocked et Connection.UpdateSecret pour le rafraîchissement de jetons. Cet article détaille chaque changement avec des exemples de code.

Table des matières

  1. Corrections de bugs
  2. Critique : ordre des paramètres de DeclareExchange
  3. Octet de type de table de champs
  4. Corrections de conformité à la spécification
  5. Autres corrections de bugs
  6. Nouvelles fonctionnalités
  7. Basic.Nack — accusés de réception négatifs
  8. Exchange.Bind/Unbind — liaisons Exchange-to-Exchange
  9. Classe Confirm — Publisher Confirms
  10. Connection.Blocked/Unblocked — alarmes de ressources
  11. Connection.UpdateSecret — rafraîchissement de jetons
  12. Fichiers modifiés

1. Corrections de bugs

Au total, 11 bugs ont été corrigés dans l'implémentation AMQP 0-9-1, allant d'un ordre de paramètres critique à des corrections de conformité à la spécification.

Critique : ordre des paramètres de DeclareExchange

Les méthodes DeclareExchange et DeclareExchangeEx passaient aNoWait, aAutoDelete et aInternal dans le mauvais ordre à DoWrite_ExchDeclare. Cela faisait envoyer le drapeau auto-delete en tant que no-wait et vice versa, entraînant un comportement d'exchange inattendu côté broker.

Avant (incorrect)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);

Après (corrigé)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);

Fichier : sgcAMQP_Client.pas


Octet de type de table de champs

La procédure sgcWriteAMQPFieldTable écrivait toujours $53 ('S' = long string) comme indicateur de type pour toutes les valeurs de la table de champs, indépendamment du type réel. Cela signifiait que les doubles, entiers, booléens et valeurs int64 étaient tous incorrectement étiquetés comme des chaînes dans le format wire.

Before (Incorrect)

sgcWriteAMQPByte($53, vBytes);  // Always wrote 'S' for ALL types
case vType of
  'd': sgcWriteAMQPDouble(...);
  'I': sgcWriteAMQPInt32(...);
  ...

After (Fixed)

case vType of
  'd':
  begin
    sgcWriteAMQPByte(Byte('d'), vBytes);  // Correct type per value
    sgcWriteAMQPDouble(...);
  end;
  'I':
  begin
    sgcWriteAMQPByte(Byte('I'), vBytes);
    sgcWriteAMQPInt32(...);
  end;
  'L':
  begin
    sgcWriteAMQPByte(Byte('L'), vBytes);
    sgcWriteAMQPInt64(...);
  end;
  't':
  begin
    sgcWriteAMQPByte(Byte('t'), vBytes);
    sgcWriteAMQPBoolean(...);
  end;
  'S':
  begin
    sgcWriteAMQPByte(Byte('S'), vBytes);
    sgcWriteAMQPLongString(...);
  end;

File: sgcAMQP_ReadWrite.pas


Corrections de conformité à la spécification

Problème Avant Après Fichier
Type incorrect pour BasicGetEmpty.Reserved1 UInt16 / sgcReadAMQPUInt16 string / sgcReadAMQPShortString sgcAMQP_Classes.pas
Lecture incorrecte pour ChannelOpenOk.Reserved1 sgcReadAMQPShortString sgcReadAMQPLongString sgcAMQP_Classes.pas
ConnectionClose/ChannelClose utilisaient des getters de valeur de méthode spécifiques à la classe sgcGetAMQPConnectionValue(FailMethodId) sgcGetAMQPMethodValue(FailClassId, FailMethodId) sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas

Une nouvelle fonction utilitaire générique sgcGetAMQPMethodValue a été ajoutée à sgcAMQP_Helpers.pas pour résoudre le bon ID entier de méthode pour toute classe AMQP, remplaçant les getters spécifiques à la classe qui ne fonctionnaient que pour leur propre classe.


Autres corrections de bugs

Bug Description File
ID de channel manquant pour Channel.CloseOk DoWrite_ChannCloseOk ne définissait pas oFrame.Header.Channel, donc le close-ok était envoyé sur le canal 0 (niveau connexion) au lieu du canal cible. Paramètre aChannelId: Word ajouté. sgcAMQP.pas
Faute de frappe dans une constante d'erreur Remplacement de 'Now Allowed' par 'Not Allowed'. sgcAMQP_Const.pas
Données de requête manquantes pour QueueUnBind DoWrite_QueueUnBind ne stockait pas QueueUnBindQueue et QueueUnBindExchange sur la requête du canal, faisant remonter des valeurs vides à l'événement OnAMQPQueueUnBind. sgcAMQP_Client.pas
Octets restants ignorés après DoRead Lorsque la boucle de lecture sortait avec 1–7 octets restants (trame partielle), ils étaient silencieusement perdus. Maintenant sauvegardés dans FBytes pour le cycle de lecture suivant. sgcAMQP.pas
GetChannel ignore aRaiseIfNotFound Le paramètre aRaiseIfNotFound n'était jamais vérifié. Maintenant, ne lève une exception que lorsque le drapeau est True. sgcAMQP.pas

2. Nouvelles fonctionnalités

Six nouvelles fonctionnalités protocolaires AMQP 0-9-1 ont été implémentées, couvrant des extensions RabbitMQ largement utilisées et des méthodes additionnelles de la spécification.


Basic.Nack — accusés de réception négatifs

Basic.Nack (classe 60, méthode 120) est une extension RabbitMQ qui permet de rejeter un ou plusieurs messages en une seule fois avec une remise en file optionnelle. Contrairement à Basic.Reject, elle prend en charge un drapeau multiple pour rejeter tous les messages jusqu'au delivery tag spécifié inclus.

Méthode Description Direction
NackMessage Envoyer un accusé de réception négatif au broker. Client → serveur
OnAMQPBasicNack Déclenché quand le serveur envoie un Nack (en mode publisher confirm). Serveur → client

NackMessage

procedure NackMessage(const aChannel: string;
  aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Rejeter un seul message et le remettre en file
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Rejeter tous les messages non acquittés jusqu'à ce tag, les supprimer
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);

OnAMQPBasicNack Event

procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
  const aChannel: string;
  const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
  Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aNack.Multiple, True) +
    ', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;

Exchange.Bind/Unbind — liaisons Exchange-to-Exchange

Les liaisons Exchange-to-Exchange (classe 40, méthodes 30/31 et 40/51) permettent de router des messages entre exchanges sans file intermédiaire. C'est une extension RabbitMQ qui permet des modèles de topologie puissants comme les hiérarchies fan-out et le partitionnement par topic.

Method Description
BindExchange / BindExchangeEx Lier un exchange de destination à un exchange source avec une routing key. La variante Ex attend la réponse du broker de manière synchrone.
UnbindExchange / UnbindExchangeEx Supprimer une liaison Exchange-to-Exchange.
OnAMQPExchangeBind Déclenché quand le broker confirme un exchange bind.
OnAMQPExchangeUnbind Déclenché quand le broker confirme un exchange unbind.

BindExchange / BindExchangeEx

procedure BindExchange(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
  const aArguments: string = ''): Boolean;
// Lier 'downstream-exchange' à 'upstream-exchange' avec une routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
  'downstream-exchange',  // destination
  'upstream-exchange',    // source
  'orders.#',             // routing key
  False)                  // attendre la confirmation
then
  Log('Liaison d''exchange créée avec succès');
// Supprimer la liaison
sgcAMQPClient.UnbindExchange('my-channel',
  'downstream-exchange', 'upstream-exchange', 'orders.#', False);

Classe Confirm — Publisher Confirms

Les publisher confirms (classe 85, méthodes 10/11) permettent au broker d'acquitter la réception des messages publiés. Une fois un canal mis en mode confirm via Confirm.Select, le broker enverra Basic.Ack ou Basic.Nack pour chaque message publié, permettant une publication fiable sans transactions.

Méthode / événement Description
SelectConfirm / SelectConfirmEx Activer le mode publisher confirm sur un canal.
OnAMQPConfirmSelectOk Déclenché quand le broker confirme que le mode confirm est actif.
OnAMQPBasicAck Déclenché quand le broker acquitte un message publié.
OnAMQPBasicNack Déclenché quand le broker rejette un message publié.

SelectConfirm / SelectConfirmEx

procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;

Exemple : publication fiable avec confirms

// 1. Activer le mode confirm sur le canal
if sgcAMQPClient.SelectConfirmEx('my-channel') then
  Log('Mode confirm activé');
// 2. Publier un message - le broker enverra Ack ou Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
  'routing.key', 'Hello World');
// 3. Gérer les confirmations du serveur
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
  const aChannel: string;
  const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
  Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;

Connection.Blocked/Unblocked — alarmes de ressources

Quand le broker manque de ressources (mémoire, disque), il envoie Connection.Blocked (classe 10, méthode 60) avec une chaîne de raison. Quand la condition se résout, il envoie Connection.Unblocked (méthode 61). Ce sont des notifications uniquement serveur-vers-client. Cette fonctionnalité est gérée dans la classe de base TsgcAMQP, ce qui la rend disponible pour tous les composants AMQP.

Événement Description
OnAMQPConnectionBlocked Déclenché quand le broker bloque la connexion pour cause de contraintes de ressources. Inclut une chaîne Reason (par ex. 'low on memory').
OnAMQPConnectionUnblocked Déclenché quand le broker lève le blocage de la connexion.
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
  const aReason: string);
begin
  Log('Connexion BLOQUÉE : ' + aReason);
  // Mettre en pause la publication pour éviter la perte de messages
  FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
  Log('Connexion débloquée - reprise');
  FPublishingPaused := False;
end;

Connection.UpdateSecret — rafraîchissement de jetons

Connection.UpdateSecret (classe 10, méthode 70) permet de rafraîchir les identifiants d'authentification sur une connexion active sans se reconnecter. C'est essentiel pour l'authentification basée sur OAuth2/JWT où les jetons expirent périodiquement.

Method / Event Description
UpdateSecret / UpdateSecretEx Envoyer un nouveau secret (jeton) au broker avec une chaîne de raison optionnelle.
OnAMQPConnectionUpdateSecretOk Déclenché quand le broker accepte le nouveau secret.

UpdateSecret / UpdateSecretEx

procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Rafraîchir le jeton OAuth avant qu'il n'expire
var
  vNewToken: string;
begin
  vNewToken := GetRefreshedOAuthToken();
  if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
    Log('Jeton rafraîchi avec succès')
  else
    Log('Échec du rafraîchissement du jeton - reconnexion');
end;

3. Fichiers modifiés

File Changements
sgcAMQP_Const.pas Correction de faute ('Not Allowed').
sgcAMQP_Helpers.pas Nouvelle fonction sgcGetAMQPMethodValue, helpers pour la classe Confirm (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), mappings d'ID de méthode pour toutes les nouvelles méthodes.
sgcAMQP_ReadWrite.pas Correction de l'octet de type de table de champs — chaque type de valeur écrit maintenant son indicateur de type correct.
sgcAMQP_Classes.pas Nouvelles énumérations (amqpClassConfirm, 12 nouvelles méthodes), 13 nouvelles classes de payload, tables de dispatch mises à jour, correction de sécurité des threads, corrections de conformité, nouveaux champs de stockage de requête.
sgcAMQP.pas 8 nouveaux types d'événement, gestion de Connection.Blocked/Unblocked, correction de l'ID de channel pour DoWrite_ChannCloseOk, préservation des octets restants, correction du drapeau GetChannel.
sgcAMQP_Client.pas 6 nouveaux gestionnaires de lecture, 5 nouvelles méthodes d'écriture, 11 nouvelles méthodes publiques, 6 nouveaux événements, table de dispatch mise à jour, correction de l'ordre des paramètres, correction des données de requête.