Chapter02_学习

发布时间 2023-05-26 00:31:22作者: Stitches

死锁

条件变量和互斥量区别

互斥器是加锁原语,用来拍他性地访问共享数据,它不是等待原语。在使用 mutex 时,我们一般都会期望加锁不要阻塞,总是能立刻拿到锁,然后尽快访问数据,用完之后尽快解锁,这样才能不影响并发性和性能。

如果需要等待某个条件成立,我们应该使用条件变量。条件变量顾名思义是一个或多个线程等待某个布尔表达式为真,即等待别的线程唤醒它,条件变量学名叫管程。注意在等待条件变量时需要使用 while 来判断而不是 if,可能在当前线程刚刚被唤醒时资源就被别的线程改变了,需要通过 while 来再次判断是否仍然满足。

条件变量是非常底层的同步原语,很少直接使用,一般都是用它来实现高层的同步措施,例如 BlockingQueue 或 CountDownLatch。

不要读写锁和信号量

读写锁和信号量完全可以通过互斥原语mutex 和 条件变量代替,后面有自己封装的 MutexLockMutexLockGuardCondition

封装 MutexLock、MutexLockGuard、Condition、CountDownLatch、线程安全的Singleton

MutexLock、MutexLockGuard:通过unix底层 pthread_mutex_t 实现

// 互斥器

#ifndef MYMUDUO_MUTEX_H
#define MYMUDUO_MUTEX_H

// #include <boost/core/noncopyable.hpp>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>

class MutexLock
{
public:
    MutexLock() : holder_(0)
    {
        pthread_mutex_init(&mutex_, NULL);
    }

    ~MutexLock()
    {
        assert(holder_ == 0);   // 表明没有被使用
        pthread_mutex_destroy(&mutex_);
    }

    bool isLockedByThisThread()
    {
        return pthread_self() == holder_;
    }

    void assertLocked()
    {
        assert(isLockedByThisThread());
    }

    void lock()     // 仅供MutexLockGuard调用,严禁用户代码调用
    {
        pthread_mutex_lock(&mutex_);    // 这两行顺序不能反
        holder_ = pthread_self();
    }

    void unlock()   // 仅供MutexLockGuard调用,严禁用户代码调用
    {
        holder_ = 0;                    // 这两行顺序不能反
        pthread_mutex_unlock(&mutex_);
    }

    pthread_mutex_t* getPthreadMutex()  // 仅供Condition调用,严禁用户代码调用
    {
        return &mutex_;
    }

private:
    pthread_mutex_t mutex_;
    pid_t holder_;  // 进程id
};


class MutexLockGuard
{
public:
    explicit MutexLockGuard(MutexLock mutex) : mutex_(mutex)
    {
        mutex.lock();
    }

    ~MutexLockGuard()
    {
        mutex_.unlock();
    }

private:
    MutexLock& mutex_;
};

#define MutexLockGuard(x) static_assert(false, "missing mutex guard var name")

#endif //MYMUDUO_MUTEX_H

Condition :通过 pthread_mutex_t、pthread_cond_t 实现

#ifndef MYCONDITION_H__
#define MYCONDITION_H__

#include <boost/core/noncopyable.hpp>
#include "Mutex.h"

class Condition : boost::nocopyable
{
public:
    explicit Condition(MutexLock& mutex) : mutex_(mutex) 
    {
        pthread_cond_init(&cond_t, NULL);
    }

    ~Condition()
    {
        pthread_cond_destroy(&cond_t);
    }

    void wait()
    {
        pthread_cond_wait(&cond_, mutex_.getPthreadMutex());
    }

    void notify()
    {
        pthread_cond_signal(&cond_);
    }

    void notifyAll()
    {
        pthread_cond_broadcast(&cond_);
    }
private:
    MutexLock& mutex_;
    pthread_cond_t cond_;
};

#endif

CountDownLatch : 通过 Condition、MutexLock 实现

#ifndef MYCOUNTDOWNLATCH_H__
#define MYCOUNTDOWNLATCH_H__

#include <boost/core/noncopyable.hpp>
#include "Mutex.h"
#include "Condition.h"

class CountDownLatch : boost::nocopyable
{
public:
    explicit CountDownLatch(int count)
        : mutex_(),
          condition_(mutex_),
          count_(count){}
    void wait();
    void countDown();
private:
    mutable MutexLock mutex_;
    Condition condition_;
    int count_;
};

void CountDownLatch::wait()
{
    MutexLockGuard lock(mutex_);
    while (count_ > 0)
        condition_.wait();
}

void CountDownLatch::countDown()
{
    MutexLockGuard lock(mutex_);
    --count_;
    if (count_ == 0)
        condition_.notifyAll();
}

#endif

线程安全 Singleton : 利用 pthread_once_t 结合 template 实现

#ifndef MYSINGLETON_H__
#define MYSINGLETON_H__

#include <boost/core/noncopyable.hpp>
#include <pthread.h>

template<typename T>
class Singleton : boost::nocpyable
{
public:
    static T& instance() 
    {
        pthread_once(&ponce_, &Singleton::init);
        return *value_;
    }
private:
    Singleton();
    ~Singleton();
    static void init()
    {
        value_ = new T();
    }
private:
    static pthread_once_t ponce_;
    static T* value_;
};

template<typename T>
pthread_once_t Singleton<T>::ponce_ = PTHREAD_ONCE_INIT;

template<typename T>
T* Singleton<T>::value_ = NULL;

#endif

通过 shared_ptr 实现 copy-on-write

参考:https://blog.csdn.net/qiuguolu1108/article/details/115285574

如果不管对于读写操作都加锁,那么会造成很大程度的阻塞,其实对于读-读操作不需要加锁;对于读操作只需要保证在读过程中目标内容不会被其它写线程改变;对于写操作需要保证内容不被其它读/写线程访问。

用普通的 mutex 替换读写锁。解决思路都是通过 shared_ptr 来管理共享数据,原理如下:

  • shared_ptr 是引用计数型智能指针,如果当前只有一个观察者,那么引用计数的值为1;
  • 对于 read 端,在读之前把引用计数加1,读完之后减一,这样保证在读的期间其引用计数大于1,可以阻止并发写。
  • 对于 write 端,如果发现引用计数为1,这时可以安全地修改共享对象,不必担心有人正在读它;如果此时引用计数不为1,则说明有其他读线程正在访问共享对象(其它线程已经释放了锁,但访问过程未结束),此时可以先拷贝一份新的结果,然后让共享对象指针指向这份新的结果,那么 write端和 read端就不会相互干扰了。
typedef std::vector<Foo> FooList;
typedef boost::shared_ptr<FooList> FooListPtr;
MutexLock mutex;
FooListPtr g_foos;

//read操作
void traverse()
{
    FooListPtr foos;   //读前加锁拷贝,读完释放锁
    {
        MutexLockGuard lock(mutex);
        foos = g_foos;
        assert(!g_foos.unique());
    }

    for (std::vector<Foo>::const_iterator it = foos->begin(); it != foos->end(); it++)
    {
        it->doit();
    }
}


//write操作
void post(const Foo& f)
{
    printf("post\n");
    MutexLockGuard lock(mutex);
    if (!g_foos.unique()) //当前计数不为1,有其他线程使用 g_foos
    {
        g_foos.reset(new FooList(*g_foos));
        printf("copy the whole list\n");
    }
    assert(g_foos.unique());
    g_foos->push_back(f);
}

网络通信同样可以通过 copy-on-write 模式提高并发度:

普通加锁模型:

class Connection
{
public:
    //向客户端发送数据
    void send(const string& message){...}
};

//存储所有建立的连接
set<Connection> connectionList;

//互斥锁
MutexLock mutex;

//模拟写操作
void onConnection(const Connection& conn) {
    MutexLockGuard lock(mutex);
    connectionList.insert(conn);
}

//模拟读操作
void onStringMessage(const string& message) {
    MutexLockGuard lock(mutex);
    for (auto c : connectionList) {
        c.send(message);
    }
}

copy-on-write模型:

typedef set<Connection> ConnectionList;
shared_ptr<ConnectionList> connectionListPtr(new ConnectionList);  //智能指针管理共享对象

MutexLock mutex;

//模拟写操作
void onConnection(const Connection& conn) {
    MutexLock lock(mutex);
    if(!connectionListPtr.unique()) {  //引用计数不为1
        connectionListPtr.reset(new set<Connection>(*connectionListPtr));
    }
    connectionListPtr->insert(conn);
}

//模拟读操作
void onStringMessage(const string& message) {
    shared_ptr<ConnectionList> tmp;
    {
        MutexLockGuard lock(mutex);   //shared_ptr 线程不安全
        tmp = connectionListPtr;      //读引用计数加1
    }
    for (auto c : tmp) {
        c.send(message);
    }
}

书上解决 RequestInventory 死锁问题的方法:用 shared_ptr 管理 std::set,在遍历的时候先增加引用计数,阻止并发修改。在读取的时候将待读取数据复制一份,然后在临界区外遍历这个副本。这样能最小程度限制临界区的范围,保证性能。

错误的写法:main() 线程先调用 Inventory::printAll() 再调用 Request::print(),而 threadFunc() 线程是先调用 Request::~Request() 再调用 Inventory::remove() ,这两个调用序列对两个 mutex 的加锁刚好相反,造成了死锁。

/**
 *  Inventory、Request 死锁例子
 */

#include <iostream>
#include <memory>
#include <unistd.h>
#include <set>
#include <thread>
#include <mutex>

class Request {
public:
	void process();
	~Request();
	void print() const;
private:
	mutable std::mutex mutex_;
};

class Inventory {
public:
	void add( Request* req) {
		std::lock_guard<std::mutex> lock(mutex_);
		requests_.insert(req);
	}
	void remove(Request* req) {
		std::lock_guard<std::mutex> lock(mutex_);
		requests_.erase(req);
	}
	void printAll() const;
private:
	mutable std::mutex mutex_;
	std::set<Request*> requests_;
};

void Inventory::printAll() const
{
	std::lock_guard<std::mutex> lock(mutex_);
	sleep(1);
	for (auto it = requests_.begin(); it != requests_.end(); it++) {
		(*it)->print();
	}
	std::cout << "Inventory::printAll() unlocked" << std::endl;
}

Inventory g_inventory;

void Request::process()
{
	std::lock_guard<std::mutex> lock(mutex_);
	g_inventory.add(this);
}
Request::~Request() {
	std::lock_guard<std::mutex> lock(mutex_);
	sleep(1);
	g_inventory.remove(this);
}
void Request::print() const
{
	std::lock_guard<std::mutex> lock(mutex_);
	//.....
}

void threadFunc() {
	Request* req = new Request;
	req->process();
	delete req;
}

int main() {
	std::thread th(threadFunc);
	usleep(500 * 1000);
	g_inventory.printAll();
	th.join();
}

死锁解决思路:用 shared_ptr 管理 set::std,在遍历的时候先增加引用计数,阻止并发修改;在需要修改数据时,先判断是否有其它读线程,如果有就 swap/reset 拷贝一份新的数据,并让 shared_ptr 指向新分配的内存,此时新内存只有写线程一人持有,可以安全地进行修改。

#include <iostream>
#include <memory>
#include <set>
#include <stdio.h>
#include <thread>
#include "Mutex.h"

class Request;

typedef std::shared_ptr<Request> RequestPtr;

class Inventory {
 public:
    Inventory() : requests_(new RequestList) 
    {

    }

    void add(const RequestPtr& req)
    {
        MutexLockGuard lock(mutex_);
        if (!requests_.unique())
        {
            requests_.reset(new RequestList(*requests_));
            std::cout << "Inventory add() copy the whole list" << std::endl;
        }
        assert(requests_.unique());
        requests_->insert(req);
    }

    void remove(const RequestPtr& req)
    {
        MutexLockGuard lock(mutex_);
        if (!requests_.unique())
        {
            requests_.reset(new RequestList(*requests_));
            std::cout << "Inventory remove() copy the whole list" << std::endl;
        }
        assert(requests_.unique());
        requests_->erase(req);
    }

    void printAll() const;
 private:
    typedef std::set<RequestPtr> RequestList;
    typedef std::shared_ptr<RequestList> RequestListPtr;
    RequestListPtr requests_;
    mutable MutexLock mutex_;

    //拷贝 shared_ptr 并返回
    RequestListPtr getData() const 
    {
        MutexLockGuard lock(mutex_);
        return requests_;
    }
};

Inventory g_inventory;

class Request : public std::enable_shared_from_this<Request> {
 public:
    Request() : x_(0) {

    }

    ~Request() {
        x_ = -1;
    }

    void cancel()
    {
        MutexLockGuard lock(mutex_);
        x_ = 1;
        sleep(1);
        std::cout << "cancel()" << std::endl;
        g_inventory.remove(shared_from_this());
    }

    void process()
    {
        MutexLockGuard lock(mutex_);
        g_inventory.add(shared_from_this());
    }

    void print()
    {
        MutexLockGuard lock(mutex_);
        printf("print Request %p x=%d\n", this, x_);
    }
 private:
    int x_;
    mutable MutexLock mutex_;
};


void Inventory::printAll() const
{
    //获取副本
    RequestListPtr requests = getData();
    printf("printAll()\n");
    sleep(1);

    //循环打印
    for (std::set<RequestPtr>::const_iterator it = requests->begin(); it != requests->end(); ++it)
    {
        (*it)->print();
    }
}


void threadFunc()
{
  RequestPtr req(new Request);
  req->process();
  req->cancel();
}

int main()
{
  std::thread thread(threadFunc);
  usleep(500*1000);
  g_inventory.printAll();
  thread.join();
}

用普通 mutex 替换读写锁的一个例子:

#include <map>
#include <string>
#include <vector>

#include <boost/shared_ptr.hpp>

#include "../Mutex.h"

using std::string;

class CustomerData : boost::noncopyable
{
 public:
  CustomerData()
    : data_(new Map)
  { }

  int query(const string& customer, const string& stock) const;

 private:
  typedef std::pair<string, int> Entry;
  typedef std::vector<Entry> EntryList;
  typedef std::map<string, EntryList> Map;
  typedef boost::shared_ptr<Map> MapPtr;            //最终的数据一定要用 shared_ptr 包裹
  void update(const string& customer, const EntryList& entries);
  void update(const string& message);

  static int findEntry(const EntryList& entries, const string& stock);
  static MapPtr parseData(const string& message);

  MapPtr getData() const
  {
    muduo::MutexLockGuard lock(mutex_);
    return data_;
  }

  mutable muduo::MutexLock mutex_;
  MapPtr data_;
};

//读操作先增加引用计数,然后读取
int CustomerData::query(const string& customer, const string& stock) const  
{
  MapPtr data = getData();

  Map::const_iterator entries = data->find(customer);
  if (entries != data->end())
    return findEntry(entries->second, stock);
  else
    return -1;
}

//写操作先加锁判断是否有其它线程正在使用数据,然后写入
void CustomerData::update(const string& customer, const EntryList& entries)
{
  muduo::MutexLockGuard lock(mutex_);
  if (!data_.unique())
  {
    MapPtr newData(new Map(*data_));
    data_.swap(newData);
  }
  assert(data_.unique());
  (*data_)[customer] = entries;
}

void CustomerData::update(const string& message)
{
  MapPtr newData = parseData(message);
  if (newData)
  {
    muduo::MutexLockGuard lock(mutex_);
    data_.swap(newData);
  }
}

int main()
{
  CustomerData data;
}