TsgcWSPClient_Kafka

Apache Kafka client.

Introduction

The TsgcWSPClient_Kafka client implements the Apache Kafka protocol, talking to the broker directly over the native Kafka binary protocol on a plain TCP connection. It can produce and consume records, manage consumer groups, administer topics and query offsets, with no REST proxy or external library.

Connection

The Kafka client is a protocol component that runs on top of a TsgcWebSocketClient. Because Kafka is not a WebSocket protocol, set Specifications.RFC6455 to false so the client connects as a plain TCP socket. Then assign the client to the Kafka component and set the broker Host and Port (default 9092).

The behaviour of the producer and the consumer is configured through the KafkaOptions property:


    oKafka := TsgcWSPClient_Kafka.Create(nil);
    oKafka.KafkaOptions.ClientId := 'my-delphi-app';
    oKafka.OnKafkaConnect := OnKafkaConnect;
    oKafka.OnKafkaMessage := OnKafkaMessage;
    oKafka.OnKafkaProduce := OnKafkaProduce;

    oClient := TsgcWebSocketClient.Create(nil);
    oKafka.Client := oClient;
    oClient.Specifications.RFC6455 := false;
    oClient.Host := '127.0.0.1';
    oClient.Port := 9092;
    oClient.Active := True;

Produce Messages

Call the method Produce to publish a record. The result of every publish is reported through the OnKafkaProduce event, including the offset the record was stored at.


    oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
    oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');

Consume Messages

Set a GroupId and an OffsetReset, subscribe to one or more topics and call Poll to fetch the records that are available. Poll returns a TsgcKafkaMessages list and also raises the OnKafkaMessage event for each record. Call Poll repeatedly (for example from a timer) to receive continuously.


    oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
    oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
    oKafka.Subscribe(['my-topic']);

    oMessages := oKafka.Poll(1000);
    try
      for I := 0 to oMessages.Count - 1 do
        DoLog(oMessages[I].GetValueString);
    finally
      oMessages.Free;
    end;

Reference