sgcWebSockets의 AMQP 0-9-1 구현이 대규모 업데이트를 받았어요. 중요한 매개변수 순서, 타입 불일치, 스레드 안전성, 데이터 손실 문제를 다룬 11개의 버그 수정과 Basic.Nack, Exchange-to-Exchange 바인딩, Publisher Confirms, Connection.Blocked/Unblocked 알림, 토큰 갱신을 위한 Connection.UpdateSecret 등 6개의 새 프로토콜 기능을 포함해요. 이 글에서는 모든 변경 사항을 코드 예제와 함께 상세히 설명해요.
목차
- 버그 수정
- 중요: DeclareExchange 매개변수 순서
- 필드 테이블 타입 바이트
- 스펙 준수 수정
- 기타 버그 수정
- 새 기능
- Basic.Nack — 부정적 확인
- Exchange.Bind/Unbind — Exchange-to-Exchange 바인딩
- Confirm 클래스 — Publisher Confirms
- Connection.Blocked/Unblocked — 리소스 알람
- Connection.UpdateSecret — 토큰 갱신
- 수정된 파일
1. 버그 수정
AMQP 0-9-1 구현 전반에 걸쳐 총 11개의 버그가 수정되었어요. 중요한 매개변수 순서 문제부터 스펙 준수 수정까지 다양한 내용을 포함해요.
중요: DeclareExchange 매개변수 순서
DeclareExchange 및 DeclareExchangeEx 메서드가 aNoWait, aAutoDelete, aInternal을 잘못된 순서로 DoWrite_ExchDeclare에 전달했어요. 이로 인해 auto-delete 플래그가 no-wait으로 전송되거나 그 반대가 되어 브로커에서 예상치 못한 exchange 동작이 발생했어요.
수정 전 (잘못됨)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);
수정 후 (올바름)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);
파일: sgcAMQP_Client.pas
필드 테이블 타입 바이트
sgcWriteAMQPFieldTable 프로시저가 실제 값 타입과 관계없이 항상 $53('S' = 긴 문자열)을 모든 필드 테이블 값의 타입 표시자로 작성했어요. 이로 인해 double, integer, boolean, int64 값이 모두 와이어 형식에서 문자열로 잘못 태그되었어요.
수정 전 (잘못됨)
sgcWriteAMQPByte($53, vBytes); // Always wrote 'S' for ALL types
case vType of
'd': sgcWriteAMQPDouble(...);
'I': sgcWriteAMQPInt32(...);
...
수정 후 (올바름)
case vType of
'd':
begin
sgcWriteAMQPByte(Byte('d'), vBytes); // Correct type per value
sgcWriteAMQPDouble(...);
end;
'I':
begin
sgcWriteAMQPByte(Byte('I'), vBytes);
sgcWriteAMQPInt32(...);
end;
'L':
begin
sgcWriteAMQPByte(Byte('L'), vBytes);
sgcWriteAMQPInt64(...);
end;
't':
begin
sgcWriteAMQPByte(Byte('t'), vBytes);
sgcWriteAMQPBoolean(...);
end;
'S':
begin
sgcWriteAMQPByte(Byte('S'), vBytes);
sgcWriteAMQPLongString(...);
end;
파일: sgcAMQP_ReadWrite.pas
스펙 준수 수정
| 문제 | 수정 전 | 수정 후 | 파일 |
|---|---|---|---|
| BasicGetEmpty.Reserved1 잘못된 타입 | UInt16 / sgcReadAMQPUInt16 |
string / sgcReadAMQPShortString |
sgcAMQP_Classes.pas |
| ChannelOpenOk.Reserved1 잘못된 읽기 | sgcReadAMQPShortString |
sgcReadAMQPLongString |
sgcAMQP_Classes.pas |
| ConnectionClose/ChannelClose에서 클래스별 메서드 값 게터 사용 | sgcGetAMQPConnectionValue(FailMethodId) |
sgcGetAMQPMethodValue(FailClassId, FailMethodId) |
sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas |
sgcAMQP_Helpers.pas에 새로운 범용 헬퍼 함수 sgcGetAMQPMethodValue가 추가되어 모든 AMQP 클래스에 대한 올바른 메서드 정수 ID를 확인할 수 있어요. 자체 클래스에서만 작동하던 클래스별 게터를 대체해요.
기타 버그 수정
| 버그 | 설명 | 파일 |
|---|---|---|
| Channel.CloseOk에 채널 ID 누락 | DoWrite_ChannCloseOk가 oFrame.Header.Channel을 설정하지 않아 close-ok이 대상 채널 대신 채널 0(연결 수준)에서 전송되었어요. aChannelId: Word 매개변수가 추가되었어요. |
sgcAMQP.pas |
| 오류 상수의 오타 | 'Now Allowed'를 'Not Allowed'로 수정했어요. |
sgcAMQP_Const.pas |
| QueueUnBind에 요청 데이터 누락 | DoWrite_QueueUnBind가 채널 요청에 QueueUnBindQueue와 QueueUnBindExchange를 저장하지 않아 OnAMQPQueueUnBind 이벤트가 빈 값을 보고했어요. |
sgcAMQP_Client.pas |
| DoRead 후 남은 바이트 삭제 | 읽기 루프가 1~7개의 남은 바이트(부분 프레임)와 함께 종료되면 자동으로 손실되었어요. 이제 다음 읽기 주기를 위해 FBytes에 저장해요. |
sgcAMQP.pas |
| GetChannel이 aRaiseIfNotFound를 무시함 | aRaiseIfNotFound 매개변수가 확인되지 않았어요. 이제 플래그가 True일 때만 예외를 발생시켜요. |
sgcAMQP.pas |
2. 새 기능
6개의 새로운 AMQP 0-9-1 프로토콜 기능이 구현되었어요. 널리 사용되는 RabbitMQ 확장과 추가 스펙 메서드를 포함해요.
Basic.Nack — 부정적 확인
Basic.Nack(클래스 60, 메서드 120)은 선택적 재대기열과 함께 하나 이상의 메시지를 한 번에 거부할 수 있는 RabbitMQ 확장이에요. Basic.Reject와 달리 지정된 전달 태그까지의 모든 메시지를 거부하는 multiple 플래그를 지원해요.
| 메서드 | 설명 | 방향 |
|---|---|---|
NackMessage |
브로커에 부정적 확인을 보내요. | 클라이언트 → 서버 |
OnAMQPBasicNack |
서버가 Nack을 보낼 때 발생해요(publisher confirm 모드에서). | 서버 → 클라이언트 |
NackMessage
procedure NackMessage(const aChannel: string;
aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);
OnAMQPBasicNack 이벤트
procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
const aChannel: string;
const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
', Multiple: ' + BoolToStr(aNack.Multiple, True) +
', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;
Exchange.Bind/Unbind — Exchange-to-Exchange 바인딩
Exchange-to-exchange 바인딩(클래스 40, 메서드 30/31 및 40/51)은 중간 대기열 없이 exchange 간에 메시지를 라우팅할 수 있어요. 팬아웃 계층 구조와 토픽 파티셔닝 같은 강력한 토폴로지 패턴을 가능하게 하는 RabbitMQ 확장이에요.
| 메서드 | 설명 |
|---|---|
BindExchange / BindExchangeEx |
라우팅 키를 사용하여 대상 exchange를 소스 exchange에 바인딩해요. Ex 변형은 브로커 응답을 동기적으로 기다려요. |
UnbindExchange / UnbindExchangeEx |
Exchange-to-exchange 바인딩을 제거해요. |
OnAMQPExchangeBind |
브로커가 exchange 바인딩을 확인할 때 발생해요. |
OnAMQPExchangeUnbind |
브로커가 exchange 언바인딩을 확인할 때 발생해요. |
BindExchange / BindExchangeEx
procedure BindExchange(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
'downstream-exchange', // destination
'upstream-exchange', // source
'orders.#', // routing key
False) // wait for confirmation
then
Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
'downstream-exchange', 'upstream-exchange', 'orders.#', False);
Confirm 클래스 — Publisher Confirms
Publisher confirms(클래스 85, 메서드 10/11)를 통해 브로커가 게시된 메시지의 수신을 확인할 수 있어요. Confirm.Select를 통해 채널이 confirm 모드로 설정되면 브로커가 각 게시된 메시지에 대해 Basic.Ack 또는 Basic.Nack을 보내어 트랜잭션 없이 안정적인 게시를 가능하게 해요.
| 메서드 / 이벤트 | 설명 |
|---|---|
SelectConfirm / SelectConfirmEx |
채널에서 publisher confirm 모드를 활성화해요. |
OnAMQPConfirmSelectOk |
브로커가 confirm 모드가 활성화되었음을 확인할 때 발생해요. |
OnAMQPBasicAck |
브로커가 게시된 메시지를 확인할 때 발생해요. |
OnAMQPBasicNack |
브로커가 게시된 메시지를 부정적으로 확인할 때 발생해요. |
SelectConfirm / SelectConfirmEx
procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
예제: Confirms를 사용한 안정적인 게시
// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
const aChannel: string;
const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;
Connection.Blocked/Unblocked — 리소스 알람
브로커의 리소스(메모리, 디스크)가 부족하면 이유 문자열과 함께 Connection.Blocked(클래스 10, 메서드 60)를 보내요. 상태가 해소되면 Connection.Unblocked(메서드 61)를 보내요. 이는 서버에서 클라이언트로의 알림만 해당해요. 이 기능은 기본 TsgcAMQP 클래스에서 처리되어 모든 AMQP 컴포넌트에서 사용할 수 있어요.
| 이벤트 | 설명 |
|---|---|
OnAMQPConnectionBlocked |
브로커가 리소스 제약으로 연결을 차단할 때 발생해요. Reason 문자열(예: 'low on memory')을 포함해요. |
OnAMQPConnectionUnblocked |
브로커가 연결 차단을 해제할 때 발생해요. |
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
const aReason: string);
begin
Log('Connection BLOCKED: ' + aReason);
// Pause publishing to avoid message loss
FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
Log('Connection unblocked - resuming');
FPublishingPaused := False;
end;
Connection.UpdateSecret — 토큰 갱신
Connection.UpdateSecret(클래스 10, 메서드 70)를 사용하면 재연결 없이 활성 연결에서 인증 자격 증명을 갱신할 수 있어요. 토큰이 주기적으로 만료되는 OAuth2/JWT 기반 인증에 필수적이에요.
| 메서드 / 이벤트 | 설명 |
|---|---|
UpdateSecret / UpdateSecretEx |
선택적 이유 문자열과 함께 새 시크릿(토큰)을 브로커에 보내요. |
OnAMQPConnectionUpdateSecretOk |
브로커가 새 시크릿을 수락할 때 발생해요. |
UpdateSecret / UpdateSecretEx
procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
vNewToken: string;
begin
vNewToken := GetRefreshedOAuthToken();
if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
Log('Token refreshed successfully')
else
Log('Token refresh failed - reconnecting');
end;
3. 수정된 파일
| 파일 | 변경 사항 |
|---|---|
sgcAMQP_Const.pas |
오타 수정('Not Allowed'). |
sgcAMQP_Helpers.pas |
새 sgcGetAMQPMethodValue 함수, Confirm 클래스 헬퍼(sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), 모든 새 메서드에 대한 메서드 ID 매핑. |
sgcAMQP_ReadWrite.pas |
필드 테이블 타입 바이트 수정 — 각 값 타입이 이제 올바른 타입 표시자를 작성해요. |
sgcAMQP_Classes.pas |
새 열거형(amqpClassConfirm, 12개의 새 메서드), 13개의 새 페이로드 클래스, 업데이트된 디스패치 테이블, 스레드 안전성 수정, 스펙 준수 수정, 새 요청 저장 필드. |
sgcAMQP.pas |
8개의 새 이벤트 타입, Connection.Blocked/Unblocked 처리, DoWrite_ChannCloseOk 채널 ID 수정, 남은 바이트 보존, GetChannel 플래그 수정. |
sgcAMQP_Client.pas |
6개의 새 읽기 핸들러, 5개의 새 쓰기 메서드, 11개의 새 공개 메서드, 6개의 새 이벤트, 업데이트된 디스패치 테이블, 매개변수 순서 수정, 요청 데이터 수정. |
