sgcWebSockets'teki AMQP 0-9-1 uygulaması kapsamlı bir güncelleme aldı: kritik parametre sıralaması, tür uyuşmazlıkları, iş parçacığı güvenliği ve veri kaybı sorunlarını kapsayan 11 hata düzeltmesinin yanı sıra Basic.Nack, Exchange'den Exchange'e bağlamalar, Publisher Confirms, Connection.Blocked/Unblocked bildirimleri ve token yenileme için Connection.UpdateSecret dahil 6 yeni protokol özelliği. Bu makale, her değişikliği kod örnekleriyle ayrıntılı olarak açıklar.
İçindekiler
- Hata Düzeltmeleri
- Kritik: DeclareExchange Parametre Sırası
- Field Table Tür Baytı
- Spesifikasyon Uyumluluğu Düzeltmeleri
- Diğer Hata Düzeltmeleri
- Yeni Özellikler
- Basic.Nack — Negatif Onaylar
- Exchange.Bind/Unbind — Exchange'den Exchange'e Bağlamalar
- Confirm Sınıfı — Publisher Confirms
- Connection.Blocked/Unblocked — Kaynak Alarmları
- Connection.UpdateSecret — Token Yenileme
- Değiştirilen Dosyalar
1. Hata Düzeltmeleri
AMQP 0-9-1 uygulamasında, kritik parametre sıralaması sorunlarından spesifikasyon uyumluluğu düzeltmelerine kadar toplam 11 hata düzeltildi.
Kritik: DeclareExchange Parametre Sırası
DeclareExchange ve
DeclareExchangeEx metotları,
aNoWait,
aAutoDelete ve
aInternal'i
DoWrite_ExchDeclare'a yanlış sırada geçiriyordu. Bu,
auto-delete bayrağının
no-wait olarak gönderilmesine ve tersine neden oluyor, broker'da beklenmedik exchange
davranışına yol açıyordu.
Önce (Hatalı)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);
Sonra (Düzeltilmiş)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);
Dosya: sgcAMQP_Client.pas
Field Table Tür Baytı
sgcWriteAMQPFieldTable prosedürü, gerçek değer türünden bağımsız olarak tüm field table değerleri için tür göstergesi olarak her zaman
$53 ('S' = uzun dize)
yazıyordu. Bu, double, integer,
boolean ve int64 değerlerinin tümünün wire biçiminde yanlışlıkla dize olarak etiketlendiği anlamına geliyordu.
Önce (Hatalı)
sgcWriteAMQPByte($53, vBytes); // Always wrote 'S' for ALL types
case vType of
'd': sgcWriteAMQPDouble(...);
'I': sgcWriteAMQPInt32(...);
...
Sonra (Düzeltilmiş)
case vType of
'd':
begin
sgcWriteAMQPByte(Byte('d'), vBytes); // Correct type per value
sgcWriteAMQPDouble(...);
end;
'I':
begin
sgcWriteAMQPByte(Byte('I'), vBytes);
sgcWriteAMQPInt32(...);
end;
'L':
begin
sgcWriteAMQPByte(Byte('L'), vBytes);
sgcWriteAMQPInt64(...);
end;
't':
begin
sgcWriteAMQPByte(Byte('t'), vBytes);
sgcWriteAMQPBoolean(...);
end;
'S':
begin
sgcWriteAMQPByte(Byte('S'), vBytes);
sgcWriteAMQPLongString(...);
end;
Dosya: sgcAMQP_ReadWrite.pas
Spesifikasyon Uyumluluğu Düzeltmeleri
| Sorun | Önce | Sonra | Dosya |
|---|---|---|---|
| BasicGetEmpty.Reserved1 yanlış tür | UInt16 / sgcReadAMQPUInt16 |
string / sgcReadAMQPShortString |
sgcAMQP_Classes.pas |
| ChannelOpenOk.Reserved1 yanlış okuma | sgcReadAMQPShortString |
sgcReadAMQPLongString |
sgcAMQP_Classes.pas |
| ConnectionClose/ChannelClose sınıfa özel metot değeri getter'ları kullanıyordu | sgcGetAMQPConnectionValue(FailMethodId) |
sgcGetAMQPMethodValue(FailClassId, FailMethodId) |
sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas |
Herhangi bir AMQP sınıfı için doğru
metot integer kimliğini çözmek üzere sgcAMQP_Helpers.pas'a yeni bir genel yardımcı fonksiyon sgcGetAMQPMethodValue
eklendi; bu, yalnızca kendi sınıfları için çalışan sınıfa özel getter'ların yerini aldı.
Diğer Hata Düzeltmeleri
| Hata | Açıklama | Dosya |
|---|---|---|
| Channel.CloseOk eksik kanal kimliği | DoWrite_ChannCloseOk, oFrame.Header.Channel'ı ayarlamıyordu, bu nedenle close-ok hedef kanal yerine kanal 0'da (bağlantı düzeyinde) gönderiliyordu. aChannelId: Word parametresi eklendi. |
sgcAMQP.pas |
| Hata sabitinde yazım hatası | 'Now Allowed', 'Not Allowed' olarak değiştirildi. |
sgcAMQP_Const.pas |
| QueueUnBind eksik istek verisi | DoWrite_QueueUnBind, QueueUnBindQueue ve QueueUnBindExchange'i kanal isteğinde depolamıyordu, bu da OnAMQPQueueUnBind olayının boş değerler bildirmesine neden oluyordu. |
sgcAMQP_Client.pas |
| DoRead'den sonra kalan baytlar atılıyordu | Okuma döngüsü 1–7 kalan baytla (kısmi çerçeve) çıktığında, bunlar sessizce kayboluyordu. Artık sonraki okuma döngüsü için FBytes'a kaydediliyor. |
sgcAMQP.pas |
| GetChannel, aRaiseIfNotFound'u yok sayıyor | aRaiseIfNotFound parametresi hiç kontrol edilmiyordu. Artık yalnızca bayrak True olduğunda istisna oluşturur. |
sgcAMQP.pas |
2. Yeni Özellikler
Yaygın olarak kullanılan RabbitMQ uzantılarını ve ek spesifikasyon metotlarını kapsayan altı yeni AMQP 0-9-1 protokol özelliği uygulandı.
Basic.Nack — Negatif Onaylar
Basic.Nack (class 60, method 120), bir veya daha fazla mesajı isteğe bağlı yeniden kuyruğa almayla bir kerede reddetmeye olanak tanıyan bir RabbitMQ
uzantısıdır.
Basic.Reject'in aksine, belirtilen teslimat etiketine kadar (dahil) tüm mesajları reddetmek için bir
multiple bayrağını destekler.
| Metot | Açıklama | Yön |
|---|---|---|
NackMessage |
Broker'a bir negatif onay gönderir. | İstemci → Sunucu |
OnAMQPBasicNack |
Sunucu bir Nack gönderdiğinde (publisher confirm modunda) tetiklenir. | Sunucu → İstemci |
NackMessage
procedure NackMessage(const aChannel: string;
aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);
OnAMQPBasicNack Event
procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
const aChannel: string;
const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
', Multiple: ' + BoolToStr(aNack.Multiple, True) +
', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;
Exchange.Bind/Unbind — Exchange'den Exchange'e Bağlamalar
Exchange'den exchange'e bağlamalar (class 40, methods 30/31 ve 40/51), ara bir kuyruk olmadan exchange'ler arasında mesaj yönlendirmeye olanak tanır. Bu, fan-out hiyerarşileri ve konu bölümlemesi gibi güçlü topoloji kalıplarını mümkün kılan bir RabbitMQ uzantısıdır.
| Metot | Açıklama |
|---|---|
BindExchange / BindExchangeEx |
Bir hedef exchange'i bir yönlendirme anahtarıyla bir kaynak exchange'e bağlar. Ex varyantı, broker yanıtını senkron olarak bekler. |
UnbindExchange / UnbindExchangeEx |
Bir exchange'den exchange'e bağlamayı kaldırır. |
OnAMQPExchangeBind |
Broker bir exchange bağlamayı onayladığında tetiklenir. |
OnAMQPExchangeUnbind |
Broker bir exchange bağlamayı kaldırmayı onayladığında tetiklenir. |
BindExchange / BindExchangeEx
procedure BindExchange(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
'downstream-exchange', // destination
'upstream-exchange', // source
'orders.#', // routing key
False) // wait for confirmation
then
Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
'downstream-exchange', 'upstream-exchange', 'orders.#', False);
Confirm Sınıfı — Publisher Confirms
Publisher confirms (class 85, methods 10/11), broker'ın yayınlanan mesajların alındığını onaylamasına olanak tanır.
Bir kanal Confirm.Select aracılığıyla confirm moduna alındığında,
broker yayınlanan her mesaj için Basic.Ack veya
Basic.Nack gönderir ve
işlemler olmadan güvenilir yayınlamayı mümkün kılar.
| Metot / Olay | Açıklama |
|---|---|
SelectConfirm / SelectConfirmEx |
Bir kanalda publisher confirm modunu etkinleştirir. |
OnAMQPConfirmSelectOk |
Broker confirm modunun etkin olduğunu onayladığında tetiklenir. |
OnAMQPBasicAck |
Broker yayınlanan bir mesajı onayladığında tetiklenir. |
OnAMQPBasicNack |
Broker yayınlanan bir mesajı negatif olarak onayladığında tetiklenir. |
SelectConfirm / SelectConfirmEx
procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
Örnek: Confirms ile Güvenilir Yayınlama
// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
const aChannel: string;
const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;
Connection.Blocked/Unblocked — Kaynak Alarmları
Broker'ın kaynakları (bellek, disk) azaldığında, bir neden dizesiyle
Connection.Blocked (class 10, method 60) gönderir.
Koşul ortadan kalktığında,
Connection.Unblocked (method 61) gönderir. Bunlar yalnızca sunucudan istemciye
bildirimlerdir. Bu özellik temel
TsgcAMQP sınıfında işlenir ve tüm AMQP bileşenleri için kullanılabilir hale getirilir.
| Olay | Açıklama |
|---|---|
OnAMQPConnectionBlocked |
Broker, kaynak kısıtlamaları nedeniyle bağlantıyı engellediğinde tetiklenir. Bir Reason dizesi içerir (örneğin, 'low on memory'). |
OnAMQPConnectionUnblocked |
Broker bağlantı engelini kaldırdığında tetiklenir. |
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
const aReason: string);
begin
Log('Connection BLOCKED: ' + aReason);
// Pause publishing to avoid message loss
FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
Log('Connection unblocked - resuming');
FPublishingPaused := False;
end;
Connection.UpdateSecret — Token Yenileme
Connection.UpdateSecret (class 10, method 70), yeniden bağlanmadan
etkin bir bağlantıdaki kimlik doğrulama bilgilerinin yenilenmesine olanak tanır. Bu, tokenların periyodik olarak süresi dolan
OAuth2/JWT tabanlı kimlik doğrulaması için gereklidir.
| Metot / Olay | Açıklama |
|---|---|
UpdateSecret / UpdateSecretEx |
Broker'a isteğe bağlı bir neden dizesiyle yeni bir secret (token) gönderir. |
OnAMQPConnectionUpdateSecretOk |
Broker yeni secret'ı kabul ettiğinde tetiklenir. |
UpdateSecret / UpdateSecretEx
procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
vNewToken: string;
begin
vNewToken := GetRefreshedOAuthToken();
if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
Log('Token refreshed successfully')
else
Log('Token refresh failed - reconnecting');
end;
3. Değiştirilen Dosyalar
| Dosya | Değişiklikler |
|---|---|
sgcAMQP_Const.pas |
Yazım hatası düzeltmesi ('Not Allowed'). |
sgcAMQP_Helpers.pas |
Yeni sgcGetAMQPMethodValue fonksiyonu, Confirm sınıfı yardımcıları (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), tüm yeni metotlar için metot kimliği eşlemeleri. |
sgcAMQP_ReadWrite.pas |
Field table tür baytı düzeltmesi — her değer türü artık kendi doğru tür göstergesini yazar. |
sgcAMQP_Classes.pas |
Yeni enumlar (amqpClassConfirm, 12 yeni metot), 13 yeni payload sınıfı, güncellenmiş dispatch tabloları, iş parçacığı güvenliği düzeltmesi, spesifikasyon uyumluluğu düzeltmeleri, yeni istek depolama alanları. |
sgcAMQP.pas |
8 yeni olay türü, Connection.Blocked/Unblocked işleme, DoWrite_ChannCloseOk kanal kimliği düzeltmesi, kalan baytların korunması, GetChannel bayrak düzeltmesi. |
sgcAMQP_Client.pas |
6 yeni okuma işleyicisi, 5 yeni yazma metodu, 11 yeni genel metot, 6 yeni olay, güncellenmiş dispatch tablosu, parametre sırası düzeltmesi, istek verisi düzeltmesi. |
