Как настроить многопоточную обработку сообщений в С++
Рассказываем, как написать код на С++, который позволит обрабатывать несколько сообщений в многопоточном режиме.
5К открытий7К показов
Алексей Чащегоров
DeutscheBank, старший программист
Одним из методов высокопроизводительной обработки данных является параллельная обработка в нескольких потоках команд (thread-ах).
Несколько потоков способны обработать данные быстрее, чем одним потоком кратно числу потоков. В С++ потоки реализованы нативно, потому поговорим о реализации частной задаче многопоточной обработки на этом языке.
Паттерн «Очередь сообщений»
Очередь сообщений — это довольно часто используемый в системном дизайне патерн. Суть его в наличии контейнера, реализующего принцип «first in — first out» («FIFO»). Сообщения в очередь помещаются в определенном порядке, а затем могут быть в том же порядке оттуда извлечены.
При этом сообщения из очереди должны извлекаться и обрабатываться. Вопрос, поднятый в данной статье — как это лучше реализовать в быстром языке C++ и почему. Далее — индивидуальное мнение. Комментарии могут дополнить материал.
Периодическая обработка
Самым простым вариантом обработки будет работа одного потока, извлекающего сообщения и осуществляющего обработку в своем теле. Выглядеть код такого обработчика может примерно так:
Данный код будет просто выгребать сообщения из очереди с периодичностью в 1 секунду. Минус тут очевиден — если требуется меньшая, чем время обработки сообщения, задержка загрузки сообщений из очереди, то такая реализация не подойдет. В системах с низкой задержкой, вроде высокочастотной торговли или в играх, подойдет другая реализация.
Система обработки, основанная на событиях
Суть приводимого далее кода в обработке сообщений по мере их поступления, а не в результате периодического действия. Далее приведен код.
В данном примере очередь блокируется на «queue.top» до прихода хоть какого-то сообщения, но не более задержки msg_max_timeout. В случае прихода сообщения, один из потоков обработки получит сообщение. Прочие останутся в заблокированном состоянии на строке с «queue.top».
Если в течение времени обработки сообщения одним потоком придет другое сообщение, то следующий поток начинает его обработку.
В данном примере дополнительные требования предъявляются к конструкции очереди:
- она должна быть потокобезопасна
- должна реализовывать блокировку группы потоков в своем коде до наступления события или на ограниченное время (нужно для легального завершения программы)
Рассмотрим, как может выглядеть реализация такой очереди:
Такая очередь будет потокобезопасной, так как внутренний контейнер с сообщениями защищен мьютексом. При вызове метода push (добавление в очередь) произойдет пробуждение лишь одного потока из ожидающих на conditional variable.
Кроме этого, код учитывает ситуацию spurious wakeup (неожиданное пробуждение): сonditional variable::wait может разблокировать поток команд внезапно.
Для контроля в последний аргумент вызова добавлен предикат. Он проверяет наличие сообщений в очереди или превышение таймаута периодически.
Заключение
Если вы работаете с многоядерным сервером, вам наверняка пригодится этот второй метод многопоточной обработки очереди.
Данное улучшение не единственное и не конечное для высокопроизводительной обработки сообщений. Оно показывает возможности многопоточной обработки, и каждый разработчик вправе добавить что-то по своему вкусу.
5К открытий7К показов