To consume records, set KafkaOptions.Consumer.GroupId and KafkaOptions.Consumer.OffsetReset (kafkaOffsetEarliest or kafkaOffsetLatest), then call Subscribe([topic]) with one or more topics.
After subscribing, call Poll(timeoutMs) to fetch records. Poll returns a TsgcKafkaMessages list and raises the OnKafkaMessage event once per record. You typically call Poll repeatedly, for example from a timer, to keep consuming new records as they arrive.
Each record in the list is a TsgcKafkaMessage. Iterate the list using Count and read each record with GetKeyString, GetValueString, Topic, Partition and Offset.
oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);
// call repeatedly, e.g. from a TTimer.OnTimer
var
oMessages: TsgcKafkaMessages;
i: Integer;
begin
oMessages := oKafka.Poll(1000);
for i := 0 to oMessages.Count - 1 do
ShowMessage(oMessages[i].GetKeyString + ' = ' + oMessages[i].GetValueString);
end;
Instead of iterating the list returned by Poll, you can handle each record in the OnKafkaMessage event.
procedure OnKafkaMessage(Sender: TObject; const Message: TsgcKafkaMessage);
begin
ShowMessage(Format('%s [%d] @ %d: %s = %s',
[Message.Topic, Message.Partition, Message.Offset, Message.GetKeyString, Message.GetValueString]));
end;