sgcWebSockets 2026.6.0부터 Delphi 및 C++Builder용 네이티브 Apache Kafka 클라이언트를 사용할 수 있습니다. 새로운 TsgcWSPClient_Kafka 컴포넌트는 중간에 REST 게이트웨이나 외부 라이브러리 없이 네이티브 Kafka 바이너리 프로토콜(raw TCP)을 통해 Kafka broker와 직접 통신합니다.
Apache Kafka는 레코드 스트림을 게시, 저장 및 소비하는 데 사용되는 분산 이벤트 스트리밍 플랫폼입니다. producer는 topic에 메시지를 쓰고, broker는 partition으로 나뉜 추가 전용(append-only) 로그에 이를 영속화하며, consumer는 선택적으로 조율된 컨슈머 그룹의 일부로서 이를 다시 읽습니다. sgcWebSockets 클라이언트는 단일 컴포넌트에서 생산, 소비, 토픽 관리 및 오프셋 관리를 제공합니다.
기능
- 네이티브 프로토콜: TCP를 통한 Kafka 와이어 프로토콜을 사용하여 broker와 통신합니다. Confluent REST 프록시, Java, 타사 DLL이 필요 없습니다.
- Producer: 키와 값, 그리고 구성 가능한 확인 응답(none, leader, all) 및 요청 타임아웃과 함께 임의의 topic 및 partition에 레코드를 게시합니다.
- Consumer: 관리형 컨슈머 그룹(broker가 그룹별로 커밋된 offset을 추적)을 사용하거나 그룹 없이(topic의 모든 partition을 직접 읽기) 레코드를 읽습니다.
- 컨슈머 그룹: 자동 코디네이터 검색, join/sync, partition 할당 및 offset 커밋/페치.
- 오프셋 제어: 가장 이른, 가장 최근, 커밋된 offset을 조회하고 새 그룹이 시작할 위치(가장 이른 또는 가장 최근)를 선택합니다.
- 토픽 관리: 클라이언트에서 직접 topic을 생성 및 삭제하고, 클러스터 메타데이터를 요청하며, 컨슈머 그룹을 나열합니다.
- 최신 레코드 형식: 최신 Kafka broker에서 사용하는 v2 레코드 배치 형식을 읽고 씁니다.
- 이벤트 기반:
OnKafkaConnect,OnKafkaMessage,OnKafkaProduce,OnKafkaError,OnKafkaDisconnect이벤트가 연결에서 발생하는 모든 것을 보고합니다.
지원되는 Kafka 버전
이 컴포넌트는 Apache Kafka 0.11에서 도입된 v2 레코드 배치 형식을 사용하며, Apache Kafka 3.x에 대해 테스트되었습니다. 해당 범위의 모든 broker, 또는 동일한 프로토콜 버전을 사용하는 Kafka 호환 broker가 작동합니다. 기본 broker 포트는 9092입니다.
구성
Kafka 클라이언트는 TsgcWebSocketClient 위에서 실행되는 프로토콜 컴포넌트입니다. Specifications.RFC6455 := False를 설정하여 클라이언트가 raw TCP 소켓으로 연결되도록 하고(Kafka 프로토콜은 WebSocket이 아님), 클라이언트를 TsgcWSPClient_Kafka 컴포넌트에 할당한 다음, broker의 Host와 Port를 설정합니다.
producer와 consumer의 동작은 KafkaOptions 속성을 통해 구성됩니다:
- ClientId: 애플리케이션을 broker에 식별합니다.
- Producer.Acks:
kafkaAcksNone,kafkaAcksLeader또는kafkaAcksAll. - Producer.TimeoutMs: broker가 필요한 확인 응답을 기다리는 시간.
- Consumer.GroupId: 컨슈머 그룹 이름. 그룹 없이 소비하려면 비워 둡니다.
- Consumer.OffsetReset:
kafkaOffsetEarliest또는kafkaOffsetLatest, 완전히 새로운 그룹이 읽기를 시작하는 위치.
uses
sgcWebSocket, sgcWebSocket_Protocols, sgcKafka_Classes;
var
oClient: TsgcWebSocketClient;
oKafka: TsgcWSPClient_Kafka;
begin
oClient := TsgcWebSocketClient.Create(nil);
oClient.Specifications.RFC6455 := False; // raw TCP, native Kafka protocol
oClient.Host := '127.0.0.1';
oClient.Port := 9092;
oKafka := TsgcWSPClient_Kafka.Create(nil);
oKafka.Client := oClient;
oKafka.KafkaOptions.ClientId := 'my-delphi-app';
oKafka.OnKafkaConnect := OnKafkaConnect;
oKafka.OnKafkaMessage := OnKafkaMessage;
oKafka.OnKafkaProduce := OnKafkaProduce;
oClient.Active := True; // connect to the broker
end;
메시지 생산
연결되면 topic, 값, 그리고 선택적 키와 partition과 함께 Produce를 호출합니다. 모든 게시의 결과는 레코드가 저장된 offset을 포함하여 OnKafkaProduce 이벤트를 통해 보고됩니다.
oKafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
oKafka.Produce('my-topic', 'Hello Kafka', 'key-1');
procedure TForm1.OnKafkaProduce(Sender: TObject; const aTopic: string;
aPartition: Integer; aOffset: Int64; aErrorCode: SmallInt);
begin
if aErrorCode = 0 then
Log(Format('stored in %s:%d at offset %d', [aTopic, aPartition, aOffset]));
end;
메시지 소비
GroupId와 OffsetReset을 설정하고, 하나 이상의 topic을 subscribe한 다음 Poll을 호출하여 사용 가능한 레코드를 가져옵니다. Poll은 TsgcKafkaMessages 목록을 반환하고 각 레코드에 대해 OnKafkaMessage 이벤트도 발생시킵니다. 계속해서 수신하려면 (예를 들어 타이머에서) Poll을 반복적으로 호출합니다.
oKafka.KafkaOptions.Consumer.GroupId := 'my-group';
oKafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
oKafka.Subscribe(['my-topic']);
var
oMessages: TsgcKafkaMessages;
i: Integer;
begin
oMessages := oKafka.Poll(1000); // timeout in milliseconds
try
for i := 0 to oMessages.Count - 1 do
Log(Format('[%s:%d@%d] %s', [oMessages[i].Topic, oMessages[i].Partition,
oMessages[i].Offset, oMessages[i].GetValueString]));
finally
oMessages.Free;
end;
end;
동일한 데이터가 OnKafkaMessage 이벤트를 통해 전달되며, 이는 백그라운드 스레드나 타이머에서 poll할 때 편리합니다:
procedure TForm1.OnKafkaMessage(Sender: TObject; const aMessage: TsgcKafkaMessage);
begin
Log(Format('received key="%s" value="%s"',
[aMessage.GetKeyString, aMessage.GetValueString]));
end;
그룹에 join하지 않고 topic만 읽으려면 GroupId를 비워 두십시오. 그러면 Subscribe가 topic의 모든 partition을 할당하고 Poll은 OffsetReset으로 선택한 위치부터 레코드를 반환합니다.
토픽과 오프셋
클라이언트는 외부 도구 없이 topic을 관리하고 offset을 검사할 수 있습니다.
// create a topic with 1 partition and replication factor 1
oKafka.CreateTopic('my-topic', 1, 1);
// inspect the partition offsets
vEarliest := oKafka.GetEarliestOffset('my-topic', 0);
vLatest := oKafka.GetLatestOffset('my-topic', 0);
// commit the consumed offsets for the current group
oKafka.CommitSync;
다운로드
Kafka 클라이언트는 sgcWebSockets 2026.6.0에 포함되어 제공됩니다. connect, produce, subscribe, poll, 토픽 관리 및 오프셋 관리를 보여주는 전체 데모가 Demos/02.WebSocket_Protocols/13.Kafka에 포함되어 있습니다. 직접 사용해 보려면 sgcWebSockets 다운로드를 이용하십시오.
질문, 피드백 또는 시작에 도움이 필요하신가요? 문의하기 — 코드를 작성한 사람들로부터 답변을 받으실 수 있습니다.
