简单实现线程安全的Observer模式

发布时间 2023-07-01 19:56:22作者: WYFC4

简单实现线程安全的Observer模式

最近开始看陈硕老师的《Linux多线程服务器编程》,刚好发现b站Up主啊起个名字不容易的总结视频,所以学习总结了一下。

什么是Observer模式

观察者(Observer)模式的定义:指多个对象间存在一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。这种模式有时又称作发布-订阅模式、模型-视图模式,它是对象行为型模式。

复习多态

class Father
{
    public:
      explicit Father() = default;
      explicit Father(const std::string &name): name_(name) {}
     
      inline virtual void call() const
      {
        std::cout<<"The name of father is "<<name_<<std::endl;
      }
    
    private:
       const std::string name_ ;
};

class Son:public Father
{

    public:
        explicit Son(const std::string &name): Father(name+ ". Son() is called."), name_(name) {}

        inline void call() const override
        {
            std::cout<<"The name of son is "<<name_<<std::endl;
        }
    private:
      const std::string name_;
};

Father *p = new Son("Teddy");
p->call();
delete p;

将Father类的call()函数声明为virtual,就可以让基类指针p指向一个派生类对象并调用派生类的call()函数。通过override显式让派生类重写基类中的同名函数。
多态发生的条件:
1)使用一个指向派生类的基类指针或者引用
2)访问虚函数

Observer0.1的实现

我们首先实现一个最简陋的观察者模式。

观察者模式需要一个观察者变量,在观察者变量变动后,需要各个观察者都可以自动更新内容。我们可以简单定义一个观察者抽象基类表示:

class Observer
{
    public:
        virtual void update() = 0;
}; // 观察者

同时,我们定义一个观察者变量,里面定义注册函数来加入新的观察者,通知函数通知观察者进行更新。

class Observable
{
    public:
        void notify();  // 通知所有观察者刷新
        void register_(Observer* observer); // 注册函数
    
    private:
        std::vector<Observer*> observers;
}; // 被观察者

一般为了线程安全,尽可能不适用this指针,不能把注册函数定义在构造函数内。我们用一个Observer*类型的vector来存储所有指向观察者变量的指针。

我们可以简单书写一下两个成员函数的实现。

void Observable::notify()
{
    for(auto observer: observers)
    {
        observer->update();
    }
    
}

void Observable::register_(Observer* observer)
{
    observers.push_back(observer);
}

我们现在可以构建一个继承于观察者变量的Data类

class Data:public Observable
{
    private:
        std::vector<int> data_;
    public:
        explicit Data() = default;
        explicit Data(const std::vector<int> &newData)
        : data_(newData) {}
        explicit Data(const std::vector<int> &&newData)
        : data_(newData) {}
       
        inline std::vector<int> returnData() const
        {
            return data_;
        }
        
        void update(const std::vector<int> &data)
        {
            data_ = data;
            notify();
        }
        void update(std::vector<int> &&data)
        {
            data_ = data;
            notify();
            data.clear();
        }
};

以及实现两个新的观察者:

class Observer1:public Observer
{
    public:
        Observer1() = delete;
        Observer1(const Data &newData): data_class(newData) {}
        void update() override
        {
            std::cout<<"Observer1() is called.\n";
            auto _data = data_class.returnData();
            for(auto x:_data)
            {
                std::cout<<" "<<x;
            }
            std::cout<<"\n";
        }
    private:
        const Data &data_class;

};
class Observer2:public Observer
{
    public:
        Observer2() = delete;
        Observer2(const Data &newData): data_class(newData) {}
        void update() override
        {
            std::cout<<"Observer2() is called.\n";
            auto _data = data_class.returnData();
            for(auto x:_data)
            {
                std::cout<<" "<<x;
            }
            std::cout<<"\n";
        }
    private:
        const Data &data_class;

};

这里需要把观察者的update()成员函数与观察者变量的进行区分。观察者变量的update()成员函数会更新数据,并利用之前基类定义的notify()成员函数通知各个观察者调用自己的update()成员函数进行相应的更新。

之后我们可以简单运行代码测试一下:

Data data{std::vector<int>{1, 2, 3, 4, 5, 6}};
Observer1 ob1{data};
Observer2 ob2(data);
data.register_(&ob1);
data.register_(&ob2);
int x{7};
auto arr = data.returnData();
int y{8};
arr.push_back(x);
data.update(arr);
arr.push_back(y);
data.update(arr);

我们首先初始化Data类,然后定义两个观察者变量,并依次注册到data对象中。之后我们很简陋地更新了数据,输出在下面所示:

Observer1() is called.
 1 2 3 4 5 6 7
Observer2() is called.
 1 2 3 4 5 6 7
Observer1() is called.
 1 2 3 4 5 6 7 8
Observer2() is called.
 1 2 3 4 5 6 7 8

Observer0.2的实现

之前我们是在观察者变量里用一个vector来存储所有观察者的指针,但是如果刚好该观察者已经析构了,我们希望程序会有相应的处理而不是继续访问指针指向的对象。

直观上,我们首先想到在Observer销毁时先删掉vector中指向自己的指针。我们可以在Observer类中存入一个指向观察者变量的指针Observable *variable_pointer;(这里为了方便暂时先是public成员变量)。然后我们通过写一个Observable类中的unregister()成员函数来取消注册。

void Observable::unregister_(Observer* observer)
{
    for(auto it=observers.begin(); it!=observers.end();)
    {
        if(*it == observer)
        {
            it = observers.erase(it);
            break;
        }
        else
            it++;
    }
}

当然,register()也应该相应的更改。

void Observable::register_(Observer* observer)
{
    observers.push_back(observer);
    observer->variable_pointer = this;
}

在观察者变量的析构函数中调用unregister(),使其销毁时自动取消注册。

 ~Observer()
{
    variable_pointer->unregister_(this);
}

为了考虑多线程访问临界资源的问题,我们可以尝试用RALL的思想封装Linux的多线程(先不考虑C++11的线程库)。

Mutex类的声明:

namespace rall_test
{
    class RallMutex
    {
    private:
        pthread_mutex_t mutex_;
    public:
        RallMutex();
        inline void lock();
        inline void unlcok();
        ~RallMutex();
    };
    
}

Mutex类的粗糙实现:

using namespace rall_test;

RallMutex::RallMutex()
{
    pthread_mutex_init(&mutex_, nullptr);
}
    
RallMutex::~RallMutex()
{
    pthread_mutex_destroy(&mutex_);
}

void RallMutex::lock()
{
    pthread_mutex_lock(&mutex_);
}

void RallMutex::unlcok()
{
    pthread_mutex_unlock(&mutex_);
}

不过我们希望继续用RALL手法封装互斥锁的创建销毁,即不需要手动lock()unlock()。于是根据原书的建议,我们希望封装一个MutexLoCKGuard类封装临界区的进入和退出,即加锁和解锁。

    class MutexLocKGuard
    {
    private:
        RallMutex &mutex_;
    public:
        explicit MutexLocKGuard(RallMutex &mutex_): mutex_(mutex_)
        {
            mutex_.lock();
        }
        ~MutexLocKGuard()
        {
            mutex_.unlock();
        }
    };

为了不让外界调用RallMutex类的上锁解锁方法,我们可以把上锁解锁的函数声明为私有成员函数,并把MutexLocKGuard类声明为友元类。

我们现在用C++11提供的线程库为我们的0.2版本加上锁:

class Observable
{
    public:
        void notify();  // 通知所有观察者刷新
        void register_(Observer* observer); // 注册函数
        void unregister_(Observer* observer);
    
    private:
        std::vector<Observer*> observers;
        mutable std::mutex mutex_;
}; // 被观察者

在成员函数里加上lock_guard:

void Observable::notify()
{
    const std::lock_guard<std::mutex> guard(mutex_);
    for(auto observer: observers)
    {
        observer->update();
    }
    
}

void Observable::register_(Observer* observer)
{
    const std::lock_guard<std::mutex> guard(mutex_);
    observers.push_back(observer);
    observer->variable_pointer = this;
}

void Observable::unregister_(Observer* observer)
{
    const std::lock_guard<std::mutex> guard(mutex_);
    for(auto it=observers.begin(); it!=observers.end();)
    {
        if(*it == observer)
        {
            it = observers.erase(it);
            break;
        }
        else
            it++;
    }
}

另一方面,我们也需要防止在unregister时观察者也已经死亡。同时,析构函数里加锁是没有意义的。
这会产生竞态条件:

// thread A
std::lock_guard<std::mutex> guard(mutex_);
// ...
delete x;
x = nullptr;

//thread B
std::lock_guard<std::mutex> guard(mutex_);
if(x)
{
    x->update();
}

比如书中这个例子,加入线程B在进入判断语句时刚好线程A销毁了x,那么直接调用x->update()会发生未定义行为。书上给的解决办法就是智能指针。

  • shared_ptr:带引用计数的智能指针
  • weak_ptr: 指向shared_ptr,防止循环引用
  • unique_ptr: 同一块内存只能被一个指针指向

使用智能指针的Observer0.3

我们可以尝试把之前存储观察者指针的vector改为存储智能指针


class Observable
{
    public:
        void notify();  // 通知所有观察者刷新
        void register_(const std::weak_ptr<Observer> &observer); // 注册函数
    
    private:
        std::vector<std::weak_ptr<Observer>> observers;             // 引用计数至少为1
        mutable std::mutex mutex_;
};  

如果使用shared_ptr,那么可能会有观察者全部销毁但是vector仍有一份的情况。 所以这里使用weak_ptr。


void Observable::notify()
{
    const std::lock_guard<std::mutex> guard(mutex_);
    for(auto iter=observers.begin(); iter!=observers.end(); )
    {
        auto p = iter->lock();
        if(p)
        {
            p->update();
            iter++;
        }
        else
        {
            iter = observers.erase(iter);
        }
    }
    
}

这里有个问题:我们可以用weak_ptr解决刚刚析构函数的解注册问题吗?我们可以在观察者基类中加入一个std::weak_ptr<Observable> variable_pointer;来进行解注册吗?如果我们这样做,我们就会在register()里写入类似这样的写法:

  auto p = observer.lock();
  if(p)
    {
        p->variable_pointer = std::shared_ptr<Observable>(this); // FATAL
    }

这种写法肯定是有问题的。我们使用了一个带隐式类型转化的拷贝初始化。我们将this指针绑定在一个shared_ptr上。那么如果该智能指针生命周期结束,观察者变量的this很可能也被销毁。这带来的后果是不可以预计的。

为了解决这一问题,我们需要让观察者变量继承C++11标准提供的std::enable_shared_from_this<Observable>。于是我们就可以将注册函数改写为:

void Observable::register_(const std::weak_ptr<Observer> &observer)
{
    const std::lock_guard<std::mutex> guard(mutex_);
    observers.push_back(observer);
    auto p = observer.lock();
    if(p)
    {
        // p->variable_pointer = std::shared_ptr<Observable>(this); // FATAL
        p->variable_pointer = shared_from_this();
    }
}

我们通过引入智能指针解决了裸指针的一些问题。可是依旧有一些问题。一方面,我们广泛加锁。另一方面,观察者变量调用观察者的update()行为不可知,如果该更新函数调用了注册或者解注册函数,就会导致多次加锁甚至死锁。
为此,我们可以尝试使用互斥锁加智能指针模拟的读写锁。可是根据原书中所说,不推荐在程序中使用读写锁。
1、读写锁性能不如互斥锁
2、读锁中不小心修改数据导致程序崩溃
3、 读锁可重入,写锁不可重入。如果一个读锁在重入的过程中被写锁抢占,可能会导致死锁。

class Cow
{
   private:
     //std::vector<int> arr_;
     std::shared_ptr<std::vector<int>> arr_;
     mutable std::mutex mutex_;
   public:
     Cow(): arr_(new std::vector<int>()) {}
      
     void read() const;
     void append(int x);
};

void Cow::read() const
{
    decltype(arr_) new_arr_; 
    const std::lock_guard<std::mutex> guard(mutex_);
    new_arr_ = arr_; // 引用计数+1
    for(auto i:*new_arr_)
    {
        std::cout<<i<<" ";
    }
    std::cout<<std::endl;
}

void Cow::append(int x)
{
    const std::lock_guard<std::mutex> guard(mutex_);
    if(!arr_.unique())
    {
        arr_.reset(new std::vector<int>(*arr_) );
    }
    arr_->push_back(x);
    // 判断是否有人在用arr_
}

这样就通过读时共享,写时复制的方式来实现一个简单的读写锁。