Django + Dramatiq + APScheduler
Working on one of my django projects I had to do long-running computations in the background. For that I decided to use dramatiq
- a very nice background task processing library.
After that I had a new task - we needed to run periodical processing tasks (import some data, calculate some statistics, etc...). So I needed some kind of scheduler to start those tasks on time.
APScheduler is the recommended scheduler to use with Dramatiq (dramatiq documentation)
Here are some approaches I've used and my discoveries.
Preparing steps
- I installed the
dramatiq
,django-dramatiq
, andAPScheduler
packages from pypi. - I created new django app via
python manage.py startapp task_scheduler
- I added my app into
INSTALLED_APPS
. (NOTE - I use this form instead of just writingtask_scheduler
to be able to use `AppConfig.ready()` function. You can read about in django documentation.
INSTALLED_APPS = [
...
'django_dramatiq',
'task_scheduler.apps.TaskSchedulerConfig',
]
- I configured django-dramatiq and started to work on periodical tasks.
Task example
For this article I will be using very simple task
If you need my final solution - just click here
Simple and naive approach (not a good idea)
At first I've decided to use BackgroundScheduler
class from apscheduler. This scheduler runs in the background using a separate thread. So it won't block the whole application. I updated periodic_tasks.py
as follows:
And then:
But if you will start your django dev server - you will see two Starting background scheduler
lines. And background tasks will be executed twice. It's because manage.py runserver
runs django twice in two separate processes (one for serving requests and another to auto-reload), and each process executed our ready()
function.
In production I use Gunicorn
so it will fork the main process into additional worker processes and I will have 5 copies (depending on settings) of my BackgroundScheduler, and every task will be enqueued 5 times. Not good at all but this is not all.
How can I change Gunicorn settings to run only one BackgroundScheduler? (not a good idea too)
Well, you can start gunicorn
with --preload
option . This option means Load application code before the worker processes are forked.
So our code will be executed in main process and only after that it will be forked. Why is this will help? Because look at start_scheduler()
def start_scheduler():
# I create scheduler
scheduler = BackgroundScheduler(timezone=UTC)
...
# I run scheduler - It will create a new thread!
scheduler.start()
After running this code the master gunicorn process will load the whole django project in memory, so it will execute `start_scheduler` once and a new thread is spun up in the background, which is responsible for scheduling jobs. After that gunicorn will call system's fork method. BUT forked processes do not inherit the threads of their parent so each worker doesn't run the BackgroundScheduler thread.
Are we good now? Well, kind-of. I have fixed running with gunicorn
but completely forgot that I will need to run dramatiq-workers processes to actually run background tasks. And each of these processes will load whole project and run start_scheduler
and I will have a bunch of schedulers again.
Custom dramatiq middleware
########################
# In periodic_tasks.py #
########################
# move scheduler to be a global object
_SCHEDULER = BackgroundScheduler(timezone=UTC)
def start_scheduler():
every_minute = CronTrigger(minute=1, timezone=UTC)
_SCHEDULER.add_job(periodically_run_job, every_minute)
_SCHEDULER.start()
#################################
# new file custom_middleware.py #
#################################
class AntiScheduleMiddleware(dramatiq.Middleware):
def before_worker_boot(self, broker, worker):
from task_scheduler.periodic_tasks import _SCHEDULER
_SCHEDULER.shutdown()
######################
# in django settings #
######################
DRAMATIQ_BROKER = {
"BROKER": "dramatiq.brokers.redis.RedisBroker",
"OPTIONS": {
"url": "redis://localhost:6379",
},
"MIDDLEWARE": [
"dramatiq.middleware.Prometheus",
"dramatiq.middleware.AgeLimit",
"dramatiq.middleware.TimeLimit",
"dramatiq.middleware.Callbacks",
"dramatiq.middleware.Retries",
"django_dramatiq.middleware.DbConnectionsMiddleware",
"django_dramatiq.middleware.AdminMiddleware",
"task_scheduler.custom_middleware.AntiScheduleMiddleware"
]
}
It will work but it looks fragile and too complicated. We need not to forget about custom gunicorn
settings and about custom dramatiq
middleware. I don't like this kind of code at all.
My final solution
The easiest solution to understand and maintain, in my opinion, is to start a single scheduler in its own dedicated process. For this task we will use the blocking scheduler so only it will be running inside the process.
- remove
def ready(self):
fromTaskSchedulerConfig
- remove
start_scheduler()
fromperiodic_tasks.py
- create
run_scheduler
command
To run scheduler we can use command python manage.py run_scheduler
How and where to do it depends on deploy strategy.