1. Producer 가 메시지를 Kafka 로 전송하는 전체 구조
위 전체구조에서 빨간 파트를 A, 주황 파트를 B, 파란 파트를 C라고 했을 때
A. 메시지 생성
A -1. 프로듀서가 메시지를 생성한다.
(Topic : "my_topic", Key : "A", Value : "Hello Kafka!")
ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "A", "Hello Kafka!");
ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", 2, "A", "Hello Kafka!");
//위 코드는 메시지를 Partition 2에 강제적으로 저장한다
A -2. 메시지 안의 Key 값과 Value 값을 직렬화한다.
(String을 직렬화하려면 StringSerializer 사용, Integer를 직렬화하려면 IntegerSerializer 사용)
"A" → StringSerializer → [01101011 01100101 01111001 00110001]
"Hello Kafka!" → StringSerializer → [01001000 01100101 01101100]
A -3. Key, Value 값을 직렬화하여 Producer 객체를 생성하는 코드
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //Kafka가 실행되고 있는 서버 호스트명 지정
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //Key 값 직렬화
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //Value 값 직렬화
KafkaProducer<String, String> producer = new KafkaProducer<>(props); //Producer 객체 생성
B. 배치에 메시지 저장
▶메시지의 Key 값은 Partition 을 결정한다.
✅ Key 값이 있으면 Hashing을 사용하여 특정 Partition에 저장된다.
✅ Key 값이 없으면 Round Robin 방식으로 Partition에 균등 분배한다.
✅ 따라서 같은 Key 값을 가진 메시지는 같은 Partition으로 가기 때문에 순서가 보장된다.
B -1. 배치 사이즈, 배치 전송대기 시간, 버퍼 메모리 크기 설정하는 코드
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * 1024 * 1024); // 버퍼 메모리 크기 설정 (32MB)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 배치 사이즈 설정 (16KB)
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 배치 전송대기 시간 설정 (10ms 대기 후 전송)
B-2. 배치에 메시지를 저장하는 방식
✅메시지에서 Topic 이 같고 Key 값이 같으면 같은 배치에 저장된다.
✅메시지에서 Topic 이 같고 Key 값이 다르면 다른 배치에 저장될 수 있다.
✅메시지에서 Topic 이 다르면 서로 다른 배치로 저장된다.
C. 배치를 Kafka에 저장
▶배치 안의 메시지들은 같은 토픽의 같은 파티션으로 전송된다.
C-1. 메시지를 Kafka에 저장하는 방식
✅ Topic이 같고 Key 값이 같으면, 메시지들은 같은 파티션에 저장된다.
✅ Topic이 같고 Key 값이 다르면, 메시지들은 다른 파티션에 저장된다.
C-2. 배치 안의 메시지를 kafka로 전송하는 코드
// 메시지를 배치에 저장하고 Kafka로 전송
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.out.println("Error sending message: " + exception.getMessage());
} else {
System.out.println("Message sent successfully to partition " + metadata.partition() + " with offset " + metadata.offset());
}
});
'Kafka' 카테고리의 다른 글
[Window] kafka 설치 및 실행 (0) | 2025.02.10 |
---|---|
[개념정리] Kafka 전체구조 (0) | 2025.02.06 |