0
Обложка: Spark-разработка: реализация параллельно выполняемых заданий

Spark-разработка: реализация параллельно выполняемых заданий

Традиционно разработка приложений на Spark сопряжена с вопросами выделения необходимого ресурса кластера и оптимизации запросов для выполнения поставленной задачи.

Сергей Сикорский, старший разработчик компании Axenix, рассказывает, как это работает на примере обработки дата-сетов и их слияния в единый набор данных.

Полный код примеров ниже выложили на GitHub.

Исходные условия

Apache Spark — популярный open-source фреймворк для распределённой обработки неструктурированных и слабоструктурированных данных. Он реализован в парадигме in-memory вычислений, то есть обрабатывает данные в оперативной памяти для ускоренного выполнения некоторых классов задач, в частности машинного обучения.

Фреймворк поддерживает работу на Java, R, Python и Scala.

Как правило выделение ресурсов кластера, за исключением архитектурных особенностей, отличается также спецификой поставленной задачи.

Вопрос планирования обработки данных в целях повышения скорости их подготовки часто выносится на уровень инструментов оркестрации. При отсутствии таких инструментов разработчики прибегают к использованию таймеров и прочих встроенных в целевую платформу (Linux Cron) инструментов для планирования запуска заданий, что не меняет сути самого подхода.

Инструменты оркестрации, такие как Airflow, дают возможность управлять на высоком уровне процессом запуска Spark-решения и отслеживать его выполнение, но не управляют самим процессом на микроуровне.

Поэтому в материале будет рассмотрен подход к микроуправлению отдельными заданиями на Spark средствами распараллеливания вычислений, применение инструментов для управления процессами, а также технические составляющие, связанные с параллельным запуском заданий (jobs) и его особенностями.

Изложенное является компиляцией опыта автора в области анализа методов повышения производительности решений на Spark Framework в версиях выше 2.2.0.

Постановка задачи и очевидное решение

Допустим, у нас есть два набора данных (дата-сета). В первом – содержится информация о фильмах и их жанрах, во втором – оценки пользователей. Путём выполнения действий агрегации, объединений и фильтрации мы хотим получить на выходе соответствие идентификатору фильма, количество жанров для него и среднюю оценку пользователей.

Результаты промежуточных вычислений должны быть сохранены на диск для дальнейшего использования. В первую очередь решим задачу самым очевидным способом, а именно разработаем программу с последовательной обработкой данных. Исходя из постановки задачи, необходимо выполнить отдельную обработку входящих наборов данных и их слияние.

Логически решение состоит из следующих последовательно выполняемых функций:

  • processGenres – выполняет подготовку данных по жанрам для фильмов;
  • processRatings – выполняет вычисление средней пользовательской оценки для фильма;
  • processStatistics – выполняет формирование результирующего набора данных.

 

 

Схематичное представление логики приложения

Рис.1 Схематичное представление логики приложения

рис.2 Порядок выполнения заданий на временной шкале

Порядок выполнения заданий на временной шкале

рис.3 Порядок запуска заданий в списке заданий


Порядок запуска заданий в списке заданий

def main(args: Array[String]): Unit = {  
val props = ArgProperties(args)
val spark = SparkSession.builder().getOrCreate()
val common = new Common(spark) // выполнение Process Genres
val genresDF = common.processGenres(props.moviesPath, props.genresPath) // выполнение Process Ratings
val avgRatingsDF = common.processRatings(props.ratingsPath, props.avgRatingsPath) // выполнение Process Statistics
common.processStatistics(genresDF, avgRatingsDF, props.statisticsPath)
}

Код запуска последовательного вычисления

Как видно из приведённого примера, мы получили решение многоэтапной обработки и подготовки данных. Однако очевидно, что оно не эффективно: задание выполняется достаточно продолжительное время ввиду последовательной обработки данных.

Логика обработки данных инкапуслирована в классе Common. Методы класса Common такие как processGenres и processRatings принимают на вход пути к целевым источникам данных и пути для сохранения результата их преобразования, а также возвращают в качестве результата объект Spark DataFrame результата вычисления.

Метод ProcessStatistics принимает на вход два DataFrame объекта и путь к результирующему набору данных. А класс ArgProperties представляет собой модель параметров командной строки.

Решение такой задачи с независимым запуском отдельных заданий дало бы преимущество и ускорило бы процесс обработки данных. Рассмотрим, каким образом можно выполнить задачу в оптимальном ключе.

Решение с использованием параллельно выполняемых заданий

Spark по умолчанию предоставляет контекст выполнения и логично предположить, что мы могли бы запустить несколько контекстов в рамках нашего вычисления.

Но исходя из рекомендаций разработчиков Spark делать это крайне не рекомендуется, разве что в тестовых целях. Мы можем создать несколько сессий и попробовать выполнить вычисления в них. В любом случае без использования параллельных потоков сделать это не получится.

Spark потокобезопасен для запуска нескольких заданий и в работе с несколькими сессиями. Однако в конфигурировании отдельных сессий для каждого из заданий есть смысл только в том случае, если используются разные конфигурации подключения к источникам данных.

В нашем же случае мы храним данные в одной области, допустим в некотором дисковом пространстве. Тогда будет достаточно создать отдельные задания и выполнить их запуск, например, с помощью механизмов распараллеливания на Python или Scala.

Приведём фрагмент кода из решения, использующий асинхронный запуск заданий на Scala:

def main(args: Array[String]): Unit = {
val props = ArgProperties(args)
val spark = SparkSession.builder().getOrCreate()
val common = new Common(spark)  // определяем пул потоков, в данном случае размерностью 2
val numThreads = 2  
implicit val ec: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(numThreads))  // формируем коллекцию Future объектов инкапсулирующих в себе вычислительные процессы  
val fSequence = Future.sequence(    Future {      // выполнение Process Genres      
    ("genresDF", common.processGenres(props.moviesPath, props.genresPath))    } ::
          Future {        // выполнение Process Ratings        
    ("avgRatingsDF", common.processRatings(props.ratingsPath, props.avgRatingsPath)) 
       } :: Nil  )  // ожидаем получение результатов из асинхронных операций  
val results = Await.result(fSequence, duration.Duration.Inf).toMap // получаем наборы данных по ключу  
val genresDF = results("genresDF")  
val avgRatingsDF = results("avgRatingsDF") // выполнение Process Statistics  
common.processStatistics(genresDF, avgRatingsDF, props.statisticsPath)
}

Код запуска решения с асинхронно запускаемыми заданиями

Это решение использует концепцию языка Scala для реализации асинхронных вычислений, концепция реализована с помощью объекта Future. Объект Future инкапсулирует в себе некоторое действие и выполняет его не блокирующим способом, предоставляя интерфейс ожидания и возврата результата.

В данном случае формируется коллекция из двух Future объектов с последующим ожиданием результата из них в основном потоке выполнения программы. Вызов метода sequence от объекта Future возвращает объект Future инкапсулирующий в себе другие асинхронные действия, в данном случае их два.

В свою очередь Await.result позволяет явно получить результаты из асинхронной операции с указанием времени ожидания, в данном случае бесконечного.

Также определяется пул потоков ExecutionContext ограничивающий количество одновременно выполняемых асинхронных операция, в данном случае размерность пула 2.

рис.4 Порядок выполнения заданий при асинхронном запуске


Порядок выполнения заданий при асинхронном запуске

рис.5 Порядок запуска приложений


Порядок запуска приложений

Как видно на шкале выполнения, теперь задания запускаются асинхронно (задание 1 и задание 2), что ускоряет процесс обработки данных. Запуск происходит с некоторой задержкой, но выполнение происходит параллельно, что видно на рисунки 4.

Конечно, можно было бы решить данную задачу и через стандартные объекты Thread/Runnable предоставляемые платформой Java. Но тогда пришлось бы дополнительно описать логику сохранения промежуточных данных во внешнюю коллекцию, что делает код менее лаконичным и более объёмным.

Очевидный плюс такого решения, кроме повышения скорости работы приложения, заключается в повышенном микроконтроле, когда мы можем отслеживать выполнение заданий в рамках одной сессии, а также более тонко управлять всем процессом обработки данных.

Таким образом, мы получим параллельное выполнение наших задач на кластере из одного приложения на Spark с минимальными изменениями в коде.

Однако есть некоторые особенности такого подхода к работе приложения:

  1. По умолчанию все задачи запускаются в FIFO-очереди. Это значит, что первая задача получит максимум требуемых ресурсов. В таком случае нам необходимо указать очередь как FAIR, что обеспечит нас справедливым распределением ресурсов (подробности описаны в официальной документации по Spark).
  2. Количество выделяемых ресурсов должно быть достаточно для параллельного выполнения задач на кластере. Мы бы могли выделять их динамически, но в таком случае будут получены издержки по времени на выделение и высвобождение необходимого числа исполнителей на кластере. Потребуется точнее рассчитать суммарно достаточное количество ядер и исполнителей для обработки данных, а также настроить буфер и время ожидания обмена данными в случае высоких нагрузок.
  3. Несмотря на то что Python поддерживает потоки, разработчики Spark Framework не гарантируют запуск каждого задания в соответствующем потоке JVM и для реализация аналогичной функциональность рекомендуют использовать pyspark.InheritableThread.

Стоит также отметить, что согласно документации, любые действия (чтение, запись) могут быть запущено параллельно. Также это относится к механизмам кеширования данных (cache/persist) и к механизмам создания контрольных точек данных при пакетной обработке (checkpoint).

Вывод

Поддержка Framework Spark запуска параллельно выполняемых заданий, даёт дополнительные возможности в микроуправлении обработки данных.

Это может быть полезно в сложных процессах, где выполняется расчёт множества витрин данных, сохранение и/или кэширование промежуточных результатов, с последующим слиянием и записью результатов вычислений.

В случае необходимости сохранения атомарности такого решения асинхронный запуск заданий может дать прирост в скорости работы.

Сама же реализация не является особо трудоёмкой и требует лишь учёта дополнительных накладных расходов на работу приложения.