死锁
条件变量和互斥量区别
互斥器是加锁原语,用来拍他性地访问共享数据,它不是等待原语。在使用 mutex 时,我们一般都会期望加锁不要阻塞,总是能立刻拿到锁,然后尽快访问数据,用完之后尽快解锁,这样才能不影响并发性和性能。
如果需要等待某个条件成立,我们应该使用条件变量。条件变量顾名思义是一个或多个线程等待某个布尔表达式为真,即等待别的线程唤醒它,条件变量学名叫管程。注意在等待条件变量时需要使用 while 来判断而不是 if,可能在当前线程刚刚被唤醒时资源就被别的线程改变了,需要通过 while 来再次判断是否仍然满足。
条件变量是非常底层的同步原语,很少直接使用,一般都是用它来实现高层的同步措施,例如 BlockingQueue
不要读写锁和信号量
读写锁和信号量完全可以通过互斥原语mutex
和 条件变量代替,后面有自己封装的 MutexLock
、MutexLockGuard
、Condition
。
封装 MutexLock、MutexLockGuard、Condition、CountDownLatch、线程安全的Singleton
MutexLock、MutexLockGuard:通过unix底层 pthread_mutex_t 实现
// 互斥器
#ifndef MYMUDUO_MUTEX_H
#define MYMUDUO_MUTEX_H
// #include <boost/core/noncopyable.hpp>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>
class MutexLock
{
public:
MutexLock() : holder_(0)
{
pthread_mutex_init(&mutex_, NULL);
}
~MutexLock()
{
assert(holder_ == 0); // 表明没有被使用
pthread_mutex_destroy(&mutex_);
}
bool isLockedByThisThread()
{
return pthread_self() == holder_;
}
void assertLocked()
{
assert(isLockedByThisThread());
}
void lock() // 仅供MutexLockGuard调用,严禁用户代码调用
{
pthread_mutex_lock(&mutex_); // 这两行顺序不能反
holder_ = pthread_self();
}
void unlock() // 仅供MutexLockGuard调用,严禁用户代码调用
{
holder_ = 0; // 这两行顺序不能反
pthread_mutex_unlock(&mutex_);
}
pthread_mutex_t* getPthreadMutex() // 仅供Condition调用,严禁用户代码调用
{
return &mutex_;
}
private:
pthread_mutex_t mutex_;
pid_t holder_; // 进程id
};
class MutexLockGuard
{
public:
explicit MutexLockGuard(MutexLock mutex) : mutex_(mutex)
{
mutex.lock();
}
~MutexLockGuard()
{
mutex_.unlock();
}
private:
MutexLock& mutex_;
};
#define MutexLockGuard(x) static_assert(false, "missing mutex guard var name")
#endif //MYMUDUO_MUTEX_H
Condition :通过 pthread_mutex_t、pthread_cond_t 实现
#ifndef MYCONDITION_H__
#define MYCONDITION_H__
#include <boost/core/noncopyable.hpp>
#include "Mutex.h"
class Condition : boost::nocopyable
{
public:
explicit Condition(MutexLock& mutex) : mutex_(mutex)
{
pthread_cond_init(&cond_t, NULL);
}
~Condition()
{
pthread_cond_destroy(&cond_t);
}
void wait()
{
pthread_cond_wait(&cond_, mutex_.getPthreadMutex());
}
void notify()
{
pthread_cond_signal(&cond_);
}
void notifyAll()
{
pthread_cond_broadcast(&cond_);
}
private:
MutexLock& mutex_;
pthread_cond_t cond_;
};
#endif
CountDownLatch : 通过 Condition、MutexLock 实现
#ifndef MYCOUNTDOWNLATCH_H__
#define MYCOUNTDOWNLATCH_H__
#include <boost/core/noncopyable.hpp>
#include "Mutex.h"
#include "Condition.h"
class CountDownLatch : boost::nocopyable
{
public:
explicit CountDownLatch(int count)
: mutex_(),
condition_(mutex_),
count_(count){}
void wait();
void countDown();
private:
mutable MutexLock mutex_;
Condition condition_;
int count_;
};
void CountDownLatch::wait()
{
MutexLockGuard lock(mutex_);
while (count_ > 0)
condition_.wait();
}
void CountDownLatch::countDown()
{
MutexLockGuard lock(mutex_);
--count_;
if (count_ == 0)
condition_.notifyAll();
}
#endif
线程安全 Singleton : 利用 pthread_once_t 结合 template 实现
#ifndef MYSINGLETON_H__
#define MYSINGLETON_H__
#include <boost/core/noncopyable.hpp>
#include <pthread.h>
template<typename T>
class Singleton : boost::nocpyable
{
public:
static T& instance()
{
pthread_once(&ponce_, &Singleton::init);
return *value_;
}
private:
Singleton();
~Singleton();
static void init()
{
value_ = new T();
}
private:
static pthread_once_t ponce_;
static T* value_;
};
template<typename T>
pthread_once_t Singleton<T>::ponce_ = PTHREAD_ONCE_INIT;
template<typename T>
T* Singleton<T>::value_ = NULL;
#endif
通过 shared_ptr 实现 copy-on-write
参考:https://blog.csdn.net/qiuguolu1108/article/details/115285574
如果不管对于读写操作都加锁,那么会造成很大程度的阻塞,其实对于读-读操作不需要加锁;对于读操作只需要保证在读过程中目标内容不会被其它写线程改变;对于写操作需要保证内容不被其它读/写线程访问。
用普通的 mutex
替换读写锁。解决思路都是通过 shared_ptr
来管理共享数据,原理如下:
shared_ptr
是引用计数型智能指针,如果当前只有一个观察者,那么引用计数的值为1;- 对于
read
端,在读之前把引用计数加1,读完之后减一,这样保证在读的期间其引用计数大于1,可以阻止并发写。 - 对于
write
端,如果发现引用计数为1,这时可以安全地修改共享对象,不必担心有人正在读它;如果此时引用计数不为1,则说明有其他读线程正在访问共享对象(其它线程已经释放了锁,但访问过程未结束),此时可以先拷贝一份新的结果,然后让共享对象指针指向这份新的结果,那么 write端和 read端就不会相互干扰了。
typedef std::vector<Foo> FooList;
typedef boost::shared_ptr<FooList> FooListPtr;
MutexLock mutex;
FooListPtr g_foos;
//read操作
void traverse()
{
FooListPtr foos; //读前加锁拷贝,读完释放锁
{
MutexLockGuard lock(mutex);
foos = g_foos;
assert(!g_foos.unique());
}
for (std::vector<Foo>::const_iterator it = foos->begin(); it != foos->end(); it++)
{
it->doit();
}
}
//write操作
void post(const Foo& f)
{
printf("post\n");
MutexLockGuard lock(mutex);
if (!g_foos.unique()) //当前计数不为1,有其他线程使用 g_foos
{
g_foos.reset(new FooList(*g_foos));
printf("copy the whole list\n");
}
assert(g_foos.unique());
g_foos->push_back(f);
}
网络通信同样可以通过 copy-on-write
模式提高并发度:
普通加锁模型:
class Connection
{
public:
//向客户端发送数据
void send(const string& message){...}
};
//存储所有建立的连接
set<Connection> connectionList;
//互斥锁
MutexLock mutex;
//模拟写操作
void onConnection(const Connection& conn) {
MutexLockGuard lock(mutex);
connectionList.insert(conn);
}
//模拟读操作
void onStringMessage(const string& message) {
MutexLockGuard lock(mutex);
for (auto c : connectionList) {
c.send(message);
}
}
copy-on-write模型:
typedef set<Connection> ConnectionList;
shared_ptr<ConnectionList> connectionListPtr(new ConnectionList); //智能指针管理共享对象
MutexLock mutex;
//模拟写操作
void onConnection(const Connection& conn) {
MutexLock lock(mutex);
if(!connectionListPtr.unique()) { //引用计数不为1
connectionListPtr.reset(new set<Connection>(*connectionListPtr));
}
connectionListPtr->insert(conn);
}
//模拟读操作
void onStringMessage(const string& message) {
shared_ptr<ConnectionList> tmp;
{
MutexLockGuard lock(mutex); //shared_ptr 线程不安全
tmp = connectionListPtr; //读引用计数加1
}
for (auto c : tmp) {
c.send(message);
}
}
书上解决 Request
、Inventory
死锁问题的方法:用 shared_ptr
管理 std::set
,在遍历的时候先增加引用计数,阻止并发修改。在读取的时候将待读取数据复制一份,然后在临界区外遍历这个副本。这样能最小程度限制临界区的范围,保证性能。
错误的写法:main()
线程先调用 Inventory::printAll()
再调用 Request::print()
,而 threadFunc()
线程是先调用 Request::~Request()
再调用 Inventory::remove()
,这两个调用序列对两个 mutex 的加锁刚好相反,造成了死锁。
/**
* Inventory、Request 死锁例子
*/
#include <iostream>
#include <memory>
#include <unistd.h>
#include <set>
#include <thread>
#include <mutex>
class Request {
public:
void process();
~Request();
void print() const;
private:
mutable std::mutex mutex_;
};
class Inventory {
public:
void add( Request* req) {
std::lock_guard<std::mutex> lock(mutex_);
requests_.insert(req);
}
void remove(Request* req) {
std::lock_guard<std::mutex> lock(mutex_);
requests_.erase(req);
}
void printAll() const;
private:
mutable std::mutex mutex_;
std::set<Request*> requests_;
};
void Inventory::printAll() const
{
std::lock_guard<std::mutex> lock(mutex_);
sleep(1);
for (auto it = requests_.begin(); it != requests_.end(); it++) {
(*it)->print();
}
std::cout << "Inventory::printAll() unlocked" << std::endl;
}
Inventory g_inventory;
void Request::process()
{
std::lock_guard<std::mutex> lock(mutex_);
g_inventory.add(this);
}
Request::~Request() {
std::lock_guard<std::mutex> lock(mutex_);
sleep(1);
g_inventory.remove(this);
}
void Request::print() const
{
std::lock_guard<std::mutex> lock(mutex_);
//.....
}
void threadFunc() {
Request* req = new Request;
req->process();
delete req;
}
int main() {
std::thread th(threadFunc);
usleep(500 * 1000);
g_inventory.printAll();
th.join();
}
死锁解决思路:用 shared_ptr
管理 set::std
,在遍历的时候先增加引用计数,阻止并发修改;在需要修改数据时,先判断是否有其它读线程,如果有就 swap/reset
拷贝一份新的数据,并让 shared_ptr
指向新分配的内存,此时新内存只有写线程一人持有,可以安全地进行修改。
#include <iostream>
#include <memory>
#include <set>
#include <stdio.h>
#include <thread>
#include "Mutex.h"
class Request;
typedef std::shared_ptr<Request> RequestPtr;
class Inventory {
public:
Inventory() : requests_(new RequestList)
{
}
void add(const RequestPtr& req)
{
MutexLockGuard lock(mutex_);
if (!requests_.unique())
{
requests_.reset(new RequestList(*requests_));
std::cout << "Inventory add() copy the whole list" << std::endl;
}
assert(requests_.unique());
requests_->insert(req);
}
void remove(const RequestPtr& req)
{
MutexLockGuard lock(mutex_);
if (!requests_.unique())
{
requests_.reset(new RequestList(*requests_));
std::cout << "Inventory remove() copy the whole list" << std::endl;
}
assert(requests_.unique());
requests_->erase(req);
}
void printAll() const;
private:
typedef std::set<RequestPtr> RequestList;
typedef std::shared_ptr<RequestList> RequestListPtr;
RequestListPtr requests_;
mutable MutexLock mutex_;
//拷贝 shared_ptr 并返回
RequestListPtr getData() const
{
MutexLockGuard lock(mutex_);
return requests_;
}
};
Inventory g_inventory;
class Request : public std::enable_shared_from_this<Request> {
public:
Request() : x_(0) {
}
~Request() {
x_ = -1;
}
void cancel()
{
MutexLockGuard lock(mutex_);
x_ = 1;
sleep(1);
std::cout << "cancel()" << std::endl;
g_inventory.remove(shared_from_this());
}
void process()
{
MutexLockGuard lock(mutex_);
g_inventory.add(shared_from_this());
}
void print()
{
MutexLockGuard lock(mutex_);
printf("print Request %p x=%d\n", this, x_);
}
private:
int x_;
mutable MutexLock mutex_;
};
void Inventory::printAll() const
{
//获取副本
RequestListPtr requests = getData();
printf("printAll()\n");
sleep(1);
//循环打印
for (std::set<RequestPtr>::const_iterator it = requests->begin(); it != requests->end(); ++it)
{
(*it)->print();
}
}
void threadFunc()
{
RequestPtr req(new Request);
req->process();
req->cancel();
}
int main()
{
std::thread thread(threadFunc);
usleep(500*1000);
g_inventory.printAll();
thread.join();
}
用普通 mutex
替换读写锁的一个例子:
#include <map>
#include <string>
#include <vector>
#include <boost/shared_ptr.hpp>
#include "../Mutex.h"
using std::string;
class CustomerData : boost::noncopyable
{
public:
CustomerData()
: data_(new Map)
{ }
int query(const string& customer, const string& stock) const;
private:
typedef std::pair<string, int> Entry;
typedef std::vector<Entry> EntryList;
typedef std::map<string, EntryList> Map;
typedef boost::shared_ptr<Map> MapPtr; //最终的数据一定要用 shared_ptr 包裹
void update(const string& customer, const EntryList& entries);
void update(const string& message);
static int findEntry(const EntryList& entries, const string& stock);
static MapPtr parseData(const string& message);
MapPtr getData() const
{
muduo::MutexLockGuard lock(mutex_);
return data_;
}
mutable muduo::MutexLock mutex_;
MapPtr data_;
};
//读操作先增加引用计数,然后读取
int CustomerData::query(const string& customer, const string& stock) const
{
MapPtr data = getData();
Map::const_iterator entries = data->find(customer);
if (entries != data->end())
return findEntry(entries->second, stock);
else
return -1;
}
//写操作先加锁判断是否有其它线程正在使用数据,然后写入
void CustomerData::update(const string& customer, const EntryList& entries)
{
muduo::MutexLockGuard lock(mutex_);
if (!data_.unique())
{
MapPtr newData(new Map(*data_));
data_.swap(newData);
}
assert(data_.unique());
(*data_)[customer] = entries;
}
void CustomerData::update(const string& message)
{
MapPtr newData = parseData(message);
if (newData)
{
muduo::MutexLockGuard lock(mutex_);
data_.swap(newData);
}
}
int main()
{
CustomerData data;
}