아파치 카프카 애플리케이션 프로그래밍 with 자바
아파치 카프카로 새로운 개발 트렌드를 준비하는 분들을 위해 집대성한 아파치 카프카 최종 솔루션이다. 국내 서적 중 최초로 카프카의 핵심 기능인 미러메이커2(MirrorMaker2)에 대한 설명과 스프
www.aladin.co.kr
[ ] : 나중에 체크하려고 하는 부분
Chapter 1. 들어가기
탄생
- 링크드인에서 파편화된 데이터 수집 및 분배 아키텍처 운영을 위한 문제를 해결하기 위해 개발
- 소스 애플리케이션과 데이터가 최종 적재되는 타킷 애플리케이션의 연결
- 각각의 애플리케이션끼리 연결하여 데이터를 처리하는 것이 아니라 데이터를 한 곳에 모아 처리하도록 중앙집중화
- 데이터 포맷은 사실상 제한이 없다. 직렬화, 역직렬화를 통해 ByteArray로 통신하기 때문에 자바에서 선언 가능한 모든 객체를 지원한다. ([ ] 직렬화?) 만약 필요할 경우 카프카에서 제공하는 Serializer<T>, Deserializer<T>를 상속받아 사용할 수도 있다.
- 상용 환경에서 카프카는 최소 3대 이상의 서버(브로커)에서 분산 운영한다.
- 데이터를 묶음 단위로 처리한다.
- 2011년 오픈소스화 www.github.com/apache/kafka
데이터 파이프라인에서 카프카의 역할
- 빅데이터에 적재되는 데이터의 종류는 다양하다. 정형, 비정형 데이터.
- 빅데이터를 저장하고 활용하기 위해선 일단 생성되는 데이터를 모두 모은다. : 데이터 레이크 Data Lake
- 필터링되거나 패키지화 되지 않은 데이터가 저장된다. : 데이터 웨어하우스 Data Warehouse
- 서비스가 작으면 end-to-end 방식으로 넣겠지만, 서비스가 비대해지면 데이터 흐름이 파편화되고 복잡해진다.
- 카프카가 데이터 파이프라인에 적합한 이유
- 높은 처리량: 데이터를 배치로 처리한다. 데이터를 병렬 처리한다(파티션).
- 확장성: 카프카 클러스터의 서버(브로커)를 무중단 scale-in, out 한다.
- 영속성: 데이터를 파일시스템에 저장한다. 페이지 캐시 영역을 사용해 데이터를 메모리에 캐싱하기에 느리지 않다.
- 고가용성: 카프카 클러스터는 3개 이상의 서버(브로커)로 운영된다. 데이터는 복제(replication)된다.
- 사실 브로커 개수의 제한은 없다. 하지만 3대 이상으로 구성할 것을 추천한다. 2대의 경우 장애 시 나머지 한 대가 살아 있긴 하지만 브로커 간에 데이터가 복제되는 시간 차이로 인해 데이터 유실의 우려가 있다.
- `min.insync.replicas` 옵션을 설정하면 최소 2개 이상의 브로커에 데이터가 완전히 복제됨을 보장한다. 이 옵션을 2로 설정하면 브로커를 3대 이상으로 운영해야만 한다. 옵션값보다 적은 수의 브로커가 존재할 때는 토픽에 더는 데이터를 넣을 수 없다.
데이터 레이크 아키텍처
- 레거시 데이터 플랫폼 아키텍처
- 초기 빅데이터 플랫폼은 end-to-end로 각 서비스 애플리케이션으로부터 데이터를 배치로 모았다.
- 유연하지 못한 구조였다. 실시간 데이터 인사이트를 빠르게 전달하지 못했다. 원천 데이터의 히스토리를 파악하기 어려웠다. 계속된 데이터 가공으로 데이터가 파편화 되면서 데이터 가버넌스(데이터 표준 및 정책)를 지키기 어려웠다.
- 람다 아키텍처 Lambda Architecture
- 기존의 배치 데이터를 처리하는 부분 외에 스피드 레이어라고 불리는 실시간 데이터 ETL 작업 영역을 정의했다.
- 3가지 레이어: 배치 레이어, 서빙 레이어, 스피드 레이어
- 배치 레이어: 배치 데이터를 모아서 특정 타이밍마다 일괄 처리한다.
- 서빙 레이어: 가공된 데이터를 데이터 사용자(데이터 사이언티스트, 데이터 엔지니어 등), 서비스 애플리케이션이 사용할 수 있도록 데이터를 저장한다. (하둡파일시스템HDFS, 오브젝트스토리지S3 등)
- 스피드 레이어: 서비스에서 생성되는 원천 데이터를 실시간으로 분석한다. 배치 데이터에 비해 낮은 지연으로 분석이 필요한 경우. (카프카, 카프카 스트림즈 등)
- 배치, 스피드 레이어가 각각 따로 존재 - 데이터 처리 로직의 파편화, 디버깅/배포/운영 분리에 대한 이슈, 배치+실시간데이터 처리 시 유연하지 못함
- 카파 아키텍처 Kappa Architecture
- 배치 레이어를 제거하고 모든 데이터를 스피드 레이어에 넣어서 처리한다. 서빙 레이어, 스피드 레이어.
- 배치 데이터도 스트림 처리해야 한다. 모든 데이터를 log(데이터의 집합, 각 데이터는 일정한 번호 또는 타임스탬프를 가진다)로 바라본다. (원래라면 배치 데이터를 'snapshot'이라고 보았을텐데, 'change log'로 보는 것이다.)
- 미래
- 2020년에 카파 아키텍처의 제시자인 제이 크랩스가 카파 아키텍처에서 서빙 레이어를 제거한 아키텍처인 데이터 레이크Data Lake를 제안했다. 스피드 레이어만 존재. '프로세싱한 데이터를 다시 서빙 레이어에 저장할 필요가 있나? 스피드 에리어에 데이터를 오랫동안 저장하고 사용한다면?' - 데이터 중복, 비정합성 문제 해결
- 카프카를 데이터 레이크로 사용하려면 개선할 점:
- 자주 접근하지 않는 데이터는 비싼 자원(디스크, 메모리)에 저장하지 말고 오브젝트스토리지(S3 등)에 저장 필요
- 카프카의 데이터를 query할 수 있는 주변 데이터 플랫폼 필요
Chapter 3. 기본 개념
카프카 브로커
- 하나의 서버에는 한 개의 카프카 브로커 프로세스가 실행된다.
- 세 대이상을 묶어서 클러스터로 운영한다.
- 데이터를 전달받아 토픽의 파티션에 데이터를 저장하고, 전달한다.
- 데이터는 파일 시스템에 저장된다.
데이터 저장
- 'config/server.properties'의 log.dir 옵션에 정의한 디렉토리에 데이터가 저장된다.
- 토픽 이름과 파티션 번호의 조합으로하위 디렉토리를 생성하여 데이터를 저장한다.
- log에는 메시지와 메타데이터를 저장한다.
- 카프카는 데이터를 메모리나 데이터베이스가 아니라 파일에 저장한다. 페이지 캐시(page cache)를 사용해 디스크 입출력 속도를 높였다. 한 번 읽은 파일의 내용은 메모리의 페이지 캐시 영역에 저장된다. JVM 상에서 동작하는 카프카가 직접 캐시를 구현하는 게 아니다. (그랬더라면 gc가 자주 일어났을 것이다.) 그래서 카프카 힙 사이즈를 크게 설정하지 않아도 된다.
데이터 복제 replication
- 복제는 파티션 단위로 이루어진다.
- 토픽 생성 시 파티션의 복제 개수도 같이 설정한다. replicaiton factor. 설정하지않으면 브로커 설정을 따라간다.
- 데이터가 유실되어도 되고 데이터 처리 속도가 중요하다면 개수를 1 또는 2로 설정하기도 한다.
- 데이터 종류마다 다른 복제 개수를 설정하고 상황에 따라서는 토픽마다 복제 개수를 다르게 설정하기도 한다.
- 복제된 파티션은 leader와 follower로 구성된다.
- 리더: 프로듀서 또는 컨슈머와 직접 통신하는 파티션
- 팔로워: 복제 데이터를 가지고 있는 파티션. 리더의 오프셋을 확인해서 자신의 오프셋과 차이 나는 경우 데이터를 가져온다.
컨트롤러
- 클러스터의 여러 브로커 중 한 대가 컨트롤러 역할을 한다.
- 컨트롤러는 다른 브로커들의 상태를 체크하고 브로커가 클러스터에서 빠지는 경우 해당 브로커에 존재하는 리더 파티션을 재분배한다.
데이터 삭제
- 카프카는 다른 메시징 플랫폼과 다르게 컨슈머가 데이터를 가져가도 토픽의 데이터가 삭제되지 않는다. 컨슈머나 프로듀서가 데이터 삭제를 요청할 수도 없다. 오직 브로커만이 데이터를 삭제할 수 있다.
- 데이터 삭제는 파일 단위, '로그 세그먼트' 단위로 이뤄진다. 세그먼트는 일반 데이터베이스처럼 특정 데이터만 선별해 삭제할 수 없다.
- 세그먼트는 데이터가 쌓이는 동안 파일 시스템으로 열려있으며, 카프카 브로커에 log.segment.bytes 또는 log.segment.ms 옵션에 값이 설정되면 세그먼트 파일이 닫힌다. 기본값은 1GB이다. 너무 적은 용량으로 설정하면 데이터를 저장하는 동안 세그먼트 파일을 자주 여닫아서 부하가 생길 수 있다.
- 데이터를 삭제하지 않고 메시지 키를 기준으로 오래된 데이터를 압축하는 정책을 선택할 수도 있다.
컨슈머 오프셋 저장
- 컨슈머 그룹은 토픽이 특정 파티션으로부터 데이터를 가져가서 처리하고 이 파티션의 어느 레코드까지 가져갔는지 확인을 위해 오프셋을 커밋한다. 커밋한 오프셋은 '__consumer_offsets' 토픽에 저장된다.
코디네이터 coordinator
- 클러스터의 여러 브로커 중 한 대가 코디네이터 역할을 한다.
- 코디네이터는 컨슈머 그룹의 상태를 체크하고 파티션을 컨슈머와 매칭되도록 분배한다. 컨슈머가 컨슈머 그룹에서 빠지면 매칭되지 않은 파티션을 정상 동작하는 컨슈머로 할당하여 끊임없이 데이터가 처리되도록 도와준다. (=리밸런스 rebalance)
- 주키퍼 zeekeeper
- 카프카의 메타데이터를 관리한다. (브로커 정보(어느 보안 규칙으로 통신하는지, jmx port 상태 정보, host 정보 등), 컨트롤러 정보, 토픽 정보 등)
- 카프카 서버에서 직접 주키퍼에 붙으려면 카프카 서버에서 실행되고 있는 주키퍼에 연결해야 한다.
토픽
- 데이터를 구분하기 위해 사용하는 단위
- 토픽은 1개 이상의 파티션을 소유한다.
- 파티션에는 프로듀서가 보낸 데이터들이 들어가 저장되는데, 이 데이터를 '레코드 record'라고 부른다.
- 토픽의 레코드는 다양한 목적을 가진 여러 컨슈머 그룹들이 데이터를 여러 번 가져갈 수 있다.
파티션
- 카프카 병렬 처리의 핵심
- 그룹으로 묶인 컨슈머들이 레코드를 병렬로 처리할 수 있도록 매칭된다.
- 컨슈머의 처리량이 한정된 상황에서 많은 레코드를 병렬로 처리하기 위한 가장 좋은 방법은 컨슈머의 개수를 늘리는 것이다. 컨슈머 개수를 늘리면서 동시에 파티션 개수도 늘리면 처리량을 증가시킬 수 있다.
토픽 이름
- 제약 조건
- 의미 있는 토픽 이름을 짓는다.
- 예시)
- <환경>.<팀명>.<애플리케이션명>.<메시지타입>
prod.marketing.sms-platform.json - <프로젝트명>.<서비스명>.<환경>.<이벤트명>
commerece.payment.prod.notification - <환경>.<서비스명>.<JIRA번호>.<메시지타입>
dev.email-sender.jira-1234.email-vo-custom - <카프카클러스터명>.<환경>.<서비스명>.<메시지타입>
aws-kafka.live.marketing-platform.json
- <환경>.<팀명>.<애플리케이션명>.<메시지타입>
- 토픽 이름은 변경할 수 없다. 삭제 후 다시 생성해야 한다.
레코드
- 레코드는 타임스탬프, 메시지 키, 메시지 값, 오프셋, 헤더로 구성된다.
- 프로듀서가 생성한 레코드가 브로커에 전송되면 오프셋과 타임스탬프가 지정되어 저장된다.
- 브로커에 한 번 적재된 레코드는 수정할 수 없고 로그 리텐션 기간 또는 용량에 따라서만 삭제된다.
- 타임스탬프 - 카프카 브로커에 저장될 때 브로커 시간을 기준으로 설정되지만 필요에 따라 레코드 생성 시간 또는 그 이전/이후로 설정할 수도 있다.
- 메시지 키 - 메시지 값을 순서대로 처리하거나 메시지 값의 종류를 나타내기 위해 사용한다. 메시지 키를 사용하면 프로듀서가 토픽에 레코드를 전송할 때 메시지 키의 해시값을 토대로 파티션을 지정하게 된다. (= 동일 메시지 키라면 동일 파티션에 들어간다. 다만, 어느 파티션에 지정될 지 알 수 없고 파티션 개수가 변하면 메시지 키와 파티션 매칭이 달라지게 되므로 주의해야 한다.) 메시지 키를 설정하지 않으면 null로 설정되고, 메시지 키가 null인 레코드는 기본 설정 파티셔너에 따라 파티션에 분배된다.
- 메시지 키와 메시지 값은 직렬화되어 브로커로 전송된다. 컨슈머는 직렬화와 동일한 형태로 역직렬화를 수행해야 한다.
- 컨슈머 그룹은 오프셋을 사용해 파티션의 데이터를 어디까지 가져갔는지 확인한다.
- 헤더는 키/값 형태의 메타데이터를 포함한다. 레코드의 속성(스키마 버전 등)을 저장해 컨슈머에서 참조할 수 있다.
카프카 클라이언트
- 카프카 프로듀서, 컨슈머, 어드민 클라이언트를 제공하는 카프카 클라이언트를 사용해 애플리케이션을 개발한다.
- 카프카 클라이언트는 라이브러리이다. 자체 라이프사이클을 가진 프레임워크나 애플리케이션 위에서 구현하고 실행해야 한다.
- https://kafka.apache.org/documentation/#api
프로듀서 API
package com.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class SimpleProducer {
private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
String messageValue = "testMessage";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
producer.send(record);
logger.info("{}", record);
producer.flush();
producer.close();
}
}
- 데이터의 시작점. 데이터를 선언하고 브로커의 특정 토픽의 파티션에 전송한다.
- 프로듀서는 데이터를 전송할 때 리더 파티션을 가지고 있는 카프카 브로커와 직접 통신한다.
- 프로듀서를 구현하는 가장 기초적인 방법은 카프카 클라이언트를 라이브러리로 추가하여 자바 기본 애플리케이션을 만드는 것이다.
- 프로듀서는 데이터를 직렬화하여 카프카 브로커로 보낸다. 자바에서 선언 가능한 모든 형태(기본형, 참조형, 동영상 같은 바이너리 데이터도)를 브로커로 전송할 수 있다.
- 프로듀서는 카프카 브로커로 데이터를 전송할 때 내부적으로 파티셔너, 배치 생성 단계를 거친다.
- ProducerRecord 클래스를 사용해 전송할 데이터의 인스턴스를 만든다.
- 파티셔너에 의해 구분된 레코드는 데이터를 전송하기 위해 accumulator에 데이터를 버퍼로 쌓아놓고 발송한다. 버퍼에 쌓인 데이터를 배치로 묶어서 발송하여 처리량을 향상했다.
- UniformStickyPartitioner와 RounderRobinPartitioner 2개 파티션이 있다. RoundRobinPartitioner는 레코드가 들어오는대로 순회하며 전송하기 때문에 배치로 묶이는 빈도가 적어서 카프카 2.4.0부터 UniformStickyPartitioner가 기본 파티셔너로 설정되었다.
- sender 스레드는 accumulator에 쌓인 배치 데이터를 가져가 카프카 브로커로 전송한다.
- 압축 옵션을 통해 브로커로 전송 시 압축 방식을 정할 수 있다. 압축 옵션을 정하지 않으면 압축 안 된 상태로 전송된다. 압축 옵션으로는 gzip, snappy, lz4, zstd를 지원한다. 압축을 하면 네트워크 처리량에서 이득을 보지만, 압축 하는 데 CPU 또는 메모리 리소스를 사용하므로 적절한 압축 옵션을 선택하는 게 중요하다. (프로듀서뿐만 아니라 컨슈머 측에서도 리소스가 사용됨을 기억하자.)
- 프로듀서 설정 필수 옵션
- bootstrap.servers: 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 '호스트 이름:포트'를 1개 이상 작성한다. 2개 이상 입력하여 일부 브로커에 이슈가 발생하더라도 접속 이슈가 없게 설정 가능하다.
- key.serializer: 레코드의 메시지 키를 직렬화하는 클래스를 지정한다.
- value.serializer: 레코드의 메시지 값을 직렬화하는 클래스를 지정한다.
- 프로듀서 설정 선택 옵션
- acks: 프로듀서가 전송한 데이터가 브로커들에 정상적으로 저장되었는지 전송 성공 여부를 확인하는 데 사용한다. 0(프로듀서가 전송한 즉시 성공), 1(리더 파티션에 저장되면 성공), -1(토픽의 min.insync.replicase 개수에 해당하는 리더 파티션과 팔로워 파티션에 저장되면 성공) 중 하나로 설정할 수 있다. 기본값은 1.
- buffer.memory: 배치로 모으기 위한 버퍼 메모리양. 기본값은 32MB. (파티션별 버퍼를 따로 가진다.)
- retries: 프로듀서가 브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수. 기본값은 2147483647. 사실상 무한 재시도다. delivery.timeout.ms 옵션, retry.backoff.ms 옵션, acks 옵션 설정 조정도 같이 고려해본다. + 커스텀 에러 핸들링, DLQ, Circuit Breaker, 모니터링 및 알람(e.g. Prometheus))
- batch.size: 배치로 전송할 레코드 최대 용량. 기본값은 16KB.
- linger.ms: 배치를 전송하기 전까지 기다리는 최소 시간. 기본값은 0.
- partitioner.class: 파티셔너 클래스. 기본값은 org.apache.kafka.clients.producer.internals.DefaultPartititoner.
- enable.idempotence: 멱등성 프로듀서로 동작할지 여부. 기본값은 false.
- transactional.id: 레코드를 트랜잭션 단위로 묶을지 여부. 프로듀서의 트랜잭션 아이디를 설정할 수 있다. 이 값을 설정하면 트랜잭션 프로듀서로 동작한다. 기본값은 null.
- 메시지 키를 가진 데이터 전송
- 레코드 생성 시 지정해주기 ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageKey, messageValue);
- 파티션을 직접 지정
- 레코드 생성 시 지정해주기 ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, partitionN, messageKey, messageValue);
- 커스텀 파티셔너 지정 (특정 키를 특정 파티션으로 보내기)
- Partitioner 구현하기 그리고 ProducerConfig의 PARTITIONER_CLASS_CONFIG 옵션으로 지정해주기
- 브로커 정상 전송 여부를 확인하기
- send() 메서드가 반환하는 Future 결과로 RecordMetada 받기 (that includes 토픽 이름, 파티션 번호, 오프셋 번호)
- 하지만 Future get()은 blocking이고, 비동기로 결과 확인하고 싶다면 Callback 인터페이스를 구현하여 send()의 인자로 준다. 데이터 처리를 빠르게 할 수 있지만 전송하는 데이터의 순서가 중요한 경우 사용하면 안 된다.
컨슈머 API
package com.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class SimpleConsumer {
private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new ShutdownThread());
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ConsumerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME), new RebalanceListener());
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
logger.info("{}", record);
}
}
} catch (WakeupException e) {
logger.warn("Wakeup consumer");
// 리소스 종료
} finally {
consumer.close();
}
}
private static class RebalanceListener implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.warn("Partitions are assigned");
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
logger.warn("Partitions are revoked");
consumer.commitSync(currentOffsets);
}
}
static class ShutdownThread extends Thread {
public void run() {
logger.info("Shutdown hook");
consumer.wakeup();
}
}
}
- 브로커에 적재된 데이터를 가져와서 필요한 처리를 한다.
- 컨슈머를 운영하는 방법은 크게 2가지가 있다. 컨슈머 그룹을 운영하기 / 토픽의 특정 파티션만 구독하는 컨슈머를 운영하기
- 컨슈머 그룹의 컨슈머가 토픽을 구독해서 데이터를 가져갈 때, 1개의 파티션은 최대 1개의 컨슈머에 할당 가능하다. 1개 컨슈머는 여러 개의 파티션에 할당될 수 있다. 그래서 컨슈머 그룹의 컨슈머 개수는 가져가고자 하는 토픽의 파티션 개수보다 같거나 작아야 한다.
e.g. 3개 파티션 가진 토픽을 효과적으로 처리하기 위해서는 3개 이하의 컨슈머로 이뤄진 컨슈머 그룹으로 운영해야 한다. 만약 4개의 컨슈머라면 1개의 컨슈머는 파티션을 할당받지 못하고 유휴 상태로 남게 된다. - 컨슈머 그룹의 컨슈머에 장애가 발생하면, 그 컨슈머에 할당된 파티션은 다른 컨슈머에 소유권이 넘어간다. (rebalancing)
- 리밸런싱은 컨슈머가 추가되는 상황에, 컨슈머가 제외되는 상황에 일어난다.
- 컨슈머는 카프카 브로커로부터 데이터를 어디까지 가져갔는지 commit을 통해 기록한다. 특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇 번째 가져갔는지 카프카 브로커 내부에서 사용되는 내부 토픽(__consumer_offsets)에 기록된다.
- 오프셋 커밋은 컨슈머 애플리케이션에서 명시적, 비명시적으로 수행할 수 있다. 기본 옵션은 poll() 메서드가 수행될 때 일정 간격마다 오프셋을 커밋하도록 enable.auto.commit=true로 설정되어 있다.
- 비명시적 오프셋 커밋: 일정 간격마다 자동 커밋 (auto.commit.interval.ms 조정)
poll() 메서드를 호출할 때 커밋을 수행하므로 코드상에서 따로 커밋 관련 코드를 작성할 필요가 없다.
poll() 메서드 호출 이후에 리밸런싱 또는 컨슈머 강제종료 발생 시 데이터 중복 또는 유실될 수 있는 가능성이 있다. 중복이나 유실을 허용하지 않는 서비스라면 자동 커밋을 사용해선 안 된다. - 명시적 오프셋 커밋: poll() 메서드 호출 이후에 데이터 처리가 완료되고 commitSync() 메서드 호출
commitSync() 메서드는 poll() 메서드를 통해 반환된 레코드의 가장 마지막 오프셋을 기준으로 커밋을 수행한다.
commitSync() 메서드는 브로커에 커밋 요청을 하고 응답 받기까지 blocking이다. 이 때문에 commitAsync() 메서드를 사용할 수 있는데, 커밋 요청이 실패했을 경우 현재 처리 중인 데이터의 순서를 보장하지 않으며 중복 처리가 발생할 수 있다.
- 비명시적 오프셋 커밋: 일정 간격마다 자동 커밋 (auto.commit.interval.ms 조정)
- 컨슈머는 poll() 메서드를 통해 레코드를 받는다. 하지만 poll() 메서드 호출 시점에 데이터를 가져오는 게 아니다. 컨슈머 애플리케이션을 실행하면 내부에서 Fetcher 인스턴스가 생성되어 poll()을 호출하기 전에 미리 레코드들을 내부 큐로 가져온다. 이후에 사용자가 명시적으로 poll()을 호출하면 컨슈머는 내부 큐에 있는 레코드들을 반환받아 처리를 수행한다.
- 컨슈머 설정 필수 옵션
- bootstrap.servers: 브로커 호스트 이름:포트 1개 이상. 2개 이상 입력해 일부 브로커에 이슈가 발생하더라도 접속에 이슈가 없도록 설정 가능하다.
- key.deserialize
- value.deserializer
- 컨슈머 설정 선택 옵션
- group.id: 컨슈머 그룹 아이디. 기본값은 null.
- auto.offset.reset: 컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지 선택. 이미 컨슈머 오프셋이 있으면 무시됨. [latest(가장 최근), earlist(가장 오래 전), none(컨슈머 그룹이 커밋한 기록을 찾아봐서 없으면 오류, 있으면 기존 커밋 기록 이후 오프셋부터 읽음)] 중 1개. 기본값은 latest.
- enable.auto.commit: 자동 커밋으로 할지 수동 커밋으로 할지 선택. 기본값은 true.
- auto.commit.interval.ms: 자동 커밋(enable.auto.commit=true)일 경우 오프셋 커밋 간격 지정. 기본값은 5000(5s).
- max.poll.records: poll() 메서드를 통해 반환되는 레코드 개수 지정. 기본값은 500.
- session.timeout.ms: 이 시간 내에 heartbeat를 전송하지 않으면 브로커는 컨슈머에 이슈가 발생했다고 가정하고 리밸런싱 한다. 보통 heartbeat 시간 간격의 3개로 설정한다. 기본값은 10000(10s).
- heartbeat.interval.ms: heartbeat를 전송하는 시간 간격. 기본값은 3000(3s).
- max.poll.interval.ms: poll() 메서드를호출하는 간격의 최대 시간. poll() 호출 후 데이터 처리에 시간이 너무 많이 걸리는 경우 비정상으로 판단하고 리밸런싱 한다. 기본값은 300000(5m).
- isolation.level: 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용. [read_committed, read_uncommitted] 중 1개. 기본값은 read_uncommitted.
- 데이터를 처리하기 전에 리밸런스가 발생하면 데이터를 중복 처리할 수 있다. 중복 처리하지 않기 위해선 리밸런스 발생 시 처리한 데이터를 기준으로 커밋을 시도해야 한다.
ConsumerRebalanceListener 인터페이스의 onPartitionAssigned()는 리밸런스가 끝난 뒤 파티션이 할당 완료되면 호출되고, onPartitionRevoked()는 리밸런스가 시작되기 직전에 호출된다. 그래서 마지막으로 처리한 레코드를 기준으로 커밋을 하기 위해서 onPartitionRevoked()에 커밋을 구현할 수 있다. - 컨슈머는 subscribe() 외에도 assign()를 사용해 직접 파티션을 명시적으로 할당 받을 수 있다. subscribe()를 사용할 때와 다르게 직접 컨슈머가 특정 토픽, 특정 파티션에 할당되므로 리밸런싱 과정이 없다.
- 컨슈머 애플리케이션은 안전하게 종료되어야 한다. 정상적으로 종료되지 않은 컨슈머는 세션 타임아웃이 발생할때까지 컨슈머 그룹에 남게 된다. 이로 인해 실제로는 종료되었지만 더는 동작을 하지 않는 컨슈머가 존재하기 때문에 파티션의 데이터는 소모되지 못하고 컨슈머 랙이 늘어나게 된다. 컨슈머 랙이 늘어나면 데이터 처리 지연이 발생한다.
KafkaConsumer의 wakeup()을 사용하여 컨슈머를 안전하게 종료할 수 있다. poll()을 통해 지속적으로 레코드들을 받아 처리하다가 wakeup()가 호출되면 다음 poll()이 호출되었을 때 WakeupException 예외가 발생한다. 예외를 받은 뒤에는 데이터 처리를 위해 사용한 자원들을 해제하면 된다. 마지막에는 consumer.close()를 호출하여 카프카 클러스터에 컨슈머가 안전하게 종료하였음을 명시적으로 알려주면 종료가 완료되었다고 볼 수 있다.
그러면 wakeup()는 어디서 호출되어야 할까? 자바 애플리케이션의 경우 운영체제로부터 종료 요청을 받으면 실행하는 스레드인 shutdown hook을 구현하여 여기서 wakeup()를 호출하면 된다.
어드민 API
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9002");
AdminClient admin = AdminClient.create(configs);
- 실제 운영환경에서는 프로듀서와 컨슈머를 통해 데이터를 주고받는 것만큼 카프카에 설정된 내부 옵션을 설정하고 확인하는 것이 중요하다. 이를 위해 AdminClient 클래스가 있다. AdminClient 클래스를 사용하면 클러스터의 옵션과 관련된 부분을 자동화할 수 있다. such as...
- 카프카 컨슈머를 멀티 스레드로 생성할 때, 구독하는 토픽의 파티션 개수만큼 스레드를 생성하고 싶을 때, 스레드 생성 전에 해당 토픽을 파티션 개수를 가져올 수 있다.
- AdminClient 클래스로 구현한 웹 대시보드를 통해 ACL(Access Control List)이 적용된 클러스터의 리소스 접근 권한 규칙 추가를 할 수 있다.
- 특정 토픽의 데이터양이 늘어남을 감지하고 AdminClient 클래스로 해당 토픽의 파티션을 늘릴 수 있다.
- https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/admin/KafkaAdminClient.html
카프카 스트림즈
- 카프카 스트림즈는 토픽에 적재된 데이터를 stateful 또는 stateless으로 실시간 변환하여 다른 토픽에 적재하는 라이브러리이다.
- 카프카 스트림 데이터 처리를 위해 Apache Spark, Apache Flink, Apache Storm, Fluentd와 같은 다양한 오픈소스 애플리케이션이 존재한다. 스트림즈는 카프카에서 공식적으로 지원하는 라이브러리이다. 때문에 카프카 클러스터와 완벽하게 호환되면서 스트림 처리에 필요한 편리한 기능들(신규 토픽 생성, 상태 저장, 데이터 조인 등)을 제공한다. 스트림즈 애플리케이션 또는 카프카 브로커의 장애가 발생하더라도 exactly once할 수 있도록 fault tolerant system을 가지고 있어서 데이터 처리 안정성이 매우 뛰어나다.
- 컨슈머와 프로듀서를 조합해 스트림즈가 제공하는 기능과 유사하게 만들 수 있다. 그러나 스트림즈 라이브러리가 제공하는 exactly once 데이터 처리, fault tolerant system 특징들은 컨슈머와 프로듀서의 조합만으로 완벽하게 구현하기 어렵다.
- 스트림즈 애플리케이션은 내부적으로 스레드를 1개 이상 생성할 수 있으며, 스레드는 1개 이상의 태스크를 가진다. 태스크는 데이터 처리 최소 단위이다. 만약 3개의 파티션으로 이뤄진 토픽이라면 스트림즈 애플리케이션 내부에 3개의 태스크가 생긴다. 컨슈머의 병렬 처리를 위해 컨슈머 그룹으로 이뤄진 컨슈머 스레드를 여러 개 실행하는 것과 비슷하다.
- 실제 운영환경에서는 장애에 안정적이도록 2개 이상의 서버로 구성해 스트림즈 애플리케이션을 운영한다.
- 카프카 스트림즈의 구조와 사용 방법을 알기 위해선 우선 topology와 관련된 개념을 익혀야 한다. 토폴로지란, 2개 이상의 노드들과 선으로 이루어진 집합이다. 종류로는 ring, tree, star 등이 있다. 카프카 스트림즈에서는 토폴로지를 이루는 노드를 하나의 processor라고 부르고, 노드와 노드를 이은 선을 stream이라고 부른다.
- 스트림은 토픽의 데이터를 뜻한다.
- 프로세서는 소스 프로세서(데이터 가져오기), 스트림 프로세서(데이터 처리하기), 싱크 프로세서(데이터를 토픽으로 저장하기)가 있다.
- Streams DSL과 Processor API 2가지 방법으로 개발 가능하다.
- 내용이 방대하니 책 <Kafka Streams in Action>을 읽어보는 것을 추천한다.
카프카 커넥트
- 카프카 커넥트는 카프카 오픈소스에 포함된 툴로, 데이터 파이프라인 생성 시 반복 작업을 줄이고 효율적인 전송을 이루기 위한 애플리케이션이다. 프로듀서, 컨슈머 애플리케이션을 만드는 것도 좋은 방법이지만 매번 반복적인 작업이 있다. 커넥트는 특정 작업 형태를 템플릿으로 만들어놓아 반복 작업을 줄일 수 있다.
- 소스 커넥터 source connector: 프로듀서 역할
- 싱크 커넥터 sink connector: 컨슈머 역할
- 커넥터 jar 파일을 추가하여 사용할 수 있다. 직접 커넥터 플러그인을 만들거나 이미 존재하는 커넥터 플러그인을 가져다 쓸 수도 있다. https://www.confluent.io/hub/
- 커넥터에 converter와 transform 기능을 옵션으로 추가할 수 있다. converter는 데이터 처리 전에 스키마를 변경하고, transform은 데이터 처리 시 각 메시지 단위로 데이터를 간단하게 변환하기 위한 용도로 사용된다.
- 실행하는 방법은 크게 두 가지가 있다.단일 모드 커넥트(standalone mode kafka connect)와 분산 모드 커넥트(distributed mode kafka connect)이다.
- 단일 모드 커넥트
- 단일 애플리케이션, 1개 프로세스로 실행된다. 단일 프로세스이기 때문에 SPOF가 될 수 있다. 개발환경이나 중요도가 낮은 파이프라인을 운영할 때 사용한다.
- connect-standalone.properties 파일을 수정한다.
- 분산 모드 커넥트
- 2대 이상의 서버에서 클러스터 형태로 운영한다.
- connect-distributed.properties 파일을 수정한다.
- 단일 모드 커넥트
- 오픈소스 커넥터를 사용하거나, 직접 SourceConnector와 SourceTask 클래스를 사용하여 소스 커넥터를 구현할 수 있고, SinkConnector와 SinkTask 클래스를 사용하여 싱크 커넥터를 구현할 수 있다.
카프카 미러메이커2
- 카프카 미러메이커2는 서로 다른 두 개의 카프카 클러스터 간에 토픽을 복제하는 애플리케이션이다. 프로듀서와 컨슈머를 사용해 직접 미러링하는 애플리케이션을 만들면 되지만 굳이 미러메이커2를 사용하는 이유는 토픽의 모든것(레코드의 고유한 메시지키, 메시지 값, 파티션 등)을 복제할 필요성이 있기 때문이다.
- 지리적 복제 (Geo-Replication) - 액티브 클러스터는 한국에, 스탠바이 클러스터는 일본에 두는 식으로 물리적 공간을 분리해서 재해에 대응한다.
- Active-standby 클러스터 운영
- replcation lag(복제가 지연되는 현상)이 발생할 수 있다. = 데이터 중복, 유실 처리가 일어날 수 있다.
- Active-active 클러스터 운영
- 통신 지연을 최소화하기 위해 2개 이상의 클러스터를 두고 서로 데이터를 미러링하면서 사용한다.
- Hub and spoke 클러스터 운영
- 허브 클러스터가 데이터 레이크 역할을 한다.
- 각 팀에서 소규모 카프카 클러스터를 사용하고 있을 때 각 팀의 카프카 클러스터 데이터를 한 개의 카프카 클러스터에 모아 데이터 레이크로 사용하고 싶다면,
- 데이터 레이크 특성 상 서비스에서 생성된 데이터를 ETL하는 격리된 플랫폼이 필요하다. 미러메이커2를 사용하여 각 팀에서 사용하는 카프카 클러스터에서 데이터를 수집하고 데이터 레이크용 클러스터에서 가공, 분석하여 가치 있는 데이터를 찾아낼 수 있다.
- Active-standby 클러스터 운영
'Data engineering' 카테고리의 다른 글
Apache Beam의 sideInput() 이해하기 (0) | 2023.03.08 |
---|