Сторона-потребитель, ждущая выполнения конкретного условия, выглядит так. Поток-потребитель проходит в цикле по блоку, который изначально блокирует мьютекс mut
unique_lock
. Затем вызывает cv.wait
:while (!finished) {
unique_lock
cv.wait(l, [] { return !q.empty() || finished; });
while (!q.empty()) {
// consume
}
}
Данный код чем-то похож на следующий эквивалентный код. Вскоре мы рассмотрим, почему эти фрагменты не похожи друг на друга:
while (!finished) {
unique_lock
while (q.empty() && !finished) {
l.unlock();
l.lock();
}
while (!q.empty()) {
// consume
}
}
Это значит, что сначала мы получили блокировку, а затем проверили, с каким сценарием сейчас работаем.
1. Есть ли элементы, которые можно использовать? В таком случае сохраняем блокировку, потребляем эти элементы, возвращаем блокировку и начинаем сначала.
2. В противном случае,
Реальная причина, по которой строка cv.wait
while (q.empty() && ...)
, заключается в том, что мы не можем просто пройти по циклу l.unlock(); l.lock();
. Отсутствие активности потока-производителя в течение какого-то промежутка времени приводит к постоянным блокировкам/ разблокировкам мьютекса, что не имеет смысла, поскольку мы впустую тратим циклы процессора.Выражение наподобие cv.wait(lock, predicate)
predicate()
не вернет значение true
. Но это не делается путем постоянной блокировки/разблокировки. Чтобы возобновить поток, который блокируется вызовом wait
объекта condition_variable
, другой поток должен вызывать методы notify_one()
или notify_all()
для одного объекта. Только тогда ожидающий поток будет возобновлен и проверит условие predicate()
. (Последнее действительно и для нескольких потоков.)Положительный момент таков: после вызова wait
notify
.На стороне производителя мы просто вызвали cv.notify_all()
finished
, равное true
. Этого было достаточно для того, чтобы направить потребителя. Реализуем идиому «несколько производителей/потребителей» с помощью std::condition_variable
Возьмем задачу из прошлого примера и усложним ее: создадим несколько производителей и
Таким образом, нужно приостанавливать не только потребителей, при отсутствии в очереди элементов, но и производителей, если элементов в ней
Мы увидим, как решить эту проблему с помощью нескольких объектов типа std::condition_variable
Как это делается
В данном примере мы реализуем программу, похожую на программу из предыдущего примера, но в этот раз у нас будет несколько производителей и несколько потребителей.
1. Сначала включим все необходимые заголовочные файлы и объявим об использовании пространств имен std
chrono_literals
:#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
using namespace chrono_literals;
2. Затем реализуем синхронизированную вспомогательную функцию для вывода сообщений на экран, показанную в другом примере этой главы, поскольку будем выводить множество сообщений на конкурентной основе:
struct pcout : public stringstream {
static inline mutex cout_mutex;
~pcout() {
lock_guard
cout << rdbuf();
}
};