The C++11 standard library provides several mechanisms to support concurrency. The first is std::thread
, which together with sync objects (std::mutex
, std::lock_guards
and std::condition_variables
, etc) offers a thread-based approach to achieve concurrency.
However, working at the level of threads and locks can be quite tricky, and thus a higher-level of abstraction, task-based concurrency, is also supported in C++11, in the form of promises and futures. std::promise<T>
and std::future<T>
work in pairs to separate the act of calling a func from the act of waiting for the call rsts.
Class std::packaged_task<T>
makes codes more readable; it is a container for a task and its promise. The template type is the type of the task func, and it automatically creates and manages a std::promise<T>
for use.
Things become much simpiler if we use the std::async()
func, which hides all the implementation, platform specific details. It takes as input a callable object and returns a future that will contain the return value.
A std::promise<T>
obj represents a result in the callee-side of the asynchronous call, and it is the channel for passing the result asynchronously to the caller. When the task completes, it puts its result into a promise object calling promise::set_value
.
When the caller finally needs to access the result, it will call the blocking future::get
to retrieve it. If the task has already completed, the result will be immediately available, ow, the caller thread will suspend until the result becomes available.
This shared state can be associated to a future object by calling member get_future
. After the call, both objects share the same shared state:
- The promise object is the asynchronous provider and is expected to set a value for the shared state at some point.
- The future object is an asynchronous return object that can retrieve the value of the shared state, waiting for it to be ready, if necessary.
#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
void accumulate(std::vector<int>::iterator first,
std::vector<int>::iterator last,
std::promise<int> accumulate_promise) {
int sum = std::accumulate(first, last, 0);
accumulate_promise.set_value(sum); // Notify future
}
int main() {
std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
//create promise
std::promise<int> accumulate_promise;
//engagement with future
std::future<int> accumulate_future = accumulate_promise.get_future();
std::thread work_thread(accumulate, numbers.begin(),
numbers.end(), std::move(accumulate_promise));
accumulate_future.wait(); // wait for result
std::cout << "result=" << accumulate_future.get() << '\n';
work_thread.join(); // wait for thread completion
}
The class template std::promise
provides a facility to store a value or an exception that is later acquired asyn via a std::future
obj created by the std::promise
obj. Each promise is associated with a shared state, which contains some state info and a result which may be not yet evaluated, to be a value or an exception.
The promise is the "push" end of promise-future communication channel: the operation that stores a value in the shared state synchronizes-with the successful return from any func that is waiting on the shared state (e.g., std::future::get
).
get_future
: returns a future obj associated with the promised result;set_value
: atomically stores the result into the shared state and makes the state ready;set_value_at_thread_exit
: stores the value into the shared state without making the state ready immediately. The state is made ready when the current thread exits, after all variables with thread-local storage duration have been destroyed;set_exception
: atomically stores the exception ptr into the shared state and makes the state ready;
std::promise<int> result;
std::thread t([&]{
try {
// code that may throw
throw std::runtime_error("Example");
} catch(...) {
try {
// store anything thrown in the promise
result.set_exception(std::current_exception());
} catch(...) {} // set_exception() may throw too
}
});
try {
std::cout << result.get_future().get();
} catch(const std::exception& e) {
std::cout << "Exception from the thread: " << e.what() << '\n';
}
t.join();
set_exception_at_thread_exit
: similar to that of value.
The class template std::future
provides a mechanism to access the result of asynchronous operations:
std::async
, std::packaged_task
, or std::promise
) can provide a std::future
obj to the creator of that asyn operation;std::future
. These methods may block if the asyn operation has not yet provided a value;std::promise::set_value
) that is linked to the creator's std::future
.Member functions:
get()
for only once, because get()
invalidates the future's state;share(): transfers the shared state from *this
to a shared_future
and returns it;
int get_value() { return 10; }
std::future<int> fut = std::async(get_value);
std::shared_future<int> shfut = fut.share();
//shared futures can be accessed multi times
std::cout << "value: " << shfut.get() << '\n';
std::cout << "its double: " << shfut.get()*2 << '\n';
valid(): checks if the future has a shared state;
wait(): waits for the result to become available;
wait_for(timeout_duration): waits for teh result to become available. Blocks until specified timeout_duration
has elapsed or the result becomes available, whichever comes first. Returns value (deferred
, ready
or timeout
) identifies the state of the result.
std::future<int> future = std::async(std::launch::async,
[](){ std::this_thread::sleep_for(std::chrono::seconds(3));
return 8; });
std::cout << "waiting ...\n";
std::future_status status;
do{
status = future.wait_for(std::chrono::seconds(1));
if (status == std::future_status::deferred) {
std::cout << "deferred\n";
} else if (status == std::future_status::timeout) {
std::cout << "timeout\n";
} else if (status == std::future_status::ready) {
std::cout << "ready!\n";
}
} while (status != std::future_status::ready);
std::cout << "result is " << future.get() << '\n';
wait_util(timeout_time): waits for the rst to become available. Similar to wait_for
.
//try to call func asynchronously
std::future<...> f(std::async(func));
...
//wait-1
f.wait(); //wait for func to be done (might start bkgd task)
//wait-2: wait for a limited time
f.wait_for(std::chrono::seconds(10)); //wait for at most 10 secs for func
//wait-3: wait until a specific timepoint has reached
f.wait_until(std::system::now()+std::chrono::minutes(1));
std::package_task
wraps any callable (func, lambda, bind expr, or another func obj) so that it can be invoked asyn. Its return value or exception thrown is stored in a shared state which can be accessed through std::future
objs.
get_future
can be called only once for each packaged_task;// unique function to avoid disambiguating the std::pow overload set
int f(int x, int y) { return std::pow(x,y); }
void task_lambda() {
std::packaged_task<int(int,int)> task([](int a, int b) {
return std::pow(a, b); });
std::future<int> result = task.get_future();
task(2, 9);
std::cout << "task_lambda:\t" << result.get() << '\n';
}
void task_bind() {
std::packaged_task<int()> task(std::bind(f, 2, 11));
std::future<int> result = task.get_future();
task();
std::cout << "task_bind:\t" << result.get() << '\n';
}
void task_thread() {
std::packaged_task<int(int,int)> task(f);
std::future<int> result = task.get_future();
std::thread task_td(std::move(task), 2, 10);
task_td.join();
std::cout << "task_thread:\t" << result.get() << '\n';
}
int main() {
task_lambda();
task_bind();
task_thread();
}
With packaged_task
, we still have to manually create the threads to run the task, and decide on which thread the task will run. Things can be much simpler using the high-level std::async()
interface.
async()
provides an interface to let a piece of functionality, a callable obj, run in the background as a separate thread, if psbl;future<>
allows you to wait for the thread to be finished and provides access to its outcome: return value of exception, if any.int doSomething (char c) {
//...
return c;
}
int func1 () {
return doSomething('.');
}
int func2 () {
return doSomething('+');
}
int main() {
//start func1 asynchronously (now or later or never)
std::future<int> result1(std::async(func1));
//call func2 synchronously (here and now)
int result2 = func2();
//print result (wait for func1() to finish and add its result to result2
int result = result1.get() + result2;
cout << result << endl;
}
instead of calling: int result = func1() + func2();
, we call:
std::future<int> result1(std::async(func1));
int result2 = func2();
int result = result1.get() + result2;
func1()
is tried to start in the background, using std::async
, and assign the result to an obj of class std::future
.
With the call of get()
, one of the three things might happen:
func1()
was started with async()
in a separate thread and has already finished, we immediately get its result;func1()
was started but has not finished yet, get()
blocks and waits for its end and yields the result;func1()
was not started yet, it will be forced to start now and, like a synchronous func call, get()
will block until it yields the result.Without calling get()
, there is no guarantee that func1()
will ever be called. We have to ensure that we ask for the result of a functionality started with async()
no earlier than necessary:
std::future<int> result1(std::async(func1));
//might call func2() after func1() ends
int result = func2() + result1.get();
To have the best effect, in general, we should maximize the distance between calling async()
and calling get
, i.e., call early and return late.
The object passed to async
may be any type of callable object: function, member func, func object, or lambda (std::async([]{ ... })
).
The exact behavior of async()
is complex and highly depends on the lanunch policy, which can be passed as the first optional argument.
async (launch policy, Fn&& fn, Args&& ...args
launch::async
, launches a new thread to call fn
(as if a thread obj is constructed with fn
and args
as arguments, and accessing the shared state of the returned future
joins it); void print_ten (char c, int ms) {
for (int i=0; i<10; ++i) {
std::this_thread::sleep_for (std::chrono::milliseconds(ms));
std::cout << c;
}
}
int main ()
{
std::cout << "with launch::async:\n";
std::future<void> foo = std::async (std::launch::async,print_ten,'*',100);
std::future<void> bar = std::async (std::launch::async,print_ten,'@',200);
// async "get" (wait for foo and bar to be ready):
foo.get();
bar.get();
std::cout << "\n\n";
launch::deferred
, the call to fn
is deferred until the shared state of the returned future
is accessed (with wait
or get
). At that point, fn
is called and the func is no longer considered deferred. When this call returns, the shared state of the returned future
is made ready;launch::async|launch::deferred
, the func chooses the policy auto (at some point). void print_ten (char c, int ms) {
for (int i=0; i<10; ++i) {
std::this_thread::sleep_for (std::chrono::milliseconds(ms));
std::cout << c;
}
}
int main ()
{
std::cout << "with launch::async: ";
std::future<void> foo = std::async (std::launch::async,print_ten,'*',100);
std::future<void> bar = std::async (std::launch::async,print_ten,'@',200);
// async "get" (wait for foo and bar to be ready):
foo.get(); bar.get();
std::cout << "\n\n";
std::cout << "with launch::deferred: ";
foo = std::async (std::launch::deferred,print_ten,'*',100);
bar = std::async (std::launch::deferred,print_ten,'@',200);
// deferred "get" (perform the actual calls):
foo.get(); bar.get();
std::cout << '\n';
return 0;
}
possible output:
with launch::async: **@**@**@*@**@*@@@@@
with launch::deferred: **********@@@@@@@@@@
the class thread represents a single thread of execution. Threads allow multi pieces of code to run asynchronously and simultaneously.
Constructors:
thread()
: createa a new thread obj which does not represent a thread;thread(thread&& other)
: move cstr. Constructs the thread obj to represent the thread of execution that was represented by other. After this call other no longer represents a thread of execution;thread(Function&& f, Args&&... args)
: creates a new std::thread
obj and associates it with a thread of execution. First the cstr copies/moves all arguments (both the func obj f and all args ...) to thread-accessible storage;thread(const thread&)=delete
: the copy cstr is deleted; threads are not copyable. No two std::thread
objs may represent the same thread of execution.void f1(int n) {
}
void f2(int& n) {
}
int main() {
int n = 0;
std::thread t1; //t1 is not a thread
std::thread t2(f1, n+1); //pass by value
std::thread t3(f2, std::ref(n)); //pass by ref
std::thread t4(std::move(t3)); //t4 is now running f2()
//t3 is no longer a thread
t2.join();
t4.join();
}
Observers:
joinable
: checks if the thread obj identifies an active thread of execution. Specially, returns true
if get_id() != std::thread:id()
. So, a default constructed thread is not joinable.
A thread that has finished executing code, but has not yet been joined is still considered an active thread of execution and is therefore joinable.
void foo() {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
int main() {
std::thread t;
std::cout << "before starting, joinable: " << t.joinable() << '\n';
t = std::thread(foo);
std::cout << "after starting, joinable: " << t.joinable() << '\n';
t.join();
std::cout << "after joining, joinable: " << t.joinable() << '\n';
}
-------output: 0 1 0
get_id
: returns a value of std::thread::id
identifying the thread associated with *this
;
native_handle
: returns the impl defined underlying thread handle;
hardware_concurrency
: returns the #concurrent threads supported by the impl. The value should be considered only a hint.
Operations:
join
: blocks the current thread until the thread identified by *this
finishes its execution;detach
: separates the thread of executioin from the thread object, allowing execution to continue independently. Any allocated resources will be freed once the thread exits;swap(thread& other)
: exchanges the underlying handles of two thread objects.For any thread, including the main thread, <thread>
declares namespace std::this_thread
, which provides the thread_specific global funcs:
get_id()
: return the id of the current thread;sleep_for(dur)
: blocks the execution of the current thread for at least the specified duration;sleep_until(tim)
: blocks the execution of the current thread until specified sleep_time has been reached.yield()
: provides a hint to the imple to reschedule the execution of threads, allowing other threads to run. to start a thread, we simply have to declare an obj of class std::thread
and pass the desired task as initial argument, and then either wait for its end or detach it:
void doSomething();
std::thread t(doSomething); //start doSomething() in the background
...
t.join(); //wait for t to finish (block until doSomething() ends)
As for async()
, we can pass anything that's a callable object (function, member func, func obj, lambda) together with psbl additional arguments. Unless you really you what you are doing, you should pass all objs necessary to process the passed functionality by value so that the thread uses only local copies.
void doSomething(int num, char c) {
//any uncaught exception would cause the prog to terminate
try {
...
}
//make sure no exception leaves the thread and terminates the program
catch(const exception& e) {
cerr << "thread-exception (thread "
<< this_thread::get_id() << "): " << e.what << endl;
}
catch(...) {
cerr << "thread-exception (thread "
<< this_thread::get_id() << ")" << endl;
}
}
int main() {
//creating a thread might throw a std::system_error
try{
thread t1(doSomething, 5, '.'); //print 5 dots in separate thread
cout << "- started fg thread " << t1.get_id() << endl;
//print other chars in other bg threads
for(int i=0; i<5; ++i) {
thread t(doSomething, 10 'a'+i); //print 10 chars in separate thread
cout << "- detach started bg thread " << t.get_id() << endl;
t.detach(); //detach thread into the bg
}
cin.get(); wait for any input (return)
cout << "- join fg thread " << t1.get_id() << endl;
t1.join(); //wait for t1 to finish
}
catch (const exception& e) {
cerr << "exception: " << e.what() << endl;
}
}
Detached threads can easily become a problem if they use nonlocal resources. Passing variables and objs to a thread by ref is always a risk, and passing by value is strongly recommended.
And, the lifetime problem also applies to global and static objs, because when the program exits, the detached thread might still run, which means that it might access global or static obj that are already destroyed or under construction. Thus, we should ensure that these global/static objs are not destroyed before all detached threads accessing them are finished. Approaches can be:
quick_exit()
, which is to end a program without calling the dstrs for global and static objs.Because std::cin
, std::cout
and std::cerr
and the other global stream objs are not destroyed during program execution, access to these objs in detached threads should introduce no undefined behavior. However, other problems, such as interleaved chars, might occur.
The only safe way to terminate a detached thread is with one of the "...atthreadexit()" functions, which force the main thread to wait for the detached thread to truly finish.
thread IDs
This ID is a special type std::thread::id, which is guaranteed to be unique for each thread. Threads IDs can be obtained by the thread obj or inside a threas using namespace this_thread
.
std::thread t1(doSomething, 5, '.');
std::thread t2(doSomething, 5, '+');
std::thread t3(doSomething, 5, '*');
std::cout << "t3 ID: " << t3.get_id() << endl;
std::cout << "main ID: " << std::this_thread::get_id() << endl;
std::cout << "nothread ID: " << std::thread::id() << endl;
The only operations allowed for thread IDs are comparisons and calling the output operator for a stream. We cannot make any further assumptions, such as "no thread" has ID 0 or the main thread has ID 1.
std::thread::id masterThreadID;
void doSomething(){
if(std::this_thread::get_id() == masterThreadID) {
...
}
}
std::thread master(doSomething);
masterThreadID = master.get_id();
...
std::thread slave(doSomething);
...
Each compiler can optimize code as long as the behavior of the program visible from the outside behaves the same (as-if rule). Hence, both compiler and hardware vendors can reorder the code to speed the program, as long as the observable behavior remains stable. E.g., compilers might unroll loops, reorder statemenets, eliminate dead code, prefetch data, and in modern architecture, a hardware buffer might reorder loads or stores, etc.
To give compilers and hardware enough freedom to optimize code, C++ does NOT in general give a couple of guarantees, which might cost too much in performance. In C++, we might have the following problems:
Unsynchronized data access: when two threads running in parallel read and write the same data, it is open which statement comes first.
Half-written data: when one thread reads data, which another thread modifies, the reading thread might even read the data in the middle of the write of the other thread, thus reading neither the old nor the new value.
Reordered statements: statements and operations might be reordered so that the behavior of each single thread is correct, but in combination of all threads, expected behavior is broken.
//ORIG CODE | //GENERATED CODE
//- providing thread | //- consuming thread
long data; | while(!readyFlag) { //loop until data is ready
bool readyFlag = false; | ;
| }
data = 42; | foo(data);
readyFlag = true; |
|
//PSBL CODE |
//- providing thread | //- consuming thread
long data; | foo(data);
bool readyFlag = false; | while(!readyFlag) { //loop until data is ready
| ;
readyFlag = true; | }
data = 42; |
To solve the three major problems of concurrent data access, we need the following concepts:
C++ standard library provides different ways (from high-level to low-level) to deal with the concepts:
A mutex, or mutual exclusion, is a synchronization primitive that can be used to protect data from being simultaneously accessed by multi threads. mutex offers exclusive, non-recursive ownership semantics.
#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
int g_num = 0; // protected by g_num_mutex
std::mutex g_num_mutex;
void slow_increment(int id) {
for (int i = 0; i < 3; ++i) {
g_num_mutex.lock();
++g_num;
std::cout << id << " => " << g_num << '\n';
g_num_mutex.unlock();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
int main() {
std::thread t1(slow_increment, 0);
std::thread t2(slow_increment, 1);
t1.join(); t2.join();
}
This simple lock-unlock
approach can, however, become pretty complicated. E.g., we should ensure that an exception, which ends an exclusive access, also unlocks the crspding mutex; ow, a rsc might become locked forever. Also, deadlock scenarios are psbl, with two thread waiting for a lock of the other thread before freeing their own lock.
To deal with exceptions (guarnatee exception safety), we shoud not lock and unlock by ourselves; instead, we should use the RAII principle (Resource Acquisition Is Initialization), whereby the cstr acquires a rsc so that the dstr, which is always called even when an exception causes the end of the lifetime, releases rsc automatically.
std::mutex
is usually not accessed directly: std::unique_lock
and std::lock_guard
are used to manage locking in exception-safe manner.
Note that the locks should be limited to the shortest period psbl because they block other code from running in parallel.
std::map<std::string, std::string> g_pages;
std::mutex g_pages_mutex;
void save_page(const std::string &url) {
// simulate a long page fetch
std::this_thread::sleep_for(std::chrono::seconds(2));
std::string result = "fake content";
std::lock_guard<std::mutex> guard(g_pages_mutex);
g_pages[url] = result;
}//lock released here
int main() {
std::thread t1(save_page, "http://foo");
std::thread t2(save_page, "http://bar");
t1.join(); t2.join();
// safe to access g_pages without lock now, as the threads are joined
for (const auto &pair : g_pages) {
std::cout << pair.first << " => " << pair.second << '\n';
}
}
Sometimes, the ability to lock recursively is required. Typical examples are active objs or monitors, which contain a mutex and take lock inside every public method to protect data races corrupting the internal state of the obj. E.g., a db interface might look as follows:
class DatabaseAccess {
private:
std::mutex dbMutex;
... //state of database access
public:
void createTable (...) {
std::lock_guard<std::mutex> lg(dbMutex);
...
}
void insertData (...) {
std::lock_guard<std::mutex> lg(dbMutex);
...
}
...
};
When we introduce a public member func that might call other public member funcs, this can become complicated:
void createTableAndInsertData (...) {
std::lock_guard<std::mutex> lg(dbMutex);
...
createTable(...); //ERROR: deadlock because dbMutex is locked again
}
Calling createTableAndInsertData()
will result in a deadlock because after locking dbMutex
, the call of createTable()
will try to lock dbMutex
again, which will block until the lock of dbMutex
is available, which will never happen because createTableAndInsertData()
will block until createTable()
is done.
A recursive mutex is a lockable object, just like mutex, but allows the same thread to acquire multi levels of ownership over the mutex obj. The lock is released when the last crspding unlock()
is called.
class DatabaseAccess {
private:
std::recursive_mutex dbMutex;
... //state of database access
public:
void createTable (...) {
std::lock_guard<std::recursive_mutex> lg(dbMutex);
...
}
void insertData (...) {
std::lock_guard<std::recursive_mutex> lg(dbMutex);
...
}
void createTableAndInsertData (...) {
std::lock_guard<std::recursive_mutex> lg(dbMutex);
...
createTable(...); //OK: no deadlock
}
...
};
sometimes a program wants to acquire a lock but doesn't want to block (forever) if this is not psbl. For this case, mutexes provide a try_block()
member func that tries to acquire a lock.
std::mutex m;
//try to acquire a lock and do other stuff if not psbl
while (m.try_lock() == false) {
doSomeOtherStuff();
}
std::lock_guard<std::mutex> lb(m, std::adopt_lock);
...
To wait only for a particular amount of time, we can use a timed mutex, try_lock_for()
and try_lock_until
provided in std::timed_mutex
and std::recursive_timed_mutex
, respectively.
std::timed_mutex m;
//try for 1sec to acquire a lock
if (m.try_lock_for(std::chrono::seconds(1))) {
std::lock_guard<std::timed_mutex> lg(m, std::adopt_lock);
...
} else {
couldNotGetTheLock();
}
Mutex constants are used as tag argument for unique_lock
to select a specific constructor:
unique_lock
objs constructed with adopt_block
do not lock the mutex obj on construction, assuming instead that it is already locked by the current thread, and thus just adopts the ownership of the existing lock on the mutex rather than attempt to lock the mutex in the cstr.unique_lock
objs constructed with defer_lock
do not lock the mutex obj automatically on construction, initializing them as not owning a lock. And later, the lock can be acquired by passing the std::unique_lock
obj itself to std::lock
.try_to_lock
: unique_lock
objs constructed with try_to_lock
attempt to lock the mutex obj by calling its try_lock
member instead of its lock
member.C++ enables to lock multi mutexes, avoiding deadlock: std::lock()
locks all mutexes passed as arguments, blocking until all mutexes are locked or until an exception is thrown. In the latter case, it unlocks mutexes already successfully locked.
std::mutex m1, m2;
int idx = std::try_lock(m1, m2); //try to lock both mutexes
if(idx < 0) { //both locks succeeded
std::lock_guard<std::mutex> lockM1(m1, std::adopt_lock);
std::lock_guard<std::mutex> lockM1(m2, std::adopt_lock);
...
} /*auto unlock all mutexes*/ else {
//idx has zero-based index of first failed lock
cerr << "could not lock mutex m" << idx+1 << endl;
}
unique_lock
Besides class lock_guard<>
, C++ standard library provides class unique_lock<>
, which is lot more flexible when dealing with locks for mutexes. It allows deferred locking, time-constrained attempts at locking, recursive locking, transfer of lock ownership, and use with condition variables.
std::lock_guard
keeps its associated mutex locked during the entire life time by acquiring the lock on construction and releasing the lock on destruction. std::unique_lock
is a lot more flexible when dealing with mutex locks. It has the same interface as std::lock_guard
but provide extra abilities to program explicitly when and how to lock or unlock its mutex. Thus, this lock obj may or may not have a mutex locked (also known as owning a mutex).std::unique_lock
can be transferred bt instances, and hence std::unique_lock
is movable whereas std::lock_guard
is not.std::mutex Mutex;
std::unique_lock<std::mutex> Foo() {
std::unique_lock<std::mutex> lock(Mutex);
return lock;
// mutex isn't unlocked here!
}
void Bar() {
auto lock = Foo();
} // mutex is unlocked when lock goes out of scope
Member functions:
lock()
: locks the associated mutes;
int counter = 0;
std::mutex counter_mutex;
std::vector<std::thread> threads;
auto worker_task = [&](int id) {
std::unique_lock<std::mutex> lock(counter_mutex);
++counter;
std::cout << id << ", initial counter: " << counter << '\n';
lock.unlock();
// don't hold the lock while we simulate an expensive operation
std::this_thread::sleep_for(std::chrono::seconds(1));
lock.lock();
++counter;
std::cout << id << ", final counter: " << counter << '\n';
};
for (int i = 0; i < 10; ++i) threads.emplace_back(worker_task, i);
for (auto &thread : threads) thread.join();
try_lock()
: tries to lock the associated mutex, returns if the mutex is not available.
try_lock_for
: tries to lock the associated mutex. Returns if the mutex has been unavailable for the specified time duration.
try_lock_until
: tries to lock the associated mutex, returns if the mutex has been unavailable until specified time point has been reached.
unlock
: unlocks the associated mutex.
swap
: swaps state with another std::unique_lock
.
release
: disassociates the associated mutex without unlocking it.
mutex()
: returns a ptr to the associated mutex.
owns_lock
or operator bool
: tests whether the lock owns its associated mutex.
Sometimes multi threads might not need some functionality that should get processed whenever the first thread needs it. A typical example is lazy initialization: the first time one of the threads needs sth that has to get processed, you process it (but not before, because u want to save the time to process it if it is not needed).
For single-thread environment:
static std::vector<std::string> staticData;
void foo() {
if (staticData.empty()) {
staticData = initializeStaticData();
}
...
}
Such code doesn't work in multithreaded context, because of data races in checking. Instead of using mutex, we can use C++ standard library funcs std::once_flag
and std::call_once
:
std::once_flag oc; //global flag
...
std::call_once(os, initialize); //init if not inited yet
static std::vector<std::string> staticData;
void foo() {
static std::once_flag oc;
std::call_once(oc, []{
staticData = initializeStaticData();
});
...
}
The 1st argument passed to call_once()
must be the crspding once_flag
; further arguments are the usual arguments for callable objects: func, member func, func obj, or lambda, plus optional arguments for the func called. Thus, lazy initialization of an obj used in multi-threads might as follow:
class X {
private:
mutable std::once_flag initDataFlag;
void initData() const;
public:
data getData() const {
std::call_once(initDataFlag, &X::initData, this);
...
}
};
Sometimes, tasks performed by different threads have to wait for each other. Thus, we have to synchronize concurrent operations for other reasons than to access the same data.
Condition variables can be used to synchronize logical dependencies in data flow bt threads. A condition variable is a variable by which a thread can wake up one or multi other waiting threads.
In principle, a condition variable works as follows:
include both <mutex>
and <condition_variable>
to declare a mutex and a condition variable:
#include <mutex>
#include <condition_variable>
std::mutex readyMutex;
std::condition_variable readyCondVar;
the thread (or one of multi threads) that signals the fulfillment of a condition has to call
readyCondVar.notify_one(); //notify one of the waiting threads
//or
readyCondVar.notify_all(); //notify all the waiting threads
any thread that waits for the condition has to call
std::unique_lock<std::mutex> l(readyMutex);
readyCondVar.wait(l);
Thus, the thread providing or preparing sth simply calls notify_one()
or notify_all()
for the cond var, which for one or all the waiting threads is the moment to wake up.
Cond var in general might have so-called spurious wakeups, i.e., a wait on a cond var may return even if the cond var has not been noified. Thus, a wakeup does not necessarily mean that the required cond now holds. Rather, after a wakeup we still need some code to verify that the cond in fact has arrived.
bool readyFlag; //a flag signaling the cond is indeed satisfied
std::mutex readyMutex; //a mutex
std::condition_variable readyCondVar; //a cond var
//locks the mutex, updates the cond, unlocks the mutex and notifies the cond var
void thread1() {
//do sth thread2 needs as preparation
std::cout << "<return>" << std::endl;
std::cin.get();
//signal that thread1 has prepared a cond
{
std::lock_guard<std::mutex> lg(readyMutex);
readyFlag = true;
}//release lock
readyCondVar.notify_one();
}
void thread2() {
//wait until thread1 is ready (readyFlag is true)
{
std::unque_lock<std::mutex> ul(readyMutex);
readyCondVar.wait(ul, []{ return readyFlag; });
}//release lock
//do whatever shall happen after thread1 has prepared things
std::cout << "done" << std::endl'
}
int main() {
auto f1 = std::async(std::launch::async, thread1);
auto f2 = std::async(std::launch::async, thread2);
}
The waiting thread locks the mutex with a unique_lock
, waits for the notification while checking the condition and releases the lock:
{
std::unque_lock<std::mutex> ul(readyMutex);
readyCondVar.wait(ul, []{ return readyFlag; });
}//release lock
Here, a wait()
member for cond vars is used as follow: pass the lock ul
for the mutex readyMutex
as 1st argument and a lambda as callable object double checking the cond as second argument. The effect is that wait()
internally calls a loop until the passed callable returns true. Thus, the code has the same effect as the following code:
{
std::unque_lock<std::mutex> ul(readyMutex);
while (!readyFlag) {
readyCondVar.wait(ul);
}
}//release lock
three threads push values into a quque that two other threads read and process:
std::queue<int> queue;
std::mutex queueMutex;
std::condition_variable queueCondVar;
void provider (int val) {
//push different vals
for (int i=0; i<6; ++i) {
{
std::lock_guard<std::mutex> lg(queueMutex);
queue.push(val+i);
}//release lock
queueCondVar.notify_one();
std::this_thread::sleep_for(
std::chrono::milliseconds(val));
}
}
void consumer (int num) {
//pop vals if available (num identifies the consumer)
while(true) {
int val;
{
std::unique_lock<std::mutex> ul(queueMutex);
queueCondVar.wait(ul, []{ return !queue.empty(); });
val = queue.front();
queue.pop();
}//release lock
std::cout << "consumer " << num << ": " << val << endl;
}
}
int main() {
//start three providers for values 100+, 300+, 500+
auto p1 = std::async(std::launch::async, provider, 100);
auto p2 = std::async(std::launch::async, provider, 300);
auto p3 = std::async(std::launch::async, provider, 500);
//start two consumers printing the vals
auto c1 = std::async(std::launch::async, consumer, 1);
auto c2 = std::async(std::launch::async, consumer, 2);
}
Once a std::atomic<T>
object has been constructed, operations on it behave as if they were inside a mutex-protected critical section, but the operations are generally implemented using special machine instructions that are more efficient than would be the case if a mutex were employed.
std::atomic<int> ai(0); //initialize ai to 0
ai = 10; //atomically set ai to 10
std::cout << ai; //atomically read ai's value
++ai; //atomically increment ai to 11
--ai; //atomically decrement ai to 10
During execution of these statements, other therads reading ai
may see only values of 0
, 10
or 11
. No other values are psbl.
using lock:
#include <mutex>
...
bool readyFlag;
std::mutex readyFlagMutex;
void thread1() {
// do something thread2 needs as preparation
...
std::lock_guard<std::mutex> lg(readyFlagMutex);
readyFlag = true;
}
void thread2() {
// wait until readyFlag is true (thread1 is done)
{
std::unique_lock<std::mutex> ul(readyFlagMutex);
while (!readyFlag) {
ul.unlock();
std::this_thread::yield(); // hint to reschedule to the next thread
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ul.lock();
}
} // release lock
// do whatever shall happen after thread1 has prepared things
...
}
using atomic:
#include <atomic> // for atomic types ...
std::atomic<bool> readyFlag(false);
void thread1() {
// do something thread2 needs as preparation ...
readyFlag.store(true);
}
void thread2() {
// wait until readyFlag is true (thread1 is done)
while (!readyFlag.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// do whatever shall happen after thread1 has prepared things
...
}
load()
: performs a so-called acquire operation on the affected mem loc, which by default ensures that all following mem operations, whether atomic or not, become visible to other threads after the load operation.store()
: performs a so-called release operation on the affected mem loc, which by default ensures that all prior mem operations, whether atomic or not, become visible to other threads before the effect of the store operation.long data;
std::atomic<bool> readyFlag(false);
void provider() {
//after reading a char
std::cout << "<return>" << std::endl;
std::cin.get();
//provide some data
data = 42;
//add signal readiness
readyFlag.store(true);
}
void consumer() {
//wait for readiness and do sth else
while (!readyFlag.load()) {
std::cout.put('.').flush();
std:;this_thread::sleep_for(std::chrono::milliseconds(500));
}
//and process provided data
std::cout << "\nvalue : " << data << std::endl;
}
Because the setting of data happens before the provider()
stores true in the readyFlag
and the processing of data happens after the consumer()
has loaded true as value of the readyFlag
, the processing of data
is guaranteed to happen after the data was provided.
This guarantee is provided because in all atomic operations, we use a default memory order named memory_order_seq_cst (sequential consistent memory order).
std::atomic_flag
is a really simple Boolean flag, and operations on this type are required (the only one guaranteed to be lcok-free) to be lock-free; and, unlike std::atomic<bool>
, std::atomic_flag
does not provide load or store operations.
Once we have a simple lock-free Boolean flag, we can use it to implement a simple lock and thus implement all the the atomic types.
std::atomic_flag
is "really simple" on the following aspects:
std::atomic_flag
can be in one of two states: set or clear;std::atomic_flag
must be inited with ATOMIC_FLAG_INIT
, which sets the flag to clear state;clear()
), or set it and query the prev value (test_and_set()
).std::atomic_flag lock = ATOMIC_FLAG_INIT;
void f(int n) {
for (int cnt = 0; cnt < 100; ++cnt) {
while (lock.test_and_set(std::memory_order_acquire)) // acquire lock
; // spin
std::cout << "Output from thread " << n << '\n';
lock.clear(std::memory_order_release); // release lock
}
}
int main() {
std::vector<std::thread> v;
for (int n = 0; n < 10; ++n) {
v.emplace_back(f, n);
}
for (auto& t : v) {
t.join();
}
}
the remaining atomic types are all accessed through specializations of the std::atomic<>
class template and are a bit more full-featured but may not be lock-free. On most popular platforms, it's expected that the atomic variants of all the built-in types (e.g., std::atomic<int>
and std::atomic<void*>
) are indeed lock-free, but it isn't required.
the standard atomic types are not copyable or copy-assignable, and thus have no copy cstrs or copy asgnment operators. They do, however, support asgnment from and implicit conversion to the crspding built-in types as well as direct load()
/store()
, exchange()
, compare_exchange_weak()
and compare_exchange_strong()
. They also support compound asgnment operators where +=
, -=
, *=
, |=
, etc.
Unlike most assignment operators, the assignment operators for atomic types do not return a reference to their left-hand arguments. They return a copy of the stored value instead.
is_lock_free
: the atomic operations on the given type are done directly with atomic instructions (true
), or done by using a lock internal to the compiler and library (false
);std::atomic
std::atomic<bool>
is the most basic of the atomic integral type.
load()
: store()
: rather than using the restrictive clear()
func of std::atomic_flag
, writes of std::atomic<bool>
use store
;exchange(des)
: replace test_and_set
; exchange()
is a read-modify-write operation, allowing to replace the stored value with a new one of your choosing and atomically retrieve the original value;
std::atomic<bool> b;
bool x = b.load(std::memory_order_acquire);
b.store(true);
x = b.exchange(false, std::memory_order_acq_rel);
storing a new value (or not) depending on the current value:
the new operation is called compare/exchange, which compares the value of the atomic variable with a supplied expected value and stores the supplied desired value if they're equal; if the values are not equal, the expected value is updated with the actual value of the atomic variable. The return type is a bool, which is true
if the store was performed and false
ow.
compare_exchange_weak(exp, des)
: the store might not be successful even if the orig value was equal to the expected value, in which case the value of the variable is unchanged and the return value is false
; this is most likely happen on machines that lack a single compare-and-exchange instr.
because compare_exchange_weak()
can fail spuriously, it must typically be used in a loop:
bool expected = false;
extern atomic<bool> b; //set somewhere else
//keep looping as long as 'expected' is false, indicating suprious failure
while(!b.compare_exchange_weak(expected, true) && !expected);
compare_exchange_strong(exp, des)
: is guaranteed to return false
only if the actual value wasn't equal to the expected
value.
std::atomic<T*>
The interface of std::atomic<T>
is essentially the same with std::atomic<bool>
. The new opertionss are the ptr arithmetic ones:
fetch_add()/fetch_sub()
: do atomic addition and subtraction on the stored addr; fetch_add()
is also known as exchange-and-add, and it's and atomic read-modify-write operation.
operators +=/-=/++/--
are convenient wrappers;
if x
is std::stomic<Foo*>
to the 1st entry of an array of Foo
objs, then x+=3
changes it to point to the 4th entry and returns a plain Foo*
that also points to that 4th entry; fetch_add()/fetch_sub()
are slightly different in that they return the orig value (so x.fetch_add(3)
will update x
to point to the 4th value but return a ptr to the 1st value in the array).
class Foo{};
Foo some_array[5];
std::atomic<Foo*> p(some_array);
Foo* x = p.fetch_add(2); //add 2 to p, return old value
assert(x == some_array);
assert(p.load() == &some_array[2]);
x=(p-=1); //subtract 1 from p, return new value
assert(x == &some_array[1]);
assert(p.load() == &some_array[1]);
std::atomic<T>
the remaining basic atomic types are essentially all the same: they're all atomic integral types and have the same interface as each other, except that the associated built-in type is different.
as well as the usual set of operations (load()
, store()
, exchange()
, compare_exchange_weak()
and compare_exchange_strong()
), hte atomic integral types such as std::atomic<int>
and std::atomic<unsigned long long>
have quite a comprehensive set of operations available: fetch_add()
, fetch_sub()
, fetch_or
, fetch_xor
, compound-asgnment forms of these operations (+=, -=, &=, |=, and ^=), and ++/--. Only division, multiplication, and shift operators are missing. Because atomic integral values are typically used either as counters or as bitmasks, this isn't a particularly noticeable loss; additional operations can easily be done using compare_exchange_weak()
in a loop, if required.
the low-level interface of atomics means using the atomic operations in a way that we have no guaranteed sequential consistency. Thus, compilers and hardware might (partially) reorder access on atomics.
std::memory_order
specifies how regular, non-atomic mem accesses are to be ordered around an atomic operation. Absent any constraint on a multi-core system, when multi threads simultaneously read and write to several variables, one thread can observe the values change in an order different from the order another thread wrote them. Indeed, the apprent order of changes can even differ among multi reader threads. Some similar effects can occur even on uniprocessor systems due to compiler transformations allowed by the mem model.
The default behavior of all atomic operations in the library provides for sequential consistent ordering. The default can hurt performance, but the library's atomic operations can be given an additional std::memory_order
argument to specify the exact constraints, beyond atomicity, that the compiler and processor must enforce that operation.
memory_order_relaxed
: relaxed operations, there are no sync or ordering constraints, only atomicity is required of this operation;
memory_order_consume
: a load operation with this memory order performs a consume operation on the affected memory location: no reads in the current thread dependent on the value currently loaded can be reordered before this load. This ensures that writes to data-dependent variables in other threads that release the same atomic variables are visible in the current thread. On most platforms, this affects compiler optimizations only.
memory_order_acquire
: a load operation with this memory order performs the acquire operation on the affected mem loc: no mem accesses in the current thread can be reordered before this load. This ensures that all writes in other threads that release the same atomic variables are visible in the current thread.
memory_order_release
: a store operation with this mem order performs the release operation: no mem accesses in the current thread can be reordered after this store. This ensures that all writes in the current thread are visible in other threads that acquire the same atomic variables and writes that carry a dependency into the atomic variable become visible in other threads that consume the same atomic.
memory_order_acq_rel
: a read-modify-write operation with this mem order is both acquire operation and a release operation. No mem accesses in the current thread can be reordered before this load, and no mem accesses in the current thread can be reordered after this store. It is ensured that all writes in other threads that release the same atomic variable are visible before the modification and the modification is visible in other threads that acquire the same atomic variable.
memory_order_seq_cst
: same as memory_order_acq_rel
, plus a single total order exists in which all threads observe all modification in the same order.
relaxed ordering
//x and y intially zero
// Thread 1:
r1 = y.load(memory_order_relaxed); // A
x.store(r1, memory_order_relaxed); // B
// Thread 2:
r2 = x.load(memory_order_relaxed); // C
y.store(42, memory_order_relaxed); // D
is allowed to produce r1==r2==42
(i.e., D-A-B-C
) because, although A
is sequenced-before B
within thread 1, and C
is sequenced-before D
within thread 2, nothing prevents D
from appearing before A
in the modification order of y, and B
before C
in the modification order of x. The side-effect of D
on y could be visible to the load A
in Thread 1 while the side-effect of B
on x could be visible to the load C
in Thread 2.
release-acquire ordering
if an atomic store in thread A is tagged memory_order_release
and an atomic load in thread B from the same variable is tagged memory_order_acquire
, all mem writes (non-atomic and relaxed atomic) that happend-before the atomic store from the point of view of thread A, become visible side-effect in thread B, that is, once the atomic load is completed, thread B is guaranteed to see everything thread A wrote to mem.