C++并发学习笔记(三)(std::mutex 详解)

Mutex又称为互斥量,C++ 11中与 Mutex 相关的类(包括锁类型)和函数都声明在 <mutex> 头文件中,所以如果你需要使用 std::mutex,就必须包含 <mutex> 头文件。

std::mutex 是C++11 中最基本的互斥量,std::mutex 对象提供了独占所有权的特性——即不支持递归地对 std::mutex 对象上锁,而 std::recursive_lock 则可以递归地对互斥量对象上锁。

mutex头文件介绍

互斥量基本作用: 互斥占有一个变量,一段时间内仅一个线程可以访问。即该类可以限制对某物的访问,只有先获得许可才可访问某物,否则一般可设为阻塞等待。能有效避免资源竞争问题。

Mutexes类(四种)

  • std::mutex,最基本的 Mutex 类。
  • std::recursive_mutex,递归 Mutex 类。
  • std::time_mutex,定时 Mutex 类。
  • std::recursive_timed_mutex,定时递归 Mutex 类。

##Lock类(两种)

  • std::lock_guard,与 Mutex RAII 相关,方便线程对互斥量上锁。
  • std::unique_lock,与 Mutex RAII 相关,方便线程对互斥量上锁,但提供了更好的上锁和解锁控制。

##其他类型

##函数

  • std::try_lock,尝试同时对多个互斥量上锁。
  • std::lock,可以同时对多个互斥量上锁。
  • std::call_once,如果多个线程需要同时调用某个函数,call_once 可以保证多个线程对该函数只调用一次。

std::mutex 介绍

  • 构造函数,std::mutex不允许拷贝构造,也不允许 move 拷贝,最初产生的 mutex 对象是处于 unlocked 状态的。

  • **lock()**,调用线程将锁住该互斥量。线程调用该函数会发生下面 3 种情况:

    (1). 如果该互斥量当前没有被锁住,则调用线程将该互斥量锁住,直到调用 unlock之前,该线程一直拥有该锁。

    (2). 如果当前互斥量被其他线程锁住,则当前的调用线程被阻塞住。

    (3). 如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。

  • **unlock()**, 解锁,释放对互斥量的所有权。

  • **try_lock()**,尝试锁住互斥量,如果互斥量被其他线程占有,则当前线程也不会被阻塞。线程调用该函数也会出现下面 3 种情况,

    (1). 如果当前互斥量没有被其他线程占有,则该线程锁住互斥量,直到该线程调用 unlock 释放互斥量。

    (2). 如果当前互斥量被其他线程锁住,则当前调用线程返回 false,而并不会被阻塞掉。

    (3). 如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。

**mutex::lock **Example(reference):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// mutex::lock/unlock
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex

std::mutex mtx; // mutex for critical section

void print_thread_id (int id) {
// critical section (exclusive access to std::cout signaled by locking mtx):
mtx.lock();
std::cout << "thread #" << id << '\n';
mtx.unlock();
}

int main ()
{
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(print_thread_id,i+1);

for (auto& th : threads) th.join();

return 0;
}

Output:

1
2
3
4
5
6
7
8
9
10
thread #1
thread #2
thread #3
thread #4
thread #5
thread #6
thread #7
thread #8
thread #9
thread #10

**mutex::try_lock **Example(reference):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// mutex::try_lock example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex

volatile int counter (0); // non-atomic counter
std::mutex mtx; // locks access to counter

void attempt_10k_increases () {
for (int i=0; i<10000; ++i) {
if (mtx.try_lock()) { // only increase if currently not locked:
++counter;
mtx.unlock();
}
}
}

int main ()
{
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(attempt_10k_increases);

for (auto& th : threads) th.join();
std::cout << counter << " successful increases of the counter.\n";

return 0;
}

Possible output (any count between 1 and 100000 possible):

1
17987 successful increases of the counter.

#std::recursive_mutex 介绍

std::recursive_mutex std::mutex 一样,也是一种可以被上锁的对象,但和 std::mutex 不同的是,std::recursive_mutex 允许同一个线程对互斥量多次上锁(即递归上锁),来获得对互斥量对象的多层所有权std::recursive_mutex 释放互斥量时需要调用与该锁层次深度相同次数的 unlock(),可理解为 lock() 次数和unlock()次数相同,除此之外,std::recursive_mutex 的特性和 std::mutex 大致相同。

如果一个线程中可能在执行中需要再次获得锁的情况,按常规的做法会出现死锁。此时就需要使用递归式互斥量std::recursive_mutex来避免这个问题。std::recursive_mutex不会产生上述的死锁问题,只是是增加锁的计数,但必须确保你unlock和lock的次数相同,其他线程才可能锁这个mutex。

std::time_mutex 介绍

std::time_mutexstd::mutex 多了两个成员函数,try_lock_for(),try_lock_until()

try_lock_for()

try_lock_for 函数接受一个时间范围,表示在这一段时间范围之内线程如果没有获得锁则被阻塞住(与 std::mutex 的 try_lock() 不同,try_lock 如果被调用时没有获得锁则直接返回 false),如果在此期间其他线程释放了锁,则该线程可以获得对互斥量的锁,如果超时(即在指定时间内还是没有获得锁),则返回 false。

**try_lock_for() —**Example(reference)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>       // std::cout
#include <chrono> // std::chrono::milliseconds
#include <thread> // std::thread
#include <mutex> // std::timed_mutex

std::timed_mutex mtx;

void fireworks () {
// waiting to get a lock: each thread prints "-" every 200ms:
while (!mtx.try_lock_for(std::chrono::milliseconds(200))) {
std::cout << "-";
}
// got a lock! - wait for 1s, then this thread prints "*"
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "*\n";
mtx.unlock();
}

int main ()
{
std::thread threads[5];
// spawn 5 threads:
for (int i=0; i<5; ++i)
threads[i] = std::thread(fireworks);

for (auto& th : threads) th.join();

return 0;
}

Output:

1
2
3
4
5
----------------*
---------------*
----------*
-----*
*

try_lock_until()

try_lock_until 函数则接受一个时间点作为参数,在指定时间点未到来之前线程如果没有获得锁则被阻塞住,如果在此期间其他线程释放了锁,则该线程可以获得对互斥量的锁,如果超时(即在指定时间内还是没有获得锁),则返回 false。

std::lock_guard 介绍

与 Mutex RAII 相关,方便线程对互斥量上锁。

Example(reference):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <iostream>       // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::lock_guard
#include <stdexcept> // std::logic_error

std::mutex mtx;

void print_even (int x) {
if (x%2==0) std::cout << x << " is even\n";
else throw (std::logic_error("not even"));
}

void print_thread_id (int id) {
try {
// using a local lock_guard to lock mtx guarantees unlocking on destruction / exception:
std::lock_guard<std::mutex> lck (mtx);
print_even(id);
}
catch (std::logic_error&) {
std::cout << "[exception caught]\n";
}
}

int main ()
{
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(print_thread_id,i+1);

for (auto& th : threads) th.join();

return 0;
}

Output:

1
2
3
4
5
6
7
8
9
10
[exception caught]
2 is even
[exception caught]
4 is even
[exception caught]
6 is even
[exception caught]
8 is even
[exception caught]
10 is even

std::unique_lock 介绍

与 Mutex RAII 相关,方便线程对互斥量上锁,但提供了更好的上锁和解锁控制。

Example(reference):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <iostream>       // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock

std::mutex mtx; // mutex for critical section

void print_block (int n, char c) {
// critical section (exclusive access to std::cout signaled by lifetime of lck):
std::unique_lock<std::mutex> lck (mtx);
for (int i=0; i<n; ++i) {
std::cout << c;
}
std::cout << '\n';
}

int main ()
{
std::thread th1 (print_block,50,'*');
std::thread th2 (print_block,50,'$');

th1.join();
th2.join();

return 0;
}
1
2
**************************************************
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$

std::lock 介绍

Example(reference):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// std::lock example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::lock

std::mutex foo,bar;

void task_a () {
// foo.lock(); bar.lock(); // replaced by:
std::lock (foo,bar);
std::cout << "task a\n";
foo.unlock();
bar.unlock();
}

void task_b () {
// bar.lock(); foo.lock(); // replaced by:
std::lock (bar,foo);
std::cout << "task b\n";
bar.unlock();
foo.unlock();
}

int main ()
{
std::thread th1 (task_a);
std::thread th2 (task_b);

th1.join();
th2.join();

return 0;
}

Possible output (order of lines may vary):

1
2
task a
task b

std::try_lock 介绍

尝试lock,若线程不可被lock返回false,否则进行lock。

Example(reference):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <iostream>       // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::try_lock

std::mutex foo,bar;

void task_a () {
foo.lock();
std::cout << "task a\n";
bar.lock();
// ...
foo.unlock();
bar.unlock();
}

void task_b () {
int x = try_lock(bar,foo);
if (x==-1) {
std::cout << "task b\n";
// ...
bar.unlock();
foo.unlock();
}
else {
std::cout << "[task b failed: mutex " << (x?"foo":"bar") << " locked]\n";
}
}

int main ()
{
std::thread th1 (task_a);
std::thread th2 (task_b);

th1.join();
th2.join();

return 0;
}

Output:

1
2
task a
[task b failed: mutex foo locked]

std::call_once 介绍

call_once可以很好的满足,某些场景下,我们需要代码只被执行一次,比如单例类的初始化,考虑到多线程安全,需要进行加锁控制。

1
2
template <class Fn, class... Args>
void call_once (once_flag& flag, Fn&& fn, Args&&... args);
  • 第一个参数是std::once_flag的对象(once_flag是不允许修改的,其拷贝构造函数和operator=函数都声明为delete),

  • 第二个参数可调用实体,即要求只执行一次的代码,后面可变参数是其参数列表。

call_once保证函数fn只被执行一次,如果有多个线程同时执行函数fn调用,则只有一个活动线程(active call)会执行函数,其他的线程在这个线程执行返回之前会处于”passive execution”(被动执行状态)——不会直接返回,直到活动线程对fn调用结束才返回。对于所有调用函数fn的并发线程,数据可见性都是同步的(一致的)。

如果活动线程在执行fn时抛出异常,则会从处于”passive execution”状态的线程中挑一个线程成为活动线程继续执行fn,依此类推。一旦活动线程返回,所有”passive execution”状态的线程也返回,不会成为活动线程。(实际上once_flag相当于一个锁,使用它的线程都会在上面等待,只有一个线程允许执行。如果该线程抛出异常,那么从等待中的线程中选择一个,重复上面的流程)。

还有一个要注意的地方是 once_flag的生命周期,它必须要比使用它的线程的生命周期要长。所以通常定义成全局变量比较好。

Example(reference)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// call_once example
#include <iostream> // std::cout
#include <thread> // std::thread, std::this_thread::sleep_for
#include <chrono> // std::chrono::milliseconds
#include <mutex> // std::call_once, std::once_flag

int winner;
void set_winner (int x) { winner = x; }
std::once_flag winner_flag;

void wait_1000ms (int id) {
// count to 1000, waiting 1ms between increments:
for (int i=0; i<1000; ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
// claim to be the winner (only the first such call is executed):
std::call_once (winner_flag,set_winner,id);
}

int main ()
{
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(wait_1000ms,i+1);

std::cout << "waiting for the first among 10 threads to count 1000 ms...\n";

for (auto& th : threads) th.join();
std::cout << "winner thread: " << winner << '\n';

return 0;
}

Output:

1
2
waiting for the first among 10 threads to count 1000 ms...
winner thread: 2