A queue acts as a buffer that stores messages that are consumed later. A queue can also be declared with a number of attributes during creation. For instance, it can be marked as durable, auto-delete and exclusive, where exclusive means that it can be used by only one connection and this queue will be deleted when that connection closes.
A channel routes messages to a queue depending on the exchange type and bindings between the exchange and the queue. For a queue to receive messages, it must be bound to at least one exchange. AMQP 0.9.1 brokers should provide four exchange types - direct exchange, fanout exchange, topic exchange, and header exchange. An exchange can be declared with a number of attributes during creation. For instance, it can be marked as durable so that it survives a broker restart, or it can be marked as auto-delete meaning that it’s automatically deleted when the last queue is unbound.
A binding is a relation between a queue and an exchange consisting of a set of rules that the exchange uses (among other things) to route messages to queues.
A message is an entity sent from the publisher to the queue and finally subscribed to by the consumer. Each message contains a set of headers defining properties such as life duration, durability, and priority. AMQP 0.9.1 also has a built-in feature called message acknowledgment that is used to confirm message delivery and/or processing.
A channel is a virtual connection inside a connection, between two AMQP peers. Message publishing or consuming to or from a queue is performed over a channel (AMQP). A channel is multiplexed, one single connection can have multiple channels.
// Create websocket client and set server options
oClient := TsgcWebSocketClient.Create(nil);
oClient.Host := 'www.esegece.com';
oClient.Port := 5672;
// Create AMQP protocol and assign to websocket client
oAMQP := TsgcWSPClient_AMQP.Create(nil);
oAMQP.Client := oClient;
// AQMP Authentication
procedure OnAMQPAuthentication(Sender: TObject; aMechanisms: TsgcAMQPAuthentications; var Mechanism: TsgcAMQPAuthentication; var User, Password: string);
begin
User := 'sgc';
Password := 'sgc';
end;
// Handle AMQP methods
oAMQP.OnAMQPConnect := OnAMQPConnectHandler;
oAMQP.OnAMQPDisconnect := OnAMQPDisconnectHandler;
// connect to server
oClient.Active := True;
AMQP.Consume('channel_name', 'exchange_name', 'consumer_tag');
procedure OnAMQPBasicGetOk(Sender: TObject; const aChannel: string; const aGetOk: TsgcAMQPFramePayload_Method_BasicGetOk; const aContent: TsgcAMQPMessageContent);
begin
DoLog('#AMQP_basic_GetOk: ' + aChannel + ' ' + IntToStr(aGetOk.MessageCount) + ' ' + aContent.Body.AsString);
end;
AMQP.PublishMessage('channel_name', 'exchange_name', 'routing_key', 'Hello from sgcWebSockets!!!');
procedure OnAMQPBasicReturn(Sender: TObject; const aChannel: string; const aReturn: TsgcAMQPFramePayload_Method_BasicReturn; const aContent: TsgcAMQPMessageContent);
begin
DoLog('#AMQP_basic_return: ' + aChannel + ' ' + IntToStr(aReturn.ReplyCode) + ' ' + aReturn.ReplyText + ' ' + aContent.Body.AsString);
end;
AMQP.SetQoS('channel_name', 1024000, 100, false);
procedure OnAMQPBasicQoS(Sender: TObject; const aChannel: string; const aQoS: TsgcAMQPFramePayload_Method_BasicQoS);
begin
DoLog('#AMQP_basic_qos: ' + aChannel + ' ' + IntToStr(aQoS.PrefetchSize) + ' ' + IntToStr(aQoS.PrefetchCount) + ' ' + BoolToStr(aQoS.Global>;
end;