Как упаковать бэкенд-код на Go для аналитики на базе Spark
Всем привет! Я Ваня Ахлестин, занимаюсь поддержкой и развитием аналитической платформы кластера Search&Recommendations на базе Spark и Hadoop в Авито. Сегодня расскажу, как начать использовать ваш код из Python или PySpark и не тратить много времени дорогих разработчиков.
187 открытий2К показов
Всем привет! Меня зовут Ваня Ахлестин, я занимаюсь поддержкой и развитием аналитической платформы кластера Search&Recommendations на базе Spark и Hadoop.
Большинство сервисов в хайлоаде, работу которых мы логируем и исследуем, давно переписаны на Go. Из-за этого часто необходимо переиспользовать логику сервиса внутри аналитического или ML-приложения на Spark. Как примеры такого кода можно взять расчёт скоров по сложному запросу или ранжирование айтемов для выдачи.
Реимплементировать и поддерживать несколько вариантов кода дорого, заниматься обстрелом сервисов долго и больно. Поэтому сегодня я расскажу, как начать использовать ваш код из Python или PySpark и не тратить много времени дорогих разработчиков.
Немного о нашей инфраструктуре
Наша платформа построена на классической связке Hadoop/YARN/Spark/PySpark. Основной язык разработки — Python 3.11. Мы стараемся придерживаться баланса между зрелостью и актуальностью софта, чтобы не погрязнуть в поддержке легаси решений. Для этого мы долго обучали разработчиков и аналитиков писать тесты для их задач.
Изначально Python был выбран вместо Scala, чтобы снизить порог входа и дать возможность каждому работать с продуктовым кодом вместо выделенной команды бигдата-инженеров, реимплементирующих код от аналитиков и ML.
В типичных цикл разработки входит:
- Проектирование — обсуждаем и подготавливаем данные для задачи, эксперимента, проговариваем примерную реализацию.
- Прототип в Jupiter ноутбуках, первое ревью, проверяем надёжность и качество исходных данных.
- Упаковываем задачу в формат Airflow, настраиваем, покрываем тестами и тестовыми данными, настраиваем метрики.
Для аналитики работы сервисов мы обычно используем пайплайн с логированием JSON структур в Kafka, данные из которой в HDFS поставляет задача на Flink. Реплей таких логов на кластере через PySpark UDF — об этом и написана статья.
CGO, или собираем биндинг дешево
Первое, о чём нужно подумать при работе с кодом сервиса — как организовать биндинг со сложными вложенными структурами вашего сервиса.
Тут есть несколько подходов:
- Довериться генератору биндингов, например GoPy. Самое очевидное решение, которое мы и попробовали в первую очередь. К сожалению, результат был негативным:биндинг содержит много мусора и ненужных артефактов;требуется дополнительно его патчить, не всё работает «из коробки»;структуры биндинга уродливы, над ними нужна ещё одна обертка;код неэффективен;собранный пакет жёстко привязывается к определённой версии интерпретатора — у нас есть несколько версий и дистрибутивов Python.
- биндинг содержит много мусора и ненужных артефактов;
- требуется дополнительно его патчить, не всё работает «из коробки»;
- структуры биндинга уродливы, над ними нужна ещё одна обертка;
- код неэффективен;
- собранный пакет жёстко привязывается к определённой версии интерпретатора — у нас есть несколько версий и дистрибутивов Python.
Поэтому такие биндинги не очень долго прожили — требовали постоянного внимания.
- Написать биндинг вручную. Существует много туториалов, как это сделать, но есть сложности:поддержка однообразных структур и кода на С;есть привязка к ABI Python, нужна сборка под конкретный Python;чтобы поддерживать в актуальном виде, нужно время квалифицированного разработчика.
- поддержка однообразных структур и кода на С;
- есть привязка к ABI Python, нужна сборка под конкретный Python;
- чтобы поддерживать в актуальном виде, нужно время квалифицированного разработчика.
- Использовать сериализацию в промежуточный формат. Плюсы:если это JSON, то сервис его уже использует, он нативен для кода;Spark и Python уже умеют его использовать «из коробки»;можем передать в код клиента схему с описанием структур;не нужно поддерживать код самого биндинга.
- если это JSON, то сервис его уже использует, он нативен для кода;
- Spark и Python уже умеют его использовать «из коробки»;
- можем передать в код клиента схему с описанием структур;
- не нужно поддерживать код самого биндинга.
Минус — есть дополнительные расходы на упаковку-распаковку, но и быстрые парсеры тоже есть. На этом варианте мы и остановились.
Минимальный биндинг изнутри
Чтобы было проще разобраться, я подготовил демо-проект со следующей структурой:
Основная идея проекта:
- Собрать shared библиотеку с С-интерфейсом.
- Добавить к ней Python-биндинг с использованием ctypes.
- Упаковать в переносимый пакет (Wheel), готовый для установки в любое окружение для платформы.
Бекэнд-код на Go
Демо-код бэкенд-логики содержит вложенные структуры и код, который их принимает и возвращает — это максимально типичная ситуация в реальной жизни. Структуры уже размечены для JSO- генератора — это необязательно.
Экспортируем функции и структуры через C Call интерфейс
Как видно, всё, что делает код — это принимает и возвращает строки с JSON. Для описания структур используется JSON Schema. Это не идеальный вариант — из-за ограниченности типов JSON больше демонстрация возможности использовать рефлекшены для создания схемы.
Собираем всё в разделяемую библиотеку
Обёртка на Python
Используем ctypes с динамической загрузкой собранной библиотеки.
И не забываем про сборку проекта в пакет:
Всё, что нужно — Python и установленный poetry. При запуске сборки poetry build собирает пакеты source и wheel. Пакет wheel как раз и предназначен для дистрибьюции бинарных зависимостей для Python.
PEX как контейнер, и деплой кода на кластер
Любой PySpark скрипт с python-udf перед тем, как начать выполнять план запроса на кластере, производит сериализацию функции и всех обьектов из её контекста через библиотеку cloudpickle. То есть, для того, чтобы восстановить функцию на кластерных пайтон-воркерах нужно обеспечить действительность всех ссылок из pickle.
Существует несколько способов деплоя Python окружения на кластер:
- Устанавливать библиотеки в окружение системного Python.
- Плюс: просто сделать.
- Минусы:сложно синхронизировать и следить за всеми артефактами.невозможно работать и отлаживать несколько разных окружений, а использование venv только всё усложняет;сложно отследить, какие версии использовались в определенный момент.
- сложно синхронизировать и следить за всеми артефактами.
- невозможно работать и отлаживать несколько разных окружений, а использование venv только всё усложняет;
- сложно отследить, какие версии использовались в определенный момент.
- Использовать docker/kubernetes.
- Плюс: можно сделать любое окружение.
- Минус: не подходит для bare-metal инсталляций с Hadoop
- Деплой через EGG.
- Минус: работает только с нативным Python-кодом — не получится использовать биндитнги из-за ограничений zipimport.
- Плюс: работает из коробки, не нужно паковать всё окружение, если библиотека самодостаточна
- Использование законсервированных окружений в виде conda-pack, venv-pack, pex.
- Плюсы:разово собирается и пакуется окружение, не получится испортить.прозрачно деплоится на кластер, но нужно оркестрировать установку на все тачки.возно версионировать синхронно с основным кодом.
- разово собирается и пакуется окружение, не получится испортить.
- прозрачно деплоится на кластер, но нужно оркестрировать установку на все тачки.
- возно версионировать синхронно с основным кодом.
- Минус: PEX-пакет весит как полноценный virtualenv.
Мы используем динамическую версию из Git в виде комбинации тега, майлстоуна, хеша коммита и признака dev-ветки.
Для себя мы выбрали PEX как наиболее зрелый вариант пакета. Если коротко, то это self-extacted virtualenv, упакованный в обычный zip и снабжённый бустстап-секцией для поиска хост-интерпретатора.
Для использования PEX как интерпретатора Python на кластере нужно модифицировать настройки Spark. У нас конфигурация контекста выглядит примерно так:
Для сборки PEX-пакета достаточно самой утилиты pex и списка всех пакетов, которые должны быть в него установлены:
Естественно, что в PEX-пакете должен находится pyspark, иначе Spark не сможет запустить Python-воркеры на кластерных машинах.
Включаем нашу UDF в работу
Подразумевается, что Spark-сессия запущена, настроена и используется PEX.
Чтобы дальше оперировать со структурами в привычном виде, описываем схему сообщения в формате Spark и используем коробочную функцию from_json.
Что почитать по теме
187 открытий2К показов