Обложка: Использование django-celery-beat для создания периодических задач в Django-проектах

Использование django-celery-beat для создания периодических задач в Django-проектах

Вадим Гартман

Вадим Гартман

младший разработчик digital-интегратора DD Planet

Когда в проекте стоит задача реализовать функциональность, которая будет выполняться через определенное время или по расписанию, на помощь приходит такой замечательный инструмент, как Celery.

Celery — это распределенная асинхронная очередь задач, выполняющаяся в реальном времени, а также поддерживающая планирование задач. Ознакомиться с теоретическими аспектами можно по ссылке. В данной статье мы рассмотрим периодические задачи. Несмотря на то, что периодические задачи можно создавать через административную страницу в Django admin, мы рассмотрим их создание в коде, а также остановку созданных задач по условию.

Если вы хотите повторить пример из статьи, то у вас уже должен быть настроен Django-проект. Однако если вам нужна помощь по настройке проекта, почитайте эту статью.

В основном Celery в Django-приложениях используется, когда необходимо:

  • асинхронно выполнить какую-либо задачу с помощью Celery worker;
  • выполнить задачу в определенное время;
  • постоянно выполнять задачу через определенный промежуток времени.

Самостоятельно Celery не умеет реализовывать периодические задачи, поэтому существует расширение django-celery-beat, которое выполняет роль планировщика задач для Celery. Как развернуть Сelery в приложении, описано в документации выше.  В качестве посредника сообщений между нашим приложением и Сelery я использую Redis, развернутый в docker-контейнере.

Давайте установим django-celery-beat с помощью pip install django-celery-beat.

После установки необходимо внести django-celery-beat в список установленных приложений в настройках приложения Django.

INSTALLED_APPS = (
...,
'django_celery_beat',
)

Это необходимо, так как данное расширение предоставляет новые модели, в которых будут храниться расписание и задачи, и приложение должно знать об этом, чтобы выполнить миграции и создать необходимые таблицы.

Запустим миграции. И в качестве последней настройки для полноценного использования django-celery-beat добавим в settings приложения: CELERY_BEAT_SCHEDULER='django_celery_beat.schedulers:DatabaseScheduler', тем самым указав Celery использовать новый планировщик задач, только что установленный нами.

Чтобы убедиться в полной установке расширения, необходимо зайти на административный сайт приложения. Если расширение установлено верно, то вы увидите следующий список моделей, позволяющий управлять периодическими задачами:

Динамическое создание периодических задач

Основная мысль, которую я бы хотел донести в статье, заключается в том, что периодические задачи являются обычными моделями, с которыми можно работать через Django ORM. Сейчас покажу на примере:

Допустим, вы создаете сервис, который делает заказы на стороннем сервисе через API. В ответе от этого сервиса вы получаете статус вашего заказа. Если мы получили статус от сервиса, то все отлично, дальше мы выполняем всю необходимую логику с этим заказом. Если же нет (например, пришел ответ '0') — тогда необходимо переотправлять запрос, пока статус не будет получен.

Мы не будем усложнять себе жизнь работой с API каких-либо сервисов. Сымитируем ответ от воображаемого сервиса с помощью custom command, которую самостоятельно напишем.

В папке приложения создадим папку «management», в ней папку «command» и в ней файл с кодом, например, «make_order.py».

И напишем код будущей консольной команды.

import json

from django.utils import timezone
from django.core.management.base import BaseCommand, CommandError
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from setups.models import Order

class Command(BaseCommand):
	def add_arguments(self, parser):
		parser.add_argument('status', type=str)
		parser.add_argument('order_id', nargs=1, type=int)

	def handle(self, *args, **options):
		status = options['status']
		order = Order.objects.get(pk=options['order_id'][0])
		if status == '0':
			PeriodicTask.objects.create(
					name='Repeat order {}'.format(options['order_id']),
					task='repeat_order_make',
					interval=IntervalSchedule.objects.get(every=10, period='seconds'),
					args=json.dumps([options['order_id'][0]]),
					start_time=timezone.now(),
				)
		else:
			order.update(status=status)
			order.refresh_from_db()
			# Необходимая логика после удачного получения статуса
			print('Статус вашего заказа -> {}'.format(order.status))

Данная команда ожидает 2 аргумента на вызове: первый — это статус, второй — id заказа. Если мы получаем статус '0' от нашего воображаемого сервиса, тогда нам необходимо повторно отправлять запросы на получение статуса. Тут-то нам и пригодятся периодические задачи. В данном примере мы создаем PeriodicTask из Django_celery_beat.models. В качестве аргументов мы передаем ей следующие параметры:

  • Имя создаваемой задачи: name='Repeat order .{}'.format(options['order_id']), в дальнейшем с ее помощью мы будем останавливать задачи, если нам это необходимо.
  • Задача, которая будет периодически выполняться через определенный промежуток времени. В нашем случае это: task='repeat_order_make'.
  • Интервал, через который мы хотим, чтобы задача выполнялась: interval=IntervalSchedule.objects.get(every=10, period='seconds'). IntervalSchedule также является моделью из django_celery_beat.models, поэтому мы можем пользоваться функциональностью, предоставляемой Django ORM
  • Аргументы, которые будут передаваться в указанную функцию. В нашем случае задаче 'repeat_order_make' будет передан аргумент args=json.dumps([options['order_id'][0]]), то есть options['order_id'][0].
  • И время начала работы периодической задачи: start_time=timezone.now().

Давайте теперь посмотрим на код задачи 'repeat_order_make'. Создадим файл «task.py» в корне нашего приложения (как на скриншоте) и напишем следующий код.

from celery import shared_task
from django_celery_beat.models import PeriodicTask

from .models import Order


@shared_task(name="repeat_order_make")
def repeat_order_make(order_id):
	order = Order.objects.get(pk=order_id)
	if order.status != '0':
		print('Статус получен!')
		task = PeriodicTask.objects.get(name='Repeat order {}'.format(order_id))
		task.enabled = False
		task.save()
	else:
		# Необходимая логика при повторной отправке заказа
		print('Я должна повторно оформлять заказ каждые 10 секунд')

В коде учитывается, что если заказ уже получил статус, то нет необходимости больше выполнять написанную нами периодическую задачу. Для того, чтобы остановить последующее выполнение задачи, мы получаем экземпляр задачи с помощью метода get(), передавая в качестве аргумента имя, которое мы дали во время создания задачи. Затем мы переводим поле enabled в состояние False, что остановит выполнение задачи в дальнейшем, и обязательно сохраняем экземпляр задачи.

Давайте теперь проверим результаты наших трудов. Запустим написанную нами команду с помощью manage.py make_order 0 1. То есть мы делаем заказ №1 и как будто получаем ответ со статусом '0'. Это должно создать новую периодическую задачу. Давайте глянем в celery_beat командой celery -A <имя вашего приложения> beat -l INFO.

И увидим, как началось периодическое выполнение задачи repeat_order_make, которую мы описали выше. А также заглянем в Celery командой celery -A <имя вашего приложения> worker -l INFO (я запускал каждую команду в отдельном окне терминала).

Видно, что Celery получает, и посылаем задачи для исполнения.

Теперь изменим статус нашего заказа, снова запустив команду manage.py make_order, но уже с параметрами «Принят, 1». То есть manage.py make_order Принят 1. После чего можно посмотреть в терминал с celery-beat и увидеть сообщение о том, что было изменено расписание выполнения задач, после чего постоянный вызов repeat_order_make прекратится.

Таким образом, мы научились создавать периодические задачи прямо в коде программы, без использования административного сайта проекта, и также научились их останавливать. Я постарался привести пример из личного опыта, однако максимально упростил его, чтобы выжать максимум информации по конкретному случаю использования периодических задач. Если вы хотите больше узнать о том, как пользоваться периодическими задачами или посмотреть другие кейсы использования этого инструмента, я рекомендую ознакомиться со следующими материалами: раз и два.