Synchronized
C++多线程编程的痛点 - 锁与数据是分开的
struct RequestHandler {
RequestQueue requestQueue_;
SharedMutex requestQueueMutex_;
std::map<std::string, Endpoint> requestEndpoints_;
SharedMutex requestEndpointsMutex_;
HandlerState workState_;
SharedMutex workStateMutex_;
void processRequest(const Request& request) {
stop_watch<> watch;
checkRequestValidity(request);
SharedMutex::WriteHolder lock(requestQueueMutex_);
requestQueue_.push_back(request);
stats_->addStatValue("requestEnqueueLatency", watch.elapsed());
LOG(INFO) << "enqueued request ID" << request.getID();
}
};
上述代码存在以下隐患:
- 不加锁可以直接访问数据,而且编译器不会报错
在上面的代码中,成员函数可以不加锁直接访问requestQueue_
,并且编译器不会直接报错。另外可能存在少加锁的情况,比如要访问requestQueue_
和requestEndpoints_
的,但是只把requestQueueMutex_
给锁上了,requestEndpointsMutex_
忘记加锁了 - 当存在多个锁保护多个数据的时候,存在上错锁的情况
成员函数需要访问requestQueue_
,但是可能错把requestEndpointsMutex_
这把锁锁上了,本来应该锁requestQueueMutex_
的。 - 上了读锁,但是却执行了写操作,编译器不会报错
Synchronized的解决方案 - 锁与数据包成一个整体,只有上了锁数据才能被访问
上面的代码可以重写为
struct RequestHandler {
Synchronized<RequestQueue> requestQueue_;
Synchronized<std::map<std::string, Endpoint>> requestEndpoints_;
Synchronized<HandlerState> workState_;
void processRequest(const Request& request) {
stop_watch<> watch;
checkRequestValidity(request);
requestQueue_.wlock()->push_back(request);
stats_->addStatValue("requestEnqueueLatency", watch.elapsed());
LOG(INFO) << "enqueued request ID" << request.getID();
}
};
Synchronized不提供直接访问数据的接口,而是提供了获取各种锁类型的接口(如上面的wlock()
接口),这些接口的返回值才能访问数据,并且返回值满足RAII
原语,对象析构时锁会自动释放.
Synchronized模板类
Synchronized
是个模板类Synchronized<T, Mutex>
,Mutex
默认为folly::SharedMutex
,std中的所有的mutex
类型都可以作为这个参数(mutex
, shared_mutex
, recursive_mutex
, timed_mutex
, recursive_timed_mutex
, shared_timed_mutex
),另外满足folly/synchronization/Lock.h
的类也都能传给Mutex
参数。
构造函数:
- 默认构造函数直接默认初始化数据对象
- 如果不支持默认构造函数,还提供了
in-place
构造,把参数透传给数据成员的构造器 - 如果数据成员支持复制构造或移动构造,还可以直接传
T
类型的参数进行构造
复制构造函数
- 先对源对象的上读锁,然后再复制源对象的数据成员,新建的Synchronized不需要上锁
移动构造函数
- 假设源对象是纯右值,因此不会上锁,直接复制源对象的数据成员
复制运算符
- 先对源对象加读锁,把数据成员复制到临时对象中,然后对目的对象加写锁,把临时对象移动赋值给目的对象的数据成员
移动赋值运算符
- 假设源对象为纯右值,不对源对象加锁,目的对象加写锁,把源对象的数据成员移动赋值给目的对象的数据成员
数据成员赋值运算符
- 目的对象加写锁,然后把参数赋值(复制/移动)给数据成员
swap
按照对象地址的递增顺序对两个对象加写锁,然后swap
俩对象的数据成员
数据成员的副本可以通过void copy(T*)
和T copy()
两个接口拿到,会对Synchronized对象加读锁
Synchronized拿锁接口
folly
把Mutex
的类型分为了Unique
、Shared
和Upgrade
,针对不同的Mutex
类型,Synchronized
提供了不同的拿锁接口
Unique
类型
LockedPtr lock();
ConstLockedPtr lock() const; // 只能访问数据的const成员函数,且不能修改数据的成员
TryLockedPtr tryLock(); // 获取锁失败返回null,应该使用operator bool() 或者 LockedPtr::isNull()来判断
ConstTryLockedPtr tryLock();
template <class Rep, class Period>
LockedPtr lock(const std::chrono::duration<Rep, Period>& timeout); // 当Mutex为timed的时候才能正常工作
template <class Rep, class Period>
ConstLockedPtr lock(const std::chrono::duration<Rep, Period>& timeout) const;
template <class Function> auto withLock(Function&& function); // function参数为const T&或者T&
template <class Function> auto withLock(Function&& function) const; // function参数只能为const T&
template <class Function> auto withLockPtr(Function&& function); // function参数为LockedPtr
template <class Function> auto withLockPtr(Function&& function) const; // function参数类型为ConstLockedPtr
Shared
类型
// 写锁
LockedPtr wlock();
ConstWLockedPtr wlock() const;
TryWLockedPtr tryWLock();
ConstTryWLockedPtr tryWLock() const;
template <class Rep, class Period>
LockedPtr wlock(const std::chrono::duration<Rep, Period>& timeout); // 当Mutex参数为shared_timed_mutex时才能正常工作
template <class Rep, class Period>
LockedPtr wlock(const std::chrono::duration<Rep, Period>& timeout) const;
template <class Function> auto withWLock(Function&& function);
template <class Function> auto withWLock(Function&& function) const;
template <class Function>
auto withWLockPtr(Function&& function);
template <class Function>
auto withWLockPtr(Function&& function) const;
// 读锁
RLockedPtr rlock();
ConstLockedPtr rlock() const;
TryRLockedPtr tryRLock();
ConstTryRLockedPtr tryRLock() const;
template <class Rep, class Period>
RLockedPtr rlock(const std::chrono::duration<Rep, Period>& timeout);
template <class Rep, class Period>
ConstRLockedPtr rlock(const std::chrono::duration<Rep, Period>& timeout) const;
template <class Function> auto withRLock(Function&& function);
template <class Function> auto withRLock(Function&& function) const;
template <class Function>
auto withRLockPtr(Function&& function);
template <class Function>
auto withRLockPtr(Function&& function) const;
Upgrade
类型
UpgradeLockedPtr ulock();
ConstUpgradeLockedPtr ulock() const;
TryUpgradeLockedPtr tryULock();
template <class Rep, class Period>
UpgradeLockedPtr ulock(const std::chrono::duration<Rep, Period>& timeout); // 当Mutex参数为shared_timed_mutex时才能正常工作
template <class Function> auto withULock(Function&& function);
template <class Function> auto withULock(Function&& function) const;
template <class Function>
auto withULockPtr(Function&& function);
template <class Function>
auto withULockPtr(Function&& function) const;
Synchronized
与std::condition_variable
当Mutex
为std::mutex
时,Synchronized<T, std::mutex>::lock()
返回的LockedPtr
对象有一个std::unique_lock<std::mutex> as_lock()
方法,可以将该方法的返回对象传入condition_variable
对象的wait
方法
Synchronized<std::vector<std::string>, std::mutex> vec;
std::condition_variable emptySignal;
auto locked = vec.lock();
emptySignal.wait(locked.as_lock(), [&]{ return !locked->empty(); });
当Mutex
为std::shared_mutex
时,wlock()
返回的LockedPtr
对象也有as_lock()
方法,也可以传给condition_variable
的wait
方法
升级锁
升级锁只允许读,不允许写。当需要写的时候,需要把升级锁升级为写锁,然后才能写。
升级锁可以和读锁共存,升级锁相互之间是互斥的,升级锁和写锁互斥。
升级锁适用的场景是:先读状态,然后根据状态按需更新。
struct MyObject {
bool isUpdateRequired() const;
void doUpdate();
};
struct MyContainingObject {
folly::Synchronized<MyObject> sync;
void mightHappenConcurrently() {
if (!sync.rlock()->isUpdateRequired()) {
return;
}
sync.withWLock([&](auto& state) {
if (!state.isUpdateRequired()) {
return;
}
state.doUpdate();
});
}
};
在上面的代码中,第二次判断state.isUpdateRequired()
时,其他线程都不能调用isUpdateRequired
,性能较差。如果后面加的是升级锁,就不存在这种问题。
void mightHappenConcurrently() {
if (!sync.rlock()->isUpdateRequired()) {
return;
}
sync.withULockPtr([&](auto ulock) {
if (!ulock->isUpdateRequired()) {
return;
}
auto wlock = ulock.moveFromUpgradeToWrite();
wlock->doUpdate();
});
}
通过ulock()
或withULockPtr()
得到的升级锁可以进行升级或降级,通过调用LockedPtr
对象的以下方法:
moveFromUpgradeToWrite()
moveFromWriteToUpgrade()
moveFromUpgradeToRead()
moveFromWriteToRead()
调用这些方法会返回一个新的LockedPtr
,原来的LockedPtr
会处于null
状态。升级或降级是原子性的,也就是说在锁的状态转换过程中Synchronized
对象不会处于无锁状态
auto ulock = obj.ulock();
if (ulock->needsUpdate()) {
auto wlock = ulock.moveFromUpgradeToWrite();
// ulock is now null
wlock->updateObj();
}
升级和降级可以在withULockPtr()
内部发生
auto newSize = obj.withULockPtr([](auto ulock) {
if (ulock->needsUpdate()) {
auto wlock = ulock.moveFromUpgradeToWrite();
wlock->updateObj();
auto rlock = wlock.moveFromWriteToRead();
return rlock->newSize();
} else {
auto rlock = ulock.moveFromUpgradeToRead();
return rlock->newSize();
}
});