bool finished {false};
3. Реализуем функцию-производитель. Она принимает аргумент items
cv.notify_all()
. Данная функция пробуждает потребителя. Далее мы увидим на стороне потребителя, как это работает.static void producer(size_t items) {
for (size_t i {0}; i < items; ++i) {
this_thread::sleep_for(100ms);
{
lock_guard
q.push(i);
}
cv.notify_all();
}
4. После создания всех элементов снова блокируем мьютекс, поскольку нужно задать значение для бита finished
cv.notify_all()
: {
lock_guard
finished = true;
}
cv.notify_all();
}
5. Теперь можем реализовать функцию потребителя. Она не принимает никаких аргументов, поскольку будет слепо перебирать все элементы до тех пор, пока очередь не опустеет. В цикле, который станет выполняться до тех пор, пока не установится значение переменной finished
finished
. В момент получения блокировки последняя вызовет функцию cv.wait
, передав ей блокировку и лямбда-выражение в качестве аргументов. Лямбда-выражение — это предикат, указывающий, жив ли еще поток-производитель и есть ли значения в очереди.static void consumer() {
while (!finished) {
unique_lock
cv.wait(l, [] { return !q.empty() || finished; });
6. Вызов cv.wait
finished
— таким способом производитель указывает, что новых элементов больше не будет. while (!q.empty()) {
cout << "Got " << q.front()
<< " from queue.\n"; q.pop();
}
}
}
7. В функции main
int main()
{
thread t1 {producer, 10};
thread t2 {consumer};
t1.join();
t2.join();
cout << "finished!\n";
}
8. Компиляция и запуск программы дадут следующий результат. При выполнении программы можно увидеть, что между появлением каждой строки проходит какое-то время (100 миллисекунд), поскольку создание элементов занимает время:
$ ./producer_consumer
Got 0 from queue.
Got 1 from queue.
Got 2 from queue.
Got 3 from queue.
Got 4 from queue.
Got 5 from queue.
Got 6 from queue.
Got 7 from queue.
Got 8 from queue.
Got 9 from queue.
finished!
Как это работает
В данном примере мы просто запустили два потока. Первый создает элементы и помещает их в очередь. Второй извлекает их из очереди. При взаимодействии с очередью один из этих потоков блокирует общий мьютекс mut
Помимо очереди и мьютекса мы объявили четыре переменные, которые были включены во взаимодействие производителя/потребителя:
queue
mutex mut;
condition_variable cv;
bool finished {false};
Переменную finished
true
, когда производитель создал ограниченное количество элементов. Когда потребитель видит, что значение этой переменной равно true
, он использует последние элементы и завершает работу. Но для чего нужна переменная condition_variable cv
? Мы использовали cv
в двух разных контекстах. Один из контекстов