AMQP 0.9.1 Delphi 클라이언트 업데이트

· 기능

sgcWebSockets의 AMQP 0-9-1 구현이 대규모 업데이트를 받았어요. 중요한 매개변수 순서, 타입 불일치, 스레드 안전성, 데이터 손실 문제를 다룬 11개의 버그 수정과 Basic.Nack, Exchange-to-Exchange 바인딩, Publisher Confirms, Connection.Blocked/Unblocked 알림, 토큰 갱신을 위한 Connection.UpdateSecret 등 6개의 새 프로토콜 기능을 포함해요. 이 글에서는 모든 변경 사항을 코드 예제와 함께 상세히 설명해요.

목차

  1. 버그 수정
  2. 중요: DeclareExchange 매개변수 순서
  3. 필드 테이블 타입 바이트
  4. 스펙 준수 수정
  5. 기타 버그 수정
  6. 새 기능
  7. Basic.Nack — 부정적 확인
  8. Exchange.Bind/Unbind — Exchange-to-Exchange 바인딩
  9. Confirm 클래스 — Publisher Confirms
  10. Connection.Blocked/Unblocked — 리소스 알람
  11. Connection.UpdateSecret — 토큰 갱신
  12. 수정된 파일

1. 버그 수정

AMQP 0-9-1 구현 전반에 걸쳐 총 11개의 버그가 수정되었어요. 중요한 매개변수 순서 문제부터 스펙 준수 수정까지 다양한 내용을 포함해요.

중요: DeclareExchange 매개변수 순서

DeclareExchangeDeclareExchangeEx 메서드가 aNoWait, aAutoDelete, aInternal을 잘못된 순서로 DoWrite_ExchDeclare에 전달했어요. 이로 인해 auto-delete 플래그가 no-wait으로 전송되거나 그 반대가 되어 브로커에서 예상치 못한 exchange 동작이 발생했어요.

수정 전 (잘못됨)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);

수정 후 (올바름)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);

파일: sgcAMQP_Client.pas


필드 테이블 타입 바이트

sgcWriteAMQPFieldTable 프로시저가 실제 값 타입과 관계없이 항상 $53('S' = 긴 문자열)을 모든 필드 테이블 값의 타입 표시자로 작성했어요. 이로 인해 double, integer, boolean, int64 값이 모두 와이어 형식에서 문자열로 잘못 태그되었어요.

수정 전 (잘못됨)

sgcWriteAMQPByte($53, vBytes);  // Always wrote 'S' for ALL types
case vType of
  'd': sgcWriteAMQPDouble(...);
  'I': sgcWriteAMQPInt32(...);
  ...

수정 후 (올바름)

case vType of
  'd':
  begin
    sgcWriteAMQPByte(Byte('d'), vBytes);  // Correct type per value
    sgcWriteAMQPDouble(...);
  end;
  'I':
  begin
    sgcWriteAMQPByte(Byte('I'), vBytes);
    sgcWriteAMQPInt32(...);
  end;
  'L':
  begin
    sgcWriteAMQPByte(Byte('L'), vBytes);
    sgcWriteAMQPInt64(...);
  end;
  't':
  begin
    sgcWriteAMQPByte(Byte('t'), vBytes);
    sgcWriteAMQPBoolean(...);
  end;
  'S':
  begin
    sgcWriteAMQPByte(Byte('S'), vBytes);
    sgcWriteAMQPLongString(...);
  end;

파일: sgcAMQP_ReadWrite.pas


스펙 준수 수정

문제 수정 전 수정 후 파일
BasicGetEmpty.Reserved1 잘못된 타입 UInt16 / sgcReadAMQPUInt16 string / sgcReadAMQPShortString sgcAMQP_Classes.pas
ChannelOpenOk.Reserved1 잘못된 읽기 sgcReadAMQPShortString sgcReadAMQPLongString sgcAMQP_Classes.pas
ConnectionClose/ChannelClose에서 클래스별 메서드 값 게터 사용 sgcGetAMQPConnectionValue(FailMethodId) sgcGetAMQPMethodValue(FailClassId, FailMethodId) sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas

sgcAMQP_Helpers.pas에 새로운 범용 헬퍼 함수 sgcGetAMQPMethodValue가 추가되어 모든 AMQP 클래스에 대한 올바른 메서드 정수 ID를 확인할 수 있어요. 자체 클래스에서만 작동하던 클래스별 게터를 대체해요.


기타 버그 수정

버그 설명 파일
Channel.CloseOk에 채널 ID 누락 DoWrite_ChannCloseOkoFrame.Header.Channel을 설정하지 않아 close-ok이 대상 채널 대신 채널 0(연결 수준)에서 전송되었어요. aChannelId: Word 매개변수가 추가되었어요. sgcAMQP.pas
오류 상수의 오타 'Now Allowed''Not Allowed'로 수정했어요. sgcAMQP_Const.pas
QueueUnBind에 요청 데이터 누락 DoWrite_QueueUnBind가 채널 요청에 QueueUnBindQueueQueueUnBindExchange를 저장하지 않아 OnAMQPQueueUnBind 이벤트가 빈 값을 보고했어요. sgcAMQP_Client.pas
DoRead 후 남은 바이트 삭제 읽기 루프가 1~7개의 남은 바이트(부분 프레임)와 함께 종료되면 자동으로 손실되었어요. 이제 다음 읽기 주기를 위해 FBytes에 저장해요. sgcAMQP.pas
GetChannel이 aRaiseIfNotFound를 무시함 aRaiseIfNotFound 매개변수가 확인되지 않았어요. 이제 플래그가 True일 때만 예외를 발생시켜요. sgcAMQP.pas

2. 새 기능

6개의 새로운 AMQP 0-9-1 프로토콜 기능이 구현되었어요. 널리 사용되는 RabbitMQ 확장과 추가 스펙 메서드를 포함해요.


Basic.Nack — 부정적 확인

Basic.Nack(클래스 60, 메서드 120)은 선택적 재대기열과 함께 하나 이상의 메시지를 한 번에 거부할 수 있는 RabbitMQ 확장이에요. Basic.Reject와 달리 지정된 전달 태그까지의 모든 메시지를 거부하는 multiple 플래그를 지원해요.

메서드 설명 방향
NackMessage 브로커에 부정적 확인을 보내요. 클라이언트 → 서버
OnAMQPBasicNack 서버가 Nack을 보낼 때 발생해요(publisher confirm 모드에서). 서버 → 클라이언트

NackMessage

procedure NackMessage(const aChannel: string;
  aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);

OnAMQPBasicNack 이벤트

procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
  const aChannel: string;
  const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
  Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aNack.Multiple, True) +
    ', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;

Exchange.Bind/Unbind — Exchange-to-Exchange 바인딩

Exchange-to-exchange 바인딩(클래스 40, 메서드 30/31 및 40/51)은 중간 대기열 없이 exchange 간에 메시지를 라우팅할 수 있어요. 팬아웃 계층 구조와 토픽 파티셔닝 같은 강력한 토폴로지 패턴을 가능하게 하는 RabbitMQ 확장이에요.

메서드 설명
BindExchange / BindExchangeEx 라우팅 키를 사용하여 대상 exchange를 소스 exchange에 바인딩해요. Ex 변형은 브로커 응답을 동기적으로 기다려요.
UnbindExchange / UnbindExchangeEx Exchange-to-exchange 바인딩을 제거해요.
OnAMQPExchangeBind 브로커가 exchange 바인딩을 확인할 때 발생해요.
OnAMQPExchangeUnbind 브로커가 exchange 언바인딩을 확인할 때 발생해요.

BindExchange / BindExchangeEx

procedure BindExchange(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
  const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
  'downstream-exchange',  // destination
  'upstream-exchange',    // source
  'orders.#',             // routing key
  False)                  // wait for confirmation
then
  Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
  'downstream-exchange', 'upstream-exchange', 'orders.#', False);

Confirm 클래스 — Publisher Confirms

Publisher confirms(클래스 85, 메서드 10/11)를 통해 브로커가 게시된 메시지의 수신을 확인할 수 있어요. Confirm.Select를 통해 채널이 confirm 모드로 설정되면 브로커가 각 게시된 메시지에 대해 Basic.Ack 또는 Basic.Nack을 보내어 트랜잭션 없이 안정적인 게시를 가능하게 해요.

메서드 / 이벤트 설명
SelectConfirm / SelectConfirmEx 채널에서 publisher confirm 모드를 활성화해요.
OnAMQPConfirmSelectOk 브로커가 confirm 모드가 활성화되었음을 확인할 때 발생해요.
OnAMQPBasicAck 브로커가 게시된 메시지를 확인할 때 발생해요.
OnAMQPBasicNack 브로커가 게시된 메시지를 부정적으로 확인할 때 발생해요.

SelectConfirm / SelectConfirmEx

procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;

예제: Confirms를 사용한 안정적인 게시

// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
  Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
  'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
  const aChannel: string;
  const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
  Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;

Connection.Blocked/Unblocked — 리소스 알람

브로커의 리소스(메모리, 디스크)가 부족하면 이유 문자열과 함께 Connection.Blocked(클래스 10, 메서드 60)를 보내요. 상태가 해소되면 Connection.Unblocked(메서드 61)를 보내요. 이는 서버에서 클라이언트로의 알림만 해당해요. 이 기능은 기본 TsgcAMQP 클래스에서 처리되어 모든 AMQP 컴포넌트에서 사용할 수 있어요.

이벤트 설명
OnAMQPConnectionBlocked 브로커가 리소스 제약으로 연결을 차단할 때 발생해요. Reason 문자열(예: 'low on memory')을 포함해요.
OnAMQPConnectionUnblocked 브로커가 연결 차단을 해제할 때 발생해요.
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
  const aReason: string);
begin
  Log('Connection BLOCKED: ' + aReason);
  // Pause publishing to avoid message loss
  FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
  Log('Connection unblocked - resuming');
  FPublishingPaused := False;
end;

Connection.UpdateSecret — 토큰 갱신

Connection.UpdateSecret(클래스 10, 메서드 70)를 사용하면 재연결 없이 활성 연결에서 인증 자격 증명을 갱신할 수 있어요. 토큰이 주기적으로 만료되는 OAuth2/JWT 기반 인증에 필수적이에요.

메서드 / 이벤트 설명
UpdateSecret / UpdateSecretEx 선택적 이유 문자열과 함께 새 시크릿(토큰)을 브로커에 보내요.
OnAMQPConnectionUpdateSecretOk 브로커가 새 시크릿을 수락할 때 발생해요.

UpdateSecret / UpdateSecretEx

procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
  vNewToken: string;
begin
  vNewToken := GetRefreshedOAuthToken();
  if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
    Log('Token refreshed successfully')
  else
    Log('Token refresh failed - reconnecting');
end;

3. 수정된 파일

파일 변경 사항
sgcAMQP_Const.pas 오타 수정('Not Allowed').
sgcAMQP_Helpers.pas sgcGetAMQPMethodValue 함수, Confirm 클래스 헬퍼(sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), 모든 새 메서드에 대한 메서드 ID 매핑.
sgcAMQP_ReadWrite.pas 필드 테이블 타입 바이트 수정 — 각 값 타입이 이제 올바른 타입 표시자를 작성해요.
sgcAMQP_Classes.pas 새 열거형(amqpClassConfirm, 12개의 새 메서드), 13개의 새 페이로드 클래스, 업데이트된 디스패치 테이블, 스레드 안전성 수정, 스펙 준수 수정, 새 요청 저장 필드.
sgcAMQP.pas 8개의 새 이벤트 타입, Connection.Blocked/Unblocked 처리, DoWrite_ChannCloseOk 채널 ID 수정, 남은 바이트 보존, GetChannel 플래그 수정.
sgcAMQP_Client.pas 6개의 새 읽기 핸들러, 5개의 새 쓰기 메서드, 11개의 새 공개 메서드, 6개의 새 이벤트, 업데이트된 디스패치 테이블, 매개변수 순서 수정, 요청 데이터 수정.