Synchronized

发布时间 2024-01-06 19:54:26作者: SchemaL

Synchronized

C++多线程编程的痛点 - 锁与数据是分开的

struct RequestHandler {
  RequestQueue requestQueue_;
  SharedMutex requestQueueMutex_;
  
  std::map<std::string, Endpoint> requestEndpoints_;
  SharedMutex requestEndpointsMutex_;

  HandlerState workState_;
  SharedMutex workStateMutex_;
    
  void processRequest(const Request& request) {
      stop_watch<> watch;
      checkRequestValidity(request);
      SharedMutex::WriteHolder lock(requestQueueMutex_);
      requestQueue_.push_back(request);
      stats_->addStatValue("requestEnqueueLatency", watch.elapsed());
      LOG(INFO) << "enqueued request ID" << request.getID();
  }
};

上述代码存在以下隐患:

  1. 不加锁可以直接访问数据,而且编译器不会报错
    在上面的代码中,成员函数可以不加锁直接访问requestQueue_,并且编译器不会直接报错。另外可能存在少加锁的情况,比如要访问requestQueue_requestEndpoints_的,但是只把requestQueueMutex_给锁上了,requestEndpointsMutex_忘记加锁了
  2. 当存在多个锁保护多个数据的时候,存在上错锁的情况
    成员函数需要访问requestQueue_,但是可能错把requestEndpointsMutex_这把锁锁上了,本来应该锁requestQueueMutex_的。
  3. 上了读锁,但是却执行了写操作,编译器不会报错

Synchronized的解决方案 - 锁与数据包成一个整体,只有上了锁数据才能被访问

上面的代码可以重写为

struct RequestHandler {
  Synchronized<RequestQueue> requestQueue_;
  Synchronized<std::map<std::string, Endpoint>> requestEndpoints_;
  Synchronized<HandlerState> workState_;
    
  void processRequest(const Request& request) {
      stop_watch<> watch;
      checkRequestValidity(request);
      requestQueue_.wlock()->push_back(request);
      stats_->addStatValue("requestEnqueueLatency", watch.elapsed());
      LOG(INFO) << "enqueued request ID" << request.getID();
  }
};

Synchronized不提供直接访问数据的接口,而是提供了获取各种锁类型的接口(如上面的wlock()接口),这些接口的返回值才能访问数据,并且返回值满足RAII原语,对象析构时锁会自动释放.

Synchronized模板类

Synchronized是个模板类Synchronized<T, Mutex>Mutex默认为folly::SharedMutex,std中的所有的mutex类型都可以作为这个参数(mutex, shared_mutex, recursive_mutex, timed_mutex, recursive_timed_mutex, shared_timed_mutex),另外满足folly/synchronization/Lock.h的类也都能传给Mutex参数。

构造函数:

  • 默认构造函数直接默认初始化数据对象
  • 如果不支持默认构造函数,还提供了in-place构造,把参数透传给数据成员的构造器
  • 如果数据成员支持复制构造或移动构造,还可以直接传T类型的参数进行构造

复制构造函数

  • 先对源对象的上读锁,然后再复制源对象的数据成员,新建的Synchronized不需要上锁

移动构造函数

  • 假设源对象是纯右值,因此不会上锁,直接复制源对象的数据成员

复制运算符

  • 先对源对象加读锁,把数据成员复制到临时对象中,然后对目的对象加写锁,把临时对象移动赋值给目的对象的数据成员

移动赋值运算符

  • 假设源对象为纯右值,不对源对象加锁,目的对象加写锁,把源对象的数据成员移动赋值给目的对象的数据成员

数据成员赋值运算符

  • 目的对象加写锁,然后把参数赋值(复制/移动)给数据成员

swap按照对象地址的递增顺序对两个对象加写锁,然后swap俩对象的数据成员

数据成员的副本可以通过void copy(T*)T copy()两个接口拿到,会对Synchronized对象加读锁

Synchronized拿锁接口

follyMutex的类型分为了UniqueSharedUpgrade,针对不同的Mutex类型,Synchronized提供了不同的拿锁接口

  • Unique类型
LockedPtr lock();
ConstLockedPtr lock() const;   // 只能访问数据的const成员函数,且不能修改数据的成员

TryLockedPtr tryLock();  // 获取锁失败返回null,应该使用operator bool() 或者 LockedPtr::isNull()来判断
ConstTryLockedPtr tryLock();

template <class Rep, class Period>
LockedPtr lock(const std::chrono::duration<Rep, Period>& timeout);  // 当Mutex为timed的时候才能正常工作
template <class Rep, class Period>
ConstLockedPtr lock(const std::chrono::duration<Rep, Period>& timeout) const;

template <class Function> auto withLock(Function&& function);  // function参数为const T&或者T&
template <class Function> auto withLock(Function&& function) const;  // function参数只能为const T&

template <class Function> auto withLockPtr(Function&& function);  // function参数为LockedPtr
template <class Function> auto withLockPtr(Function&& function) const; // function参数类型为ConstLockedPtr
  • Shared类型
// 写锁
LockedPtr wlock();
ConstWLockedPtr wlock() const;

TryWLockedPtr tryWLock();
ConstTryWLockedPtr tryWLock() const;

template <class Rep, class Period>
LockedPtr wlock(const std::chrono::duration<Rep, Period>& timeout);  // 当Mutex参数为shared_timed_mutex时才能正常工作
template <class Rep, class Period>
LockedPtr wlock(const std::chrono::duration<Rep, Period>& timeout) const;

template <class Function> auto withWLock(Function&& function);
template <class Function> auto withWLock(Function&& function) const;

template <class Function>
auto withWLockPtr(Function&& function);
template <class Function>
auto withWLockPtr(Function&& function) const;

// 读锁
RLockedPtr rlock();
ConstLockedPtr rlock() const;

TryRLockedPtr tryRLock();
ConstTryRLockedPtr tryRLock() const;

template <class Rep, class Period>
RLockedPtr rlock(const std::chrono::duration<Rep, Period>& timeout);
template <class Rep, class Period>
ConstRLockedPtr rlock(const std::chrono::duration<Rep, Period>& timeout) const;

template <class Function> auto withRLock(Function&& function);
template <class Function> auto withRLock(Function&& function) const;

template <class Function>
auto withRLockPtr(Function&& function);
template <class Function>
auto withRLockPtr(Function&& function) const;

  • Upgrade类型
UpgradeLockedPtr ulock();
ConstUpgradeLockedPtr ulock() const;

TryUpgradeLockedPtr tryULock();

template <class Rep, class Period>
UpgradeLockedPtr ulock(const std::chrono::duration<Rep, Period>& timeout);  // 当Mutex参数为shared_timed_mutex时才能正常工作

template <class Function> auto withULock(Function&& function);
template <class Function> auto withULock(Function&& function) const;

template <class Function>
auto withULockPtr(Function&& function);
template <class Function>
auto withULockPtr(Function&& function) const;

Synchronizedstd::condition_variable

Mutexstd::mutex时,Synchronized<T, std::mutex>::lock()返回的LockedPtr对象有一个std::unique_lock<std::mutex> as_lock()方法,可以将该方法的返回对象传入condition_variable对象的wait方法

Synchronized<std::vector<std::string>, std::mutex> vec;
std::condition_variable emptySignal;

auto locked = vec.lock();
emptySignal.wait(locked.as_lock(), [&]{ return !locked->empty(); });

Mutexstd::shared_mutex时,wlock()返回的LockedPtr对象也有as_lock()方法,也可以传给condition_variablewait方法

升级锁

升级锁只允许读,不允许写。当需要写的时候,需要把升级锁升级为写锁,然后才能写。

升级锁可以和读锁共存,升级锁相互之间是互斥的,升级锁和写锁互斥。

升级锁适用的场景是:先读状态,然后根据状态按需更新。

struct MyObject {
  bool isUpdateRequired() const;
  void doUpdate();
};

struct MyContainingObject {
  folly::Synchronized<MyObject> sync;
  
  void mightHappenConcurrently() {
      if (!sync.rlock()->isUpdateRequired()) {
          return;
      }
      sync.withWLock([&](auto& state) {
          if (!state.isUpdateRequired()) {
              return;
          }
          state.doUpdate();
      });
  }
};

在上面的代码中,第二次判断state.isUpdateRequired()时,其他线程都不能调用isUpdateRequired,性能较差。如果后面加的是升级锁,就不存在这种问题。

void mightHappenConcurrently() {
    if (!sync.rlock()->isUpdateRequired()) {
        return;
    }
    sync.withULockPtr([&](auto ulock) {
        if (!ulock->isUpdateRequired()) {
            return;
        }
        auto wlock = ulock.moveFromUpgradeToWrite();
        wlock->doUpdate();
    });
}

通过ulock()withULockPtr()得到的升级锁可以进行升级或降级,通过调用LockedPtr对象的以下方法:

  • moveFromUpgradeToWrite()
  • moveFromWriteToUpgrade()
  • moveFromUpgradeToRead()
  • moveFromWriteToRead()

调用这些方法会返回一个新的LockedPtr,原来的LockedPtr会处于null状态。升级或降级是原子性的,也就是说在锁的状态转换过程中Synchronized对象不会处于无锁状态

auto ulock = obj.ulock();
if (ulock->needsUpdate()) {
    auto wlock = ulock.moveFromUpgradeToWrite();
    // ulock is now null
    wlock->updateObj();
}

升级和降级可以在withULockPtr()内部发生

auto newSize = obj.withULockPtr([](auto ulock) {
  if (ulock->needsUpdate()) {
      auto wlock = ulock.moveFromUpgradeToWrite();
      wlock->updateObj();
      auto rlock = wlock.moveFromWriteToRead();
      return rlock->newSize();
  } else {
      auto rlock = ulock.moveFromUpgradeToRead();
      return rlock->newSize();
  }
});