Commands | Kafka Consume Messages

To consume records, set KafkaOptions.Consumer.GroupId and KafkaOptions.Consumer.OffsetReset (kafkaOffsetEarliest or kafkaOffsetLatest), then call Subscribe([topic]) with one or more topics.

After subscribing, call Poll(timeoutMs) to fetch records. Poll returns a TsgcKafkaMessages list and raises the OnKafkaMessage event once per record. You typically call Poll repeatedly, for example from a timer, to keep consuming new records as they arrive.

Each record in the list is a TsgcKafkaMessage. Iterate the list using Count and read each record with GetKeyString, GetValueString, Topic, Partition and Offset.

Subscribe and Poll


oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);

// call repeatedly, e.g. from a TTimer.OnTimer
var
  oMessages: TsgcKafkaMessages;
  i: Integer;
begin
  oMessages := oKafka.Poll(1000);
  for i := 0 to oMessages.Count - 1 do
    ShowMessage(oMessages[i].GetKeyString + ' = ' + oMessages[i].GetValueString);
end;

OnKafkaMessage Event

Instead of iterating the list returned by Poll, you can handle each record in the OnKafkaMessage event.


procedure OnKafkaMessage(Sender: TObject; const Message: TsgcKafkaMessage);
begin
  ShowMessage(Format('%s [%d] @ %d: %s = %s',
    [Message.Topic, Message.Partition, Message.Offset, Message.GetKeyString, Message.GetValueString]));
end;