Apache Kafka client.
The TsgcWSPClient_Kafka client implements the Apache Kafka protocol, talking to the broker directly over the native Kafka binary protocol on a plain TCP connection. It can produce and consume records, manage consumer groups, administer topics and query offsets, with no REST proxy or external library.
The Kafka client is a protocol component that runs on top of a TsgcWebSocketClient. Because Kafka is not a WebSocket protocol, set Specifications.RFC6455 to false so the client connects as a plain TCP socket. Then assign the client to the Kafka component and set the broker Host and Port (default 9092).
The behaviour of the producer and the consumer is configured through the KafkaOptions property:
KafkaOptions.ClientId: identifies your application to the broker.
KafkaOptions.Producer.Acks: required acknowledgements, kafkaAcksNone, kafkaAcksLeader or kafkaAcksAll.
KafkaOptions.Producer.TimeoutMs: how long the broker waits for the required acknowledgements.
KafkaOptions.Consumer.GroupId: the consumer group name. Leave it empty to consume without a group.
KafkaOptions.Consumer.OffsetReset: kafkaOffsetEarliest or kafkaOffsetLatest, where a brand-new group starts reading.
oKafka := TsgcWSPClient_Kafka.Create(nil);
oKafka.KafkaOptions.ClientId := 'my-delphi-app';
oKafka.OnKafkaConnect := OnKafkaConnect;
oKafka.OnKafkaMessage := OnKafkaMessage;
oKafka.OnKafkaProduce := OnKafkaProduce;
oClient := TsgcWebSocketClient.Create(nil);
oKafka.Client := oClient;
oClient.Specifications.RFC6455 := false;
oClient.Host := '127.0.0.1';
oClient.Port := 9092;
oClient.Active := True;
Call the method Produce to publish a record. The result of every publish is reported through the OnKafkaProduce event, including the offset the record was stored at.
oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
Set a GroupId and an OffsetReset, subscribe to one or more topics and call Poll to fetch the records that are available. Poll returns a TsgcKafkaMessages list and also raises the OnKafkaMessage event for each record. Call Poll repeatedly (for example from a timer) to receive continuously.
oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);
oMessages := oKafka.Poll(1000);
try
for I := 0 to oMessages.Count - 1 do
DoLog(oMessages[I].GetValueString);
finally
oMessages.Free;
end;