Многопоточность в Node.js
Рассказываем самое необходимое о многопоточности в Node.js v10.5.0. Потоковые воркеры, пул воркеров и worker_threads своими словами.
Некоторые разработчики удивляются, как однопоточный Node.js может конкурировать с многопоточным серверным софтом. Кажется нелогичным, что компании выбирают его в качестве backend. Для начала надо разобраться в том, что на самом деле подразумевается под однопоточностью Node.
JavaScript был создан для реализации простых web-задач вроде проверки формы или создания следа у курсора. Только в 2009 году Райан Дал (создатель Node.js) сделал возможным использование этого языка для написания backend-софта.
Backend-языки, поддерживающие многопоточность, имеют необходимые механизмы для синхронизации значений между потоками и другими поточно-ориентированными функциями. Для поддержки этого в JavaScript потребовалось бы изменить весь язык, что не входило в планы Дала. Пришлось создать обходной путь, чтобы простой JavaScript мог поддерживать многопоточность.
Как на самом деле работает Node.js
Node.js использует два вида потоков:
- основной поток, обрабатываемый циклом событий (Event Loop),
- несколько вспомогательных потоков в пуле воркеров.
Цикл обработки событий — это механизм, который принимает callback-функции и регистрирует их для выполнения в определённый момент в будущем. Он работает в том же потоке, что и сам код JavaScript. Когда операция блокирует поток, цикл событий также блокируется.
Пул воркеров — модель исполнения, вызывающая и обрабатывающая отдельные потоки. Затем они синхронно выполняют задачу и возвращают результат в цикл обработки событий. После цикл вызывает callback-функцию с указанным результатом.
Если коротко, то пул воркеров может заниматься асинхронными операциями ввода-вывода — прежде всего, взаимодействем с системным диском и сетью. Эта модель исполнения в основном используется модулями вроде fs
(требовательного к скорости ввода-вывода) или crypto
(требовательного к CPU). Пул воркеров реализован в libuv, что приводит к небольшой задержке всякий раз, когда Node требует связи между JavaScript и C ++, но эта задержка едва ощутима.
Используя оба эти механизма, можно написать следующий код:
Модуль fs
указывает пулу воркеров использовать один из его потоков для чтения содержимого файла и уведомления цикла обработки событий, когда это будет сделано. Цикл принимает предоставленную callback-функцию и выполняет её с содержимым файла.
Выше приведён пример неблокирующего кода. Пул воркеров прочитает файл и вызовет предоставленную функцию с результатом. Поскольку пул имеет собственные потоки, цикл обработки событий может продолжать исполнение в обычном режиме во время чтения файла.
Всё работает, пока нет необходимости синхронно выполнять какую-то сложную операцию. Любая функция, выполнение которой занимает слишком много времени, блокирует поток. Если в приложении много таких функций, оно может значительно снизить производительность сервера или вообще заморозить его работу в целом. В этом случае нет способа делегировать работу пулу воркеров.
Области, требующие сложных вычислений, — искусственный интеллект, машинное обучение или большие данные— не могли эффективно использовать Node.js из-за операций, блокирующих основной (и единственный) поток, что делало сервер неотзывчивым. Так было до появления Node.js v10.5.0, в котором была добавлена поддержка нескольких потоков.
Знакомство с worker_threads
Модуль worker_threads
— это пакет, который позволяет создавать полнофункциональные многопоточные приложения Node.js.
Потоковый воркер (thread worker) — фрагмент кода (обычно извлекаемый из файла), созданный в отдельном потоке.
Для использования потоковых воркеров нужно импортировать модуль worker_threads
. Начнём с создания функции, которая поможет создавать эти воркеры, а также рассмотрим их свойства.
Для создания потокового воркера необходимо создать экземпляр класса Worker
. В первом аргументе указываем путь к файлу, который содержит код воркера; во втором предоставляем объект, содержащий свойство с именем workerData
. Это те данные, к которым поток будет иметь доступ при запуске, если того хочет разработчик.
Обратите внимание: независимо от того, используете ли вы сам JavaScript или что-то, что в него транспилируется (например TypeScript), путь всегда должен ссылаться на файлы с расширениями .js
или .mjs
.
Также стоит указать, почему используется callback-функция вместо возвращения промиса (promise), который будет передавать результат в resolve
при запуске события message
. Это связано с возможностью потоковых воркеров отправлять много событий message
, а не только одно.
Связь между потоками основана на событиях. Это означает, что надо настроить обработчики, которые будут вызываться после отправки потоком данного события.
Рассмотрим наиболее распространённые события.
Событие error
генерируется, когда внутри воркера возникает необработанное исключение. Затем поток завершается, а ошибка становится первым аргументом в callback.
Exit
генерируется, когда воркер заканчивает своё выполнение. Если process.exit()
вызывается внутри потока, exitCode
будет предоставлен в callback. Если поток прерывается с помощью worker.terminate()
, код будет 1
.
Online
генерируется, когда воркер прекращает парсинг кода JavaScript и начинает его выполнение. Это событие используется нечасто, но в определённых случаях оно может быть информативным.
Message
генерируется, когда воркер отправляет данные в родительский поток.
Обмен данными между потоками
Для отправки данных другому потоку используется метод port.postMessage()
. Он имеет следующую сигнатуру:
Объект port
может быть или экземпляром parentPort
, или экземпляром MessagePort
— подробнее об этом позже.
Аргумент data
Первый аргумент данных — назовём его data
— это объект, который копируется в другой поток. Он может содержать всё, что поддерживает алгоритм копирования.
Данные копируются алгоритмом структурированного клонирования.
Алгоритм не копирует функции, ошибки, дескрипторы свойств или цепочки прототипов. Следует также отметить, что копирование объектов таким способом отличается от JSON, потому что он может содержать циклические ссылки и типизированные массивы, а JSON не может.
Поддерживая копирование типизированных массивов, алгоритм позволяет разделять память между потоками.
Разделение памяти между потоками
Считается, что модули вроде cluster
или child_process
используют потоки уже давно. Это одновременно и верно и нет.
Cluster
может создавать несколько процессов Node.js с одним главным процессом, маршрутизирующим запросы между ними. Кластеризация приложения позволяет эффективно увеличить пропускную способность сервера. Однако нельзя создать отдельный поток с модулем cluster
.
Модуль child_process
может создавать любой исполняемый файл независимо от типа файла. В этом модуле отсутствуют некоторые важные функции, которые есть у worker_threads
.
Потоковые воркеры являются более лёгкими и имеют тот же идентификатор процесса, что и их родительские потоки. Ещё они могут использовать память совместно со своими родительскими потоками. Это позволяет воркерам избежать сериализации больших входных данных и, как следствие, отправлять данные вперёд и назад более эффективно.
Рассмотрим пример разделения памяти между потоками. Чтобы память была разделена, экземпляры ArrayBuffer
или SharedArrayBuffer
должны быть отправлены другому потоку в качестве аргумента data
или внутри него.
Пример воркера, который разделяет память со своим родительским потоком:
Создаётся экземпляр SharedArrayBuffer
с размером памяти, необходимым для хранения ста 32-битных целых чисел. Затем создаётся экземпляр Int32Array
, который будет использовать буфер для хранения его структуры. После массив заполняется некоторыми случайными числами и отправляется в родительский поток.
В родительском потоке:
Меняя значение arr[0]
на 5, фактически изменяем его в обоих потоках.
При разделении памяти есть риск изменить значение в одном потоке, изменив его в другом. Но вместе с этим появляется хорошая особенность: значение не нужно сериализовывать, чтобы оно было доступно в другом потоке. Это значительно повышает эффективность. Просто не забывайте правильно управлять ссылками на данные, чтобы те в свою очередь не оставляли за собой мусор после завершения работы с ними.
Зачастую гораздо удобнее передавать между потоками не массив, а объект. Но, к сожалению, не существует SharedObjectBuffer
или чего-либо подобного, но можно самим создать похожую структуру.
Аргумент TransferList
TransferList
может содержать только ArrayBuffer
и MessagePort
. После передачи в другой поток их больше нельзя использовать в отправляющем потоке. Память перемещается в другой поток и, следовательно, недоступна в отправляющем.
Пока нет канала связи, нельзя передавать сетевые сокеты, включая их в TransferList
.
Создание канала связи
Связь между потоками осуществляется через порты, которые являются экземплярами класса MessagePort
. Они обеспечивают эту связь на основе событий.
Есть два способа использования портов для связи между потоками. Первый используется по умолчанию и проще, чем второй. В код воркера импортируется объект с именем parentPort
из модуля worker_threads
и используется .postMessage()
для отправки сообщений в родительский поток.
Пример:
parentPort
— это экземпляр MessagePort
, который Node.js создал “за кулисами”, чтобы обеспечить связь с родительским потоком. Таким образом, можно общаться между потоками, используя объекты parentPort
и worker
.
Второй способ связи между потоками — создать MessageChannel
и отправить его воркеру. Вот как можно создать новый MessagePort
и поделиться им с потоковым воркером:
После создания port1
и port2
настраиваем обработчики событий на port1
и отправляем port2
воркеру. Необходимо включить его в файл TransferList
, чтобы он был передан рабочей стороне.
Теперь внутри воркера:
Таким образом, используется порт, который был отправлен родительским потоком.
Использование parentPort
тоже является правильным подходом, но лучше создать новый MessagePort
с экземпляром MessageChannel,
а затем поделиться им с созданным воркером.
Обратите внимание, в примерах ниже для простоты используется parentPort
.
Два способа использования воркеров
Первый — создать воркер, выполнить его код и отправить результат в родительский поток. При таком подходе каждый раз, когда появляется новая задача, надо заново создавать воркер.
Второй способ — создать воркер и настроить обработчики для события message
. Каждый раз при запуске это событие выполняет свою работу и отправляет результат обратно в родительский поток, который сохраняет воркер для последующего использования.
Документация Node.js рекомендует второй подход, поскольку много усилий необходимо для создания потокового воркера, который требует создания виртуальной машины, парсинга и выполнения кода. Этот метод также намного эффективнее, чем постоянно создающиеся воркеры.
Такой подход называется пулом воркеров. Создаётся пул и воркеры находятся в ожидании события message
, которое нужно для выполнения задания.
Пример файла, содержащего воркер, который создаётся, выполняется, а затем закрывается:
После отправки collection
в родительский поток, воркер просто завершается.
А вот пример воркера, который может ждать в течение длительного периода времени, прежде чем ему будет дано задание:
Полезные свойства модуля worker_threads
isMainThread
Свойство имеет значение true
, когда не работает внутри потока воркера. Если есть необходимость, можно включить простой оператор if
в начале файла, чтобы убедиться, что он запускается только как воркер.
workerData
Несёт в себе данные, включённые в конструктор воркера созданным потоком.
В потоке воркера:
parentPort
Экземпляр MessagePort
, используется для связи с родительским потоком.
threadId
Уникальный идентификатор, присвоенный воркеру.
Реализация setTimeout
setTimeout
— это бесконечный цикл, который прерывает выполнение приложения. На практике он проверяет на каждой итерации, меньше ли сумма начальной даты и заданного количества миллисекунд, чем фактическая дата.
Эта конкретная реализация создаёт поток, выполняет его код и затем завершает работу.
Реализуем код, который будет использовать этот воркер. Создадим стейт, в котором будут отслеживаться созданные воркеры:
Функция, которая отвечает за создание потоковых воркеров и хранит их в стейт:
Используем пакет UUID для создания уникального идентификатора воркера, затем задействуем определённую ранее вспомогательную функцию runWorker()
, чтобы получить воркер. Передаём ему callback-функцию, которая запускается после отправки воркером некоторых данных. Сохраняем воркер в стейт и возвращаем id
.
Внутри callback-функции нужно проверить, существует ли воркер в стейте, потому что есть возможность отменить его с помощью cancelTimeout()
. Если он существует, удаляем его из стейта и вызываем callback, переданный в функцию setTimeout()
.
Функция cancelTimeout()
использует метод .terminate()
, чтобы принудительно остановить воркер и удалить его из стейта:
Прим. если вам интересно, есть реализация метода setInterval()
. Но он не имеет ничего общего с потоками (повторно используется код setTimeout()
). Кроме того, существует небольшой тестовый код для проверки, насколько такой подход отличается от исходного. Вы можете просмотреть код здесь. Результаты:
Видно, что в setTimeout()
есть небольшая задержка — около 40 мс — из-за создаваемого воркера. Средняя стоимость процессора также немного выше, но ничего страшного в этом нет (стоимость процессора — это среднее значение загрузки процессора за всё время процесса).
Если бы можно было повторно использовать воркеры, задержка и загрузка ЦП снизилась бы. Поэтому рассмотрим, как реализовать собственный пул воркеров.
Реализация пула воркеров
Пул воркеров — это заданное количество ранее созданных воркеров, которые ожидают событие message
. Как только событие происходит, воркеры выполняют работу и отправляют результат обратно.
Вот как можно создать пул воркеров из восьми рабочих потоков:
Если вы знакомы с ограничением параллельных операций, то знаете, что логика здесь почти одинакова.
Из фрагмента выше видно, конструктору WorkerPool
передаётся количество воркеров и путь для их появления.
Здесь есть дополнительные свойства вроде workerById
и activeWorkersById
, в которых можно сохранить существующие воркеры и их идентификаторы соответственно. Также есть queue
(очередь), в которой можно сохранять объекты со следующей структурой:
callback
— callback-функция в Node по умолчанию с ошибкой в качестве первого аргумента и возможным результатом в качестве второго. getData
— это функция, передаваемая методу .run()
пула воркеров (поясняется ниже), которая вызывается после начала обработки элемента. Данные, возвращаемые функцией getData()
, будут переданы в рабочий поток.
Внутри метода .init()
создаём воркеры и сохраняем их в стейтах:
Для избежания бесконечных циклов нужно убедиться, что количество потоков больше 1. Создаём необходимое число воркеров и сохраняем их по индексу в стейте workerById
. Также сохраняем информацию, работают ли они в настоящее время, в стейте activeWorkersById
, который всегда по умолчанию имеет значение false
.
Реализуем метод .run()
для настройки задачи, которая будет запущена, как только воркер станет доступен.
Внутри функции, переданной в промис, проверяем, есть ли доступный для обработки данных воркер, вызывая .getInactiveWorkerId()
:
Создаём queueItem
, в котором сохраняем переданную методу .run()
функцию getData()
в качестве callback. В этом callback разрешаем (resolve
) или отклоняем (reject
) промис в зависимости от того, передал ли воркер callback.
Если значение availableWorkerId
равно -1, доступного воркера нет. В этом случае добавляем queueItem
в queue
. Если есть доступный воркер, вызываем метод .runWorker()
для его выполнения.
В методе .runWorker()
в стейте activeWorkersById
необходимо установить, что воркер в данный момент используется. Также нужно настроить обработчики для событий message
и error
(после очистить их). И, наконец, отправить данные воркеру.
Используя переданный workerId
, получаем ссылку на воркер из стейта workerById
. Внутри activeWorkersById
устанавливаем в свойстве [workerId]
значение true
. Таким образом будет известно, что больше ничего не нужно запускать, пока воркер занят.
Создаём messageCallback()
и errorCallback()
для вызова событий message
и error
соответственно. Регистрируем указанные функции для обработки события и отправки данных воркеру.
Внутри функций вызываем callback в queueItem
, а затем вызываем функцию cleanUp()
. Убеждаемся, что обработчики событий удаляются, т. к. один и тот же воркер используется многократно. Если не удалить обработчики, произойдёт утечка памяти (память медленно исчерпается).
В стейте activeWorkersById
устанавливаем для свойства [workerId]
значение false
и проверяем, пуста ли очередь. Если это не так, удаляем первый элемент из queue
и снова вызываем воркер с другим queueItem
.
Создадим воркер, который выполняет некоторые вычисления после получения данных в событии message
:
Потоковый воркер создаёт массив из 1 миллиона случайных чисел, а затем сортирует их.
Пример простого использования пула воркеров:
Всё начиналось с создания пула из восьми воркеров. Затем был создан массив из 100 элементов и для каждого элемента запускалась задача в пуле воркеров. Первые восемь задач были выполнены немедленно, а остальные помещены в очередь и выполнены постепенно. Благодаря использованию пула воркеров не нужно каждый раз создавать воркер, что значительно повышает эффективность.
Заключение
worker_threads
предоставляет простой способ добавить поддержку многопоточности в приложения. Передавая тяжёлые CPU-вычисления другим потокам, можно значительно увеличить пропускную способность сервера. Благодаря официальной поддержке потоков можно ожидать, что всё больше разработчиков и инженеров из различных областей (ИИ, машинное обучение и большие данные) начнут использовать Node.js.
70К открытий72К показов