Настраиваем конфигурацию DAG в Apache Airflow так, чтобы меньше о ней думать

Логотип компании Газпромбанк
Отредактировано

В статье рассказали, как мы настроили и оптимизировали разработку загрузок для Apache Airflow и что для этого потребовалось.

2К открытий13К показов

Сбор и обработка данных для биг дата — достаточно трудоёмкая задача. Для этих целей используются загрузчики.

Но заранее спроектировать и настроить их «на века» невозможно. В процессе работы всегда появляются новые источники сбора данных или специфичные требования к ним, которые часто приходится обрабатывать вручную. Поэтому мы настроили и оптимизировали разработку загрузок.

Мы пользуемся Airflow как единой системой запуска задач

В нашем банке есть собственная платформа больших данных — DataFactory. Я уже больше года занимаюсь тем, что создаю загрузчики внешних данных для неё. До того, как я пришёл в Газпромбанк, этот процесс был неоптимизирован. Часть информации обрабатывалась Java-загрузчиками, часть — написанными на голом Python.

Сейчас мы используем Apache Airflow как оркестратор ETL-процессов, а сами процессы пишем на Python. Airflow оперирует DAG — направленным ациклическим графом, в котором описываются правила работы задач внутри одной загрузки данных. Такое решение, например, позволяет нам запускать задачи параллельно внутри кластера Kubernetes — в котором за выполнение каждой задачи отвечает отдельный k8s POD.

В основном, у загрузчиков схожие правила работы. Но для каждого требуется определенные настройки, которые в виде JSON описаны в наборе конфигурационных параметров и хранятся в Airflow Variables. Например, можно указать глубину сбора данных (в днях) и другие параметры, влияющие на выполнение DAG.

Иногда процесс нужно запускать с произвольной конфигурацией. Чаще всего на стадии финальной отладки и активного тестирования работы. Раньше в такой ситуации мы меняли значения в Airflow Variables, а затем возвращали их обратно, когда загрузчик завершал работу. Случалось, что мы забывали вернуть измененные параметры обратно, а потом искали причину некорректной работы DAG. В конце концов мы задумались, как можно легко менять значения важных переменных внутри существующего кода. И придумали!

Способы изменить конфигурацию

Встроенный редактор

В UI Apache Airflow есть свой редактор JSON. Можно залезть внутрь Airflow Variable и изменить значения там. Но такой подход не очень подойдёт, если вы хотите разово выполнить загрузку со специфическими параметрами. Airflow Variable содержит постоянные значения, которые, как правило, прописываются раз и навсегда. Когда вы что-то там меняете, то после завершения разового запуска DAG вам нужно не забыть вернуть измененные параметры обратно.

Запуск с параметрами

Apache Airflow может запустить DAG со специфическими параметрами.

Настраиваем конфигурацию DAG в Apache Airflow так, чтобы меньше о ней думать 1

Они прописываются вручную в виде нового JSON. В ходе работы загрузчика каждая задача получает доступ к этим значениям и корректирует своё поведение.

Настраиваем конфигурацию DAG в Apache Airflow так, чтобы меньше о ней думать 2

Такой способ позволяет оперативно вмешаться в работу процесса. Например, если внезапно окажется, что в DataFactory отсутствуют значения за неделю, мы можем разово повлиять на определенный параметр DAG и указать сбор данных глубиной в семь дней. Нюанс в том, что запуск с ручной конфигурацией нужно обработать в коде отдельно.

Хардкод

Этот способ самый радикальный, но рабочий. Можно отдельно прописать константы или переменные, которые необходимо использовать для специфического запуска DAG. Но это крайне неудобно: приходится вшивать много информации в код, а затем убирать.

Наш способ оптимизации запуска DAG

Итак, есть три способа передать DAG параметры: с помощью Airflow Variable, вручную через w/ config и напрямую, указав в коде значения. Наша цель — просто и удобно извне влиять на важные для загрузчика значения. При запуске очередной задачи DAG:

  • сначала посмотреть, указан ли параметр при ручном запуске;
  • если нет — то попытаться найти его значение в Airflow Variable;
  • иначе присвоить значение из заданных в коде значений по умолчанию.
			POLLS_COUNT_ON_PAGE: int = 50 
POLLS_DATE_FROM_DEFAULT: str = "1d" 
count_on_page, date_from = vars_from(POLLS_COUNT_ON_PAGE, POLLS_DATE_FROM_DEFAULT)
		

При выполнении функции внутри работающей задачи DAG функция vars_from() сначала проверяет, был ли DAG запущен с помощью w/ config. После чего пытается найти, заданы ли ключи «count_on_page» и/или «date_from» внутри JSONа, переданного через w/ config. Если да — передаёт соответствующие значения переменным count_on_page и date_from в коде задачи.

Если в словаре w/ config искомых ключей нет, то производится попытка поиска значений ключей «count_on_page» и/или «date_from» внутри JSON из соответствующей DAG Airflow Variable.

На тот случай, если необходимых значений нигде нет, в аргументах функции vars_from() мы указали значения по умолчанию. В нашем примере это 50 и 1d, взятые из констант POLLS_COUNT_ON_PAGE и POLLS_DATE_FROM_DEFAULT.

Как работает vars_from()

Функция имеет одну внешнюю зависимость: для её реализации необходима библиотека sorcery. Она предоставляет объекту информацию, откуда его вызывают. В терминах библиотеки такой объект называется «заклинанием» (spell) и инициализируется следующим образом:

			@spell 
def vars_from(*defaults, **aliases):
		

Кортеж *defaults — аналог *args, а декоратор spell помещает в нулевой элемент кортежа *defaults ссылку на frame-объект функции vars_from(). Так мы можем задать значения локальных переменных, стоящих слева от вызова функции.

Обработка ключа

На всякий случай проверка вхождения ключей реализована тремя способами: по строгому соответствию с названием переменной, по её интерпретации camel case и через псевдоним.

Здесь псевдоним маршрутизирует значение из ключа sql_injection_report_mail в переменную recipients:

			recipients = vars_from(recipients="sql_injection_report_mail")
		

Если ключ есть и в ручной конфигурации, и в базе Airflow Variable, приоритет будет отдаваться переменной, которая записана стилем snake case:

			dag_run_conf = {"sqlInjectionReportMail": "blabla", "recipients": "123"} 
dag_variable = {"sql_injection_report_mail": "gluk-gluk"}

recipients = vars_from(recipients="sql_injection_report_mail")
assert recipients == "gluk-gluk"
		

Несмотря на то что в ручном запуске есть сущность «sql injection report mail», её значение будет взято из Airflow Variable.

Ещё пример, чуть более сложный:

			dag_run_conf = {"dateFrom": "blabla", "date_to": 23} 
dag_variable = {"date_from": "gluk-gluk", "b": 16, "date_to": 32} 
# внутри другой функции, которая является Airflow Task 
a = 13 
date_from, date_to, a, b, c = vars_from() 
assert date_from == "gluk-gluk" 
# взят из dag_variable, потомучто snake_case приоритетнее 
assert date_to == 23 
# потому что dag_run_conf приоритетнее 
assert a == 13 
# был взят из значения установленного внутри функции вызвавшей vars_from
assert b == 16 
assert c is None 
# не задан в функции до вызова vars_from и не задан в dag_run_conf или dag_variable
		

Теперь настройка DAG для новых задач — не проблема

Загрузчики извлекают много информации из разных мест. Мы берем данные из готовых CSV-файлов, архивов, вложений почтовых ящиков, файлы из SAMBA-папок. Проникаем в REST API за полезными JSON и XML — в общем, работаем со всей информацией, которая требуется бизнес-подразделениям банка и которую разрешает собирать наш Департамент информационной безопасности.

Решение, которое мы разработали, позволяет удобнее корректировать работу DAG, не внося изменений в код загрузчиков.

Следите за новыми постами
Следите за новыми постами по любимым темам
2К открытий13К показов