Игра Яндекс Практикума
Игра Яндекс Практикума
Игра Яндекс Практикума

Spark-разработка: реализация параллельно выполняемых заданий

Отредактировано

Как при помощи Apache Spark выполнять параллельные задания на примере обработки дата-сетов и их слияния в единый набор данных.

2К открытий2К показов

Традиционно разработка приложений на Spark сопряжена с вопросами выделения необходимого ресурса кластера и оптимизации запросов для выполнения поставленной задачи.

Сергей Сикорский, старший разработчик компании Axenix, рассказывает, как это работает на примере обработки дата-сетов и их слияния в единый набор данных.

Полный код примеров ниже выложили на GitHub.

Исходные условия

Apache Spark — популярный open-source фреймворк для распределённой обработки неструктурированных и слабоструктурированных данных. Он реализован в парадигме in-memory вычислений, то есть обрабатывает данные в оперативной памяти для ускоренного выполнения некоторых классов задач, в частности машинного обучения.

Фреймворк поддерживает работу на Java, R, Python и Scala.

Как правило выделение ресурсов кластера, за исключением архитектурных особенностей, отличается также спецификой поставленной задачи.

Вопрос планирования обработки данных в целях повышения скорости их подготовки часто выносится на уровень инструментов оркестрации. При отсутствии таких инструментов разработчики прибегают к использованию таймеров и прочих встроенных в целевую платформу (Linux Cron) инструментов для планирования запуска заданий, что не меняет сути самого подхода.

Инструменты оркестрации, такие как Airflow, дают возможность управлять на высоком уровне процессом запуска Spark-решения и отслеживать его выполнение, но не управляют самим процессом на микроуровне.

Поэтому в материале будет рассмотрен подход к микроуправлению отдельными заданиями на Spark средствами распараллеливания вычислений, применение инструментов для управления процессами, а также технические составляющие, связанные с параллельным запуском заданий (jobs) и его особенностями.

Изложенное является компиляцией опыта автора в области анализа методов повышения производительности решений на Spark Framework в версиях выше 2.2.0.

Постановка задачи и очевидное решение

Допустим, у нас есть два набора данных (дата-сета). В первом – содержится информация о фильмах и их жанрах, во втором – оценки пользователей. Путём выполнения действий агрегации, объединений и фильтрации мы хотим получить на выходе соответствие идентификатору фильма, количество жанров для него и среднюю оценку пользователей.

Результаты промежуточных вычислений должны быть сохранены на диск для дальнейшего использования. В первую очередь решим задачу самым очевидным способом, а именно разработаем программу с последовательной обработкой данных. Исходя из постановки задачи, необходимо выполнить отдельную обработку входящих наборов данных и их слияние.

Логически решение состоит из следующих последовательно выполняемых функций:

  • processGenres – выполняет подготовку данных по жанрам для фильмов;
  • processRatings – выполняет вычисление средней пользовательской оценки для фильма;
  • processStatistics – выполняет формирование результирующего набора данных.
			def main(args: Array[String]): Unit = {  
val props = ArgProperties(args)
val spark = SparkSession.builder().getOrCreate()
val common = new Common(spark) // выполнение Process Genres
val genresDF = common.processGenres(props.moviesPath, props.genresPath) // выполнение Process Ratings
val avgRatingsDF = common.processRatings(props.ratingsPath, props.avgRatingsPath) // выполнение Process Statistics
common.processStatistics(genresDF, avgRatingsDF, props.statisticsPath)
}
		

Код запуска последовательного вычисления

Как видно из приведённого примера, мы получили решение многоэтапной обработки и подготовки данных. Однако очевидно, что оно не эффективно: задание выполняется достаточно продолжительное время ввиду последовательной обработки данных.

Логика обработки данных инкапуслирована в классе Common. Методы класса Common такие как processGenres и processRatings принимают на вход пути к целевым источникам данных и пути для сохранения результата их преобразования, а также возвращают в качестве результата объект Spark DataFrame результата вычисления.

Метод ProcessStatistics принимает на вход два DataFrame объекта и путь к результирующему набору данных. А класс ArgProperties представляет собой модель параметров командной строки.

Решение такой задачи с независимым запуском отдельных заданий дало бы преимущество и ускорило бы процесс обработки данных. Рассмотрим, каким образом можно выполнить задачу в оптимальном ключе.

Решение с использованием параллельно выполняемых заданий

Spark по умолчанию предоставляет контекст выполнения и логично предположить, что мы могли бы запустить несколько контекстов в рамках нашего вычисления.

Но исходя из рекомендаций разработчиков Spark делать это крайне не рекомендуется, разве что в тестовых целях. Мы можем создать несколько сессий и попробовать выполнить вычисления в них. В любом случае без использования параллельных потоков сделать это не получится.

Spark потокобезопасен для запуска нескольких заданий и в работе с несколькими сессиями. Однако в конфигурировании отдельных сессий для каждого из заданий есть смысл только в том случае, если используются разные конфигурации подключения к источникам данных.

В нашем же случае мы храним данные в одной области, допустим в некотором дисковом пространстве. Тогда будет достаточно создать отдельные задания и выполнить их запуск, например, с помощью механизмов распараллеливания на Python или Scala.

Приведём фрагмент кода из решения, использующий асинхронный запуск заданий на Scala:

			def main(args: Array[String]): Unit = {
val props = ArgProperties(args)
val spark = SparkSession.builder().getOrCreate()
val common = new Common(spark)  // определяем пул потоков, в данном случае размерностью 2
val numThreads = 2  
implicit val ec: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(numThreads))  // формируем коллекцию Future объектов инкапсулирующих в себе вычислительные процессы  
val fSequence = Future.sequence(    Future {      // выполнение Process Genres      
    ("genresDF", common.processGenres(props.moviesPath, props.genresPath))    } ::
          Future {        // выполнение Process Ratings        
    ("avgRatingsDF", common.processRatings(props.ratingsPath, props.avgRatingsPath)) 
       } :: Nil  )  // ожидаем получение результатов из асинхронных операций  
val results = Await.result(fSequence, duration.Duration.Inf).toMap // получаем наборы данных по ключу  
val genresDF = results("genresDF")  
val avgRatingsDF = results("avgRatingsDF") // выполнение Process Statistics  
common.processStatistics(genresDF, avgRatingsDF, props.statisticsPath)
}
		

Код запуска решения с асинхронно запускаемыми заданиями

Это решение использует концепцию языка Scala для реализации асинхронных вычислений, концепция реализована с помощью объекта Future. Объект Future инкапсулирует в себе некоторое действие и выполняет его не блокирующим способом, предоставляя интерфейс ожидания и возврата результата.

В данном случае формируется коллекция из двух Future объектов с последующим ожиданием результата из них в основном потоке выполнения программы. Вызов метода sequence от объекта Future возвращает объект Future инкапсулирующий в себе другие асинхронные действия, в данном случае их два.

В свою очередь Await.result позволяет явно получить результаты из асинхронной операции с указанием времени ожидания, в данном случае бесконечного.

Также определяется пул потоков ExecutionContext ограничивающий количество одновременно выполняемых асинхронных операция, в данном случае размерность пула 2.

Как видно на шкале выполнения, теперь задания запускаются асинхронно (задание 1 и задание 2), что ускоряет процесс обработки данных. Запуск происходит с некоторой задержкой, но выполнение происходит параллельно, что видно на рисунки 4.

Конечно, можно было бы решить данную задачу и через стандартные объекты Thread/Runnable предоставляемые платформой Java. Но тогда пришлось бы дополнительно описать логику сохранения промежуточных данных во внешнюю коллекцию, что делает код менее лаконичным и более объёмным.

Очевидный плюс такого решения, кроме повышения скорости работы приложения, заключается в повышенном микроконтроле, когда мы можем отслеживать выполнение заданий в рамках одной сессии, а также более тонко управлять всем процессом обработки данных.

Таким образом, мы получим параллельное выполнение наших задач на кластере из одного приложения на Spark с минимальными изменениями в коде.

Однако есть некоторые особенности такого подхода к работе приложения:

  1. По умолчанию все задачи запускаются в FIFO-очереди. Это значит, что первая задача получит максимум требуемых ресурсов. В таком случае нам необходимо указать очередь как FAIR, что обеспечит нас справедливым распределением ресурсов (подробности описаны в официальной документации по Spark).
  2. Количество выделяемых ресурсов должно быть достаточно для параллельного выполнения задач на кластере. Мы бы могли выделять их динамически, но в таком случае будут получены издержки по времени на выделение и высвобождение необходимого числа исполнителей на кластере. Потребуется точнее рассчитать суммарно достаточное количество ядер и исполнителей для обработки данных, а также настроить буфер и время ожидания обмена данными в случае высоких нагрузок.
  3. Несмотря на то что Python поддерживает потоки, разработчики Spark Framework не гарантируют запуск каждого задания в соответствующем потоке JVM и для реализация аналогичной функциональность рекомендуют использовать pyspark.InheritableThread.

Стоит также отметить, что согласно документации, любые действия (чтение, запись) могут быть запущено параллельно. Также это относится к механизмам кеширования данных (cache/persist) и к механизмам создания контрольных точек данных при пакетной обработке (checkpoint).

Вывод

Поддержка Framework Spark запуска параллельно выполняемых заданий, даёт дополнительные возможности в микроуправлении обработки данных.

Это может быть полезно в сложных процессах, где выполняется расчёт множества витрин данных, сохранение и/или кэширование промежуточных результатов, с последующим слиянием и записью результатов вычислений.

В случае необходимости сохранения атомарности такого решения асинхронный запуск заданий может дать прирост в скорости работы.

Сама же реализация не является особо трудоёмкой и требует лишь учёта дополнительных накладных расходов на работу приложения.

Следите за новыми постами
Следите за новыми постами по любимым темам
2К открытий2К показов