반응형
    
    
    
  Kafka에서 메시지를 수신하고 해당 메시지를 Slack으로 보내는 파이썬 코드를 작성하기
Kafka 메시지 소비 : kafka-python 라이브러리를 사용할 수 있습니다.
Slack으로 메시지 전송 : Slack의 Webhook을 사용하여 메시지를 보냅니다.
Python 설치
기존 Python 제거
sudo apt-get remove --purge python3Python 설치
sudo apt-get updatesudo apt-get install -y python3 python3-pip python3-venv가상 환경 생성 및 패키지 설치
가상 환경을 생성하고 필요한 패키지를 설치할 수 있습니다.
가상 환경 생성
python3 -m venv myenv가상 환경 활성화 (Linux/Mac)
source myenv/bin/activate필요한 패키지 설치
pip install kafka-python requests six코드 작성
vim python_kafka_consumer.pyimport json
import requests
from kafka import KafkaConsumer
def send_slack_message(webhook_url, message):
    """슬랙으로 메시지를 전송합니다."""
    headers = {
        'Content-Type': 'application/json',
    }
    data = {
        "text": message
    }
    
    response = requests.post(webhook_url, headers=headers, data=json.dumps(data))
    if response.status_code == 200:
        print("슬랙으로 메시지가 성공적으로 전송되었습니다!")
    else:
        print(f"슬랙으로 메시지 전송 실패: {response.status_code}, {response.text}")
def consume_kafka_messages(kafka_servers, topic, slack_webhook_url):
    """Kafka에서 메시지를 소비하고 슬랙으로 전송합니다."""
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=kafka_servers,
        auto_offset_reset='earliest',  # 'latest'로 변경하면 마지막 오프셋부터 시작
        enable_auto_commit=True,
        group_id='slack-consumer-group',
        value_deserializer=lambda x: x.decode('utf-8')
    )
    print(f"Kafka 토픽 '{topic}'에서 메시지를 소비 중...")
    
    try:
        # Kafka에서 메시지 수신 및 Slack 전송
        for message in consumer:
            print(f"수신된 메시지: {message.value}")
            send_slack_message(slack_webhook_url, message.value)
    except KeyboardInterrupt:
        print("\n종료 중입니다...")
    finally:
        consumer.close()
        print("Kafka 소비자가 종료되었습니다.")
if __name__ == "__main__":
    # 설정 값
    kafka_bootstrap_servers = ['node3:9092']  # Kafka 서버 주소
    topic = 'test-topic'  # Kafka 토픽 이름
    slack_webhook_url = 'https://hooks.slack.com/services/your/webhook/url'  # Slack Webhook URL
    # Kafka 메시지를 소비하고 Slack으로 전송
    consume_kafka_messages(kafka_bootstrap_servers, topic, slack_webhook_url)코드 실행
python python_kafka_consumer.py$ python python_kafka_consumer.py 
Kafka 토픽 'test-topic'에서 메시지를 소비 중...
수신된 메시지: hi
슬랙으로 메시지가 성공적으로 전송되었습니다!
^C
종료 중입니다...
Kafka 소비자가 종료되었습니다.Ctrl+C로 코드를 종료합니다.
가상 환경 비활성화 (Linux/Mac)
deactivate
728x90
    
    
  반응형
    
    
    
  '리눅스' 카테고리의 다른 글
| 우분투에서 고정 IP 주소를 설정하는 방법 (3) | 2024.10.15 | 
|---|---|
| 리눅스에서 LISTEN 포트를 확인하는 방법 (0) | 2024.10.15 | 
| 우분투에서 PHP 8.3으로 업그레이드하고 기존의 PHP 8.1을 삭제하는 방법 (0) | 2024.10.14 | 
| Shell 스크립트에서 CRLF와 LF 문제를 해결하는 방법 (0) | 2024.10.11 | 
| Python으로 Kafka Consumer 생성 (0) | 2024.10.11 | 
 
                  
                 
                  
                 
                  
                