3. Все производители пишут значения в одну очередь, а все потребители также получают значения из нее. В дополнение к этой очереди нужен мьютекс, защищающий очередь и флаг, на основе которого можно сказать, что создание элементов будет приостановлено в какой-то момент:
queue
mutex q_mutex;
bool production_stopped {false};
4. В этой программе задействуем две разные переменные condition_variables
condition_variable
, которая указывала на появление в очереди новых элементов. В этом случае ситуация чуть более запутанная. Мы хотим, чтобы производители создавали новые элементы до тех пор, пока в очереди не будет содержаться их go_consume
пригодна для возобновления потребителей, которые, в свою очередь, смогут возобновить производителей с помощью переменной go_produce
:condition_variable go_produce;
condition_variable go_consume;
5. Функция-производитель принимает идентификатор производителя, общее количество элементов, которые нужно создать, а также максимальное количество элементов в очереди. Затем она входит в собственный производственный цикл. Далее блокирует мьютекс очереди, а затем разблокирует его снова в вызове go_produce.wait
stock
:static void producer(size_t id, size_t items, size_t stock)
{
for (size_t i = 0; i < items; ++i) {
unique_lock
go_produce.wait(lock,
[&] { return q.size() < stock; });
6. После того как производитель будет возобновлен, он создаст элемент и поместит его в очередь. Значение, помещаемое в очередь, определяется на основе выражения id*100+i
q.push(id * 100 + i);
pcout{} << " Producer " << id << " --> item "
<< setw(3) << q.back() << '\n';
7. После создания элемента можно возобновить приостановленных потребителей. Период приостановки, равный 90 миллисекундам, симулирует тот факт, что на создание элементов требуется какое-то время:
go_consume.notify_all();
this_thread::sleep_for(90ms);
}
pcout{} << "EXIT: Producer " << id << '\n';
}
8. Теперь перейдем к функции-потребителю, которая принимает в качестве аргумента только идентификатор. Она продолжает ожидать элементов при условии, что их производство не остановлено или очередь не пуста. Если очередь пуста, но производство не остановлено, то, возможно, скоро появятся новые элементы:
static void consumer(size_t id)
{
while (!production_stopped || !q.empty()) {
unique_lock
9. После блокирования мьютекса очереди снова разблокируем его, чтобы подождать установки значения переменной события go_consume
wait
, когда очередь содержит элементы. Второй аргумент 1s
указывает, что мы не должны ждать вечно. Если мы ждем больше секунды, то хотим выйти из функции wait
. Можно определить, вернула ли функция wait_for
значение (условие-предикат будет верным) или мы вышли из нее по тайм-ауту (в этом случае возвратится значение false
). При наличии в очереди новых элементов используем (потребим) их и выведем соответствующее сообщение на консоль: if (go_consume.wait_for(lock, 1s,
[] { return !q.empty(); })) {
pcout{} << " item "
<< setw(3) << q.front()
<< " --> Consumer "
<< id << '\n';
q.pop();
10. После потребления элемента оповещаем производителей и приостанавливаем поток на 130 миллисекунд для симуляции того факта, что потребление элементов тоже требует времени:
go_produce.notify_all();
this_thread::sleep_for(130ms);
}
}
pcout{} << "EXIT: Producer " << id << '\n';
}