Celery Worker 방식에 대하여

반응형

Celery Worker 방식비동기 작업(Asynchronous Task) 을 처리하기 위한 분산 작업 큐(Distributed Task Queue) 시스템입니다.
즉, 메인 애플리케이션이 바로 처리하기엔 오래 걸리는 일을 백그라운드에서 병렬로 처리하도록 도와주는 구조예요.
하나씩 단계별로 설명드릴게요.


🧩 1. Celery의 기본 개념

Celery는 Python에서 많이 사용하는 비동기 작업 처리 프레임워크로,
Flask, Django, FastAPI 등과 함께 자주 사용됩니다.

구성 요소는 보통 다음과 같습니다:

  • Producer (또는 Client): 작업을 생성해서 큐에 넣는 주체 (보통 웹서버)
  • Broker: 작업을 임시로 저장하는 큐 (예: Redis, RabbitMQ)
  • Worker: 큐에 있는 작업을 꺼내 실제로 처리하는 프로세스
  • Result Backend (선택): 작업 결과를 저장하는 곳 (예: Redis, DB 등)

⚙️ 2. Worker 방식의 동작 흐름

Celery Worker는 “작업 처리 전담 프로세스”입니다.
흐름은 다음과 같습니다:

  1. 앱이 작업 요청→ send_email이라는 Celery task가 큐에 들어갑니다.
  2. send_email.delay(user_id)
  3. Broker(예: Redis)에 Task 저장
    • Celery는 Redis에 “해야 할 일”을 메시지 형태로 남깁니다.
  4. Worker가 Broker 감시
    • Worker 프로세스(celery -A app worker -l info)가 계속 Redis를 감시합니다.
    • 새로운 작업이 들어오면 꺼내서 실행합니다.
  5. 작업 수행
    • Worker가 실제 이메일 전송, 파일 변환, AI 분석 등 CPU나 I/O가 무거운 일을 수행합니다.
  6. 결과 저장 (선택)
    • 성공/실패 여부나 결과를 result backend에 저장할 수 있습니다.

🧠 3. Worker 방식의 장점

  • 메인 서버 부하 감소
    웹 요청과 무거운 연산을 분리할 수 있습니다.
  • 비동기 처리
    사용자는 즉시 응답을 받고, 작업은 백그라운드에서 진행됩니다.
  • 🧮 병렬 처리 가능
    여러 Worker 프로세스를 띄워 동시에 많은 작업을 병렬로 처리할 수 있습니다.
  • 🧱 확장성 높음
    서버를 늘려서 Worker만 추가하면 처리 속도 향상 가능.

📦 4. 예시 구조

[Flask App] → [Redis Broker] → [Celery Worker] → [Result Backend]

예를 들어, Flask 앱에서 환자 데이터를 분석하는 AI 기능을 Celery로 처리한다면:

from celery import Celery

app = Celery('ai_tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

@app.task
def analyze_dicom(dicom_id):
    # 오래 걸리는 AI 분석 로직
    result = heavy_ai_analysis(dicom_id)
    return result

이후 Flask 서버에서는:

@app.route('/analyze/<int:id>')
def start_analysis(id):
    analyze_dicom.delay(id)
    return "분석이 시작되었습니다."

이러면 Flask는 바로 응답을 주고, Worker가 백그라운드에서 AI 분석을 진행하게 됩니다.


🚀 5. 실제 운영 시

운영 환경에서는 보통 이렇게 구성합니다:

  • celery -A app worker -l info → Worker 실행
  • celery -A app beat -l info → 주기적인 작업 예약용 (예: 매일 리포트 생성)
  • Flower → Celery 작업 모니터링 웹 UI

그럼 Flask + Celery + Redis 기반의 Worker 구조 예제를 단계별로 보여드릴게요.


🧱 1. 프로젝트 구조

flask_celery_app/
├── app.py               # Flask 메인 서버
├── celery_worker.py     # Celery 설정 및 Worker 엔트리포인트
├── tasks.py             # 실제 수행할 작업 정의
├── requirements.txt
└── config.py            # 설정 파일 (Redis URL 등)

⚙️ 2. config.py

# config.py
import os

REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
CELERY_RESULT_BACKEND = os.getenv("RESULT_BACKEND", "redis://localhost:6379/1")

🚀 3. celery_worker.py

# celery_worker.py
from celery import Celery
from config import REDIS_URL, CELERY_RESULT_BACKEND

# Celery 객체 생성
celery = Celery(
    "flask_celery_app",
    broker=REDIS_URL,
    backend=CELERY_RESULT_BACKEND,
)

# Flask와 분리된 Celery 설정
celery.conf.update(
    task_routes={
        "tasks.send_email": {"queue": "email"},
        "tasks.analyze_dicom": {"queue": "ai"},
    },
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
)

🧠 4. tasks.py

# tasks.py
from celery_worker import celery
import time

@celery.task
def send_email(email):
    print(f"📧 Sending email to {email} ...")
    time.sleep(3)
    print("✅ Email sent successfully!")
    return f"Email sent to {email}"

@celery.task
def analyze_dicom(dicom_id):
    print(f"🧠 Analyzing DICOM ID: {dicom_id}")
    time.sleep(5)
    print(f"✅ Analysis complete for DICOM ID {dicom_id}")
    return {"dicom_id": dicom_id, "result": "Normal"}

🌐 5. app.py (Flask 메인 서버)

# app.py
from flask import Flask, jsonify
from tasks import send_email, analyze_dicom

app = Flask(__name__)

@app.route("/send-email/<email>")
def send_email_task(email):
    result = send_email.delay(email)  # 비동기 실행
    return jsonify({"task_id": result.id, "status": "email task started"})

@app.route("/analyze/<dicom_id>")
def analyze_task(dicom_id):
    result = analyze_dicom.delay(dicom_id)
    return jsonify({"task_id": result.id, "status": "dicom analysis started"})

if __name__ == "__main__":
    app.run(debug=True)

🧩 6. requirements.txt

Flask==3.0.0
Celery==5.3.4
redis==5.0.1

▶️ 7. 실행 순서

1️⃣ Redis 실행

Docker를 사용하면 간단합니다:

docker run -d -p 6379:6379 redis

2️⃣ Flask 서버 실행

python app.py

3️⃣ Celery Worker 실행

celery -A celery_worker.celery worker --loglevel=info

(만약 큐별로 분리하고 싶다면)

celery -A celery_worker.celery worker -Q email --loglevel=info
celery -A celery_worker.celery worker -Q ai --loglevel=info

🧭 8. 동작 예시

  1. Flask 서버에 요청:→ 응답: { "task_id": "...", "status": "email task started" }
  2. GET http://localhost:5000/send-email/test@mai.com
  3. Celery Worker 콘솔에서:
  4. [INFO/MainProcess] Task tasks.send_email[test@mai.com] succeeded ✅ Email sent successfully!

🧰 9. (선택) 결과 확인하기

Worker 결과를 Flask에서 조회하려면:

from celery.result import AsyncResult
from celery_worker import celery

@app.route("/result/<task_id>")
def get_result(task_id):
    result = AsyncResult(task_id, app=celery)
    return jsonify({"status": result.status, "result": result.result})

💡 확장 아이디어

  • Celery Beat를 이용해서 주기적인 작업 예약 (예: 매일 보고서 자동 생성)
  • Flower 웹 UI로 작업 상태 모니터링:→ 브라우저에서 http://localhost:5555 접속
  • pip install flower celery -A celery_worker.celery flower

 

반응형