Перетяжка IT-коробка
Перетяжка IT-коробка
Перетяжка IT-коробка
Написать пост

Из цикла ETL: настройка первого DAG

Рассказываем о настройке первого DAG — направленного ациклического графа — для тех, кто решил автоматизировать отлаженный код на Python.

Из цикла ETL: настройка первого DAG

На обложке: Воздушный поток (air flow) вокруг самолета, источник: fineartamerica.com

В предыдущих статьях я разобрала, как наладить поток логов из конструктора Dialogflow в BigQuery, и теперь почва для построения отчётов построена. Сегодня покажу, как настроить один DAG – исполняемую по расписанию группу команд в Airflow. Я исхожу из предпосылки, что вы уже развернули программу на своем сервере / устройстве и имеете доступ к UI. Эта статья подходит тем, кто решил автоматизировать в своей работе то, что достаточно отлажено, чтобы поддаться автоматизации. В моем случае это скрипты, рассчитывающие ежемесячную эффективность одного из ботов по конечным состояниям диалогов.

Это статья из цикла «5 ETL для зоопарка ботов». В нём я пошагово разбираю, как наладить потоки данных из разных библиотек и конструкторов ботов на разных языках и стеках. В основе лежат Python и его библиотеки. Вот предыдущие статьи цикла, подводящие к текущей:
  1. Анонс цикла с перечнем технологий
  2. Настройка потока логов «Из Dialogflow в BigQuery»
  3. Python для аналитики ad hoc из BigQuery
  4. Развертывание Airflow

Что такое DAG

DAG (Directed Acyclic Graph – направленный ациклический граф) – просто коллекция различных задач. Вероятно, вы уже сталкивались с термином «граф», если проходили курсы по машинному обучению. В контексте Airflow же слово имеет примерно такое же значение – это поток задач, который может разветвляться:

Из цикла ETL: настройка первого DAG 1

Потому модуль DAG импортируется всегда, чтобы связать различные задачи воедино.

Вернемся к настройке

Для начала я импортирую необходимые библиотеки:

			from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.telegram.operators.telegram import TelegramOperator
from airflow.utils.trigger_rule import TriggerRule
		

BashOperator нужен, чтобы запускать силами bash-скрипты, лежащие по соседству. Классы datetime, timedelta модуля datetime нужны, чтобы описать поведение графа при ошибке. TriggerRule понадобится, чтобы уведомлять меня в Telegram только при ошибке.

Подключение бота Telegram

Почти все рабочие чаты у меня живут в Telegram, потому удобно получать уведомления об исполнении в специальный чат, где живут Airflow-бот и его братья по другим проектам. Поэтому я импортировала TelegramOperator.

Если вы не знаете, как создать бота в Telegram, то используйте раздел «Создание бота» из вот этого гайда. Кратко: передав команду /start боту @BotFather, мы следуем инструкции, называем бота, задаем ему идентификатор для поиска и в конце получаем токен:

Из цикла ETL: настройка первого DAG 2

Скопировав токен, создаём функцию, уведомляющую об исполнении задач через мессенджер:

			send_telegram_message = TelegramOperator(
    task_id="send_telegram_message",
    token="<Токен бота в Telegram>",
    chat_id="Идентификатор чата, куда добавлен бот",
    text="Расчет сводки выполнен."
)
		

Чтобы получить chat_id, используйте бота @RawDataBot. Токен бота оставляем в одноименном поле.

Граф

Теперь создаём DAG и описываем некоторые обязательные характеристики:

			with DAG(
    "daily_effectiveness", # Уникальное имя графа, отобразится в консоли
    default_args={
        "depends_on_past": False, # Зависимость задач от предыдущих
        "retries": 1, # Число перепопыток в случае неудаче
        "retry_delay": timedelta(seconds=30) # Интервал между попытками
    },
    description="Ежемесячная сводка маркетплейса", Описание, появится в консоли при наведении на название DAG’а
    schedule_interval='@monthly', # Ежемесячное исполнение

    start_date=datetime(2023, 7, 1), # Когда начать исполнение по расписанию
    catchup=False,
    tags=["Маркетплейс", "Dialogflow", "BigQuery"],
) as dag:
		
Ключевая особенность Airflow заключается в том, что запуск DAG по расписанию – нечто вроде копирования группы задач для каждого нужного времени. Планировщик (scheduler – его мы запускали в прошлой статье) проверяет, не работает ли до сих пор предыдущая копия графа. Эта концепция называется catchup (можно перевести как «подхват»).

Задачи

Теперь создаём задачи (tasks) – отдельные исполняемые команды в рамках графа. В моём случае для расчёта сводки эффективности необходимо предварительно войти в виртуальную среду airflow_env, и только потом исполнять подпрограмму dialogflow-to-bigquery.py силами bash:

			t1 = BashOperator(
     task_id="entering_virtual_environment", # Идентификатор таски для отслеживания в консоли
     bash_command="source /home/fitwist/airflow/airflow_env/bin/activate",
     retries=2 #
)

t2 = BashOperator(
      task_id="calculating_marketplace_effectiveness",
      depends_on_past=False,
 	bash_command="python3 /home/fitwist/airflow/df-to-looker/dialogflow-to-bigquery.py",
      retries=2
)
		

Задаем последовательность задач:

			t1 >> t2 >> t3 >> t4 >> TelegramOperator(
    task_id="send_telegram_message",
    token="6191376785:AAHf9thqeAuaIjou3DjzEFiI06bYY9FDKlI",
    chat_id="-1001200247335",
    trigger_rule=TriggerRule.ONE_FAILED,
    text="Группа чат-бота: одна из ежедневных выгрузок не выполнена. Проверь логи (Recent Tasks / Failed / Task Id / Log)."
)
		

И вуаля! Ваш первый DAG готов! Когда у меня впервые получилось запустить Airflow, чувство было, словно в космос полетела, ведь такая технология в резюме очень котируется у дата-аналитиков.

Полностью DAG будет выглядеть так:

			from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.telegram.operators.telegram import TelegramOperator



send_telegram_message = TelegramOperator(
    task_id="send_telegram_message",
    token="<Токен бота в Telegram>",
    chat_id="Идентификатор чата, куда добавлен бот",
    text="Расчет сводки выполнен."
)


with DAG(
    "daily_effectiveness", # Идентификатор, отобразится в консоли
    default_args={
        "depends_on_past": False, # Зависимость задач от предыдущих
        "retries": 1, # Число перепопыток в случае неудаче
        "retry_delay": timedelta(seconds=30) # Интервал между попытками
    },
    description="Ежемесячная сводка маркетплейса", Описание, появится в консоли при наведении на название DAG
    schedule_interval='@monthly', # Ежемесячное исполнение
    start_date=datetime(2023, 7, 1), # Когда начать исполнение по расписанию
    catchup=False,
    tags=["Маркетплейс", "Dialogflow", "BigQuery"],
) as dag:


t1 = BashOperator(
     task_id="entering_virtual_environment", # Идентификатор таски для отслеживания в консоли
     bash_command="source /home/fitwist/airflow/airflow_env/bin/activate",
     retries=2 #
)


t2 = BashOperator(
      task_id="calculating_marketplace_effectiveness",
      depends_on_past=False,
    bash_command="python3 /home/fitwist/airflow/df-to-looker/dialogflow-to-bigquery.py",
      retries=2
)


t1 >> t2 >> t3 >> t4 >> TelegramOperator(
    task_id="send_telegram_message",
    token="<Telegram Token>",
    chat_id="<ChatID>",
    trigger_rule=TriggerRule.ONE_FAILED,
    text="Группа чат-бота: одна из ежедневных выгрузок не выполнена. Проверь логи (Recent Tasks / Failed / Task Id / Log)."
)
		

Можно даже получать уведомления об ошибке любым удобным способом. Мне удобно в Telegram, потому использую триггер ONE_FAILED, то есть “когда хотя бы одна задача не выполнена”.

Этот и другие сниппеты из цикла статей можно найти в моем репозитории на GitHub.

Заключение

Airflow – это настоящий швейцарский нож. В нём, как в Python, написана утилита почти под любую потребность. И на покрытие тредами Stack Overflow грех жаловаться.

Из цикла ETL: настройка первого DAG 3

Когда я только делала первые шаги в направлении автоматизации, то существенную моральную помощь также оказывала прекрасная документация: там и примеры, и грамотно дозированные страницы. Даже база графов на любой вкус на GitHub. Так и хочется создателям за неё выдать награду.

Это уникальный в своем роде опенсорсный продукт, который позволяет даже новичкам предъявить полезнейший функционал – автоматизацию чего угодно. Он не требует знания теории вероятности и основ машинного обучения, однако при наличии Airflow в вашем резюме позволяет требовать 100К+.

В следующей статье мы посмотрим, как выглядит сводка эффективности, которую мы теперь можем визуализировать силами Google Looker (экс-Data Studio).

Следите за новыми постами
Следите за новыми постами по любимым темам
1К открытий4К показов