Настраиваем Celery в Django проекте

Описание базовой настройки celery в проектах на django с использованием redis без использования docker контейнеров.

Предполагается, что у нас уже есть созданный django проект к которому необходимо подключить celery.

Установка Redis

Рассмотренный способ предпологает, самую простую базовую установку, на ОС Семейства Linux

  1. Заходим на официальный сайт redis
  2. Качаем последнюю стабильную версию к себе на компьютер.
  3. Распаковываем архив в нужную папку. Например в: … program/redis/
  4. Открывам консоль, при помощи команды cd переходим в папку: program/redis/src
  5. Выполняем команду ./redis-server
  6. Поздравляю redis сервер запущен. Можно свернуть консоль.
  7. Далее в виртуально окружении нашего django проекта установим пакет redis
pip install redis

Установка и настройка Celery

pip install celery

В папке с django проектом, в том месте где лежит файл settings.py, создадим новый файл celery.py со следующим содержимым:

import os

# Из только что установленной библиотеки celery импортируем класс Celery
from celery import Celery

# Указываем где находится модуль django и файл с настройками django (имя_вашего_проекта.settings)
# в свою очередь в файле settings будут лежать все настройки celery. 
# Соответственно при указании данной директивы нам не нужно будет при вызове каждого task(функции) прописывать 
# эти настройки.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_supper_app.settings')

# Создаем объект(экземпляр класса) celery и даем ему имя
app = Celery('my_supper_app')

# Загружаем config с настройками для объекта celery.
# т.е. импортируем настройки из django файла settings
# namespace='CELERY' - в данном случае говорит о том, что применятся будут только 
# те настройки из файла settings.py которые начинаются с ключевого слова CELERY
app.config_from_object('django.conf:settings', namespace='CELERY')

# В нашем приложении мы будем создавать файлы tasks.py в которых будут находится 
# task-и т.е. какие-либо задания. При указании этой настройки 
# celery будет автоматом искать такие файлы и подцеплять к себе.
app.autodiscover_tasks()

Необходимые настройки в файле settings.py:

# REDIS settings
# Настройки Redis условные и у вас они могут отличатся в зависимости от конфигурации
REDIS_HOST = '127.0.0.1'
REDIS_PORT = '6379'

# CELERY settings
CELERY_BROKER_URL = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0'
CELERY_BROKER_TRANSPORT_OPTION = {'visibility_timeout': 3600}
CELERY_RESULT_BACKEND = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

Далее, для того что бы все заработало необходимо в файле __init__.py нашего проекта прописать:

# импортируем из созданного нами ранее файла celery.py наш объект(экземпляр класса) celery (app)
from .celery import app as celery_app

# Подключаем объект
__all__ = ('celery_app',)

Создание task — задач

Для начала следует пояснить, что такое task — по сути это функция или какая либо задача которую необходимо выполнить в фоновом режиме.

Итак, в нашем приложении создаем файл tasks.py в котором будет создавать функции работающие в фоне (таски)

# tasks.py
# Импортируем созданный нами ранее экземпляр класса celery(app)
from my_supper_app.celery import app

# Декоратор @app.task, говорит celery о том, что эта функция является (task-ом) т.е. должна выполнятся в фоне.
@app.task
def supper_sum(x, y):
    return x + y

Выполнение созданных задач

Для того, что бы запустить на выполнения task, в django в нужном нам месте необходимо прописать:

# views.py
# Условный пример при валидации формы, мы запускаем нашу задачу.
# ...
from .tasks import supper_sum
def form_valid(self, form):
   # ...
   # Запускаем нашу функцию при помощи метода delay - который, запустит task.
   # условно говоря celery обработает функцию в фоне.
    supper_sum.delay(5, 7)
    return super().form_valid(form)

Запуск Celery

Находясь в консоле, необходимо перейти в папку с нашим django приложением (в ту директорию где находится файл manage.py)

Далее выполняем команду:

# В данном случае параметр: my_supper_app - это имя которое мы задали
# при создании экземпляра класса celery в файле celery.py
celery -A my_supper_app worker -l INFO

Теперь если в нашем приложении при каких либо условиях запуститься функция supper_sum.delay(5, 7) в консоли мы увидим информацию от celery о результате выполнения задачи.

Периодические задачи в Celery

Для выполнения периодичных задач модифицируем созданный нами ранее файл celery.py, добавив в него такую конструкцию:

#...
# Импортируем crontab для запуска по расписанию
from celery.schedules import crontab

# https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html

app.conf.beat_schedule = {
        # Название задачи
    'my-super-sum-every-5-min' = {
                # Регистрируем задачу. Для этого в качестве значения ключа task
                # Указываем полный путь до созданного нами ранее таска(функции)
        'task': 'my_supper_app.tasks.supper_sum',

                 # Периодичность с которой мы будем запускать нашу задачу
                 # minute='*/5' - говорит о том, что задача должна выполнятся каждые 5 мин.
        'schedule': crontab(minute='*/5'),

                # Аргументы которые будет принимать функция
                'args': (5, 8),
    }
}

Запуск периодической задачи

Запуск периодической задачи в Celery выполняется командой:

celery -A my_supper_app beat -l INFO

Если в момент создания новой таски celery был запущен, его необходимо перезапустить.

Повторный запуск task (задач) при возникновении ошибки

Бывают ситуации когда, по независящим от нас причинам Celery не может выполнить нужную нам задачу, например в момент выполнения этой задачи отсутствовал интернет или упал какой-то сервер к которому мы обращались из задачи. В таких случаях целесообразно сделать так, что бы задача которая не смогла выполниться, повторилась заданный промежуток времени. По умолчанию в Celery интервал перезапуска задач равен 3 мин.

Ниже представлен пример который перезапустит задачу, если во время её выполнения произошла ошибка.

@app.task(bind=True, default_retry_delay=5* 60)
def my_task_retry(self, x, y):
    try:
        return x + y
    except Exception as exc:
        # retry(countdown=60) задает перезапуск задачи через 60 сек
        raise self.retry(exc=exc, countdown=60)

 Отложенный запуск задач

В ситуациях когда необходимо запустить задачу не сразу, а отложить её на некоторое время или выполнить её в конкретно заданный час и минуту применяется метод: apply_async() принцип его работы такой же как и у метода delay() за исключением того, что apply_async() принимает не много другие аргументы.

Итак, запуск задачи с задержкой в 60 секунд выглядит так:

# Передаем кортеж с параметрами функции и задержку.
supper_sum.apply_async((5, 7), countdown=60)

Связанные задачи

В некоторых ситуация нам может понадобиться вызвать задачу и результат выполнения этой задачи записать как аргумент этой же задачи. Пример:

supper_sum.apply_async((5, 7), link=supper_sum.s(8))
  1. В данном случае мы вызываем метод apply_async() в который передаем кортеж с аргументами функции (5, 7) в результате выполнения сложения получаем число 12.
  2. В аругменте link мы указали связанную задачу supper_sum с одним аргументом одним 8 — supper_sum.s(8) (хотя функция supper_sum ожидает 2 аргумента)
  3. Получится, что результат выполнения функции на первом шаге, автоматически подставиться как второй аргумент функции.
  4. В итоге выполнения мы получим число 20. (5+7 => 12 + 8 => 20)

Установка Flower

Flower — это инструмент для отслеживания состояния ваших задач и воркеров Celery. У инструмента есть веб-интерфейс. Установка Flower происходит при помощи команды

pip install flower

Запуск Flower

flower -A my_supper_app --port=5555