Безопасность потоков в С++

Допустим, вы пишете конвейер, в котором 2 потока, используя общий буфер, обрабатывают данные. Поток-producer эти данные создает, а поток-consumer их обрабатывает (Producer–consumer problem). Следующий код представляет собой самую простую модель: с помощью std::thread мы порождаем поток-consumer, a создавать данные мы будем в главном потоке.

void produce() {
    // создаем задачу и кладем в очередь
}

void consume() {
    // читаем данные из очереди и обрабатываем
}

int main(int , char **) {
    std::thread thr(consume); // порождаем поток
    produce(); // создаем данные для обработки
    thr.join(); // ждем завершения работы функции consume()
    return 0;
}

Опустим механизмы синхронизации двух потоков, и обратим внимание на функцию main(). Попробуйте догадаться, что с этим кодом не так, и как его исправить?

В С++, если не сказано иного, принято считать, что каждая функция может выбросить исключение.

Допустим, функция consume() бросает исключение. Поскольку это исключение генерируется в дочернем потоке, поймать и обработать его в главном потоке нельзя1 . Если во время развертывания стека дочернего потока не нашлось подходящего обработчика исключения, будет вызвана функция std::terminate(), которая по-умолчанию вызовет функцию abort(). Иными словами, если не обработать исключение в потоке, порожденном объектом thr, то программа завершит свою работу с ошибкой.

С функцией produce() немного сложнее. Допустим, эта функция генерирует исключение. Первое, что хочется сделать, это обернуть тело main() в try-catch блок:

try {
    std::thread thr(consume);
    produce(); // бросает исключение
    thr.join();
} catch (...) {
}

Кажется, что проблема решена, но если вы попытаетесь запустить этот код, то программа упадет в любом случае. Почему так происходит? Давайте разбираться.

std::thread

Как вы уже, может быть, догадались, проблема не имеет отношение к конвейеру, а относится к правильному использованию потоков выполнения стандартной библиотеки в принципе. В частности следующая обобщенная функция равнозначна, и имеет те же проблемы:

void run(function<void()> f1, function<void()> f2) {
    std::thread thr(f1);
    f2();
    thr.join();
}
...
run(consume, produce);
...

Прежде чем перейти к решению нашей задачи, давайте вкратце вспомним как работает std::thread.

1) конструктор для инициализации:

template <class Fn, class... Args>
explicit thread (Fn&& fn, Args&&... args);

При инициализации объекта std::thread создается новый поток, в котором запускается функция fn с возможными аргументами args. При успешном его создании, конкретный экземпляр объекта начинает представлять этот поток в родительском потоке, а в свойствах объекта выставляется флаг joinable.
Запомним: joinable ~ объект связан с потоком.

2) Ждем конца выполнения порожденного потока:

void thread::join();

Этот метод блокирует дальнейшее выполнение родительского потока, до тех пока не будет завершен дочерний. После успешного выполнения, объект потока перестает его представлять, поскольку нашего потока больше не существует. Флаг joinable сбрасывается.

3) Немедленно “отсоединяем” объект от потока:

void thread::detach();

Это неблокирующий метод. Флаг joinable сбрасывается, а дочерний поток предоставлен сам себе и завершит свою работу когда-нибудь позже.

4) Деструктор:

thread::~thread();

Деструктор уничтожает объект. При этом если, у этого объекта стоит флаг joinable, то вызывается функция std::terminate(), которая по умолчанию вызовет функцию abort().
Внимание! Если мы создали объект и поток, но не вызвали join или detach, то программа упадет. В принципе, это логично – если объект до сих пор связан с потоком, то надо что-то с ним делать. А еще лучше – ничего не делать, и завершить программу (по крайней мере так решил комитет по стандарту).

Поэтому при возникновении исключения в функции produce(), мы пытаемся уничтожить объект thr, который является joinable.

Ограничения

Почему же стандартный комитет решил поступить так и не иначе? Не лучше было бы вызвать в деструкторе join() или detach()? Оказывается, не лучше. Давайте разберем оба этих случая.

Допустим, у нас есть класс joining_thread, который так вызывает join() в своем деструкторе:

joining_thread::~joining_thread() {
    join();
}

Тогда, прежде чем обработать исключение, мы должны будем подождать завершения работы дочернего потока, поскольку join() блокирует дальнейшее выполнение программы. А если так получилось, что порожденном потоке оказался в бесконечный цикл?

void consume() {
    while(1) { ... }
}
...
try {
    joining_thread thr(consume);
    throw std::exception();
} catch (...) {
    // может случится не скоро, или даже никогда 
}

Хорошо, мы выяснили, что join() в деструкторе лучше не вызывать (до тех пор пока вы не уверены, что это корректная обработка события), поскольку это блокирующая операция. А что насчет detach()? Почему бы не вызвать в деструкторе этот неблокирующий метод, дав главному потоку продолжить работу? Допустим у нас есть такой класс detaching_thread.

Но тогда мы можем прийти к такой ситуации, когда порожденный поток пытается использовать ресурс, которого уже нет, как в следующей ситуации:

try {
    int data;
    detaching_thread th(consume, &data); // в данном случае consume принимает указатель на int в качестве аргумента
    throw std::exception()
} catch (...) {
    // корректно обработаем исключение
    // consume продолжает исполняться, но ссылается на уже удаленный объект data
}

Таким образом, создатели стандарта решили переложить ответственность на программиста – в конце концов ему виднее, как программа должна обрабатывать подобные случаи. Исходя из всего этого, получается, что стандартная библиотека противоречит принципу RAII – при создании std::thread мы сами должны позаботиться о корректном управлении ресурсами, то есть явно вызвать join или detach. По этой причине некоторые программисты советуют не использовать объекты std::thread. Так же как new и delete, std::thread предоставляет возможность построить на основе них более высокоуровневые инструменты.

Решение

Одним из таких инструментов является класс из библиотеки Boost boost::thread_joiner. Он соответствует нашему joining_thread в примере выше. Если вы можете позволить себе использовать сторонние библиотеки для работы с потоками, то лучше это сделать.

Другое решение – позаботиться об это самому в RAII-стиле, например так:

class Consumer {
public:
     Consumer()
          : exit_flag(false)
          , thr( &Consumer::run, this )
     {
         // после создания потока не делайте тут ничего, что бросает исключение,
         // поскольку в этом случае не будет вызван деструктор объекта Consumer,
         // поток не будет завершен, а программа упадет
     }

     ~Consumer() {
          exit_flag = true; // говорим потоку остановиться
          thr.join();
     }

private:
    std::atomic<bool> exit_flag; // флаг для синхронизации (опционально)
    std::thread thr;

    void run() {
        while (!exit_flag) {
            // делаем что-нибудь
        }
    }
};

В случае, если вы собираетесь отделить поток от объекта в любом случае, лучше сделать это сразу же:

std::thread(consume).detach(); // создаем поток, и сразу же освобождаем объект, связанный с ним

Ссылки:

Александр Петров специально для “Типичного программиста”

  1. На самом деле можно явно передать исключение в другой поток с помощью move-семантики.