Использование django-celery-beat для создания периодических задач в Django-проектах
Когда в проекте нужно реализовать функциональность, которая будет выполняться по расписанию, на помощь приходит замечательный инструмент Celery.
25К открытий28К показов
Вадим Гартман
младший разработчик digital-интегратора DD Planet
Когда в проекте стоит задача реализовать функциональность, которая будет выполняться через определенное время или по расписанию, на помощь приходит такой замечательный инструмент, как Celery.
Celery — это распределенная асинхронная очередь задач, выполняющаяся в реальном времени, а также поддерживающая планирование задач. Ознакомиться с теоретическими аспектами можно по ссылке. В данной статье мы рассмотрим периодические задачи. Несмотря на то, что периодические задачи можно создавать через административную страницу в Django admin, мы рассмотрим их создание в коде, а также остановку созданных задач по условию.
Если вы хотите повторить пример из статьи, то у вас уже должен быть настроен Django-проект. Однако если вам нужна помощь по настройке проекта, почитайте эту статью.
В основном Celery в Django-приложениях используется, когда необходимо:
- асинхронно выполнить какую-либо задачу с помощью Celery worker;
- выполнить задачу в определенное время;
- постоянно выполнять задачу через определенный промежуток времени.
Самостоятельно Celery не умеет реализовывать периодические задачи, поэтому существует расширение django-celery-beat, которое выполняет роль планировщика задач для Celery. Как развернуть Сelery в приложении, описано в документации выше. В качестве посредника сообщений между нашим приложением и Сelery я использую Redis, развернутый в docker-контейнере.
Давайте установим django-celery-beat с помощью pip install django-celery-beat
.
После установки необходимо внести django-celery-beat в список установленных приложений в настройках приложения Django.
Это необходимо, так как данное расширение предоставляет новые модели, в которых будут храниться расписание и задачи, и приложение должно знать об этом, чтобы выполнить миграции и создать необходимые таблицы.
Запустим миграции. И в качестве последней настройки для полноценного использования django-celery-beat добавим в settings
приложения: CELERY_BEAT_SCHEDULER='django_celery_beat.schedulers:DatabaseScheduler'
, тем самым указав Celery использовать новый планировщик задач, только что установленный нами.
Чтобы убедиться в полной установке расширения, необходимо зайти на административный сайт приложения. Если расширение установлено верно, то вы увидите следующий список моделей, позволяющий управлять периодическими задачами:
Динамическое создание периодических задач
Основная мысль, которую я бы хотел донести в статье, заключается в том, что периодические задачи являются обычными моделями, с которыми можно работать через Django ORM. Сейчас покажу на примере:
Допустим, вы создаете сервис, который делает заказы на стороннем сервисе через API. В ответе от этого сервиса вы получаете статус вашего заказа. Если мы получили статус от сервиса, то все отлично, дальше мы выполняем всю необходимую логику с этим заказом. Если же нет (например, пришел ответ '0'
) — тогда необходимо переотправлять запрос, пока статус не будет получен.
Мы не будем усложнять себе жизнь работой с API каких-либо сервисов. Сымитируем ответ от воображаемого сервиса с помощью custom command, которую самостоятельно напишем.
В папке приложения создадим папку «management», в ней папку «command» и в ней файл с кодом, например, «make_order.py».
И напишем код будущей консольной команды.
Данная команда ожидает 2 аргумента на вызове: первый — это статус, второй — id заказа. Если мы получаем статус '0'
от нашего воображаемого сервиса, тогда нам необходимо повторно отправлять запросы на получение статуса. Тут-то нам и пригодятся периодические задачи. В данном примере мы создаем PeriodicTask
из Django_celery_beat.models
. В качестве аргументов мы передаем ей следующие параметры:
- Имя создаваемой задачи:
name='Repeat order .{}'.format(options['order_id'])
, в дальнейшем с ее помощью мы будем останавливать задачи, если нам это необходимо. - Задача, которая будет периодически выполняться через определенный промежуток времени. В нашем случае это:
task='repeat_order_make'
. - Интервал, через который мы хотим, чтобы задача выполнялась:
interval=IntervalSchedule.objects.get(every=10, period='seconds')
.IntervalSchedule
также является моделью изdjango_celery_beat.models
, поэтому мы можем пользоваться функциональностью, предоставляемой Django ORM - Аргументы, которые будут передаваться в указанную функцию. В нашем случае задаче
'repeat_order_make'
будет передан аргументargs=json.dumps([options['order_id'][0]])
, то естьoptions['order_id'][0]
. - И время начала работы периодической задачи:
start_time=timezone.now()
.
Давайте теперь посмотрим на код задачи 'repeat_order_make'
. Создадим файл «task.py» в корне нашего приложения (как на скриншоте) и напишем следующий код.
В коде учитывается, что если заказ уже получил статус, то нет необходимости больше выполнять написанную нами периодическую задачу. Для того, чтобы остановить последующее выполнение задачи, мы получаем экземпляр задачи с помощью метода get()
, передавая в качестве аргумента имя, которое мы дали во время создания задачи. Затем мы переводим поле enabled
в состояние False
, что остановит выполнение задачи в дальнейшем, и обязательно сохраняем экземпляр задачи.
Давайте теперь проверим результаты наших трудов. Запустим написанную нами команду с помощью manage.py make_order 0 1
. То есть мы делаем заказ №1 и как будто получаем ответ со статусом '0'
. Это должно создать новую периодическую задачу. Давайте глянем в celery_beat командой celery -A <имя вашего приложения> beat -l INFO
.
И увидим, как началось периодическое выполнение задачи repeat_order_make
, которую мы описали выше. А также заглянем в Celery командой celery -A <имя вашего приложения> worker -l INFO
(я запускал каждую команду в отдельном окне терминала).
Видно, что Celery получает, и посылаем задачи для исполнения.
Теперь изменим статус нашего заказа, снова запустив команду manage.py make_order
, но уже с параметрами «Принят, 1». То есть manage.py make_order Принят 1
. После чего можно посмотреть в терминал с celery-beat и увидеть сообщение о том, что было изменено расписание выполнения задач, после чего постоянный вызов repeat_order_make
прекратится.
Таким образом, мы научились создавать периодические задачи прямо в коде программы, без использования административного сайта проекта, и также научились их останавливать. Я постарался привести пример из личного опыта, однако максимально упростил его, чтобы выжать максимум информации по конкретному случаю использования периодических задач. Если вы хотите больше узнать о том, как пользоваться периодическими задачами или посмотреть другие кейсы использования этого инструмента, я рекомендую ознакомиться со следующими материалами: раз и два.
25К открытий28К показов