Actualización del cliente AMQP 1 para Delphi

· Características

La implementación del protocolo AMQP 1.0 en sgcWebSockets ha sido sometida a una revisión exhaustiva frente a la especificación OASIS AMQP 1.0. Este artículo documenta las 30 correcciones aplicadas en 8 archivos fuente, abarcando errores críticos, fugas de memoria, cumplimiento de la especificación, corrección de la máquina de estados, gestión de heartbeat y mejoras de seguridad de hilos.

Índice de contenidos

  1. Visión general
  2. Correcciones de errores críticos
  3. Correcciones de fugas de memoria
  4. Correcciones de cumplimiento de la especificación
  5. Correcciones de máquina de estados y conexión
  6. Correcciones de heartbeat e idle timeout
  7. Correcciones de seguridad de hilos
  8. Archivos modificados

1. Visión general

Se han aplicado un total de 30 correcciones en 6 archivos fuente y 2 archivos de interfaz de la implementación AMQP 1.0. Las correcciones se categorizan como sigue:

Categoría Número Gravedad
Correcciones de errores críticos 6 Crítica
Correcciones de fugas de memoria 4 Crítica
Cumplimiento de la especificación 10 Alta
Máquina de estados y conexión 5 Alta
Heartbeat e idle timeout 3 Alta
Seguridad de hilos 2 Alta

2. Correcciones de errores críticos

Estas correcciones abordan errores que provocarían fallos inmediatos en ejecución, corrupción de protocolo o comportamiento incorrecto durante la comunicación AMQP 1.0.

2.1 Palabra clave raise faltante en una excepción

Archivo: sgcAMQP1_Classes.pas
Se creaba un objeto de excepción pero nunca se lanzaba cuando se encontraba un tipo de frame inválido. La excepción se descartaba silenciosamente, permitiendo procesar frames corruptos sin detectarlos.

Antes (roto)

TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);

Después (corregido)

raise TsgcAMQP1Exception.CreateFmt(S_AMQP1_INVALID_FRAME_TYPE, [vByte]);

2.2 WriteMap Map32 con datos faltantes y tamaño erróneo

Archivo: sgcAMQP1_Classes.pas
La ruta de codificación Map32 carecía de la llamada a WriteBytes para los datos reales del mapa, y el campo size usaba un offset incorrecto. Map32 utiliza un campo de count de 4 bytes (a diferencia de Map8 que usa 1 byte), por lo que el tamaño debe incluir 4 bytes adicionales.

Antes (roto)

else
begin
  WriteUByte(Ord(amqp1DataMap32));
  _WriteUInt32(vSize + 1);
  _WriteUInt32(oJSON.Count * 2);
  // Missing: WriteBytes(oArray.Bytes);
end;

Después (corregido)

else
begin
  WriteUByte(Ord(amqp1DataMap32));
  _WriteUInt32(vSize + 4);
  _WriteUInt32(oJSON.Count * 2);
  WriteBytes(oArray.Bytes);
end;

2.3 Lógica de ContainsError invertida

Archivo: sgcAMQP1_Frames.pas
El método ContainsError tanto en TsgcAMQP1FrameRejected como en TsgcAMQP1DescribedListError devolvía True cuando no había error y False cuando lo había. Esto provocaba que la información de error se descartara silenciosamente y que se escribieran bytes nulos cuando los errores reales deberían haberse serializado. Las ramas DoWrite y DoWriteError también se intercambiaron para ajustarse a la lógica corregida.

Antes (roto)

function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
  if not Assigned(FError) then
    Result := True              // Wrong: True when no error
  else
    Result := (Error.Condition = '') and (Error.Description = '') and
      (Error.Info = '');     // Wrong: True when all empty
end;

Después (corregido)

function TsgcAMQP1FrameRejected.ContainsError: Boolean;
begin
  if not Assigned(FError) then
    Result := False             // Correct: False when no error
  else
    Result := (Error.Condition <> '') or (Error.Description <> '') or
      (Error.Info <> '');    // Correct: True when any field set
end;

2.4 Separadores de byte nulo en SASL PLAIN

Archivo: sgcAMQP1_Frames.pas
El mecanismo SASL PLAIN requiere el formato \0username\0password con separadores de byte nulo ($00). La implementación no estaba inicializando a cero el array de bytes, por lo que las posiciones de los separadores contenían basura. La autenticación fallaría contra cualquier broker AMQP 1.0 conforme al estándar.

Después (corregido)

SetLength(FInitialResponse, 2 + Length(oUser) + Length(oPassword));
FillChar(FInitialResponse[0], Length(FInitialResponse), 0);  // Zero-fill for null separators

2.5 Falta inherited Create en TsgcAMQP1Message

Archivo: sgcAMQP1_Message.pas
El constructor parametrizado de TsgcAMQP1Message no llamaba a inherited Create, lo que significa que la clase base nunca se inicializaba. Esto provocaría access violations o estado corrupto al usar el constructor de conveniencia.

Después (corregido)

constructor TsgcAMQP1Message.Create(const aValue: string);
begin
  inherited Create;
  ApplicationData.ValueType := amqp1adtAmqpValue;
  ApplicationData.AMQPValue.Value := aValue;
end;

2.6 Falta punto y coma en AmqpValue.DoRead

Archivo: sgcAMQP1_Frames.pas
Un punto y coma faltante en TsgcAMQP1FrameAmqpValue.DoRead impediría la compilación.


3. Correcciones de fugas de memoria

Estas correcciones abordan problemas de gestión del ciclo de vida de objetos que provocarían que la memoria se acumulara con el tiempo, especialmente durante conexiones AMQP 1.0 de larga duración con muchos intercambios de mensajes.

Corrección Archivo Descripción
3.1 sgcAMQP1_Frames.pas FDefaultOutcome no se liberaba antes de la reasignación al leer el descriptor Source. Cada vez que se recibía un nuevo default outcome, el objeto anterior quedaba huérfano.
3.2 sgcAMQP1_Session.pas Una llamada duplicada a sgcFree(FCreditConsumed) en el destructor provocaba un posible doble-free. Se eliminó la línea duplicada.
3.3 sgcAMQP1_Session.pas FOutgoingDeliveries faltaba en el destructor de la sesión. La lista de seguimiento de deliveries nunca se liberaba al destruirse una sesión.
3.4 sgcAMQP1_Message.pas SetMessage y SetMessageAndFreeOnDestroy no liberaban el mensaje anterior cuando FFreeMessageOnDestroy estaba habilitado. Asignaciones repetidas de mensaje provocaban fugas de memoria.

Corrección 3.1 – FDefaultOutcome liberado antes de la reasignación

sgcFree(FDefaultOutcome);  // Free previous instance before reassignment
if oDescriptor.Code = amqp1dcptReleased then
  FDefaultOutcome := TsgcAMQP1FrameReleased.Create
else if oDescriptor.Code = amqp1dcptAccepted then
  FDefaultOutcome := TsgcAMQP1FrameAccepted.Create
else if oDescriptor.Code = amqp1dcptRejected then
  FDefaultOutcome := TsgcAMQP1FrameRejected.Create

Corrección 3.4 – SetMessage libera el mensaje anterior

procedure TsgcAMQP1Delivery.SetMessage(const aMessage: TsgcAMQP1Message);
begin
  if FFreeMessageOnDestroy and Assigned(F_Message) and (F_Message <> aMessage) then
    sgcFree(F_Message);
  FFreeMessageOnDestroy := False;
  F_Message := aMessage;
end;

4. Correcciones de cumplimiento de la especificación

Estas correcciones corrigen desviaciones respecto a las especificaciones AMQP 1.0 Transport, Types y Messaging.

4.1 Campo 7 del Begin Frame leído en propiedad equivocada

Archivo: sgcAMQP1_Frames.pas
El índice de campo 7 de la performative begin se leía en DesiredCapabilities en lugar de Properties. Según la especificación, los campos de begin son: remote-channel(0), next-outgoing-id(1), incoming-window(2), outgoing-window(3), handle-max(4), offered-capabilities(5), desired-capabilities(6), properties(7).

4.2 Source y Target con campos faltantes en DoWrite

Archivo: sgcAMQP1_Frames.pas
El método DoWrite del descriptor Source no serializaba los campos default-outcome, outcomes y capabilities. Al descriptor Target le faltaba capabilities. Esto provocaba que el broker usara valores por defecto en lugar de los negociados, lo que podría llevar a una gestión incorrecta del estado de los deliveries.

4.3 AmqpSequence leído en propiedad equivocada

Archivo: sgcAMQP1_Message.pas
Al leer el cuerpo del mensaje, los datos amqp-sequence se leían en ApplicationData.AMQPValue en lugar de ApplicationData.AMQPSequence. Esto corrompía el cuerpo del mensaje para cualquier mensaje que usara la codificación amqp-sequence.

4.4 TransactionalState Outcome no escrito

Archivo: sgcAMQP1_Frames.pas
El delivery state transactional-state no serializaba su campo outcome, que es obligatorio al confirmar deliveries dentro de una transacción.

4.5 El campo Last de Disposition no podía distinguir cero de no asignado

Archivos: sgcAMQP1_Frames.pas, sgcAMQP1_Frames.intf, sgcAMQP1_Session.pas
La performative disposition tiene un campo opcional last (delivery-id). Al ser un Cardinal, el valor 0 es válido y no puede usarse como centinela para "no asignado". Se añadió un nuevo flag booleano FLastAssigned y un setter SetLast para rastrear correctamente si el campo se asignó explícitamente.

procedure TsgcAMQP1FrameDisposition.SetLast(const Value: Cardinal);
begin
  FLast := Value;
  FLastAssigned := True;
end;

4.6 AmqpSequence sin propiedad Value ni Read/Write

Archivos: sgcAMQP1_Frames.pas, sgcAMQP1_Frames.intf
La clase TsgcAMQP1FrameAmqpSequence carecía de propiedad Value y tenía métodos DoRead/DoWrite vacíos. El tipo de cuerpo amqp-sequence era completamente no funcional.

4.7 Campo Info de Error leído como String en lugar de Map

Archivo: sgcAMQP1_Frames.pas
La especificación AMQP 1.0 define el campo info del tipo error como un map. Se leía con ReadString en lugar de ReadMap, provocando fallos de parseo cuando los brokers envían información estructurada de error.

4.8 Capabilities y Locales escritos como String en lugar de Symbol

Archivo: sgcAMQP1_Frames.pas
La especificación AMQP 1.0 define offered-capabilities, desired-capabilities, outgoing-locales e incoming-locales como arrays de symbol. Se escribían con WriteString en lugar de WriteSymbol en las performatives open, begin y attach. Un broker conforme al estándar rechazaría estos frames por tener tipos de campo incorrectos.

4.9 Descriptor Accepted faltante en el handler DefaultOutcome

Archivo: sgcAMQP1_Frames.pas
El lector del default-outcome del descriptor Source solo manejaba released y rejected. El outcome más común, accepted, no estaba contemplado. Cuando un broker enviaba accepted como default outcome, se ignoraba silenciosamente.


5. Correcciones de máquina de estados y conexión

Estas correcciones abordan la máquina de estados de conexión AMQP 1.0 y la lógica de procesamiento de frames.

Corrección Archivo Descripción
5.1 sgcAMQP1.pas A la transición del estado amqp1csOpenReceived le faltaba el else DoRaiseInvalidState que tenían todos los demás estados. Las transiciones inválidas se ignoraban silenciosamente en lugar de lanzar un error.
5.2 sgcAMQP1.pas El mensaje de error de validación del tamaño de frame mostraba RemoteMaxFrameSize pero debería haber mostrado LocalMaxFrameSize, ya que el límite local es lo que se comprueba.
5.3 sgcAMQP1.pas FLastTimeRead se inicializaba a 0 (epoch de Delphi: 1899-12-30) en lugar de a Now. Esto provocaba una detección inmediata y falsa de idle timeout al arrancar.
5.4 sgcAMQP1.pas La sobrecarga TBytes de Read no actualizaba FLastTimeRead := Now, a diferencia de la sobrecarga TMemoryStream. Esto provocaba un seguimiento de heartbeat inconsistente.
5.5 sgcAMQP1.pas La transición de estado al recibir el header era condicional cuando debería dispararse siempre. La máquina de estados debe transicionar en cada intercambio de header válido según la especificación AMQP 1.0.

Corrección 5.1 – A OpenReceived le faltaba la rama de error

amqp1csOpenReceived:
  begin
    if aState = amqp1csOpenSent then
      FConnectionState := amqp1csOpened
    else
      DoRaiseInvalidState;  // Added: was missing
  end;

6. Correcciones de heartbeat e idle timeout

La especificación AMQP 1.0 requiere que cuando un peer anuncia un idle-timeout en la performative open, el otro peer debe enviar frames de heartbeat a la mitad del intervalo anunciado. Estas correcciones aseguran que el mecanismo de heartbeat funcione realmente.

Corrección Archivo Descripción
6.1 sgcAMQP1_Client.pas HeartBeat nunca se habilitaba. Ambas ramas de la comprobación del idle timeout establecían HeartBeat.Enabled := False. Cambiado a True cuando IdleTimeout > 0.
6.2 sgcAMQP1_Client.pas Disconnect no deshabilitaba el heartbeat ni establecía FConnected := False lo suficientemente pronto. Reordenado para evitar disparos de heartbeat durante el cierre.
6.3 sgcAMQP1.pas FLastTimeRead no se actualizaba en la sobrecarga TBytes de Read (también listado en la sección de Máquina de estados).

Corrección 6.1 – HeartBeat habilitado

if oOpen.IdleTimeout > 0 then
begin
  HeartBeat.Interval := Trunc(oOpen.IdleTimeout / 2);
  HeartBeat.Enabled := True;   // Was: False (heartbeat never started)
end
else
  HeartBeat.Enabled := False;

Corrección 6.2 – Disconnect reordenado

procedure TsgcAMQP1_Client.Disconnect;
begin
  FConnected := False;           // Moved first: prevents heartbeat race
  DoStopIdleTimeout;
  HeartBeat.Enabled := False;    // Added: stop heartbeat during teardown
  Clear;
  DoConnectionState(amqp1csEnd);
end;

7. Correcciones de seguridad de hilos

Estas correcciones abordan condiciones de carrera en el acceso concurrente a estructuras de datos compartidas.

7.1 Comprobación de límites y locking en TsgcAMQP1Deliveries.First()

Archivo: sgcAMQP1_Message.pas
El método First() accedía a Items[0] sin comprobar si la lista estaba vacía y sin adquirir el lock de seguridad de hilos. En un entorno multihilo, otro hilo podría eliminar todos los elementos entre la comprobación de count y el acceso, provocando una excepción de índice fuera de rango.

Después (corregido)

function TsgcAMQP1Deliveries.First: TsgcAMQP1Delivery;
var
  oList: TList;
begin
  result := nil;
  oList := LockList;
  Try
    if oList.Count > 0 then
      result := TsgcAMQP1Delivery(oList[0]);
  Finally
    UnLockList;
  End;
end;

7.2 Reemplazo seguro de objeto en SetMessage

Archivo: sgcAMQP1_Message.pas
El método SetMessage ahora comprueba que el nuevo mensaje sea distinto del actual antes de liberar, evitando use-after-free al asignar el mismo objeto mensaje.


8. Archivos modificados

Archivo Correcciones Categorías
Source\sgcAMQP1_Classes.pas 2 Errores críticos
Source\sgcAMQP1_Frames.pas 16 Errores críticos, fugas de memoria, cumplimiento de la especificación
Interfaces\sgcAMQP1_Frames.intf 2 Cumplimiento de la especificación (Disposition LastAssigned, AmqpSequence Value)
Source\sgcAMQP1_Message.pas 4 Errores críticos, fugas de memoria, seguridad de hilos
Source\sgcAMQP1_Session.pas 3 Fugas de memoria, cumplimiento de la especificación
Source\sgcAMQP1.pas 5 Máquina de estados, conexión, heartbeat
Source\sgcAMQP1_Client.pas 3 Heartbeat, seguridad de Disconnect

Total: 30 correcciones en 8 archivos, mejorando la corrección de protocolo, la seguridad de memoria y la fiabilidad de la implementación AMQP 1.0 respecto a la especificación OASIS.