Django + Dramatiq + APScheduler

Working on one of my django projects I had to do long-running computations in the background. For that I decided to use dramatiq - a very nice background task processing library.

After that I had a new task - we needed to run periodical processing tasks (import some data, calculate some statistics, etc...). So I needed some kind of scheduler to start those tasks on time.

APScheduler is the recommended scheduler to use with Dramatiq  (dramatiq documentation)

Here are some approaches I've used and my discoveries.

Preparing steps

  • I installed the dramatiq, django-dramatiq, and APScheduler packages from pypi.
  • I created new django app via python manage.py startapp task_scheduler
  • I added my app into INSTALLED_APPS. (NOTE - I use this form instead of just writing task_scheduler to be able to use `AppConfig.ready()` function. You can read about in django documentation.
INSTALLED_APPS = [
	...
    'django_dramatiq',
    'task_scheduler.apps.TaskSchedulerConfig',
]

Task example

For this article I will be using very simple task

import logging
import time

import dramatiq


@dramatiq.actor()
def process_user_stats():
    """Very simple task for demonstrating purpose."""
    logging.warning('Start my long-running task')
    time.sleep(5)
    logging.warning('Task is ended')
tasks.py (djang-dramatiq will auto-discover functions in this file)
import logging
import os

from .tasks import process_user_stats


def periodically_run_job():
	"""This task will be run by APScheduler. It can prepare some data and parameters and then enqueue background task."""
    logging.warning('It is time to start the dramatiq task')
    process_user_stats.send()
periodic_tasks.py
task_scheduler
├── __init__.py
├── admin.py
├── apps.py
├── migrations
│   ├── __init__.py
├── models.py
├── periodic_tasks.py
├── tasks.py
└── views.py
Directory structure of task_scheduler django app.

If you need my final solution - just click here

Simple and naive approach (not a good idea)

At first I've decided to use BackgroundScheduler class from apscheduler. This scheduler runs in the background using a separate thread. So it won't block the whole application. I updated periodic_tasks.py as follows:

import logging
import os

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from pytz import UTC

from .tasks import process_user_stats


def periodically_run_job():
    logging.warning('Starting dramatiq task')
    process_user_stats.send()


def start_scheduler():
    logging.warning(f'Starting background scheduler.')
    scheduler = BackgroundScheduler(timezone=UTC)
    every_minute = CronTrigger(minute=1, timezone=UTC)
    scheduler.add_job(periodically_run_job, every_minute)
    scheduler.start()
    
periodic_tasks.py

And then:

from django.apps import AppConfig


class TaskSchedulerConfig(AppConfig):
    name = 'task_scheduler'

    def ready(self):
        from .periodic_tasks import start_scheduler
        start_scheduler()
    
apps.py

But if you will start your django dev server - you will see two Starting background scheduler lines. And background tasks will be executed twice. It's because manage.py runserver  runs django twice in two separate processes (one for serving requests and another to auto-reload), and each process executed our ready() function.

In production I use Gunicorn so it will fork the main process into additional worker processes and I will have 5 copies (depending on settings) of my BackgroundScheduler, and every task will be enqueued 5 times. Not good at all but this is not all.

How can I change Gunicorn settings to run only one BackgroundScheduler? (not a good idea too)

Well, you can start gunicorn with --preload option . This option means Load application code before the worker processes are forked. So our code will be executed in main process and only after that it will be forked. Why is this will help? Because look at start_scheduler()

def start_scheduler():
    # I create scheduler
    scheduler = BackgroundScheduler(timezone=UTC)
    ...
    # I run scheduler - It will create a new thread!
    scheduler.start()

After running this code the master gunicorn process will load the whole django project in memory, so it will execute `start_scheduler` once and a new thread is spun up in the background, which is responsible for scheduling jobs. After that gunicorn will call system's fork method. BUT forked processes do not inherit the threads of their parent so each worker doesn't run the BackgroundScheduler thread.

Are we good now? Well, kind-of. I have fixed running with gunicorn but completely forgot that I will need to run dramatiq-workers processes to actually run background tasks. And each of these processes will load whole project and run start_scheduler and I will have a bunch of schedulers again.

Custom dramatiq middleware

########################
# In periodic_tasks.py #
########################
# move scheduler to be a global object
_SCHEDULER = BackgroundScheduler(timezone=UTC)


def start_scheduler():
    every_minute = CronTrigger(minute=1, timezone=UTC)
    _SCHEDULER.add_job(periodically_run_job, every_minute)
    _SCHEDULER.start()

#################################
# new file custom_middleware.py #
#################################
class AntiScheduleMiddleware(dramatiq.Middleware):
    def before_worker_boot(self, broker, worker):
        from task_scheduler.periodic_tasks import _SCHEDULER
        _SCHEDULER.shutdown()

######################
# in django settings #
######################
DRAMATIQ_BROKER = {
    "BROKER": "dramatiq.brokers.redis.RedisBroker",
    "OPTIONS": {
        "url": "redis://localhost:6379",
    },
    "MIDDLEWARE": [
        "dramatiq.middleware.Prometheus",
        "dramatiq.middleware.AgeLimit",
        "dramatiq.middleware.TimeLimit",
        "dramatiq.middleware.Callbacks",
        "dramatiq.middleware.Retries",
        "django_dramatiq.middleware.DbConnectionsMiddleware",
        "django_dramatiq.middleware.AdminMiddleware",
        "task_scheduler.custom_middleware.AntiScheduleMiddleware"
    ]
}

It will work but it looks fragile and too complicated. We need not to forget about custom gunicorn settings and about custom dramatiq middleware. I don't like this kind of code at all.

My final solution

The easiest solution to understand and  maintain, in my opinion, is  to start a single scheduler in its own dedicated process. For this task we will use the blocking scheduler so only it will be running inside the process.

  • remove def ready(self): from TaskSchedulerConfig
  • remove start_scheduler()from periodic_tasks.py
  • create run_scheduler command
from django.core.management.base import BaseCommand, CommandError
from apscheduler.schedulers.background import BlockingScheduler
import pytz

from apscheduler.triggers.cron import CronTrigger

from task_scheduler.periodic_tasks import periodically_run_job


class Command(BaseCommand):
    help = 'Run blocking scheduler to create periodical tasks'

    def handle(self, *args, **options):
        self.stdout.write(self.style.NOTICE('Preparing scheduler'))
        scheduler = BlockingScheduler(timezone=pytz.UTC)
        every_day_at_05_05_utc = CronTrigger(hour=5, minute=5, timezone=pytz.UTC)
        scheduler.add_job(periodically_run_job, every_day_at_05_05_utc)
        # ... add another jobs
        self.stdout.write(self.style.NOTICE('Start scheduler'))
        scheduler.start()
File task_scheduler/management/commands/run_scheduler.py

To run scheduler we can use command python manage.py run_scheduler How and where to do it depends on deploy strategy.