반응형
Celery Worker 방식은 비동기 작업(Asynchronous Task) 을 처리하기 위한 분산 작업 큐(Distributed Task Queue) 시스템입니다.
즉, 메인 애플리케이션이 바로 처리하기엔 오래 걸리는 일을 백그라운드에서 병렬로 처리하도록 도와주는 구조예요.
하나씩 단계별로 설명드릴게요.
🧩 1. Celery의 기본 개념
Celery는 Python에서 많이 사용하는 비동기 작업 처리 프레임워크로,
Flask, Django, FastAPI 등과 함께 자주 사용됩니다.
구성 요소는 보통 다음과 같습니다:
- Producer (또는 Client): 작업을 생성해서 큐에 넣는 주체 (보통 웹서버)
- Broker: 작업을 임시로 저장하는 큐 (예: Redis, RabbitMQ)
- Worker: 큐에 있는 작업을 꺼내 실제로 처리하는 프로세스
- Result Backend (선택): 작업 결과를 저장하는 곳 (예: Redis, DB 등)
⚙️ 2. Worker 방식의 동작 흐름
Celery Worker는 “작업 처리 전담 프로세스”입니다.
흐름은 다음과 같습니다:
- 앱이 작업 요청→ send_email이라는 Celery task가 큐에 들어갑니다.
- send_email.delay(user_id)
- Broker(예: Redis)에 Task 저장
- Celery는 Redis에 “해야 할 일”을 메시지 형태로 남깁니다.
- Worker가 Broker 감시
- Worker 프로세스(celery -A app worker -l info)가 계속 Redis를 감시합니다.
- 새로운 작업이 들어오면 꺼내서 실행합니다.
- 작업 수행
- Worker가 실제 이메일 전송, 파일 변환, AI 분석 등 CPU나 I/O가 무거운 일을 수행합니다.
- 결과 저장 (선택)
- 성공/실패 여부나 결과를 result backend에 저장할 수 있습니다.
🧠 3. Worker 방식의 장점
- ✅ 메인 서버 부하 감소
웹 요청과 무거운 연산을 분리할 수 있습니다. - ⚡ 비동기 처리
사용자는 즉시 응답을 받고, 작업은 백그라운드에서 진행됩니다. - 🧮 병렬 처리 가능
여러 Worker 프로세스를 띄워 동시에 많은 작업을 병렬로 처리할 수 있습니다. - 🧱 확장성 높음
서버를 늘려서 Worker만 추가하면 처리 속도 향상 가능.
📦 4. 예시 구조
[Flask App] → [Redis Broker] → [Celery Worker] → [Result Backend]
예를 들어, Flask 앱에서 환자 데이터를 분석하는 AI 기능을 Celery로 처리한다면:
from celery import Celery
app = Celery('ai_tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
@app.task
def analyze_dicom(dicom_id):
# 오래 걸리는 AI 분석 로직
result = heavy_ai_analysis(dicom_id)
return result
이후 Flask 서버에서는:
@app.route('/analyze/<int:id>')
def start_analysis(id):
analyze_dicom.delay(id)
return "분석이 시작되었습니다."
이러면 Flask는 바로 응답을 주고, Worker가 백그라운드에서 AI 분석을 진행하게 됩니다.
🚀 5. 실제 운영 시
운영 환경에서는 보통 이렇게 구성합니다:
- celery -A app worker -l info → Worker 실행
- celery -A app beat -l info → 주기적인 작업 예약용 (예: 매일 리포트 생성)
- Flower → Celery 작업 모니터링 웹 UI
그럼 Flask + Celery + Redis 기반의 Worker 구조 예제를 단계별로 보여드릴게요.
🧱 1. 프로젝트 구조
flask_celery_app/
├── app.py # Flask 메인 서버
├── celery_worker.py # Celery 설정 및 Worker 엔트리포인트
├── tasks.py # 실제 수행할 작업 정의
├── requirements.txt
└── config.py # 설정 파일 (Redis URL 등)
⚙️ 2. config.py
# config.py
import os
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
CELERY_RESULT_BACKEND = os.getenv("RESULT_BACKEND", "redis://localhost:6379/1")
🚀 3. celery_worker.py
# celery_worker.py
from celery import Celery
from config import REDIS_URL, CELERY_RESULT_BACKEND
# Celery 객체 생성
celery = Celery(
"flask_celery_app",
broker=REDIS_URL,
backend=CELERY_RESULT_BACKEND,
)
# Flask와 분리된 Celery 설정
celery.conf.update(
task_routes={
"tasks.send_email": {"queue": "email"},
"tasks.analyze_dicom": {"queue": "ai"},
},
task_serializer="json",
result_serializer="json",
accept_content=["json"],
)
🧠 4. tasks.py
# tasks.py
from celery_worker import celery
import time
@celery.task
def send_email(email):
print(f"📧 Sending email to {email} ...")
time.sleep(3)
print("✅ Email sent successfully!")
return f"Email sent to {email}"
@celery.task
def analyze_dicom(dicom_id):
print(f"🧠 Analyzing DICOM ID: {dicom_id}")
time.sleep(5)
print(f"✅ Analysis complete for DICOM ID {dicom_id}")
return {"dicom_id": dicom_id, "result": "Normal"}
🌐 5. app.py (Flask 메인 서버)
# app.py
from flask import Flask, jsonify
from tasks import send_email, analyze_dicom
app = Flask(__name__)
@app.route("/send-email/<email>")
def send_email_task(email):
result = send_email.delay(email) # 비동기 실행
return jsonify({"task_id": result.id, "status": "email task started"})
@app.route("/analyze/<dicom_id>")
def analyze_task(dicom_id):
result = analyze_dicom.delay(dicom_id)
return jsonify({"task_id": result.id, "status": "dicom analysis started"})
if __name__ == "__main__":
app.run(debug=True)
🧩 6. requirements.txt
Flask==3.0.0
Celery==5.3.4
redis==5.0.1
▶️ 7. 실행 순서
1️⃣ Redis 실행
Docker를 사용하면 간단합니다:
docker run -d -p 6379:6379 redis
2️⃣ Flask 서버 실행
python app.py
3️⃣ Celery Worker 실행
celery -A celery_worker.celery worker --loglevel=info
(만약 큐별로 분리하고 싶다면)
celery -A celery_worker.celery worker -Q email --loglevel=info
celery -A celery_worker.celery worker -Q ai --loglevel=info
🧭 8. 동작 예시
- Flask 서버에 요청:→ 응답: { "task_id": "...", "status": "email task started" }
- GET http://localhost:5000/send-email/test@mai.com
- Celery Worker 콘솔에서:
- [INFO/MainProcess] Task tasks.send_email[test@mai.com] succeeded ✅ Email sent successfully!
🧰 9. (선택) 결과 확인하기
Worker 결과를 Flask에서 조회하려면:
from celery.result import AsyncResult
from celery_worker import celery
@app.route("/result/<task_id>")
def get_result(task_id):
result = AsyncResult(task_id, app=celery)
return jsonify({"status": result.status, "result": result.result})
💡 확장 아이디어
- Celery Beat를 이용해서 주기적인 작업 예약 (예: 매일 보고서 자동 생성)
- Flower 웹 UI로 작업 상태 모니터링:→ 브라우저에서 http://localhost:5555 접속
- pip install flower celery -A celery_worker.celery flower
반응형
'[====== Development ======] > Python' 카테고리의 다른 글
| Python 아나콘다(Anaconda) (3) | 2025.07.23 |
|---|---|
| Python - PIP 명령어 모음 (3) | 2025.07.23 |
| 백엔드 API 자동 문서화 (0) | 2025.06.17 |
| Flask-React SSE (Server-Sent Events) 구현 (0) | 2025.04.28 |
| Python - QR Code 이미지 생성 코드 (0) | 2024.12.10 |