본문 바로가기

기술이야기

Elasticsearch + Logstash + Kibana 구축하기 (3) - Kafka와 연동

 

구성도


ELK + Springboot + Kafka

 

  • 갑작스러운 트래픽 증가로 Logstash와 Elasticsearch의 부하를 막기 위한 메시지 브로커 Kafka를 도입
  • Kafka는 데이터를 토픽으로 그룹화하고 발행-소비 체계로 운영되는 분산 큐 시스템. 도입 시 탄력적인 운영 가능.
  • Elastic사는 어플리케이션과 Kafka 사이에 Shipper 역할을 하는 beats를 추가하는 것을 권장(이 글에서는 생략)
  • 일반적으로 데이터 소스별로 토픽을 분리하고 토픽 별 파티션 수와 Logstash 인스턴스 규모를 조정(이 글에서는 토픽 별 파티션 수는 1개로 설정)
  • 기본적으로 Logstash(shipper) 사용 시 데이터는 라운드 로빈 방식으로 파티션에 할당. message_key를 지정하면 파티션에 데이터를 할당한 방법 설정 가능
  • Kafka를 읽어들이는 Logstash(Indexer) 인스턴스는 디폴트로 logstash라는 cumsumer group id를 형성. 새로운 Logstash가 group id에 포함되면 kafka는 리밸런싱 작업이 트리거 된다.
  • 따라서 한 Logstash 인스턴스가 다운되면 kafka는 리밸런싱을 통해 남아있는 Logstash 인스턴스에 할당을 분산시킨다. (이 글에서는 indexer 역할을 하는 Logstash는 1개만 생성)
  • Logstash의 consumer_threads를 설정하면 kafka 파티션에서 소비되는 스레드 수를 제어. 보통 파티션 수와 같은 스레드 수를 유지하는 것이 이상적
  • 스레드가 많다면 idle 쓰레드가 발생하며 적다면 하나 이상의 파티션을 소비하므로 부담이 가중된다. (이 글에서는 파티션이 1개이므로 consumer_threads도 1로 설정)

 

 

Kafka


 

설치

sudo apt-get update
# kafka의 노드를 관리하는 zookeeper 설치. 항상 kafka 보다 zookeeper가 먼저 구동되어야 함.
sudo apt-get install zookeeperd
 
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -zxf kafka_2.12-2.5.0.tgz
ln -s kafka_2.12-2.5.0/ ~/kafka
ls -l ~

 

config 파일 수정

  • kafka/config/service.properties
  • 이 글에서는 listeners와 advertised.listeners 값만 수정
# The address the socket server listens on
listeners=PLAINTEXT://0.0.0.0:9092
 
# kafka 브로커를 가르키는 주소 목록. 초기 연길 시 이를 client(producers and consumers)에게 보냄, 기본 값은 listeners
advertised.listeners=PLAINTEXT://{client가 접근 시 사용할 주소}:{client가 접근 시 사용할 포트}
 
# lisener에 custom protocol 사용 시 반드시 설정
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
 
# the brokers.
num.partitions=1
 
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
zookeeper.connect=localhost:2181

 

kafka 실행(중지)

sudo bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
sudo bin/kafka-server-start.sh -daemon config/server.properties
 
sudo bin/zookeeper-server-stop.sh config/zookeeper.properties
sudo bin/kafka-server-stop.sh config/server.properties
 
# zookeeper, kafka 포트 확인
netstat -lntp

 

테스트

# topic 리스트 출력
kafka-topics.sh --zookeeper zookeeper:2181 --list
 
# 또 다른 console을 이용해 kafka consumer를 실행하여 메시지 수신 상태로 대기
kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092 --from-beginning
 
# console을 이용해 kafka producer를 실행하여 메시지 생산 상태로 대기시킵니다. 위 명령을 실행하면 > 이 출력되면서 입력 대기 상태로 전환
# --broker-list: kafka가 실행 중인 호스트.
kafka-console-producer.sh --topic test --broker-list localhost:9092

 

  • producer 콘솔에 test 입력

  • consumer 콘솔에 test 출력 확인

 

 

만약 Docker로 Kafka를 설치한다면?

  • docker-compose.yaml 파일 예시
version: '2'
 
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    container_name: zookeeper
    ports:
      - "2181:2181"
 
  kafka:
    image: wurstmeister/kafka:2.12-2.5.0
    hostname: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      # kafka 브로커를 가르키는 주소 목록. 초기 연길 시 이를 client에게 보냄
      # INSIDE: 브로커 내에서 사용할 주소, OUTSIDE: Client가 접속할 주소
      KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://{client가 접근 시 사용할 주소}:{client가 접근 시 사용할 포트}
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      HOSTNAME_COMMAND: "docker info | grep ^Name: | cut -d' ' -f 2"
      # Zookeeper 연결 문자열. ,로 구분
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      # 생성할 Topic명:Partition개수:Replica개수
      KAFKA_CREATE_TOPICS: "springboot-kafka-elk:1:1"
      #KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zookeeper # 항상 zookeeper가 먼저 실행

 

 

 

Spring boot 


  • 이전 포스트에서 만들었던 springboot-elk-01, springboot-elk-02, springboot-elk-03 애플리케이션 사용
  • 의존성 수정 및 log4j2.xml 파일 생성

 

pom.xml 수정

  • spring-boot-starter-log4j2, kafka-log4j-appender 의존성 추가
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter</artifactId>
     <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>logback-classic</artifactId>
        </exclusion>
    </exclusions>
 </dependency>
  
 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-log4j-appender</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>
 
<!-- for JSONLayout -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
 
 <dependency>
     <groupId>org.projectlombok</groupId>
     <artifactId>lombok</artifactId>
     <optional>true</optional>
 </dependency>

 

 

log4j2.xml 파일 생성

  • 이전 포스트에서 만들었던 logback.xml 파일 삭제
  • src/main/resource에 위치

 

 

 

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info" name="spring-boot-log-to-kafka-example" packages="io.woolford">
 
    <Appenders>
        <!--topic 명시, 없으면 자동으로 생성-->
        <Kafka name="Kafka" topic="springboot-kafka-elk">
            <JSONLayout />
            <Property name="bootstrap.servers">{kafka 서버 주소}:{kafka 서버 포트}</Property>
        </Kafka>
 
        <Console name="stdout" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5p [%-7t] %F:%L - %m%n"/>
        </Console>
 
    </Appenders>
    <Loggers>
        <Root level="INFO">
            <AppenderRef ref="Kafka"/>
            <AppenderRef ref="stdout"/>
        </Root>
        <Logger name="org.apache.kafka" level="info" />
    </Loggers>
</Configuration>

 

 

테스트

  • springboot-elk-01, springboot-elk-02, springboot-elk-03 구동
  • kafka consumer 콘솔 확인

애플리케이션과 Kafka 연동 결과

 

 

 

Logstash


 

logstash.conf 수정

  • docker-elk/logstash/pipeline/logstash.conf
  • input
    • kafka 서버 주소 및 topic 설정, 복수 값 설정 가능
    • consumer_threads는 파티션 수와 동일한 1로 설정
    • 자동으로 Consumer 그룹은 logstash로 설정됨, 
  • output
    • Elasticsearch Index를 topic과 동일한 springboot-kafka-elk로 설정
input {
    kafka {
            bootstrap_servers => "{kafka 서버 주소}:{kafka 서버 포트}"
            topics => "springboot-kafka-elk"
            consumer_threads => 1
    }
}
 
## Add your filters / logstash plugins configuration here
 
output {
    elasticsearch {
        hosts => "elasticsearch:9200"
        index => "springboot-kafka-elk"
        workers => 1
        user => "elastic"
        password => "changeme"
    }
}

 

  • 완료 시 docker-compose build & docker-compose up

 

 

 

Kibana


  • Springboot + Kafka + ELK 가 제대로 연동이 됐는지 확인
  • kibana 로그인 > Index Patterns > Create index Pattern > logstash.conf 에서 명시한 index 추가

create index

 

  • kibana > discovery에서 로그 확인

kibana discovery