Настраиваем конфигурацию DAG в Apache Airflow так, чтобы меньше о ней думать
В статье рассказали, как мы настроили и оптимизировали разработку загрузок для Apache Airflow и что для этого потребовалось.
2К открытий13К показов
Павел Грошев
Исполнительный директор по разработке, Газпромбанк
Сбор и обработка данных для биг дата — достаточно трудоёмкая задача. Для этих целей используются загрузчики.
Но заранее спроектировать и настроить их «на века» невозможно. В процессе работы всегда появляются новые источники сбора данных или специфичные требования к ним, которые часто приходится обрабатывать вручную. Поэтому мы настроили и оптимизировали разработку загрузок.
Мы пользуемся Airflow как единой системой запуска задач
В нашем банке есть собственная платформа больших данных — DataFactory. Я уже больше года занимаюсь тем, что создаю загрузчики внешних данных для неё. До того, как я пришёл в Газпромбанк, этот процесс был неоптимизирован. Часть информации обрабатывалась Java-загрузчиками, часть — написанными на голом Python.
Сейчас мы используем Apache Airflow как оркестратор ETL-процессов, а сами процессы пишем на Python. Airflow оперирует DAG — направленным ациклическим графом, в котором описываются правила работы задач внутри одной загрузки данных. Такое решение, например, позволяет нам запускать задачи параллельно внутри кластера Kubernetes — в котором за выполнение каждой задачи отвечает отдельный k8s POD.
В основном, у загрузчиков схожие правила работы. Но для каждого требуется определенные настройки, которые в виде JSON описаны в наборе конфигурационных параметров и хранятся в Airflow Variables. Например, можно указать глубину сбора данных (в днях) и другие параметры, влияющие на выполнение DAG.
Иногда процесс нужно запускать с произвольной конфигурацией. Чаще всего на стадии финальной отладки и активного тестирования работы. Раньше в такой ситуации мы меняли значения в Airflow Variables, а затем возвращали их обратно, когда загрузчик завершал работу. Случалось, что мы забывали вернуть измененные параметры обратно, а потом искали причину некорректной работы DAG. В конце концов мы задумались, как можно легко менять значения важных переменных внутри существующего кода. И придумали!
Способы изменить конфигурацию
Встроенный редактор
В UI Apache Airflow есть свой редактор JSON. Можно залезть внутрь Airflow Variable и изменить значения там. Но такой подход не очень подойдёт, если вы хотите разово выполнить загрузку со специфическими параметрами. Airflow Variable содержит постоянные значения, которые, как правило, прописываются раз и навсегда. Когда вы что-то там меняете, то после завершения разового запуска DAG вам нужно не забыть вернуть измененные параметры обратно.
Запуск с параметрами
Apache Airflow может запустить DAG со специфическими параметрами.
Они прописываются вручную в виде нового JSON. В ходе работы загрузчика каждая задача получает доступ к этим значениям и корректирует своё поведение.
Такой способ позволяет оперативно вмешаться в работу процесса. Например, если внезапно окажется, что в DataFactory отсутствуют значения за неделю, мы можем разово повлиять на определенный параметр DAG и указать сбор данных глубиной в семь дней. Нюанс в том, что запуск с ручной конфигурацией нужно обработать в коде отдельно.
Хардкод
Этот способ самый радикальный, но рабочий. Можно отдельно прописать константы или переменные, которые необходимо использовать для специфического запуска DAG. Но это крайне неудобно: приходится вшивать много информации в код, а затем убирать.
Наш способ оптимизации запуска DAG
Итак, есть три способа передать DAG параметры: с помощью Airflow Variable, вручную через w/ config
и напрямую, указав в коде значения. Наша цель — просто и удобно извне влиять на важные для загрузчика значения. При запуске очередной задачи DAG:
- сначала посмотреть, указан ли параметр при ручном запуске;
- если нет — то попытаться найти его значение в Airflow Variable;
- иначе присвоить значение из заданных в коде значений по умолчанию.
При выполнении функции внутри работающей задачи DAG функция vars_from()
сначала проверяет, был ли DAG запущен с помощью w/ config
. После чего пытается найти, заданы ли ключи «count_on_page» и/или «date_from» внутри JSONа, переданного через w/ config
. Если да — передаёт соответствующие значения переменным count_on_page
и date_from
в коде задачи.
Если в словаре w/ config
искомых ключей нет, то производится попытка поиска значений ключей «count_on_page» и/или «date_from» внутри JSON из соответствующей DAG Airflow Variable.
На тот случай, если необходимых значений нигде нет, в аргументах функции vars_from()
мы указали значения по умолчанию. В нашем примере это 50 и 1d, взятые из констант POLLS_COUNT_ON_PAGE
и POLLS_DATE_FROM_DEFAULT
.
Как работает vars_from()
Функция имеет одну внешнюю зависимость: для её реализации необходима библиотека sorcery. Она предоставляет объекту информацию, откуда его вызывают. В терминах библиотеки такой объект называется «заклинанием» (spell
) и инициализируется следующим образом:
Кортеж *defaults
— аналог *args
, а декоратор spell
помещает в нулевой элемент кортежа *defaults
ссылку на frame-объект функции vars_from()
. Так мы можем задать значения локальных переменных, стоящих слева от вызова функции.
Обработка ключа
На всякий случай проверка вхождения ключей реализована тремя способами: по строгому соответствию с названием переменной, по её интерпретации camel case и через псевдоним.
Здесь псевдоним маршрутизирует значение из ключа sql_injection_report_mail
в переменную recipients
:
Если ключ есть и в ручной конфигурации, и в базе Airflow Variable, приоритет будет отдаваться переменной, которая записана стилем snake case:
Несмотря на то что в ручном запуске есть сущность «sql injection report mail», её значение будет взято из Airflow Variable.
Ещё пример, чуть более сложный:
Теперь настройка DAG для новых задач — не проблема
Загрузчики извлекают много информации из разных мест. Мы берем данные из готовых CSV-файлов, архивов, вложений почтовых ящиков, файлы из SAMBA-папок. Проникаем в REST API за полезными JSON и XML — в общем, работаем со всей информацией, которая требуется бизнес-подразделениям банка и которую разрешает собирать наш Департамент информационной безопасности.
Решение, которое мы разработали, позволяет удобнее корректировать работу DAG, не внося изменений в код загрузчиков.
2К открытий13К показов