Deploying a Celery App
What is Celery?
Celery is a distributed task queue library for Python that enables asynchronous job processing and task scheduling. It decouples task execution from request handling, allowing applications to handle long-running operations without blocking user requests. Celery supports multiple message brokers (Redis, RabbitMQ, Amazon SQS), result backends, and workers across multiple machines.
Key capabilities include:
- Asynchronous task execution with worker processes
- Task scheduling and periodic tasks using Celery Beat
- Automatic task retries with exponential backoff
- Task routing and priority queuing
- Real-time monitoring and task introspection
- Distributed processing across multiple workers and machines
- Integration with popular Python frameworks (Django, Flask, FastAPI)
- Task result persistence with multiple backend options
- Chain, chord, and group task primitives for complex workflows
- Graceful shutdown and task cancellation
Celery is ideal for sending emails, processing image uploads, running data analysis, generating reports, and any CPU-intensive or I/O-bound operations that shouldn’t block your application.
Prerequisites
Before deploying a Celery application to Klutch.sh, ensure you have:
- Python 3.9+ installed on your local machine
- pip or conda for dependency management
- Git and a GitHub account
- A Klutch.sh account with dashboard access
- A message broker (Redis or RabbitMQ) - can be deployed separately on Klutch.sh or use managed services
- Basic understanding of Python virtual environments
Getting Started with Celery
Step 1: Create Your Project Directory
mkdir my-celery-appcd my-celery-apppython3 -m venv venvsource venv/bin/activate # On Windows: venv\Scripts\activateStep 2: Install Celery and Dependencies
pip install celery redis flask requestsThe redis package is the Python client for Redis broker. flask and requests are optional but useful for creating a simple web interface to submit tasks.
Step 3: Create Your Configuration File
Create a celery_config.py file to centralize Celery configuration:
import osfrom kombu import Exchange, Queue
# Broker configurationCELERY_BROKER_URL = os.getenv( 'CELERY_BROKER_URL', 'redis://localhost:6379/0')
# Result backend configurationCELERY_RESULT_BACKEND = os.getenv( 'CELERY_RESULT_BACKEND', 'redis://localhost:6379/1')
# Task settingsCELERY_TASK_SERIALIZER = 'json'CELERY_RESULT_SERIALIZER = 'json'CELERY_ACCEPT_CONTENT = ['json']CELERY_TIMEZONE = 'UTC'CELERY_ENABLE_UTC = True
# Task execution settingsCELERY_TASK_TRACK_STARTED = TrueCELERY_TASK_TIME_LIMIT = 30 * 60 # 30 minutes hard limitCELERY_TASK_SOFT_TIME_LIMIT = 25 * 60 # 25 minutes soft limitCELERY_TASK_ACKS_LATE = TrueCELERY_TASK_REJECT_ON_WORKER_LOST = True
# Worker settingsCELERY_WORKER_PREFETCH_MULTIPLIER = 4CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
# LoggingCELERY_WORKER_LOG_FORMAT = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'CELERY_WORKER_TASK_LOG_FORMAT = '[%(asctime)s: %(levelname)s/%(processName)s] [%(task_name)s(%(task_id)s)] %(message)s'
# Queue configurationCELERY_DEFAULT_QUEUE = 'default'CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), Queue('high_priority', Exchange('high'), routing_key='high'), Queue('low_priority', Exchange('low'), routing_key='low'),)Step 4: Create Your Celery Application Instance
Create a celery_app.py file:
from celery import Celeryfrom celery_config import *
app = Celery(__name__)app.config_from_object('celery_config')
# Auto-discover tasks from all installed appsapp.autodiscover_tasks()
@app.task(bind=True)def debug_task(self): print(f'Request: {self.request!r}')Step 5: Define Your Tasks
Create a tasks.py file with your background tasks:
from celery_app import appimport timeimport requestsfrom datetime import datetime
@app.task(bind=True, max_retries=3)def send_email(self, email_address, subject, body): """ Send an email asynchronously with retry logic. Retries up to 3 times on failure. """ try: # Simulate email sending print(f"Sending email to {email_address}") # In production, use a service like SendGrid or AWS SES time.sleep(2) return f"Email sent to {email_address}" except Exception as exc: # Retry with exponential backoff raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@app.taskdef process_image(image_url, width, height): """ Download and process an image asynchronously. """ try: response = requests.get(image_url) response.raise_for_status()
# In production, use PIL/Pillow to resize the image print(f"Processing image: {width}x{height}")
return { 'status': 'success', 'image_url': image_url, 'dimensions': f'{width}x{height}' } except requests.RequestException as e: return { 'status': 'error', 'error': str(e) }
@app.taskdef generate_report(report_type, filters=None): """ Generate a report asynchronously. """ print(f"Generating {report_type} report with filters: {filters}")
# Simulate long-running report generation time.sleep(5)
return { 'report_type': report_type, 'generated_at': datetime.now().isoformat(), 'filters': filters or {} }
@app.taskdef cleanup_old_files(days=30): """ Periodic task to clean up old temporary files. """ print(f"Cleaning up files older than {days} days") return f"Cleanup completed for files older than {days} days"
# Chord example: process multiple images in parallel then aggregate resultsfrom celery import chord
def process_multiple_images(image_urls, width, height): """ Process multiple images in parallel. """ callback = aggregate_results.s() header = [process_image.s(url, width, height) for url in image_urls] result = chord(header)(callback) return result
@app.taskdef aggregate_results(results): """ Aggregate results from multiple image processing tasks. """ successful = sum(1 for r in results if r.get('status') == 'success') failed = sum(1 for r in results if r.get('status') == 'error')
return { 'total_processed': len(results), 'successful': successful, 'failed': failed }Step 6: Create a Simple Flask Web Interface
Create an app.py file to submit tasks via HTTP:
from flask import Flask, request, jsonifyfrom celery_app import app as celery_appfrom tasks import send_email, process_image, generate_report, process_multiple_imagesimport os
app = Flask(__name__)
@app.route('/health', methods=['GET'])def health_check(): """Health check endpoint for monitoring.""" return jsonify({ 'status': 'healthy', 'service': 'celery-app' }), 200
@app.route('/tasks/send-email', methods=['POST'])def submit_send_email(): """Submit an email task.""" data = request.get_json()
email = data.get('email') subject = data.get('subject') body = data.get('body')
if not all([email, subject, body]): return jsonify({'error': 'Missing required fields'}), 400
task = send_email.delay(email, subject, body)
return jsonify({ 'task_id': task.id, 'status': task.status }), 202
@app.route('/tasks/process-image', methods=['POST'])def submit_process_image(): """Submit an image processing task.""" data = request.get_json()
image_url = data.get('image_url') width = data.get('width', 800) height = data.get('height', 600)
if not image_url: return jsonify({'error': 'image_url is required'}), 400
task = process_image.delay(image_url, width, height)
return jsonify({ 'task_id': task.id, 'status': task.status }), 202
@app.route('/tasks/generate-report', methods=['POST'])def submit_generate_report(): """Submit a report generation task.""" data = request.get_json()
report_type = data.get('report_type') filters = data.get('filters')
if not report_type: return jsonify({'error': 'report_type is required'}), 400
task = generate_report.delay(report_type, filters)
return jsonify({ 'task_id': task.id, 'status': task.status }), 202
@app.route('/tasks/status/<task_id>', methods=['GET'])def get_task_status(task_id): """Get the status of a submitted task.""" task = celery_app.AsyncResult(task_id)
response = { 'task_id': task.id, 'status': task.status, }
if task.state == 'PENDING': response['result'] = 'Task is pending...' elif task.state == 'SUCCESS': response['result'] = task.result elif task.state == 'FAILURE': response['result'] = str(task.info) elif task.state == 'RETRY': response['result'] = 'Task is retrying...'
return jsonify(response), 200
@app.route('/tasks/revoke/<task_id>', methods=['POST'])def revoke_task(task_id): """Revoke (cancel) a submitted task.""" celery_app.control.revoke(task_id, terminate=True)
return jsonify({ 'task_id': task_id, 'status': 'revoked' }), 200
if __name__ == '__main__': port = int(os.getenv('PORT', 5000)) app.run(host='0.0.0.0', port=port, debug=False)Step 7: Create Requirements File
pip freeze > requirements.txtYour requirements.txt should contain:
Celery==5.3.4Flask==3.0.0Redis==5.0.1requests==2.31.0Werkzeug==3.0.1gunicorn==21.2.0Step 8: Test Locally
Start Redis locally (if using Docker):
docker run -d -p 6379:6379 redis:7-alpineIn one terminal, start the Celery worker:
celery -A celery_app worker --loglevel=infoIn another terminal, start the Flask app:
python app.pyIn a third terminal, test the task submission:
curl -X POST http://localhost:5000/tasks/send-email \ -H "Content-Type: application/json" \ -d '{"email":"test@example.com","subject":"Test","body":"Hello"}'Deploying Without a Dockerfile
Klutch.sh uses Nixpacks to automatically detect and build your Celery application from your source code.
Prepare Your Repository
- Initialize a Git repository and commit your code:
git initgit add .git commit -m "Initial Celery app commit"- Create a
.gitignorefile:
venv/__pycache__/*.pyc*.pyo*.egg-info/.env.DS_Store- Push to GitHub:
git remote add origin https://github.com/YOUR_USERNAME/my-celery-app.gitgit branch -M maingit push -u origin mainDeploy to Klutch.sh
-
Log in to Klutch.sh dashboard.
-
Click “Create a new project” and provide a project name.
-
Inside your project, click “Create a new app”.
-
Repository Configuration:
- Select your GitHub repository containing the Celery app
- Select the branch to deploy (typically
main)
-
Traffic Settings:
- Select “HTTP” as the traffic type
-
Port Configuration:
- Set the internal port to 5000 (the port the Flask app listens on)
-
Environment Variables: Set the following environment variables in the Klutch.sh dashboard:
CELERY_BROKER_URL: Your Redis or RabbitMQ connection string (e.g.,redis://redis-instance-hostname:6379/0)CELERY_RESULT_BACKEND: Your result backend URL (e.g.,redis://redis-instance-hostname:6379/1)PYTHONUNBUFFERED: Set to1to ensure Python output is logged immediately
-
Build and Start Commands: If you need to customize the build or start command, set these environment variables:
BUILD_COMMAND: Build-time dependencies installation. Default:pip install -r requirements.txtSTART_COMMAND: The command to start your app. Default:gunicorn app:app --bind 0.0.0.0:$PORT
For example, if your app structure is different, you might set:
START_COMMAND=flask --app app run --host 0.0.0.0 --port $PORT -
Region, Compute, and Instances:
- Choose your desired region for optimal latency
- Select compute resources based on your task complexity (Starter, Pro, or Premium)
- Set the number of instances (start with 1-2 for testing)
-
Click “Create” to deploy. Klutch.sh will automatically build your application using Nixpacks and deploy it.
-
Once deployment completes, your app will be accessible at
example-app.klutch.sh.
Verifying the Deployment
Test your deployed app:
curl https://example-app.klutch.sh/healthYou should receive:
{ "status": "healthy", "service": "celery-app"}Deploying With a Dockerfile
If you prefer more control over your build environment, you can provide a custom Dockerfile. Klutch.sh automatically detects and uses a Dockerfile in your repository’s root directory.
Create a Multi-Stage Dockerfile
Create a Dockerfile in your project root:
# Build stageFROM python:3.11-slim as builder
WORKDIR /app
# Install system dependenciesRUN apt-get update && apt-get install -y --no-install-recommends \ build-essential \ && rm -rf /var/lib/apt/lists/*
# Copy requirements and install Python dependenciesCOPY requirements.txt .RUN pip install --user --no-cache-dir -r requirements.txt
# Runtime stageFROM python:3.11-slim
WORKDIR /app
# Copy Python dependencies from builderCOPY --from=builder /root/.local /root/.local
# Set PATH to use pip from builderENV PATH=/root/.local/bin:$PATHENV PYTHONUNBUFFERED=1
# Copy application codeCOPY . .
# Create non-root user for securityRUN useradd -m -u 1000 celery_user && \ chown -R celery_user:celery_user /app
USER celery_user
# Health checkHEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD curl -f http://localhost:5000/health || exit 1
# Expose portEXPOSE 5000
# Start the applicationCMD ["gunicorn", "app:app", "--bind", "0.0.0.0:5000", "--workers", "4", "--timeout", "120"]Advanced Dockerfile with Celery Worker
If you want a separate container for Celery workers, create a Dockerfile.worker:
FROM python:3.11-slim
WORKDIR /app
# Install system dependenciesRUN apt-get update && apt-get install -y --no-install-recommends \ build-essential \ && rm -rf /var/lib/apt/lists/*
# Copy requirementsCOPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt
# Copy application codeCOPY . .
# Create non-root userRUN useradd -m -u 1000 celery_user && \ chown -R celery_user:celery_user /app
USER celery_user
# Set environment variablesENV PYTHONUNBUFFERED=1
# Health check (celery worker doesn't expose HTTP, so we check the process)HEALTHCHECK --interval=30s --timeout=10s --start-period=10s --retries=3 \ CMD ps aux | grep 'celery.*worker' | grep -v grep || exit 1
# Start Celery workerCMD ["celery", "-A", "celery_app", "worker", "--loglevel=info", "--concurrency=4"]Deploy the Dockerfile Version
- Push your code with the Dockerfile to GitHub:
git add Dockerfilegit commit -m "Add Dockerfile for custom build"git push-
Log in to Klutch.sh dashboard.
-
- Select your GitHub repository and branch
- Set traffic type to “HTTP”
- Set the internal port to 5000
- Add environment variables (same as Nixpacks deployment)
- Click “Create”
-
Klutch.sh will automatically detect your Dockerfile and use it for building and deployment.
Environment Variables and Configuration
Essential Environment Variables
Configure these variables in the Klutch.sh dashboard:
| Variable | Description | Example |
|---|---|---|
CELERY_BROKER_URL | Message broker connection string | redis://redis-host:6379/0 |
CELERY_RESULT_BACKEND | Result backend for storing task results | redis://redis-host:6379/1 |
PYTHONUNBUFFERED | Enable immediate output logging | 1 |
FLASK_ENV | Flask environment | production |
Customization Environment Variables (Nixpacks)
For Nixpacks deployments, customize build and runtime behavior:
| Variable | Purpose | Default |
|---|---|---|
BUILD_COMMAND | Command to run during build | pip install -r requirements.txt |
START_COMMAND | Command to start the application | gunicorn app:app --bind 0.0.0.0:$PORT |
Celery-Specific Environment Variables
Configure Celery behavior:
CELERY_TASK_TIME_LIMIT=1800CELERY_TASK_SOFT_TIME_LIMIT=1500CELERY_WORKER_PREFETCH_MULTIPLIER=4CELERY_WORKER_MAX_TASKS_PER_CHILD=1000Message Brokers and Result Backends
Redis Setup
Redis is the recommended broker for Celery. To use a Redis instance with Klutch.sh:
- Deploy a Redis instance on Klutch.sh (from the open-source software marketplace)
- Get the connection details from the Redis app dashboard
- Set
CELERY_BROKER_URL=redis://redis-hostname:6379/0in your Celery app environment variables
Example Redis connection string:
redis://my-redis-app.klutch.sh:6379/0RabbitMQ Setup
For RabbitMQ as a broker:
- Deploy RabbitMQ on Klutch.sh or use a managed service
- Set
CELERY_BROKER_URL=amqp://user:password@rabbitmq-host:5672//
Result Backend Options
Store task results in:
-
Redis: Fast in-memory storage (recommended)
CELERY_RESULT_BACKEND=redis://redis-host:6379/1 -
Database: Persistent PostgreSQL storage
CELERY_RESULT_BACKEND=db+postgresql://user:password@db-host:5432/celery_results -
RabbitMQ: Message queue-based backend
CELERY_RESULT_BACKEND=rpc://
Worker Management and Scaling
Running Multiple Workers
For the Dockerfile worker approach, create multiple app instances on Klutch.sh:
- Deploy a second app instance using the
Dockerfile.worker - Set concurrency:
CELERY_WORKER_CONCURRENCY=4 - Scale to multiple instances for higher throughput
Graceful Shutdown Configuration
Celery supports graceful shutdown. Update your celery_config.py:
# Graceful shutdown timeout (seconds)CELERY_WORKER_SHUTDOWN_TIMEOUT = 600 # 10 minutes
# Task prefetch behaviorCELERY_WORKER_PREFETCH_MULTIPLIER = 4
# Don't reject tasks if worker is lostCELERY_TASK_REJECT_ON_WORKER_LOST = True
# Acknowledge tasks after executionCELERY_TASK_ACKS_LATE = TrueMonitoring Worker Health
Add a worker monitoring task in tasks.py:
@app.taskdef monitor_worker(): """Monitoring task to check worker health.""" import socket from datetime import datetime
return { 'hostname': socket.gethostname(), 'timestamp': datetime.now().isoformat(), 'status': 'healthy' }Persistent Storage for Task Data
Celery task data and logs can be stored persistently. Configure persistent storage on Klutch.sh:
Adding Persistent Volume
- In the Klutch.sh app dashboard, navigate to “Persistent Storage” or “Volumes”
- Click “Add Volume”
- Set the mount path:
/app/task_logs - Set the size:
10 GB(adjust based on your needs) - Save and redeploy
Updating Application Code
Update your app.py to log tasks to persistent storage:
import loggingimport os
# Configure logging to persistent storagelog_dir = os.getenv('LOG_DIR', '/app/task_logs')os.makedirs(log_dir, exist_ok=True)
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(os.path.join(log_dir, 'celery.log')), logging.StreamHandler() ])
logger = logging.getLogger(__name__)Custom Domains
To serve your Celery API from a custom domain:
- In the Klutch.sh app dashboard, navigate to “Custom Domains”
- Click “Add Custom Domain”
- Enter your domain (e.g.,
api.example.com) - Follow the DNS configuration instructions provided
- Point your domain’s CNAME to the Klutch.sh endpoint
Example DNS configuration:
api.example.com CNAME example-app.klutch.shMonitoring and Logging
Real-Time Monitoring
Use Celery’s built-in monitoring tools. Add Flower (Celery monitoring tool) to your requirements.txt:
flower==2.0.1Create a flower_config.py:
import os
# Flower settingsFLOWER_PORT = int(os.getenv('FLOWER_PORT', 5555))FLOWER_BASIC_AUTH = os.getenv('FLOWER_BASIC_AUTH', 'admin:admin')FLOWER_BROKER = os.getenv('CELERY_BROKER_URL')Run Flower locally:
celery -A celery_app flower --port=5555Task Logging and Monitoring
Monitor task execution in your Klutch.sh dashboard:
- Check application logs for worker output
- Monitor memory and CPU usage of worker instances
- Set up alerts for failed tasks
Add task monitoring to tasks.py:
import logginglogger = logging.getLogger(__name__)
@app.task(bind=True)def monitored_task(self): """Task with built-in monitoring.""" logger.info(f'Task {self.request.id} started')
try: result = perform_work() logger.info(f'Task {self.request.id} completed successfully') return result except Exception as e: logger.error(f'Task {self.request.id} failed: {str(e)}') raisePrometheus Metrics
Export Celery metrics to Prometheus:
from prometheus_client import Counter, Histogram, generate_latestimport time
# Prometheus metricstasks_total = Counter('celery_tasks_total', 'Total tasks', ['task_name', 'status'])task_duration = Histogram('celery_task_duration_seconds', 'Task duration', ['task_name'])
@app.taskdef tracked_task(): """Task with Prometheus tracking.""" start = time.time() task_name = 'tracked_task'
try: result = perform_work() tasks_total.labels(task_name=task_name, status='success').inc() return result except Exception as e: tasks_total.labels(task_name=task_name, status='failure').inc() raise finally: duration = time.time() - start task_duration.labels(task_name=task_name).observe(duration)Security Best Practices
- Use HTTPS: Ensure all connections to your Celery API endpoint use HTTPS
- Broker Authentication: Use authenticated Redis/RabbitMQ connections with strong passwords
- Environment Variables: Never hardcode credentials; use Klutch.sh environment variables
- Task Serialization: Use JSON serialization (not pickle) to prevent code injection
- Task Time Limits: Set reasonable time limits to prevent runaway tasks
- Worker Isolation: Run workers as non-root users (shown in Dockerfile)
- Network Segmentation: Deploy Celery workers and brokers in private networks when possible
- Task Secrets: Use secure methods to pass sensitive data to tasks
- Rate Limiting: Implement task rate limiting to prevent abuse
- Monitoring Alerts: Set up alerts for task failures and worker health
Example secure task with rate limiting:
from celery.exceptions import Rejectfrom datetime import datetime, timedelta
# Rate limiting dictionary (use Redis in production)rate_limit_store = {}
@app.task(bind=True, rate_limit='100/m')def rate_limited_task(self, user_id): """Task with rate limiting.""" key = f'user_{user_id}' now = datetime.now()
if key in rate_limit_store: last_called = rate_limit_store[key] if (now - last_called) < timedelta(seconds=1): # Retry after 1 second raise self.retry(countdown=1)
rate_limit_store[key] = now return f"Task executed for user {user_id}"Troubleshooting
Issue 1: Connection Refused to Broker
Problem: Tasks fail with “Connection refused” when connecting to Redis/RabbitMQ.
Solution:
- Verify
CELERY_BROKER_URLis correct in environment variables - Ensure Redis/RabbitMQ instance is running and accessible from Klutch.sh
- Check firewall rules allow connection on the broker port
- Test connection:
redis-cli -h your-redis-host ping
Issue 2: Tasks Stuck in PENDING State
Problem: Tasks are submitted but never execute (stuck in PENDING).
Solution:
- Verify at least one Celery worker is running:
celery -A celery_app inspect active_queues - Check worker logs in Klutch.sh dashboard for errors
- Ensure worker is connected to the same broker as the app
- Verify the task function is properly decorated with
@app.task
Issue 3: High Memory Usage in Workers
Problem: Celery workers consume excessive memory over time.
Solution:
- Reduce
CELERY_WORKER_MAX_TASKS_PER_CHILDto recycle workers more frequently - Decrease
CELERY_WORKER_PREFETCH_MULTIPLIERto process fewer tasks in advance - Monitor individual task memory usage and optimize
- Use memory profiling tools:
pip install memory-profiler
Issue 4: Tasks Not Being Retried
Problem: Failed tasks are not retried as configured.
Solution:
- Ensure
max_retriesis set in the task decorator:@app.task(max_retries=3) - Verify the exception is not being caught and suppressed
- Check
CELERY_TASK_ACKS_LATE=Trueis set so failed tasks are requeued - Use
self.retry(countdown=60)instead ofraise self.retry()
Issue 5: Custom Commands Fail in Nixpacks
Problem: Custom BUILD_COMMAND or START_COMMAND not executing.
Solution:
- Verify command syntax is correct and executable
- Use full paths to binaries:
/usr/local/bin/pythoninstead ofpython - Check logs in Klutch.sh deployment history
- Example:
START_COMMAND="python -m gunicorn app:app --bind 0.0.0.0:$PORT"
Best Practices for Production Deployment
-
Use Connection Pooling: Redis connection pooling prevents connection exhaustion
from redis import ConnectionPoolpool = ConnectionPool.from_url(CELERY_BROKER_URL) -
Implement Idempotent Tasks: Tasks should produce the same result when retried
@app.taskdef idempotent_task(user_id, unique_id):"""Check if task was already executed."""if Task.objects.filter(unique_id=unique_id).exists():return "Already executed"# Execute task... -
Set Task Timeouts: Prevent hung tasks from consuming resources
@app.task(time_limit=600, soft_time_limit=540)def time_limited_task():pass -
Use Task Routing: Route specific tasks to specific workers
@app.task(queue='high_priority', routing_key='high')def urgent_task():pass -
Monitor Queue Depths: Alert when queues grow unexpectedly
@app.taskdef check_queue_depth():inspector = app.control.inspect()reserved = inspector.reserved()return reserved -
Implement Backpressure: Stop accepting tasks when queues are full
@app.taskdef submit_with_backpressure(task_data):inspector = app.control.inspect()active = inspector.active()if sum(len(tasks) for tasks in active.values()) > 10000:raise Exception("Task queue is full, please retry later") -
Use Dead Letter Queues: Capture and analyze failed tasks
CELERY_DEFAULT_QUEUE = 'default'CELERY_TASK_REJECT_ON_WORKER_LOST = True -
Schedule Periodic Tasks with Beat: Automate recurring work
from celery.schedules import crontabfrom celery import Celeryapp.conf.beat_schedule = {'cleanup-every-hour': {'task': 'tasks.cleanup_old_files','schedule': crontab(minute=0),},} -
Graceful Scaling: Plan for scaling workers and brokers
- Start with 1-2 worker instances
- Monitor task latency and queue depth
- Scale workers horizontally as load increases
- Use separate broker instances for very large deployments
-
Backup Result Data: Persist important task results
CELERY_RESULT_BACKEND = 'db+postgresql://user:pass@host/db'CELERY_RESULT_EXPIRES = 3600 # 1 hour
Resources
- Celery Official Documentation
- Redis Documentation
- RabbitMQ Documentation
- Flower Monitoring Documentation
- Celery on Wikipedia
Conclusion
Deploying Celery applications to Klutch.sh provides a scalable, reliable foundation for distributed task processing. Whether you choose Nixpacks for simplicity or Docker for control, Klutch.sh handles the infrastructure complexity, allowing you to focus on your application logic.
Key takeaways:
- Use Nixpacks for quick deployments with minimal configuration
- Use Docker for custom build environments and advanced configurations
- Configure environment variables for broker and result backend connectivity
- Monitor worker health and task execution with logging and metrics
- Implement graceful shutdown and task retry logic for reliability
- Scale workers horizontally as your task volume increases
For questions or support, refer to the Celery documentation or Klutch.sh support resources.