AMQP 0.9.1 Delphi İstemcisi Güncellemesi

· Özellikler

sgcWebSockets'teki AMQP 0-9-1 uygulaması kapsamlı bir güncelleme aldı: kritik parametre sıralaması, tür uyuşmazlıkları, iş parçacığı güvenliği ve veri kaybı sorunlarını kapsayan 11 hata düzeltmesinin yanı sıra Basic.Nack, Exchange'den Exchange'e bağlamalar, Publisher Confirms, Connection.Blocked/Unblocked bildirimleri ve token yenileme için Connection.UpdateSecret dahil 6 yeni protokol özelliği. Bu makale, her değişikliği kod örnekleriyle ayrıntılı olarak açıklar.

İçindekiler

  1. Hata Düzeltmeleri
  2. Kritik: DeclareExchange Parametre Sırası
  3. Field Table Tür Baytı
  4. Spesifikasyon Uyumluluğu Düzeltmeleri
  5. Diğer Hata Düzeltmeleri
  6. Yeni Özellikler
  7. Basic.Nack — Negatif Onaylar
  8. Exchange.Bind/Unbind — Exchange'den Exchange'e Bağlamalar
  9. Confirm Sınıfı — Publisher Confirms
  10. Connection.Blocked/Unblocked — Kaynak Alarmları
  11. Connection.UpdateSecret — Token Yenileme
  12. Değiştirilen Dosyalar

1. Hata Düzeltmeleri

AMQP 0-9-1 uygulamasında, kritik parametre sıralaması sorunlarından spesifikasyon uyumluluğu düzeltmelerine kadar toplam 11 hata düzeltildi.

Kritik: DeclareExchange Parametre Sırası

DeclareExchange ve DeclareExchangeEx metotları, aNoWait, aAutoDelete ve aInternal'i DoWrite_ExchDeclare'a yanlış sırada geçiriyordu. Bu, auto-delete bayrağının no-wait olarak gönderilmesine ve tersine neden oluyor, broker'da beklenmedik exchange davranışına yol açıyordu.

Önce (Hatalı)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);

Sonra (Düzeltilmiş)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);

Dosya: sgcAMQP_Client.pas


Field Table Tür Baytı

sgcWriteAMQPFieldTable prosedürü, gerçek değer türünden bağımsız olarak tüm field table değerleri için tür göstergesi olarak her zaman $53 ('S' = uzun dize) yazıyordu. Bu, double, integer, boolean ve int64 değerlerinin tümünün wire biçiminde yanlışlıkla dize olarak etiketlendiği anlamına geliyordu.

Önce (Hatalı)

sgcWriteAMQPByte($53, vBytes);  // Always wrote 'S' for ALL types
case vType of
  'd': sgcWriteAMQPDouble(...);
  'I': sgcWriteAMQPInt32(...);
  ...

Sonra (Düzeltilmiş)

case vType of
  'd':
  begin
    sgcWriteAMQPByte(Byte('d'), vBytes);  // Correct type per value
    sgcWriteAMQPDouble(...);
  end;
  'I':
  begin
    sgcWriteAMQPByte(Byte('I'), vBytes);
    sgcWriteAMQPInt32(...);
  end;
  'L':
  begin
    sgcWriteAMQPByte(Byte('L'), vBytes);
    sgcWriteAMQPInt64(...);
  end;
  't':
  begin
    sgcWriteAMQPByte(Byte('t'), vBytes);
    sgcWriteAMQPBoolean(...);
  end;
  'S':
  begin
    sgcWriteAMQPByte(Byte('S'), vBytes);
    sgcWriteAMQPLongString(...);
  end;

Dosya: sgcAMQP_ReadWrite.pas


Spesifikasyon Uyumluluğu Düzeltmeleri

Sorun Önce Sonra Dosya
BasicGetEmpty.Reserved1 yanlış tür UInt16 / sgcReadAMQPUInt16 string / sgcReadAMQPShortString sgcAMQP_Classes.pas
ChannelOpenOk.Reserved1 yanlış okuma sgcReadAMQPShortString sgcReadAMQPLongString sgcAMQP_Classes.pas
ConnectionClose/ChannelClose sınıfa özel metot değeri getter'ları kullanıyordu sgcGetAMQPConnectionValue(FailMethodId) sgcGetAMQPMethodValue(FailClassId, FailMethodId) sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas

Herhangi bir AMQP sınıfı için doğru metot integer kimliğini çözmek üzere sgcAMQP_Helpers.pas'a yeni bir genel yardımcı fonksiyon sgcGetAMQPMethodValue eklendi; bu, yalnızca kendi sınıfları için çalışan sınıfa özel getter'ların yerini aldı.


Diğer Hata Düzeltmeleri

Hata Açıklama Dosya
Channel.CloseOk eksik kanal kimliği DoWrite_ChannCloseOk, oFrame.Header.Channel'ı ayarlamıyordu, bu nedenle close-ok hedef kanal yerine kanal 0'da (bağlantı düzeyinde) gönderiliyordu. aChannelId: Word parametresi eklendi. sgcAMQP.pas
Hata sabitinde yazım hatası 'Now Allowed', 'Not Allowed' olarak değiştirildi. sgcAMQP_Const.pas
QueueUnBind eksik istek verisi DoWrite_QueueUnBind, QueueUnBindQueue ve QueueUnBindExchange'i kanal isteğinde depolamıyordu, bu da OnAMQPQueueUnBind olayının boş değerler bildirmesine neden oluyordu. sgcAMQP_Client.pas
DoRead'den sonra kalan baytlar atılıyordu Okuma döngüsü 1–7 kalan baytla (kısmi çerçeve) çıktığında, bunlar sessizce kayboluyordu. Artık sonraki okuma döngüsü için FBytes'a kaydediliyor. sgcAMQP.pas
GetChannel, aRaiseIfNotFound'u yok sayıyor aRaiseIfNotFound parametresi hiç kontrol edilmiyordu. Artık yalnızca bayrak True olduğunda istisna oluşturur. sgcAMQP.pas

2. Yeni Özellikler

Yaygın olarak kullanılan RabbitMQ uzantılarını ve ek spesifikasyon metotlarını kapsayan altı yeni AMQP 0-9-1 protokol özelliği uygulandı.


Basic.Nack — Negatif Onaylar

Basic.Nack (class 60, method 120), bir veya daha fazla mesajı isteğe bağlı yeniden kuyruğa almayla bir kerede reddetmeye olanak tanıyan bir RabbitMQ uzantısıdır. Basic.Reject'in aksine, belirtilen teslimat etiketine kadar (dahil) tüm mesajları reddetmek için bir multiple bayrağını destekler.

Metot Açıklama Yön
NackMessage Broker'a bir negatif onay gönderir. İstemci → Sunucu
OnAMQPBasicNack Sunucu bir Nack gönderdiğinde (publisher confirm modunda) tetiklenir. Sunucu → İstemci

NackMessage

procedure NackMessage(const aChannel: string;
  aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);

OnAMQPBasicNack Event

procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
  const aChannel: string;
  const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
  Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aNack.Multiple, True) +
    ', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;

Exchange.Bind/Unbind — Exchange'den Exchange'e Bağlamalar

Exchange'den exchange'e bağlamalar (class 40, methods 30/31 ve 40/51), ara bir kuyruk olmadan exchange'ler arasında mesaj yönlendirmeye olanak tanır. Bu, fan-out hiyerarşileri ve konu bölümlemesi gibi güçlü topoloji kalıplarını mümkün kılan bir RabbitMQ uzantısıdır.

Metot Açıklama
BindExchange / BindExchangeEx Bir hedef exchange'i bir yönlendirme anahtarıyla bir kaynak exchange'e bağlar. Ex varyantı, broker yanıtını senkron olarak bekler.
UnbindExchange / UnbindExchangeEx Bir exchange'den exchange'e bağlamayı kaldırır.
OnAMQPExchangeBind Broker bir exchange bağlamayı onayladığında tetiklenir.
OnAMQPExchangeUnbind Broker bir exchange bağlamayı kaldırmayı onayladığında tetiklenir.

BindExchange / BindExchangeEx

procedure BindExchange(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
  const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
  'downstream-exchange',  // destination
  'upstream-exchange',    // source
  'orders.#',             // routing key
  False)                  // wait for confirmation
then
  Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
  'downstream-exchange', 'upstream-exchange', 'orders.#', False);

Confirm Sınıfı — Publisher Confirms

Publisher confirms (class 85, methods 10/11), broker'ın yayınlanan mesajların alındığını onaylamasına olanak tanır. Bir kanal Confirm.Select aracılığıyla confirm moduna alındığında, broker yayınlanan her mesaj için Basic.Ack veya Basic.Nack gönderir ve işlemler olmadan güvenilir yayınlamayı mümkün kılar.

Metot / Olay Açıklama
SelectConfirm / SelectConfirmEx Bir kanalda publisher confirm modunu etkinleştirir.
OnAMQPConfirmSelectOk Broker confirm modunun etkin olduğunu onayladığında tetiklenir.
OnAMQPBasicAck Broker yayınlanan bir mesajı onayladığında tetiklenir.
OnAMQPBasicNack Broker yayınlanan bir mesajı negatif olarak onayladığında tetiklenir.

SelectConfirm / SelectConfirmEx

procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;

Örnek: Confirms ile Güvenilir Yayınlama

// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
  Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
  'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
  const aChannel: string;
  const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
  Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;

Connection.Blocked/Unblocked — Kaynak Alarmları

Broker'ın kaynakları (bellek, disk) azaldığında, bir neden dizesiyle Connection.Blocked (class 10, method 60) gönderir. Koşul ortadan kalktığında, Connection.Unblocked (method 61) gönderir. Bunlar yalnızca sunucudan istemciye bildirimlerdir. Bu özellik temel TsgcAMQP sınıfında işlenir ve tüm AMQP bileşenleri için kullanılabilir hale getirilir.

Olay Açıklama
OnAMQPConnectionBlocked Broker, kaynak kısıtlamaları nedeniyle bağlantıyı engellediğinde tetiklenir. Bir Reason dizesi içerir (örneğin, 'low on memory').
OnAMQPConnectionUnblocked Broker bağlantı engelini kaldırdığında tetiklenir.
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
  const aReason: string);
begin
  Log('Connection BLOCKED: ' + aReason);
  // Pause publishing to avoid message loss
  FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
  Log('Connection unblocked - resuming');
  FPublishingPaused := False;
end;

Connection.UpdateSecret — Token Yenileme

Connection.UpdateSecret (class 10, method 70), yeniden bağlanmadan etkin bir bağlantıdaki kimlik doğrulama bilgilerinin yenilenmesine olanak tanır. Bu, tokenların periyodik olarak süresi dolan OAuth2/JWT tabanlı kimlik doğrulaması için gereklidir.

Metot / Olay Açıklama
UpdateSecret / UpdateSecretEx Broker'a isteğe bağlı bir neden dizesiyle yeni bir secret (token) gönderir.
OnAMQPConnectionUpdateSecretOk Broker yeni secret'ı kabul ettiğinde tetiklenir.

UpdateSecret / UpdateSecretEx

procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
  vNewToken: string;
begin
  vNewToken := GetRefreshedOAuthToken();
  if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
    Log('Token refreshed successfully')
  else
    Log('Token refresh failed - reconnecting');
end;

3. Değiştirilen Dosyalar

Dosya Değişiklikler
sgcAMQP_Const.pas Yazım hatası düzeltmesi ('Not Allowed').
sgcAMQP_Helpers.pas Yeni sgcGetAMQPMethodValue fonksiyonu, Confirm sınıfı yardımcıları (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), tüm yeni metotlar için metot kimliği eşlemeleri.
sgcAMQP_ReadWrite.pas Field table tür baytı düzeltmesi — her değer türü artık kendi doğru tür göstergesini yazar.
sgcAMQP_Classes.pas Yeni enumlar (amqpClassConfirm, 12 yeni metot), 13 yeni payload sınıfı, güncellenmiş dispatch tabloları, iş parçacığı güvenliği düzeltmesi, spesifikasyon uyumluluğu düzeltmeleri, yeni istek depolama alanları.
sgcAMQP.pas 8 yeni olay türü, Connection.Blocked/Unblocked işleme, DoWrite_ChannCloseOk kanal kimliği düzeltmesi, kalan baytların korunması, GetChannel bayrak düzeltmesi.
sgcAMQP_Client.pas 6 yeni okuma işleyicisi, 5 yeni yazma metodu, 11 yeni genel metot, 6 yeni olay, güncellenmiş dispatch tablosu, parametre sırası düzeltmesi, istek verisi düzeltmesi.