Skip to content

Deploying a Celery App

What is Celery?

Celery is a distributed task queue library for Python that enables asynchronous job processing and task scheduling. It decouples task execution from request handling, allowing applications to handle long-running operations without blocking user requests. Celery supports multiple message brokers (Redis, RabbitMQ, Amazon SQS), result backends, and workers across multiple machines.

Key capabilities include:

  • Asynchronous task execution with worker processes
  • Task scheduling and periodic tasks using Celery Beat
  • Automatic task retries with exponential backoff
  • Task routing and priority queuing
  • Real-time monitoring and task introspection
  • Distributed processing across multiple workers and machines
  • Integration with popular Python frameworks (Django, Flask, FastAPI)
  • Task result persistence with multiple backend options
  • Chain, chord, and group task primitives for complex workflows
  • Graceful shutdown and task cancellation

Celery is ideal for sending emails, processing image uploads, running data analysis, generating reports, and any CPU-intensive or I/O-bound operations that shouldn’t block your application.

Prerequisites

Before deploying a Celery application to Klutch.sh, ensure you have:

  • Python 3.9+ installed on your local machine
  • pip or conda for dependency management
  • Git and a GitHub account
  • A Klutch.sh account with dashboard access
  • A message broker (Redis or RabbitMQ) - can be deployed separately on Klutch.sh or use managed services
  • Basic understanding of Python virtual environments

Getting Started with Celery

Step 1: Create Your Project Directory

Terminal window
mkdir my-celery-app
cd my-celery-app
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate

Step 2: Install Celery and Dependencies

Terminal window
pip install celery redis flask requests

The redis package is the Python client for Redis broker. flask and requests are optional but useful for creating a simple web interface to submit tasks.

Step 3: Create Your Configuration File

Create a celery_config.py file to centralize Celery configuration:

import os
from kombu import Exchange, Queue
# Broker configuration
CELERY_BROKER_URL = os.getenv(
'CELERY_BROKER_URL',
'redis://localhost:6379/0'
)
# Result backend configuration
CELERY_RESULT_BACKEND = os.getenv(
'CELERY_RESULT_BACKEND',
'redis://localhost:6379/1'
)
# Task settings
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
# Task execution settings
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60 # 30 minutes hard limit
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60 # 25 minutes soft limit
CELERY_TASK_ACKS_LATE = True
CELERY_TASK_REJECT_ON_WORKER_LOST = True
# Worker settings
CELERY_WORKER_PREFETCH_MULTIPLIER = 4
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
# Logging
CELERY_WORKER_LOG_FORMAT = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
CELERY_WORKER_TASK_LOG_FORMAT = '[%(asctime)s: %(levelname)s/%(processName)s] [%(task_name)s(%(task_id)s)] %(message)s'
# Queue configuration
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('high_priority', Exchange('high'), routing_key='high'),
Queue('low_priority', Exchange('low'), routing_key='low'),
)

Step 4: Create Your Celery Application Instance

Create a celery_app.py file:

from celery import Celery
from celery_config import *
app = Celery(__name__)
app.config_from_object('celery_config')
# Auto-discover tasks from all installed apps
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')

Step 5: Define Your Tasks

Create a tasks.py file with your background tasks:

from celery_app import app
import time
import requests
from datetime import datetime
@app.task(bind=True, max_retries=3)
def send_email(self, email_address, subject, body):
"""
Send an email asynchronously with retry logic.
Retries up to 3 times on failure.
"""
try:
# Simulate email sending
print(f"Sending email to {email_address}")
# In production, use a service like SendGrid or AWS SES
time.sleep(2)
return f"Email sent to {email_address}"
except Exception as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@app.task
def process_image(image_url, width, height):
"""
Download and process an image asynchronously.
"""
try:
response = requests.get(image_url)
response.raise_for_status()
# In production, use PIL/Pillow to resize the image
print(f"Processing image: {width}x{height}")
return {
'status': 'success',
'image_url': image_url,
'dimensions': f'{width}x{height}'
}
except requests.RequestException as e:
return {
'status': 'error',
'error': str(e)
}
@app.task
def generate_report(report_type, filters=None):
"""
Generate a report asynchronously.
"""
print(f"Generating {report_type} report with filters: {filters}")
# Simulate long-running report generation
time.sleep(5)
return {
'report_type': report_type,
'generated_at': datetime.now().isoformat(),
'filters': filters or {}
}
@app.task
def cleanup_old_files(days=30):
"""
Periodic task to clean up old temporary files.
"""
print(f"Cleaning up files older than {days} days")
return f"Cleanup completed for files older than {days} days"
# Chord example: process multiple images in parallel then aggregate results
from celery import chord
def process_multiple_images(image_urls, width, height):
"""
Process multiple images in parallel.
"""
callback = aggregate_results.s()
header = [process_image.s(url, width, height) for url in image_urls]
result = chord(header)(callback)
return result
@app.task
def aggregate_results(results):
"""
Aggregate results from multiple image processing tasks.
"""
successful = sum(1 for r in results if r.get('status') == 'success')
failed = sum(1 for r in results if r.get('status') == 'error')
return {
'total_processed': len(results),
'successful': successful,
'failed': failed
}

Step 6: Create a Simple Flask Web Interface

Create an app.py file to submit tasks via HTTP:

from flask import Flask, request, jsonify
from celery_app import app as celery_app
from tasks import send_email, process_image, generate_report, process_multiple_images
import os
app = Flask(__name__)
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint for monitoring."""
return jsonify({
'status': 'healthy',
'service': 'celery-app'
}), 200
@app.route('/tasks/send-email', methods=['POST'])
def submit_send_email():
"""Submit an email task."""
data = request.get_json()
email = data.get('email')
subject = data.get('subject')
body = data.get('body')
if not all([email, subject, body]):
return jsonify({'error': 'Missing required fields'}), 400
task = send_email.delay(email, subject, body)
return jsonify({
'task_id': task.id,
'status': task.status
}), 202
@app.route('/tasks/process-image', methods=['POST'])
def submit_process_image():
"""Submit an image processing task."""
data = request.get_json()
image_url = data.get('image_url')
width = data.get('width', 800)
height = data.get('height', 600)
if not image_url:
return jsonify({'error': 'image_url is required'}), 400
task = process_image.delay(image_url, width, height)
return jsonify({
'task_id': task.id,
'status': task.status
}), 202
@app.route('/tasks/generate-report', methods=['POST'])
def submit_generate_report():
"""Submit a report generation task."""
data = request.get_json()
report_type = data.get('report_type')
filters = data.get('filters')
if not report_type:
return jsonify({'error': 'report_type is required'}), 400
task = generate_report.delay(report_type, filters)
return jsonify({
'task_id': task.id,
'status': task.status
}), 202
@app.route('/tasks/status/<task_id>', methods=['GET'])
def get_task_status(task_id):
"""Get the status of a submitted task."""
task = celery_app.AsyncResult(task_id)
response = {
'task_id': task.id,
'status': task.status,
}
if task.state == 'PENDING':
response['result'] = 'Task is pending...'
elif task.state == 'SUCCESS':
response['result'] = task.result
elif task.state == 'FAILURE':
response['result'] = str(task.info)
elif task.state == 'RETRY':
response['result'] = 'Task is retrying...'
return jsonify(response), 200
@app.route('/tasks/revoke/<task_id>', methods=['POST'])
def revoke_task(task_id):
"""Revoke (cancel) a submitted task."""
celery_app.control.revoke(task_id, terminate=True)
return jsonify({
'task_id': task_id,
'status': 'revoked'
}), 200
if __name__ == '__main__':
port = int(os.getenv('PORT', 5000))
app.run(host='0.0.0.0', port=port, debug=False)

Step 7: Create Requirements File

Terminal window
pip freeze > requirements.txt

Your requirements.txt should contain:

Celery==5.3.4
Flask==3.0.0
Redis==5.0.1
requests==2.31.0
Werkzeug==3.0.1
gunicorn==21.2.0

Step 8: Test Locally

Start Redis locally (if using Docker):

Terminal window
docker run -d -p 6379:6379 redis:7-alpine

In one terminal, start the Celery worker:

Terminal window
celery -A celery_app worker --loglevel=info

In another terminal, start the Flask app:

Terminal window
python app.py

In a third terminal, test the task submission:

Terminal window
curl -X POST http://localhost:5000/tasks/send-email \
-H "Content-Type: application/json" \
-d '{"email":"test@example.com","subject":"Test","body":"Hello"}'

Deploying Without a Dockerfile

Klutch.sh uses Nixpacks to automatically detect and build your Celery application from your source code.

Prepare Your Repository

  1. Initialize a Git repository and commit your code:
Terminal window
git init
git add .
git commit -m "Initial Celery app commit"
  1. Create a .gitignore file:
venv/
__pycache__/
*.pyc
*.pyo
*.egg-info/
.env
.DS_Store
  1. Push to GitHub:
Terminal window
git remote add origin https://github.com/YOUR_USERNAME/my-celery-app.git
git branch -M main
git push -u origin main

Deploy to Klutch.sh

  1. Log in to Klutch.sh dashboard.

  2. Click “Create a new project” and provide a project name.

  3. Inside your project, click “Create a new app”.

  4. Repository Configuration:

    • Select your GitHub repository containing the Celery app
    • Select the branch to deploy (typically main)
  5. Traffic Settings:

    • Select “HTTP” as the traffic type
  6. Port Configuration:

    • Set the internal port to 5000 (the port the Flask app listens on)
  7. Environment Variables: Set the following environment variables in the Klutch.sh dashboard:

    • CELERY_BROKER_URL: Your Redis or RabbitMQ connection string (e.g., redis://redis-instance-hostname:6379/0)
    • CELERY_RESULT_BACKEND: Your result backend URL (e.g., redis://redis-instance-hostname:6379/1)
    • PYTHONUNBUFFERED: Set to 1 to ensure Python output is logged immediately
  8. Build and Start Commands: If you need to customize the build or start command, set these environment variables:

    • BUILD_COMMAND: Build-time dependencies installation. Default: pip install -r requirements.txt
    • START_COMMAND: The command to start your app. Default: gunicorn app:app --bind 0.0.0.0:$PORT

    For example, if your app structure is different, you might set:

    START_COMMAND=flask --app app run --host 0.0.0.0 --port $PORT
  9. Region, Compute, and Instances:

    • Choose your desired region for optimal latency
    • Select compute resources based on your task complexity (Starter, Pro, or Premium)
    • Set the number of instances (start with 1-2 for testing)
  10. Click “Create” to deploy. Klutch.sh will automatically build your application using Nixpacks and deploy it.

  11. Once deployment completes, your app will be accessible at example-app.klutch.sh.

Verifying the Deployment

Test your deployed app:

Terminal window
curl https://example-app.klutch.sh/health

You should receive:

{
"status": "healthy",
"service": "celery-app"
}

Deploying With a Dockerfile

If you prefer more control over your build environment, you can provide a custom Dockerfile. Klutch.sh automatically detects and uses a Dockerfile in your repository’s root directory.

Create a Multi-Stage Dockerfile

Create a Dockerfile in your project root:

# Build stage
FROM python:3.11-slim as builder
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt
# Runtime stage
FROM python:3.11-slim
WORKDIR /app
# Copy Python dependencies from builder
COPY --from=builder /root/.local /root/.local
# Set PATH to use pip from builder
ENV PATH=/root/.local/bin:$PATH
ENV PYTHONUNBUFFERED=1
# Copy application code
COPY . .
# Create non-root user for security
RUN useradd -m -u 1000 celery_user && \
chown -R celery_user:celery_user /app
USER celery_user
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:5000/health || exit 1
# Expose port
EXPOSE 5000
# Start the application
CMD ["gunicorn", "app:app", "--bind", "0.0.0.0:5000", "--workers", "4", "--timeout", "120"]

Advanced Dockerfile with Celery Worker

If you want a separate container for Celery workers, create a Dockerfile.worker:

FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd -m -u 1000 celery_user && \
chown -R celery_user:celery_user /app
USER celery_user
# Set environment variables
ENV PYTHONUNBUFFERED=1
# Health check (celery worker doesn't expose HTTP, so we check the process)
HEALTHCHECK --interval=30s --timeout=10s --start-period=10s --retries=3 \
CMD ps aux | grep 'celery.*worker' | grep -v grep || exit 1
# Start Celery worker
CMD ["celery", "-A", "celery_app", "worker", "--loglevel=info", "--concurrency=4"]

Deploy the Dockerfile Version

  1. Push your code with the Dockerfile to GitHub:
Terminal window
git add Dockerfile
git commit -m "Add Dockerfile for custom build"
git push
  1. Log in to Klutch.sh dashboard.

  2. Create a new app:

    • Select your GitHub repository and branch
    • Set traffic type to “HTTP”
    • Set the internal port to 5000
    • Add environment variables (same as Nixpacks deployment)
    • Click “Create”
  3. Klutch.sh will automatically detect your Dockerfile and use it for building and deployment.


Environment Variables and Configuration

Essential Environment Variables

Configure these variables in the Klutch.sh dashboard:

VariableDescriptionExample
CELERY_BROKER_URLMessage broker connection stringredis://redis-host:6379/0
CELERY_RESULT_BACKENDResult backend for storing task resultsredis://redis-host:6379/1
PYTHONUNBUFFEREDEnable immediate output logging1
FLASK_ENVFlask environmentproduction

Customization Environment Variables (Nixpacks)

For Nixpacks deployments, customize build and runtime behavior:

VariablePurposeDefault
BUILD_COMMANDCommand to run during buildpip install -r requirements.txt
START_COMMANDCommand to start the applicationgunicorn app:app --bind 0.0.0.0:$PORT

Celery-Specific Environment Variables

Configure Celery behavior:

CELERY_TASK_TIME_LIMIT=1800
CELERY_TASK_SOFT_TIME_LIMIT=1500
CELERY_WORKER_PREFETCH_MULTIPLIER=4
CELERY_WORKER_MAX_TASKS_PER_CHILD=1000

Message Brokers and Result Backends

Redis Setup

Redis is the recommended broker for Celery. To use a Redis instance with Klutch.sh:

  1. Deploy a Redis instance on Klutch.sh (from the open-source software marketplace)
  2. Get the connection details from the Redis app dashboard
  3. Set CELERY_BROKER_URL=redis://redis-hostname:6379/0 in your Celery app environment variables

Example Redis connection string:

redis://my-redis-app.klutch.sh:6379/0

RabbitMQ Setup

For RabbitMQ as a broker:

  1. Deploy RabbitMQ on Klutch.sh or use a managed service
  2. Set CELERY_BROKER_URL=amqp://user:password@rabbitmq-host:5672//

Result Backend Options

Store task results in:

  • Redis: Fast in-memory storage (recommended)

    CELERY_RESULT_BACKEND=redis://redis-host:6379/1
  • Database: Persistent PostgreSQL storage

    CELERY_RESULT_BACKEND=db+postgresql://user:password@db-host:5432/celery_results
  • RabbitMQ: Message queue-based backend

    CELERY_RESULT_BACKEND=rpc://

Worker Management and Scaling

Running Multiple Workers

For the Dockerfile worker approach, create multiple app instances on Klutch.sh:

  1. Deploy a second app instance using the Dockerfile.worker
  2. Set concurrency: CELERY_WORKER_CONCURRENCY=4
  3. Scale to multiple instances for higher throughput

Graceful Shutdown Configuration

Celery supports graceful shutdown. Update your celery_config.py:

# Graceful shutdown timeout (seconds)
CELERY_WORKER_SHUTDOWN_TIMEOUT = 600 # 10 minutes
# Task prefetch behavior
CELERY_WORKER_PREFETCH_MULTIPLIER = 4
# Don't reject tasks if worker is lost
CELERY_TASK_REJECT_ON_WORKER_LOST = True
# Acknowledge tasks after execution
CELERY_TASK_ACKS_LATE = True

Monitoring Worker Health

Add a worker monitoring task in tasks.py:

@app.task
def monitor_worker():
"""Monitoring task to check worker health."""
import socket
from datetime import datetime
return {
'hostname': socket.gethostname(),
'timestamp': datetime.now().isoformat(),
'status': 'healthy'
}

Persistent Storage for Task Data

Celery task data and logs can be stored persistently. Configure persistent storage on Klutch.sh:

Adding Persistent Volume

  1. In the Klutch.sh app dashboard, navigate to “Persistent Storage” or “Volumes”
  2. Click “Add Volume”
  3. Set the mount path: /app/task_logs
  4. Set the size: 10 GB (adjust based on your needs)
  5. Save and redeploy

Updating Application Code

Update your app.py to log tasks to persistent storage:

import logging
import os
# Configure logging to persistent storage
log_dir = os.getenv('LOG_DIR', '/app/task_logs')
os.makedirs(log_dir, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(log_dir, 'celery.log')),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)

Custom Domains

To serve your Celery API from a custom domain:

  1. In the Klutch.sh app dashboard, navigate to “Custom Domains”
  2. Click “Add Custom Domain”
  3. Enter your domain (e.g., api.example.com)
  4. Follow the DNS configuration instructions provided
  5. Point your domain’s CNAME to the Klutch.sh endpoint

Example DNS configuration:

api.example.com CNAME example-app.klutch.sh

Monitoring and Logging

Real-Time Monitoring

Use Celery’s built-in monitoring tools. Add Flower (Celery monitoring tool) to your requirements.txt:

flower==2.0.1

Create a flower_config.py:

import os
# Flower settings
FLOWER_PORT = int(os.getenv('FLOWER_PORT', 5555))
FLOWER_BASIC_AUTH = os.getenv('FLOWER_BASIC_AUTH', 'admin:admin')
FLOWER_BROKER = os.getenv('CELERY_BROKER_URL')

Run Flower locally:

Terminal window
celery -A celery_app flower --port=5555

Task Logging and Monitoring

Monitor task execution in your Klutch.sh dashboard:

  1. Check application logs for worker output
  2. Monitor memory and CPU usage of worker instances
  3. Set up alerts for failed tasks

Add task monitoring to tasks.py:

import logging
logger = logging.getLogger(__name__)
@app.task(bind=True)
def monitored_task(self):
"""Task with built-in monitoring."""
logger.info(f'Task {self.request.id} started')
try:
result = perform_work()
logger.info(f'Task {self.request.id} completed successfully')
return result
except Exception as e:
logger.error(f'Task {self.request.id} failed: {str(e)}')
raise

Prometheus Metrics

Export Celery metrics to Prometheus:

from prometheus_client import Counter, Histogram, generate_latest
import time
# Prometheus metrics
tasks_total = Counter('celery_tasks_total', 'Total tasks', ['task_name', 'status'])
task_duration = Histogram('celery_task_duration_seconds', 'Task duration', ['task_name'])
@app.task
def tracked_task():
"""Task with Prometheus tracking."""
start = time.time()
task_name = 'tracked_task'
try:
result = perform_work()
tasks_total.labels(task_name=task_name, status='success').inc()
return result
except Exception as e:
tasks_total.labels(task_name=task_name, status='failure').inc()
raise
finally:
duration = time.time() - start
task_duration.labels(task_name=task_name).observe(duration)

Security Best Practices

  1. Use HTTPS: Ensure all connections to your Celery API endpoint use HTTPS
  2. Broker Authentication: Use authenticated Redis/RabbitMQ connections with strong passwords
  3. Environment Variables: Never hardcode credentials; use Klutch.sh environment variables
  4. Task Serialization: Use JSON serialization (not pickle) to prevent code injection
  5. Task Time Limits: Set reasonable time limits to prevent runaway tasks
  6. Worker Isolation: Run workers as non-root users (shown in Dockerfile)
  7. Network Segmentation: Deploy Celery workers and brokers in private networks when possible
  8. Task Secrets: Use secure methods to pass sensitive data to tasks
  9. Rate Limiting: Implement task rate limiting to prevent abuse
  10. Monitoring Alerts: Set up alerts for task failures and worker health

Example secure task with rate limiting:

from celery.exceptions import Reject
from datetime import datetime, timedelta
# Rate limiting dictionary (use Redis in production)
rate_limit_store = {}
@app.task(bind=True, rate_limit='100/m')
def rate_limited_task(self, user_id):
"""Task with rate limiting."""
key = f'user_{user_id}'
now = datetime.now()
if key in rate_limit_store:
last_called = rate_limit_store[key]
if (now - last_called) < timedelta(seconds=1):
# Retry after 1 second
raise self.retry(countdown=1)
rate_limit_store[key] = now
return f"Task executed for user {user_id}"

Troubleshooting

Issue 1: Connection Refused to Broker

Problem: Tasks fail with “Connection refused” when connecting to Redis/RabbitMQ.

Solution:

  • Verify CELERY_BROKER_URL is correct in environment variables
  • Ensure Redis/RabbitMQ instance is running and accessible from Klutch.sh
  • Check firewall rules allow connection on the broker port
  • Test connection: redis-cli -h your-redis-host ping

Issue 2: Tasks Stuck in PENDING State

Problem: Tasks are submitted but never execute (stuck in PENDING).

Solution:

  • Verify at least one Celery worker is running: celery -A celery_app inspect active_queues
  • Check worker logs in Klutch.sh dashboard for errors
  • Ensure worker is connected to the same broker as the app
  • Verify the task function is properly decorated with @app.task

Issue 3: High Memory Usage in Workers

Problem: Celery workers consume excessive memory over time.

Solution:

  • Reduce CELERY_WORKER_MAX_TASKS_PER_CHILD to recycle workers more frequently
  • Decrease CELERY_WORKER_PREFETCH_MULTIPLIER to process fewer tasks in advance
  • Monitor individual task memory usage and optimize
  • Use memory profiling tools: pip install memory-profiler

Issue 4: Tasks Not Being Retried

Problem: Failed tasks are not retried as configured.

Solution:

  • Ensure max_retries is set in the task decorator: @app.task(max_retries=3)
  • Verify the exception is not being caught and suppressed
  • Check CELERY_TASK_ACKS_LATE=True is set so failed tasks are requeued
  • Use self.retry(countdown=60) instead of raise self.retry()

Issue 5: Custom Commands Fail in Nixpacks

Problem: Custom BUILD_COMMAND or START_COMMAND not executing.

Solution:

  • Verify command syntax is correct and executable
  • Use full paths to binaries: /usr/local/bin/python instead of python
  • Check logs in Klutch.sh deployment history
  • Example: START_COMMAND="python -m gunicorn app:app --bind 0.0.0.0:$PORT"

Best Practices for Production Deployment

  1. Use Connection Pooling: Redis connection pooling prevents connection exhaustion

    from redis import ConnectionPool
    pool = ConnectionPool.from_url(CELERY_BROKER_URL)
  2. Implement Idempotent Tasks: Tasks should produce the same result when retried

    @app.task
    def idempotent_task(user_id, unique_id):
    """Check if task was already executed."""
    if Task.objects.filter(unique_id=unique_id).exists():
    return "Already executed"
    # Execute task...
  3. Set Task Timeouts: Prevent hung tasks from consuming resources

    @app.task(time_limit=600, soft_time_limit=540)
    def time_limited_task():
    pass
  4. Use Task Routing: Route specific tasks to specific workers

    @app.task(queue='high_priority', routing_key='high')
    def urgent_task():
    pass
  5. Monitor Queue Depths: Alert when queues grow unexpectedly

    @app.task
    def check_queue_depth():
    inspector = app.control.inspect()
    reserved = inspector.reserved()
    return reserved
  6. Implement Backpressure: Stop accepting tasks when queues are full

    @app.task
    def submit_with_backpressure(task_data):
    inspector = app.control.inspect()
    active = inspector.active()
    if sum(len(tasks) for tasks in active.values()) > 10000:
    raise Exception("Task queue is full, please retry later")
  7. Use Dead Letter Queues: Capture and analyze failed tasks

    CELERY_DEFAULT_QUEUE = 'default'
    CELERY_TASK_REJECT_ON_WORKER_LOST = True
  8. Schedule Periodic Tasks with Beat: Automate recurring work

    from celery.schedules import crontab
    from celery import Celery
    app.conf.beat_schedule = {
    'cleanup-every-hour': {
    'task': 'tasks.cleanup_old_files',
    'schedule': crontab(minute=0),
    },
    }
  9. Graceful Scaling: Plan for scaling workers and brokers

    • Start with 1-2 worker instances
    • Monitor task latency and queue depth
    • Scale workers horizontally as load increases
    • Use separate broker instances for very large deployments
  10. Backup Result Data: Persist important task results

    CELERY_RESULT_BACKEND = 'db+postgresql://user:pass@host/db'
    CELERY_RESULT_EXPIRES = 3600 # 1 hour

Resources


Conclusion

Deploying Celery applications to Klutch.sh provides a scalable, reliable foundation for distributed task processing. Whether you choose Nixpacks for simplicity or Docker for control, Klutch.sh handles the infrastructure complexity, allowing you to focus on your application logic.

Key takeaways:

  • Use Nixpacks for quick deployments with minimal configuration
  • Use Docker for custom build environments and advanced configurations
  • Configure environment variables for broker and result backend connectivity
  • Monitor worker health and task execution with logging and metrics
  • Implement graceful shutdown and task retry logic for reliability
  • Scale workers horizontally as your task volume increases

For questions or support, refer to the Celery documentation or Klutch.sh support resources.