Protocol
AMQP 0.9.1
Advanced Message Queuing Protocol 0.9.1

Message Queue

A queue acts as a buffer that stores messages that are consumed later. A queue can also be declared with a number of attributes during creation. For instance, it can be marked as durable, auto-delete and exclusive, where exclusive means that it can be used by only one connection and this queue will be deleted when that connection closes.

Exchanges and Exchange Types

A channel routes messages to a queue depending on the exchange type and bindings between the exchange and the queue. For a queue to receive messages, it must be bound to at least one exchange. AMQP 0.9.1 brokers should provide four exchange types - direct exchange, fanout exchange, topic exchange, and header exchange. An exchange can be declared with a number of attributes during creation. For instance, it can be marked as durable so that it survives a broker restart, or it can be marked as auto-delete meaning that it’s automatically deleted when the last queue is unbound.

Binding

A binding is a relation between a queue and an exchange consisting of a set of rules that the exchange uses (among other things) to route messages to queues.

Message and Content

A message is an entity sent from the publisher to the queue and finally subscribed to by the consumer. Each message contains a set of headers defining properties such as life duration, durability, and priority. AMQP 0.9.1 also has a built-in feature called message acknowledgment that is used to confirm message delivery and/or processing.

Channel

A channel is a virtual connection inside a connection, between two AMQP peers. Message publishing or consuming to or from a queue is performed over a channel (AMQP). A channel is multiplexed, one single connection can have multiple channels.

  • AMQP 0.9.1 Delphi
  • AMQP 0.9.1 Delphi Demo
  • Rad Studio Trial

Create Connection

// Create websocket client and set server options
oClient := TsgcWebSocketClient.Create(nil);
oClient.Host := 'www.esegece.com';
oClient.Port := 5672;

// Create AMQP protocol and assign to websocket client
oAMQP := TsgcWSPClient_AMQP.Create(nil);
oAMQP.Client := oClient;

// AQMP Authentication
procedure OnAMQPAuthentication(Sender: TObject; aMechanisms: TsgcAMQPAuthentications; var Mechanism: TsgcAMQPAuthentication; var User, Password: string); begin User := 'sgc'; Password := 'sgc'; end;
// Handle AMQP methods
oAMQP.OnAMQPConnect := OnAMQPConnectHandler;
oAMQP.OnAMQPDisconnect := OnAMQPDisconnectHandler;

// connect to server
oClient.Active := True;

Consume

AMQP.Consume('channel_name', 'exchange_name', 'consumer_tag');

procedure OnAMQPBasicGetOk(Sender: TObject; const aChannel: string; const aGetOk: TsgcAMQPFramePayload_Method_BasicGetOk; const aContent: TsgcAMQPMessageContent);
begin
DoLog('#AMQP_basic_GetOk: ' + aChannel + ' ' + IntToStr(aGetOk.MessageCount) + ' ' + aContent.Body.AsString);
end;

Publish Message

AMQP.PublishMessage('channel_name', 'exchange_name', 'routing_key', 'Hello from sgcWebSockets!!!');

procedure OnAMQPBasicReturn(Sender: TObject; const aChannel: string; const aReturn: TsgcAMQPFramePayload_Method_BasicReturn; const aContent: TsgcAMQPMessageContent);
begin
DoLog('#AMQP_basic_return: ' + aChannel + ' ' + IntToStr(aReturn.ReplyCode) + ' ' + aReturn.ReplyText + ' ' + aContent.Body.AsString);
end;

QoS

AMQP.SetQoS('channel_name', 1024000, 100, false);

procedure OnAMQPBasicQoS(Sender: TObject; const aChannel: string; const aQoS: TsgcAMQPFramePayload_Method_BasicQoS);
begin
DoLog('#AMQP_basic_qos: ' + aChannel + ' ' + IntToStr(aQoS.PrefetchSize) + ' ' + IntToStr(aQoS.PrefetchCount) + ' ' + BoolToStr(aQoS.Global>;
end;

Supported Platforms

The component supports the following Platforms:

Delphi   CBuilder Lazarus