기술이야기
Elasticsearch + Logstash + Kibana 구축하기 (3) - Kafka와 연동
투자 엔지니어
2020. 11. 19. 17:36
구성도
- 갑작스러운 트래픽 증가로 Logstash와 Elasticsearch의 부하를 막기 위한 메시지 브로커 Kafka를 도입
- Kafka는 데이터를 토픽으로 그룹화하고 발행-소비 체계로 운영되는 분산 큐 시스템. 도입 시 탄력적인 운영 가능.
- Elastic사는 어플리케이션과 Kafka 사이에 Shipper 역할을 하는 beats를 추가하는 것을 권장(이 글에서는 생략)
- 일반적으로 데이터 소스별로 토픽을 분리하고 토픽 별 파티션 수와 Logstash 인스턴스 규모를 조정(이 글에서는 토픽 별 파티션 수는 1개로 설정)
- 기본적으로 Logstash(shipper) 사용 시 데이터는 라운드 로빈 방식으로 파티션에 할당. message_key를 지정하면 파티션에 데이터를 할당한 방법 설정 가능
- Kafka를 읽어들이는 Logstash(Indexer) 인스턴스는 디폴트로 logstash라는 cumsumer group id를 형성. 새로운 Logstash가 group id에 포함되면 kafka는 리밸런싱 작업이 트리거 된다.
- 따라서 한 Logstash 인스턴스가 다운되면 kafka는 리밸런싱을 통해 남아있는 Logstash 인스턴스에 할당을 분산시킨다. (이 글에서는 indexer 역할을 하는 Logstash는 1개만 생성)
- Logstash의 consumer_threads를 설정하면 kafka 파티션에서 소비되는 스레드 수를 제어. 보통 파티션 수와 같은 스레드 수를 유지하는 것이 이상적
- 스레드가 많다면 idle 쓰레드가 발생하며 적다면 하나 이상의 파티션을 소비하므로 부담이 가중된다. (이 글에서는 파티션이 1개이므로 consumer_threads도 1로 설정)
Kafka
설치
sudo apt-get update
# kafka의 노드를 관리하는 zookeeper 설치. 항상 kafka 보다 zookeeper가 먼저 구동되어야 함.
sudo apt-get install zookeeperd
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -zxf kafka_2.12-2.5.0.tgz
ln -s kafka_2.12-2.5.0/ ~/kafka
ls -l ~
config 파일 수정
- kafka/config/service.properties
- 이 글에서는 listeners와 advertised.listeners 값만 수정
# The address the socket server listens on
listeners=PLAINTEXT://0.0.0.0:9092
# kafka 브로커를 가르키는 주소 목록. 초기 연길 시 이를 client(producers and consumers)에게 보냄, 기본 값은 listeners
advertised.listeners=PLAINTEXT://{client가 접근 시 사용할 주소}:{client가 접근 시 사용할 포트}
# lisener에 custom protocol 사용 시 반드시 설정
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# the brokers.
num.partitions=1
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
zookeeper.connect=localhost:2181
kafka 실행(중지)
sudo bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
sudo bin/kafka-server-start.sh -daemon config/server.properties
sudo bin/zookeeper-server-stop.sh config/zookeeper.properties
sudo bin/kafka-server-stop.sh config/server.properties
# zookeeper, kafka 포트 확인
netstat -lntp
테스트
# topic 리스트 출력
kafka-topics.sh --zookeeper zookeeper:2181 --list
# 또 다른 console을 이용해 kafka consumer를 실행하여 메시지 수신 상태로 대기
kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092 --from-beginning
# console을 이용해 kafka producer를 실행하여 메시지 생산 상태로 대기시킵니다. 위 명령을 실행하면 > 이 출력되면서 입력 대기 상태로 전환
# --broker-list: kafka가 실행 중인 호스트.
kafka-console-producer.sh --topic test --broker-list localhost:9092
- producer 콘솔에 test 입력
- consumer 콘솔에 test 출력 확인
만약 Docker로 Kafka를 설치한다면?
- docker-compose.yaml 파일 예시
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.5.0
hostname: kafka
ports:
- "9092:9092"
environment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
# kafka 브로커를 가르키는 주소 목록. 초기 연길 시 이를 client에게 보냄
# INSIDE: 브로커 내에서 사용할 주소, OUTSIDE: Client가 접속할 주소
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://{client가 접근 시 사용할 주소}:{client가 접근 시 사용할 포트}
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
HOSTNAME_COMMAND: "docker info | grep ^Name: | cut -d' ' -f 2"
# Zookeeper 연결 문자열. ,로 구분
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# 생성할 Topic명:Partition개수:Replica개수
KAFKA_CREATE_TOPICS: "springboot-kafka-elk:1:1"
#KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
depends_on:
- zookeeper # 항상 zookeeper가 먼저 실행
Spring boot
- 이전 포스트에서 만들었던 springboot-elk-01, springboot-elk-02, springboot-elk-03 애플리케이션 사용
- 의존성 수정 및 log4j2.xml 파일 생성
pom.xml 수정
- spring-boot-starter-log4j2, kafka-log4j-appender 의존성 추가
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- for JSONLayout -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
log4j2.xml 파일 생성
- 이전 포스트에서 만들었던 logback.xml 파일 삭제
- src/main/resource에 위치
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info" name="spring-boot-log-to-kafka-example" packages="io.woolford">
<Appenders>
<!--topic 명시, 없으면 자동으로 생성-->
<Kafka name="Kafka" topic="springboot-kafka-elk">
<JSONLayout />
<Property name="bootstrap.servers">{kafka 서버 주소}:{kafka 서버 포트}</Property>
</Kafka>
<Console name="stdout" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5p [%-7t] %F:%L - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="Kafka"/>
<AppenderRef ref="stdout"/>
</Root>
<Logger name="org.apache.kafka" level="info" />
</Loggers>
</Configuration>
테스트
- springboot-elk-01, springboot-elk-02, springboot-elk-03 구동
- kafka consumer 콘솔 확인
Logstash
logstash.conf 수정
- docker-elk/logstash/pipeline/logstash.conf
- input
- kafka 서버 주소 및 topic 설정, 복수 값 설정 가능
- consumer_threads는 파티션 수와 동일한 1로 설정
- 자동으로 Consumer 그룹은 logstash로 설정됨,
- output
- Elasticsearch Index를 topic과 동일한 springboot-kafka-elk로 설정
input {
kafka {
bootstrap_servers => "{kafka 서버 주소}:{kafka 서버 포트}"
topics => "springboot-kafka-elk"
consumer_threads => 1
}
}
## Add your filters / logstash plugins configuration here
output {
elasticsearch {
hosts => "elasticsearch:9200"
index => "springboot-kafka-elk"
workers => 1
user => "elastic"
password => "changeme"
}
}
- 완료 시 docker-compose build & docker-compose up
Kibana
- Springboot + Kafka + ELK 가 제대로 연동이 됐는지 확인
- kibana 로그인 > Index Patterns > Create index Pattern > logstash.conf 에서 명시한 index 추가
- kibana > discovery에서 로그 확인