11. В функции main
int main()
{
vector
vector
12. Далее порождаем три потока-производителя и пять потоков-потребителей:
for (size_t i = 0; i < 3; ++i) {
workers.emplace_back(producer, i, 15, 5);
}
for (size_t i = 0; i < 5; ++i) {
consumers.emplace_back(consumer, i);
}
13. Сначала позволим закончить работу потокам-производителям. Как только все из них вернут значения, установим значение флага production_stopped;
for (auto &t : workers) { t.join(); }
production_stopped = true;
for (auto &t : consumers) { t.join(); }
}
14. Компиляция и запуск программы дадут следующий результат. Сообщений получилось довольно много, поэтому мы приводим их в сокращенном виде. Как видите, производители приостанавливаются время от времени и позволяют потребителям использовать несколько элементов, чтобы снова получить возможность их производить. Интересно будет изменить время ожидания для производителей/потребителей, а также манипулировать количеством производителей/потребителей и максимальным количеством элементов в очереди, поскольку это значительно меняет шаблоны появления выходных сообщений:
$ ./multi_producer_consumer
Producer 0 --> item 0
Producer 1 --> item 100
item 0 --> Consumer 0
Producer 2 --> item 200
item 100 --> Consumer 1
item 200 --> Consumer 2
Producer 0 --> item 1
Producer 1 --> item 101
item 1 --> Consumer 0
...
Producer 0 --> item 14
EXIT: Producer 0
Producer 1 --> item 114
EXIT: Producer 1
item 14 --> Consumer 0
Producer 2 --> item 214
EXIT: Producer 2
item 114 --> Consumer 1
item 214 --> Consumer 2
EXIT: Consumer 2
EXIT: Consumer 3
EXIT: Consumer 4
EXIT: Consumer 0
EXIT: Consumer 1
Как это работает
Этот пример дополняет предыдущий. Вместо того чтобы синхронизировать одного производителя с одним потребителем, мы реализовали программу, которая синхронизирует M
N
потребителей. Кроме того, приостанавливаются не только потребители при отсутствии элементов для них, но и производители, если очередь становится Более сложной, чем предыдущий пример, в котором запускались всего один производитель и один потребитель, эту программу делает тот факт, что мы указываем потокам-производителям останавливаться, когда длина очереди превышает какое-то значение. Чтобы соответствовать этому требованию, мы реализовали два разных сигнала, имеющих собственную переменную condition_variable
1. go_produce
2. go_consume
Таким образом производители заполняют очередь элементами и сигнализируют с помощью события go_consume
if (go_consume.wait_for(lock, 1s, [] { return !q.empty(); })) {
// получили событие без тайм-аута
}
Производители, с другой стороны, ждут на следующей строке до тех пор, пока не смогут создавать элементы снова:
go_produce.wait(lock, [&] { return q.size() < stock; });