Виммельбух, 3, перетяжка
Виммельбух, 3, перетяжка
Виммельбух, 3, перетяжка

Как настроить многопоточную обработку сообщений в С++

Аватар Типичный программист
Отредактировано

Рассказываем, как написать код на С++, который позволит обрабатывать несколько сообщений в многопоточном режиме.

5К открытий7К показов

Одним из методов высокопроизводительной обработки данных является параллельная обработка в нескольких потоках команд (thread-ах).

Несколько потоков способны обработать данные быстрее, чем одним потоком кратно числу потоков. В С++ потоки реализованы нативно, потому поговорим о реализации частной задаче многопоточной обработки на этом языке.

Паттерн «Очередь сообщений»

Очередь сообщений — это довольно часто используемый в системном дизайне патерн. Суть его в наличии контейнера, реализующего принцип «first in — first out» («FIFO»). Сообщения в очередь помещаются в определенном порядке, а затем могут быть в том же порядке оттуда извлечены.

При этом сообщения из очереди должны извлекаться и обрабатываться. Вопрос, поднятый в данной статье — как это лучше реализовать в быстром языке C++ и почему. Далее —  индивидуальное мнение. Комментарии могут дополнить материал.

Периодическая обработка

Самым простым вариантом обработки будет работа одного потока, извлекающего сообщения и осуществляющего обработку в своем теле. Выглядеть код такого обработчика может примерно так:

			#include ‹custom_queue.h> // выдуманный хедер
	#include ‹thread.h>
	#include ‹chrono.h>
void process(Message&& /*msg*/) {
// ваша обработка
}

	int main() {
	CustomQueue queue(“connector_id”); // подключение к очереди
		while(true) {
			while (!queue.empty()) {
				process(std::move(queue.top()));
				queue.pop();
			}
			std::this_thread::sleep_for(std::chrono::seconds(1)); 
		}
		return 0;
};
		

Данный код будет просто выгребать сообщения из очереди с периодичностью в 1 секунду. Минус тут очевиден — если требуется меньшая, чем время обработки сообщения, задержка загрузки сообщений из очереди, то такая реализация не подойдет. В системах с низкой задержкой, вроде высокочастотной торговли или в играх, подойдет другая реализация.

Система обработки, основанная на событиях

Суть приводимого далее кода в обработке сообщений по мере их поступления, а не в результате периодического действия. Далее приведен код.

			#include ‹custom_queue.h>
#include ‹atomic>
#include ‹csignal>
#include ‹functional>
#include ‹thread.h>

void processQueue(CustomQueue& queue, std::atomic‹bool>& finishing) {
	auto msg_max_timeout = std::chrono::seconds(1);
		while(!finishing) {
			while (auto msg = queue.top( msg_max_timeout ) ) {						if (msg) {
	// ваша обработка
		queue.pop();
}
			}
	}
}

static std::atomic‹bool> finishing(false);

void signal_handler(int signal) { // обработчик нажатия “CTRL+C”
finishing = true;
}

int main() {
std::signal(SIGINT, signal_handler);

	CustomQueue queue(“connector_id”);
		std::vector‹std::thread>> workers(
std::bind(processQueue(queue, finishing)),
10 /* число потоков обработки */
);
for(auto& worker : workers) {
	worker.join(); // легальное завершение всех потоков
}
return 0;
}
		

В данном примере очередь блокируется на «queue.top» до прихода хоть какого-то сообщения, но не более задержки msg_max_timeout. В случае прихода сообщения, один из потоков обработки получит сообщение. Прочие останутся в заблокированном состоянии на строке с «queue.top».

Если в течение времени обработки сообщения одним потоком придет другое сообщение, то следующий поток начинает его обработку.

В данном примере дополнительные требования предъявляются к конструкции очереди:

  • она должна быть потокобезопасна
  • должна реализовывать блокировку группы потоков в своем коде до наступления события или на ограниченное время (нужно для легального завершения программы)

Рассмотрим, как может выглядеть реализация такой очереди:

			template ‹class Message>
class CustomQueue {
	private:
		std::list‹Message> messages;
		std::mutex mtx;
		std::condition_variable cond;
		void push(Message&& msg) {
			{
				std::unique_lock‹std::mutex> lock(mtx);
				messages.emplace_front(msg);
			}
			cond.notify_one();
		}
	public:
		Message& top(std::chrono::duration timeout=std::chrono::seconds(0)) {
			std::unique_lock‹std::mutex> lock(mtx);
			auto now = std::chrono::system_clock::now();
			while(cond.wait_for(lock, timeout, 
[&now, &timeout,](){
		return (now + timeout) > 
std::chrono::system_clock::now() &&
	messages.empty();
}
) // ждем события или таймаута - проверяем spurious wakekup predicate
{
} 
return messages.empty() ? Message::default : messages.front();
		}
		void pop() {
			std::unique_lock‹std::mutex> lock(mtx);
			if (!messages.empty()) {
				messages.pop_back();
			}
		}
};
		

Такая очередь будет потокобезопасной, так как внутренний контейнер с сообщениями защищен мьютексом. При вызове метода push (добавление в очередь) произойдет пробуждение лишь одного потока из ожидающих на conditional variable.

Кроме этого, код учитывает ситуацию spurious wakeup (неожиданное пробуждение): сonditional variable::wait может разблокировать поток команд внезапно.

Для контроля в последний аргумент вызова добавлен предикат. Он проверяет наличие сообщений в очереди или превышение таймаута периодически.

Заключение

Если вы работаете с многоядерным сервером, вам наверняка пригодится этот второй метод многопоточной обработки очереди.

Данное улучшение не единственное и не конечное для высокопроизводительной обработки сообщений. Оно показывает возможности многопоточной обработки, и каждый разработчик вправе добавить что-то по своему вкусу.

Следите за новыми постами
Следите за новыми постами по любимым темам
5К открытий7К показов