Delphi용 Apache Kafka 클라이언트

· 컴포넌트

sgcWebSockets 2026.6.0부터 Delphi 및 C++Builder용 네이티브 Apache Kafka 클라이언트를 사용할 수 있습니다. 새로운 TsgcWSPClient_Kafka 컴포넌트는 중간에 REST 게이트웨이나 외부 라이브러리 없이 네이티브 Kafka 바이너리 프로토콜(raw TCP)을 통해 Kafka broker와 직접 통신합니다.

Apache Kafka는 레코드 스트림을 게시, 저장 및 소비하는 데 사용되는 분산 이벤트 스트리밍 플랫폼입니다. producer는 topic에 메시지를 쓰고, broker는 partition으로 나뉜 추가 전용(append-only) 로그에 이를 영속화하며, consumer는 선택적으로 조율된 컨슈머 그룹의 일부로서 이를 다시 읽습니다. sgcWebSockets 클라이언트는 단일 컴포넌트에서 생산, 소비, 토픽 관리 및 오프셋 관리를 제공합니다.

기능

지원되는 Kafka 버전

이 컴포넌트는 Apache Kafka 0.11에서 도입된 v2 레코드 배치 형식을 사용하며, Apache Kafka 3.x에 대해 테스트되었습니다. 해당 범위의 모든 broker, 또는 동일한 프로토콜 버전을 사용하는 Kafka 호환 broker가 작동합니다. 기본 broker 포트는 9092입니다.

구성

Kafka 클라이언트는 TsgcWebSocketClient 위에서 실행되는 프로토콜 컴포넌트입니다. Specifications.RFC6455 := False를 설정하여 클라이언트가 raw TCP 소켓으로 연결되도록 하고(Kafka 프로토콜은 WebSocket이 아님), 클라이언트를 TsgcWSPClient_Kafka 컴포넌트에 할당한 다음, broker의 HostPort를 설정합니다.

producer와 consumer의 동작은 KafkaOptions 속성을 통해 구성됩니다:

uses
  sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;

var
  oClient: TsgcWebSocketClient;
  oKafka: TsgcWSPClient_Kafka;
begin
  oClient := TsgcWebSocketClient.Create(nil);
  oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
  oClient.Host := '127.0.0.1';
  oClient.Port := 9092;

  oKafka := TsgcWSPClient_Kafka.Create(nil);
  oKafka.Client := oClient;
  oKafka.KafkaOptions.ClientId := 'my-delphi-app';
  oKafka.OnKafkaConnect := OnKafkaConnect;
  oKafka.OnKafkaMessage := OnKafkaMessage;
  oKafka.OnKafkaProduce := OnKafkaProduce;

  oClient.Active := True; // connect to the broker
end;

메시지 생산

연결되면 topic, 값, 그리고 선택적 키와 partition과 함께 Produce를 호출합니다. 모든 게시의 결과는 레코드가 저장된 offset을 포함하여 OnKafkaProduce 이벤트를 통해 보고됩니다.

oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
  aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
  if aErrorCode = 0 then
    Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;

메시지 소비

GroupIdOffsetReset을 설정하고, 하나 이상의 topic을 subscribe한 다음 Poll을 호출하여 사용 가능한 레코드를 가져옵니다. PollTsgcKafkaMessages 목록을 반환하고 각 레코드에 대해 OnKafkaMessage 이벤트도 발생시킵니다. 계속해서 수신하려면 (예를 들어 타이머에서) Poll을 반복적으로 호출합니다.

oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);

var
  oMessages: TsgcKafkaMessages;
  i: Integer;
begin
  oMessages := oKafka.Poll(1000); // timeout in milliseconds
  try
    for i := 0 to oMessages.Count - 1 do
      Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
        oMessages[i].Offset, oMessages[i].GetValueString]));
  finally
    oMessages.Free;
  end;
end;

동일한 데이터가 OnKafkaMessage 이벤트를 통해 전달되며, 이는 백그라운드 스레드나 타이머에서 poll할 때 편리합니다:

procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
  Log(Format('received key="%s" value="%s"',
    [aMessage.GetKeyString, aMessage.GetValueString]));
end;

그룹에 join하지 않고 topic만 읽으려면 GroupId를 비워 두십시오. 그러면 Subscribe가 topic의 모든 partition을 할당하고 PollOffsetReset으로 선택한 위치부터 레코드를 반환합니다.

토픽과 오프셋

클라이언트는 외부 도구 없이 topic을 관리하고 offset을 검사할 수 있습니다.

// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);

// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);

// commit the consumed offsets for the current group
oKafka.CommitSync;

다운로드

Kafka 클라이언트는 sgcWebSockets 2026.6.0에 포함되어 제공됩니다. connect, produce, subscribe, poll, 토픽 관리 및 오프셋 관리를 보여주는 전체 데모가 Demos/02.WebSocket_Protocols/13.Kafka에 포함되어 있습니다. 직접 사용해 보려면 sgcWebSockets 다운로드를 이용하십시오.

질문, 피드백 또는 시작에 도움이 필요하신가요? 문의하기 — 코드를 작성한 사람들로부터 답변을 받으실 수 있습니다.