sgcWebSockets 2022.1부터 AMQP 0.9.1 프로토콜을 지원해요. Advanced Message Queuing Protocol(AMQP)은 메시지 지향 미들웨어를 위한 개방형 표준 애플리케이션 계층 프로토콜이에요. AMQP의 핵심 특징은 메시지 지향성, 큐잉, 라우팅(점대점 및 발행-구독 포함), 안정성, 보안이에요.
AMQP는 다양한 메시징 애플리케이션과 통신 패턴을 효율적으로 지원하도록 설계된 이진 애플리케이션 계층 프로토콜이에요. 흐름 제어가 가능한 메시지 지향 통신을 제공하며, at-most-once(각 메시지가 한 번만 전달되거나 전달되지 않음), at-least-once(각 메시지가 반드시 전달되지만 여러 번 전달될 수 있음), exactly-once(메시지가 반드시 한 번만 도착함) 같은 메시지 전달 보장 기능과 SASL 또는 TLS 기반의 인증 및 암호화를 제공해요. 기본 신뢰성 있는 전송 계층 프로토콜(예: TCP)을 전제로 해요.
채널
AMQP는 다중 채널 프로토콜이에요. 채널은 무거운 TCP/IP 연결을 여러 개의 가벼운 연결로 다중화하는 방법을 제공해요. 포트 사용이 예측 가능하므로 프로토콜이 "방화벽 친화적"이에요. 또한 트래픽 셰이핑이나 다른 네트워크 QoS 기능을 쉽게 활용할 수 있어요.
모든 채널은 자체 스레드에서 실행되므로, 새 메시지가 수신될 때마다 클라이언트는 먼저 채널을 식별하고 메시지를 채널 스레드에서 처리하는 큐에 넣어요.
채널의 수명 주기는 다음과 같아요.
1. 클라이언트가 새 채널을 열어요(Open).
2. 서버가 새 채널이 준비되었음을 확인해요(Open-Ok).
3. 클라이언트와 서버가 원하는 대로 채널을 사용해요.
4. 한쪽 피어(클라이언트 또는 서버)가 채널을 닫아요(Close).
5. 다른 피어가 채널 닫기를 핸드셰이크해요(Close-Ok).
새 채널을 만들려면 OpenChannel 메서드를 호출하고 채널 이름을 인자로 전달하기만 하면 돼요. 채널이 열렸음을 서버가 확인하면 OnAMQPChannelOpen 이벤트가 발생해요.
AMQP.OpenChannel('channel_name');
procedure OnAMQPChannelOpen(Sender: TObject; const aChannel: string);
begin
DoLog('#AMQP_channel_open: ' + aChannel);
end;
익스체인지
익스체인지 클래스는 애플리케이션이 서버의 익스체인지를 관리할 수 있게 해줘요. 이 클래스를 사용하면 (설정 인터페이스에 의존하지 않고) 애플리케이션이 자체적으로 와이어링을 스크립트화할 수 있어요. 참고: 대부분의 애플리케이션은 이 수준의 정교함이 필요하지 않으며, 레거시 미들웨어는 이런 의미론을 지원하지 못할 가능성이 높아요.
익스체인지 수명 주기는 다음과 같아요.
1. 클라이언트가 서버에 익스체인지가 존재하는지 확인하도록 요청해요(Declare). 클라이언트는 이를 "익스체인지가 없으면 생성", 또는 "없으면 경고만 하고 생성하지 않음"으로 세분화할 수 있어요.
2. 클라이언트가 익스체인지에 메시지를 발행해요.
3. 클라이언트는 익스체인지를 삭제하도록 선택할 수 있어요(Delete).
DeclareExchange 메서드는 새 익스체인지를 생성하거나 익스체인지가 이미 존재하는지 확인해요. 메서드는 다음 인자를 가져요.
- ChannelName: 채널 이름(이 메서드를 호출하기 전에 열려 있어야 해요).
- ExchangeName: 익스체인지 이름. 255자를 초과할 수 없고 "amq."로 시작할 수 없어요(passive 매개변수가 true인 경우 제외).
- ExchangeType: 익스체인지 유형. 모든 AMQP 서버는 "direct"와 "fanout" 익스체인지를 지원해요. 어떤 익스체인지 유형이 지원되는지 서버 문서를 확인하세요.
- Passive: passive가 true이면 서버는 익스체인지가 이미 선언되었는지만 확인해요. passive가 false이고 익스체인지가 존재하지 않으면 서버가 새로 생성해요.
- Durable: true이면 서버가 시작될 때 익스체인지가 다시 생성돼요. false이면 서버가 중지될 때 익스체인지가 삭제돼요.
- AutoDelete: true이면 모든 큐가 언바인딩될 때 익스체인지가 삭제돼요.
- Internal: 항상 false.
- NoWait: true이면 서버가 클라이언트에 확인을 보내지 않아요.
새 익스체인지를 선언하려면 DeclareExchange 메서드를 호출하고 채널 이름, 익스체인지 이름, 익스체인지 유형을 인자로 전달하면 돼요. 익스체인지가 선언되었음을 서버가 확인하면 OnAMQPExchangeDeclare 이벤트가 발생해요.
AMQP.DeclareExchange('channel_name', 'exchange_name', 'direct');
procedure OnAMQPExchangeDeclare(Sender: TObject; const aChannel, aExchange: string);
begin
DoLog('#AMQP_exchange_declare: [' + aChannel + '] ' + aExchange);
end;
큐
큐 클래스는 애플리케이션이 서버의 메시지 큐를 관리할 수 있게 해줘요. 메시지를 소비하는 거의 모든 애플리케이션에서 기본적인 단계로, 최소한 예상되는 메시지 큐가 실제로 존재하는지 확인하기 위한 단계예요.
지속성 메시지 큐의 수명 주기는 비교적 단순해요.
1. 클라이언트가 메시지 큐가 존재한다고 주장해요("passive" 인자를 사용한 Declare).
2. 서버가 메시지 큐가 존재함을 확인해요(Declare-Ok).
3. 클라이언트가 메시지 큐에서 메시지를 읽어요.
임시 메시지 큐의 수명 주기는 더 흥미로워요.
1. 클라이언트가 메시지 큐를 생성해요(Declare, 보통 메시지 큐 이름 없이 서버가 이름을 할당하도록 함). 서버가 확인해요(Declare-Ok).
2. 클라이언트가 메시지 큐에 컨슈머를 시작해요. 컨슈머의 정확한 기능은 Basic 클래스에 정의되어 있어요.
3. 클라이언트가 명시적으로 또는 채널/연결을 닫아서 컨슈머를 취소해요.
4. 마지막 컨슈머가 메시지 큐에서 사라지고 적절한 타임아웃이 지나면 서버가 메시지 큐를 삭제해요.
AMQP는 토픽 구독을 위한 전달 메커니즘을 메시지 큐로 구현해요. 이를 통해 협력하는 구독 애플리케이션 풀에서 구독의 부하 분산을 할 수 있는 흥미로운 구조가 가능해져요.
구독의 수명 주기에는 추가적인 바인딩 단계가 포함돼요.
1. 클라이언트가 메시지 큐를 생성하고(Declare), 서버가 확인해요(Declare-Ok).
2. 클라이언트가 메시지 큐를 토픽 익스체인지에 바인딩하고(Bind), 서버가 확인해요(Bind-Ok).
3. 클라이언트가 이전 예제와 같이 메시지 큐를 사용해요.
DeclareQueue 메서드는 새 큐를 생성하거나 큐가 이미 존재하는지 확인해요. 메서드는 다음 인자를 가져요.
- ChannelName: 채널 이름(이 메서드를 호출하기 전에 열려 있어야 해요).
- QueueName: 큐 이름. 255자를 초과할 수 없고 "amq."로 시작할 수 없어요(passive 매개변수가 true인 경우 제외).
- Passive: passive가 true이면 서버는 큐가 이미 선언되었는지만 확인해요. passive가 false이고 큐가 존재하지 않으면 서버가 새로 생성해요.
- Durable: true이면 서버가 시작될 때 큐가 다시 생성돼요. false이면 서버가 중지될 때 큐가 삭제돼요.
- Exclusive: true이면 큐는 현재 연결에서만 접근할 수 있어요.
- AutoDelete: true이면 모든 컨슈머가 더 이상 큐를 사용하지 않을 때 큐가 삭제돼요.
- NoWait: true이면 서버가 클라이언트에 확인을 보내지 않아요.
새 큐를 선언하려면 DeclareQueue 메서드를 호출하고 채널 이름과 큐 이름을 인자로 전달하면 돼요. 익스체인지가 선언되었음을 서버가 확인하면 OnAMQPQueueDeclare 이벤트가 발생해요.
AMQP.DeclareQueue('channel_name', 'queue_name');
procedure OnAMQPQueueDeclare(Sender: TObject; const aChannel, aQueue: string; aMessageCount, aConsumerCount: Integer);
begin
DoLog('#AMQP_queue_declare: [' + aChannel + '] ' + aQueue);
end;
