std::condition_variable
(Live Hacking Multithreaded Queue)¶
Overview¶
#include <condition_variable>
Generic communication device
Communicate changes in shared state ⟶ notification/wakeup
Used together with a mutex
Operations
Operation |
Description |
---|---|
wait |
|
signal (notify) |
Signal the condition variable (i.e., notify waiter) |
Sample communication scenarios
Semaphores
Events
Message queues
Overview: Operations¶
Operation |
Description |
---|---|
Constructor |
Default only (cannot move when someone’s waiting) |
|
|
|
Timeouts of various sorts |
|
Notify (and wakeup) one waiting thread |
|
Notify (and wakeup) all waiting threads |
Note
Broadcasting (notify_all()
) a condition variable wakes up all
waiting threads. If only one can handle the event, then waking all
is a waste of resources - in this case notify_one()
should
rather be used.
Communication, Polling: Thread-Safe Queue¶
Double-ended queue (
std::deque
)Producer adds to tail
Consumer removes from head
Protected by a mutex
Corner cases ⟶ conditions
Queue is full
Queue is empty
⟶ spin on either side if condition holds
#include <thread>
#include <mutex>
#include <deque>
#include <chrono>
#include <iostream>
template <typename T>
class ThreadSafeQueue
{
public:
ThreadSafeQueue(unsigned maxelem) : _maxelem(maxelem) {}
template<typename dur>
void push(T elem, dur d)
{
while (true) {
{
std::scoped_lock guard(_lock);
if (_queue.size() <= _maxelem) {
_queue.push_back(elem);
return;
}
}
std::this_thread::sleep_for(d);
}
}
template <typename dur>
T pop(dur d)
{
while (true) {
{
std::scoped_lock guard(_lock);
if (_queue.size() > 0) {
T elem = _queue.front();
_queue.pop_front();
return elem;
}
}
std::this_thread::sleep_for(d);
}
};
private:
std::deque<T> _queue;
unsigned _maxelem;
std::mutex _lock;
};
using namespace std::chrono_literals;
int main()
{
ThreadSafeQueue<int> queue(10);
std::thread producer([&queue](){
int i = 0;
while (true) {
queue.push(i++, 2ms);
std::this_thread::sleep_for(1ms);
}
});
std::thread consumer1([&queue](){
while (true)
std::cout << "1: " << queue.pop(1ms) << std::endl;
});
std::thread consumer2([&queue](){
while (true)
std::cout << "2: " << queue.pop(2ms) << std::endl;
});
producer.join();
consumer1.join();
consumer2.join();
return 0;
}
Anti-Polling: Thread-Safe Queue, And POSIX Condition Variables¶
#include <thread>
#include <deque>
#include <iostream>
#include <pthread.h>
template <typename T>
class ThreadSafeQueue
{
public:
ThreadSafeQueue(unsigned maxelem)
: _maxelem(maxelem)
{
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_not_empty, nullptr);
pthread_cond_init(&_not_full, nullptr);
}
void push(T elem)
{
pthread_mutex_lock(&_lock);
while (_queue.size() == _maxelem)
pthread_cond_wait(&_not_full, &_lock);
_queue.push_back(elem);
pthread_mutex_unlock(&_lock);
pthread_cond_signal(&_not_empty);
}
T pop()
{
pthread_mutex_lock(&_lock);
while (_queue.size() == 0)
pthread_cond_wait(&_not_empty, &_lock);
T elem = _queue.front();
_queue.pop_front();
pthread_mutex_unlock(&_lock);
pthread_cond_signal(&_not_full);
return elem;
};
private:
std::deque<T> _queue;
unsigned _maxelem;
pthread_mutex_t _lock;
pthread_cond_t _not_empty;
pthread_cond_t _not_full;
};
using namespace std::chrono_literals;
int main()
{
ThreadSafeQueue<int> queue(10);
std::thread producer([&queue](){
int i = 0;
while (true) {
queue.push(i++);
std::this_thread::sleep_for(1ms);
}
});
std::thread consumer1([&queue](){
while (true)
std::cout << "1: " << queue.pop() << std::endl;
});
std::thread consumer2([&queue](){
while (true)
std::cout << "2: " << queue.pop() << std::endl;
});
producer.join();
consumer1.join();
consumer2.join();
return 0;
}
Discussion: Signalling And Waiting, Predicates: Separation Of Concerns¶
Waiting - and atomically releasing a mutex - is one thing
Necessary to implement any kind of parallel handshake
⟶ OS building block
The condition itself is part of the concrete problem
Here:
Full:
_queue.size() == _maxelem
⟶ wait until not emptyEmpty:
_queue.size() == 0
⟶ wait until not full
⟶ predicate
Signalling wakes a waiter (possibly immediately, depending on realtime attributes, and whatnot)
Signalling while holding the mutex ⟶ waiter has to go to sleep waiting for the mutex again
⟶ best done when not holding the mutex
Mutex: scoped locking would be great also
That being said …
Thread-Safe Queue, And C++ Condition Variables¶
Wish list
Scoped locking:
std::unique_lock
(see here)Predicate: parameter (usually a lambda) to
condvar.wait()
#include <thread>
#include <mutex>
#include <condition_variable>
#include <deque>
#include <chrono>
#include <iostream>
template <typename T>
class ThreadSafeQueue
{
public:
ThreadSafeQueue(unsigned maxelem)
: _maxelem(maxelem)
{}
void push(T elem)
{
{
std::unique_lock<std::mutex> guard(_lock);
_not_full.wait(guard, [this](){return _queue.size() < _maxelem;});
_queue.push_back(elem);
} // scoped locking: unlock before signal
_not_empty.notify_one();
}
T pop()
{
T elem;
{
std::unique_lock<std::mutex> guard(_lock);
_not_empty.wait(guard, [this](){return _queue.size() > 0;});
elem = _queue.front();
_queue.pop_front();
}
_not_full.notify_one();
return elem;
};
private:
std::deque<T> _queue;
unsigned _maxelem;
std::mutex _lock;
std::condition_variable _not_empty;
std::condition_variable _not_full;
};
using namespace std::chrono_literals;
int main()
{
ThreadSafeQueue<int> queue(10);
std::thread producer([&queue](){
int i = 0;
while (true) {
queue.push(i++);
std::this_thread::sleep_for(1ms);
}
});
std::thread consumer1([&queue](){
while (true)
std::cout << "1: " << queue.pop() << std::endl;
});
std::thread consumer2([&queue](){
while (true)
std::cout << "2: " << queue.pop() << std::endl;
});
producer.join();
consumer1.join();
consumer2.join();
return 0;
}