Spark-разработка: реализация параллельно выполняемых заданий
Как при помощи Apache Spark выполнять параллельные задания на примере обработки дата-сетов и их слияния в единый набор данных.
Традиционно разработка приложений на Spark сопряжена с вопросами выделения необходимого ресурса кластера и оптимизации запросов для выполнения поставленной задачи.
Сергей Сикорский, старший разработчик компании Axenix, рассказывает, как это работает на примере обработки дата-сетов и их слияния в единый набор данных.
Полный код примеров ниже выложили на GitHub.
Исходные условия
Apache Spark — популярный open-source фреймворк для распределённой обработки неструктурированных и слабоструктурированных данных. Он реализован в парадигме in-memory вычислений, то есть обрабатывает данные в оперативной памяти для ускоренного выполнения некоторых классов задач, в частности машинного обучения.
Фреймворк поддерживает работу на Java, R, Python и Scala.
Как правило выделение ресурсов кластера, за исключением архитектурных особенностей, отличается также спецификой поставленной задачи.
Вопрос планирования обработки данных в целях повышения скорости их подготовки часто выносится на уровень инструментов оркестрации. При отсутствии таких инструментов разработчики прибегают к использованию таймеров и прочих встроенных в целевую платформу (Linux Cron) инструментов для планирования запуска заданий, что не меняет сути самого подхода.
Инструменты оркестрации, такие как Airflow, дают возможность управлять на высоком уровне процессом запуска Spark-решения и отслеживать его выполнение, но не управляют самим процессом на микроуровне.
Поэтому в материале будет рассмотрен подход к микроуправлению отдельными заданиями на Spark средствами распараллеливания вычислений, применение инструментов для управления процессами, а также технические составляющие, связанные с параллельным запуском заданий (jobs) и его особенностями.
Изложенное является компиляцией опыта автора в области анализа методов повышения производительности решений на Spark Framework в версиях выше 2.2.0.
Постановка задачи и очевидное решение
Допустим, у нас есть два набора данных (дата-сета). В первом – содержится информация о фильмах и их жанрах, во втором – оценки пользователей. Путём выполнения действий агрегации, объединений и фильтрации мы хотим получить на выходе соответствие идентификатору фильма, количество жанров для него и среднюю оценку пользователей.
Результаты промежуточных вычислений должны быть сохранены на диск для дальнейшего использования. В первую очередь решим задачу самым очевидным способом, а именно разработаем программу с последовательной обработкой данных. Исходя из постановки задачи, необходимо выполнить отдельную обработку входящих наборов данных и их слияние.
Логически решение состоит из следующих последовательно выполняемых функций:
- processGenres – выполняет подготовку данных по жанрам для фильмов;
- processRatings – выполняет вычисление средней пользовательской оценки для фильма;
- processStatistics – выполняет формирование результирующего набора данных.
Код запуска последовательного вычисления
Как видно из приведённого примера, мы получили решение многоэтапной обработки и подготовки данных. Однако очевидно, что оно не эффективно: задание выполняется достаточно продолжительное время ввиду последовательной обработки данных.
Логика обработки данных инкапуслирована в классе Common. Методы класса Common такие как processGenres и processRatings принимают на вход пути к целевым источникам данных и пути для сохранения результата их преобразования, а также возвращают в качестве результата объект Spark DataFrame результата вычисления.
Метод ProcessStatistics принимает на вход два DataFrame объекта и путь к результирующему набору данных. А класс ArgProperties представляет собой модель параметров командной строки.
Решение такой задачи с независимым запуском отдельных заданий дало бы преимущество и ускорило бы процесс обработки данных. Рассмотрим, каким образом можно выполнить задачу в оптимальном ключе.
Решение с использованием параллельно выполняемых заданий
Spark по умолчанию предоставляет контекст выполнения и логично предположить, что мы могли бы запустить несколько контекстов в рамках нашего вычисления.
Но исходя из рекомендаций разработчиков Spark делать это крайне не рекомендуется, разве что в тестовых целях. Мы можем создать несколько сессий и попробовать выполнить вычисления в них. В любом случае без использования параллельных потоков сделать это не получится.
Spark потокобезопасен для запуска нескольких заданий и в работе с несколькими сессиями. Однако в конфигурировании отдельных сессий для каждого из заданий есть смысл только в том случае, если используются разные конфигурации подключения к источникам данных.
В нашем же случае мы храним данные в одной области, допустим в некотором дисковом пространстве. Тогда будет достаточно создать отдельные задания и выполнить их запуск, например, с помощью механизмов распараллеливания на Python или Scala.
Приведём фрагмент кода из решения, использующий асинхронный запуск заданий на Scala:
Код запуска решения с асинхронно запускаемыми заданиями
Это решение использует концепцию языка Scala для реализации асинхронных вычислений, концепция реализована с помощью объекта Future. Объект Future инкапсулирует в себе некоторое действие и выполняет его не блокирующим способом, предоставляя интерфейс ожидания и возврата результата.
В данном случае формируется коллекция из двух Future объектов с последующим ожиданием результата из них в основном потоке выполнения программы. Вызов метода sequence от объекта Future возвращает объект Future инкапсулирующий в себе другие асинхронные действия, в данном случае их два.
В свою очередь Await.result позволяет явно получить результаты из асинхронной операции с указанием времени ожидания, в данном случае бесконечного.
Также определяется пул потоков ExecutionContext ограничивающий количество одновременно выполняемых асинхронных операция, в данном случае размерность пула 2.
Как видно на шкале выполнения, теперь задания запускаются асинхронно (задание 1 и задание 2), что ускоряет процесс обработки данных. Запуск происходит с некоторой задержкой, но выполнение происходит параллельно, что видно на рисунки 4.
Конечно, можно было бы решить данную задачу и через стандартные объекты Thread/Runnable предоставляемые платформой Java. Но тогда пришлось бы дополнительно описать логику сохранения промежуточных данных во внешнюю коллекцию, что делает код менее лаконичным и более объёмным.
Очевидный плюс такого решения, кроме повышения скорости работы приложения, заключается в повышенном микроконтроле, когда мы можем отслеживать выполнение заданий в рамках одной сессии, а также более тонко управлять всем процессом обработки данных.
Таким образом, мы получим параллельное выполнение наших задач на кластере из одного приложения на Spark с минимальными изменениями в коде.
Однако есть некоторые особенности такого подхода к работе приложения:
- По умолчанию все задачи запускаются в FIFO-очереди. Это значит, что первая задача получит максимум требуемых ресурсов. В таком случае нам необходимо указать очередь как FAIR, что обеспечит нас справедливым распределением ресурсов (подробности описаны в официальной документации по Spark).
- Количество выделяемых ресурсов должно быть достаточно для параллельного выполнения задач на кластере. Мы бы могли выделять их динамически, но в таком случае будут получены издержки по времени на выделение и высвобождение необходимого числа исполнителей на кластере. Потребуется точнее рассчитать суммарно достаточное количество ядер и исполнителей для обработки данных, а также настроить буфер и время ожидания обмена данными в случае высоких нагрузок.
- Несмотря на то что Python поддерживает потоки, разработчики Spark Framework не гарантируют запуск каждого задания в соответствующем потоке JVM и для реализация аналогичной функциональность рекомендуют использовать pyspark.InheritableThread.
Стоит также отметить, что согласно документации, любые действия (чтение, запись) могут быть запущено параллельно. Также это относится к механизмам кеширования данных (cache/persist) и к механизмам создания контрольных точек данных при пакетной обработке (checkpoint).
Вывод
Поддержка Framework Spark запуска параллельно выполняемых заданий, даёт дополнительные возможности в микроуправлении обработки данных.
Это может быть полезно в сложных процессах, где выполняется расчёт множества витрин данных, сохранение и/или кэширование промежуточных результатов, с последующим слиянием и записью результатов вычислений.
В случае необходимости сохранения атомарности такого решения асинхронный запуск заданий может дать прирост в скорости работы.
Сама же реализация не является особо трудоёмкой и требует лишь учёта дополнительных накладных расходов на работу приложения.
2К открытий2К показов