ACID 6.824
发布时间 2023-09-08 18:05:55作者: yytarget
# go style协程
## 先看一下使用样例
```cpp
int main() {
// 创建一个 Channel
Channel chan(1);
// 开启一个协程往 Channel 里发送数据
Go {
for (int i = 0; i < 10; ++i) {
chan << i;
LOG_DEBUG << "send: " << i;
}
chan.close();
};
// 开启一个协程从 Channel 里读取数据
Go {
int i;
while (chan >> i) {
LOG_DEBUG << "recv: " << i;
}
};
}
```
可以看到非常容易上手,使用go关键字就能起一个协程。
现在看看具体该如何实现。
最原始调度器提交协程任务的方法为
```cpp
template
Scheduler* submit(FiberOrCb&& fc) {
...
}
```
按照这样提交,可以看到过程比较冗余,也不方便与宏配合。
```cpp
Scheduler::GetThis()->submit(fiber);
```
所以再重载一个运算符来简化一下
```cpp
// 重载+运算符,将任务转发给submit
template
Scheduler* operator+(FiberOrCb&& fc) {
return submit(std::move(fc));
}
```
就可以这样提交协程了
```cpp
(*Scheduler::GetThis()) + fiber
```
由于前面部分都是固定的,可以用宏了简化一下
```cpp
#define go (*acid::IOManager::GetThis()) +
```
现在就能用go来启动函数或者协程了
```cpp
// 普通函数
go normal;
// 协程
go fiber;
// 函数对象
go [] {
LOG_DEBUG << "Lambda";
};
std::string str = "hello";
// 但有个问题,捕获后变量是const,所以这个操作是错误,无法编译
go [str] {
str += "hello";
};
// 要想改变变量,得加上mutable
go [str]() mutable {
str += "world";
};
```
注意到最后一种情况有很强的通用性,即捕获变量进行修改,于是再设计一个宏来简化这种情况
```cpp
// Go 关键字默认按值捕获变量
// 等价于 go [=]() mutable {}
#define Go (*acid::IOManager::GetThis()) + [=]()mutable
```
现在如果要捕获变量到协程里修改,就可以用Go来启动
```cpp
std::string str = "hello";
Go {
str += "world";
};
```
注意Go默认按值捕获全部局部变量,所以使用起来要注意,如果变量太多就用go来按需捕获。
提供了配置默认调度器线程数量和调度器名字的方法,在还没启动go之前,可以使用如下方法设置
```cpp
// 设置默认调度器的线程数量
Config::Lookup("scheduler.threads")->setValue(2);
// 设置默认调度器的名字
Config::Lookup("scheduler.name")->setValue("default");
```
# 协程同步原语
一旦协程阻塞后整个协程所在的线程都将阻塞,这也就失去了协程的优势。编写协程程序时难免会对一些数据进行同步,而Linux下常见的同步原语互斥量、条件变量、信号量等基本都会堵塞整个线程,使用原生同步原语协程性能将大幅下降,甚至发生死锁的概率大大增加!
重新实现一套用户态协程同步原语能解决这个问题。
在开始实现之前我们先简单介绍一下原理。原生同步对象由内核维护,当互斥量获取锁失败,条件变量wait,信号量wait获取失败时,内核将条件不满足的线程加入一个由内核维护的等待队列,然后阻塞线程,等待条件满足时将线程重新加入调度。
如同协程之于线程,我们很容易得到一个启示,既然内核维护等待队列会阻塞线程,那可不可以由用户态来维护等待队列呢。当获取协程同步对象失败时,用户将条件不满足的协程加入一个由用户维护的协程等待队列,然后让出协程,等待条件满足时将协程重新加入协程调度器调度。看,我们解决了线程同步问题,而且没有阻塞线程!
介绍完了原理,我们来看看实现,框架实现了一下以下几种协程同步原语
* CoMutex 协程锁
* CoCondvar 协程条件变量
* CoSemaphore 协程信号量
* Channel 消息通道
依赖关系如下:
```cpp
CoMutex CoCondVar CoMutex CoCondVar
| | | |
----------- -----------
| |
V V
CoSemaphore Channel
```
## SpinLock 自旋锁
在此之前不得不提一下自旋锁。不管你是用TAS实现还是直接封装posix spin lock他们都有一个共同特点,就是不阻塞线程。我们的同步原语可以说都是基于自旋锁来实现,这里简单封装了一下posix自旋锁。
```cpp
/**
* @brief 自旋锁
*/
class SpinLock : Noncopyable {
public:
using Lock = ScopedLock;
SpinLock(){
pthread_spin_init(&m_mutex,0);
}
~SpinLock(){
pthread_spin_destroy(&m_mutex);
}
void lock(){
pthread_spin_lock(&m_mutex);
}
bool tryLock() {
return !pthread_spin_trylock(&m_mutex);
}
void unlock(){
pthread_spin_unlock(&m_mutex);
}
private:
pthread_spinlock_t m_mutex;
};
```
## CoMutex 协程锁
`CoMutex`的定义如下
```cpp
/**
* @brief 协程锁
*/
class CoMutex : Noncopyable {
public:
using Lock = ScopedLock;
bool tryLock();
void lock();
void unlock();
private:
// 协程所持有的锁
SpinLock m_mutex;
// 保护等待队列的锁
SpinLock m_gaurd;
// 持有锁的协程id
uint64_t m_fiberId = 0;
// 协程等待队列
std::queue> m_waitQueue;
};
```
成员`m_waitQueue`就是用户态维护的等待队列,维护等待这个锁的协程。
成员函数`lock`的主要代码如下
```cpp
void lock() {
...
// 第一次尝试获取锁
while (!tryLock()) {
// 由于进入等待队列和出队的代价比较大,所以再次尝试获取锁,
// 成功获取锁就返回
if(tryLock()){
...
return;
}
// 获取所在的协程
auto self = GetTHisFiber();
// 将自己加入协程等待队列
m_waitQueue.push(self);
// 让出协程
Yield;
}
...
}
```
我们尝试获取锁,如果获取失败就把自己放入等待队列并让出协程。
成员函数`unlock`的主要代码如下
```cpp
void unlock() {
...
auto Fiber = m_waitQueue.front();
...
// 释放协程锁
m_mutex.unlock();
...
// 将等待的协程重新加入调度
Schedule(fiber);
...
}
```
我们取出等待这个锁的协程,释放锁后将协程重新加入调度器。
通过一个很简单方式,我们在用户空间实现了互斥量。
使用样例
```cpp
CoMutex mutex;
void a() {
for (int i = 0; i < 100000; ++i) {
CoMutex::Lock lock(mutex);
++n;
}
}
void b() {
for (int i = 0; i < 100000; ++i) {
CoMutex::Lock lock(mutex);
++n;
}
}
```
## CoCondVar 协程条件变量
`CoCondVar`的定义如下
```cpp
/**
* @brief 协程条件变量
*/
class CoCondVar : Noncopyable {
public:
using MutexType = SpinLock;
/**
* @brief 唤醒一个等待的协程
*/
void notify();
/**
* @brief 唤醒全部等待的协程
*/
void notifyAll();
...
/**
* @brief 等待唤醒
*/
void wait(CoMutex::Lock& lock);
private:
// 协程等待队列
std::queue> m_waitQueue;
// 保护协程等待队列
MutexType m_mutex;
...
};
```
和`CoMutex`一样,协程条件变量也维护了一个等待队列。
成员函数`notify`的主要代码如下
```cpp
void notify() {
...
Fiber::ptr fiber;
// 减小锁的粒度
{
// 获取一个等待的协程
MutexType::Lock lock(m_mutex);
fiber = m_waitQueue.front();
m_waitQueue.pop();
}
// 将等待的协程重新加入调度
Schedule(fiber);
}
```
与协程锁的解锁类似,获取一个在等待队列里的协程重新加入调度器。
成员函数`notifyAll`则是将全部等待的协程加入调度器。
成员函数`wait`的主要代码如下
```cpp
void wait(CoMutex::Lock& lock) {
// 获取本协程对象
auto self = GetThisFiber();
{
MutexType::Lock lock1(m_mutex);
// 将自己加入等待队列
m_waitQueue.push(self);
...
}
// 先解锁
lock.unlock();
// 让出协程
Yield;
// 重新获取锁
lock.lock();
}
```
注意,只有先将协程锁解锁了才能加入到等待队列,否则别的协程无法获取锁,被唤醒后要重新获取锁。
至此我们已经实现了两个重要的同步原语。
使用样例
```cpp
CoMutex mutex;
CoCondVar condVar;
void cond_a() {
CoMutex::Lock lock(mutex);
LOG_INFO() << "cond a wait";
condVar.wait(lock);
LOG_INFO() << "cond a notify";
}
void cond_b() {
CoMutex::Lock lock(mutex);
LOG_INFO() << "cond b wait";
condVar.wait(lock);
LOG_INFO() << "cond b notify";
}
void cond_c() {
sleep(2);
LOG_INFO() << "notify cone";
condVar.notify();
sleep(2);
LOG_INFO() << "notify cone";
condVar.notify();
}
```
## CoSemaphore 协程信号量
`CoSemaphore`的定义如下
```cpp
/**
* @brief 协程信号量
*/
class CoSemaphore : Noncopyable {
public:
CoSemaphore(uint32_t num) {
m_num = num;
m_used = 0;
}
void wait();
void notify();
private:
// 信号量的数量
uint32_t m_num;
// 已经获取的信号量的数量
uint32_t m_used;
// 协程条件变量
CoCondVar m_condvar;
// 协程锁
CoMutex m_mutex;
};
```
协程信号量是基于协程锁和协程条件变量的。
成员函数`wait`的主要代码如下
```cpp
void wait() {
CoMutex::Lock lock(m_mutex);
// 如果已经获取的信号量大于等于信号量数量则等待
while (m_used >= m_num) {
m_condvar.wait(lock);
}
++m_used;
}
```
在条件变量的wait里让出协程等待。
成员函数`notify`的主要代码如下
```cpp
void notify() {
CoMutex::Lock lock(m_mutex);
if (m_used > 0) {
--m_used;
}
// 通知一个等待的协程
m_condvar.notify();
}
```
有了协程锁和协程条件变量,协程信号量实现起来十分简单。
使用样例
```cpp
CoSemaphore sem(5);
void sem_a() {
for (int i = 0; i < 5; ++i) {
sem.wait();
}
sleep(2);
for (int i = 0; i < 5; ++i) {
sem.notify();
}
}
void sem_b() {
sleep(1);
for (int i = 0; i < 5; ++i) {
sem.wait();
}
for (int i = 0; i < 5; ++i) {
sem.notify();
}
}
```
## Channel 消息通道
`Channel`主要是用于协程之间的通信,属于更高级层次的抽象。
在类的实现上采用了 PIMPL 设计模式,将具体操作转发给实现类
`Channel `对象可随意复制,通过智能指针指向同一个 `ChannelImpl`
`Channel`的定义如下
```cpp
template
class Channel {
public:
Channel(size_t capacity) {
m_channel = std::make_shared>(capacity);
}
Channel(const Channel& chan) {
m_channel = chan.m_channel;
}
void close() {
m_channel->close();
}
operator bool() const {
return *m_channel;
}
bool push(const T& t) {
return m_channel->push(t);
}
bool pop(T& t) {
return m_channel->pop(t);
}
Channel& operator>>(T& t) {
(*m_channel) >> t;
return *this;
}
Channel& operator<<(const T& t) {
(*m_channel) << t;
return *this;
}
size_t capacity() const {
return m_channel->capacity();
}
size_t size() {
return m_channel->size();
}
bool empty() {
return m_channel->empty();
}
bool unique() const {
return m_channel.unique();
}
private:
std::shared_ptr> m_channel;
};
```
`ChannelImpl`的定义如下
```cpp
/**
* @brief Channel 的具体实现
*/
template
class ChannelImpl : Noncopyable {
public:
ChannelImpl(size_t capacity)
: m_isClose(false)
, m_capacity(capacity){
}
/**
* @brief 发送数据到 Channel
* @param[in] t 发送的数据
* @return 返回调用结果
*/
bool push(const T& t);
/**
* @brief 从 Channel 读取数据
* @param[in] t 读取到 t
* @return 返回调用结果
*/
bool pop(T& t);
ChannelImpl& operator>>(T& t) {
pop(t);
return *this;
}
ChannelImpl& operator<<(const T& t) {
push(t);
return *this;
}
/**
* @brief 关闭 Channel
*/
void close();
operator bool() {
return !m_isClose;
}
size_t capacity() const {
return m_capacity;
}
size_t size() {
CoMutex::Lock lock(m_mutex);
return m_queue.size();
}
bool empty() {
return !size();
}
private:
bool m_isClose;
// Channel 缓冲区大小
size_t m_capacity;
// 协程锁和协程条件变量配合使用保护消息队列
CoMutex m_mutex;
// 入队条件变量
CoCondVar m_pushCv;
// 出队条件变量
CoCondVar m_popCv;
// 消息队列
std::queue m_queue;
};
```
成员函数`push`的主要代码如下
```cpp
bool push(const T& t) {
CoMutex::Lock lock(m_mutex);
if (m_isClose) {
return false;
}
// 如果缓冲区已满,等待m_pushCv唤醒
while (m_queue.size() >= m_capacity) {
m_pushCv.wait(lock);
if (m_isClose) {
return false;
}
}
m_queue.push(t);
// 唤醒m_popCv
m_popCv.notify();
return true;
}
```
成员函数`pop`的主要代码如下
```cpp
bool pop(T& t) {
CoMutex::Lock lock(m_mutex);
if (m_isClose) {
return false;
}
// 如果缓冲区为空,等待m_pushCv唤醒
while (m_queue.empty()) {
m_popCv.wait(lock);
if (m_isClose) {
return false;
}
}
t = m_queue.front();
m_queue.pop();
// 唤醒 m_pushCv
m_pushCv.notify();
return true;
}
```
成员函数`close`的主要代码如下
```cpp
void close() {
CoMutex::Lock lock(m_mutex);
if (m_isClose) {
return;
}
m_isClose = true;
// 唤醒等待的协程
m_pushCv.notify();
m_popCv.notify();
std::queue q;
std::swap(m_queue, q);
}
```
通过`Channel`我们很容易实现一个生产者消费者的样例
```cpp
void chan_a(Channel chan) {
for (int i = 0; i < 10; ++i) {
chan << i;
ACID_LOG_INFO(g_logger) << "provider " << i;
}
ACID_LOG_INFO(g_logger) << "close";
chan.close();
}
void chan_b(Channel chan) {
int i;
while (chan >> i) {
ACID_LOG_INFO(g_logger) << "consumer " << i;
}
ACID_LOG_INFO(g_logger) << "close";
}
void test_channel() {
IOManager loop{};
Channel chan(5);
loop.submit(std::bind(chan_a, chan));
loop.submit(std::bind(chan_b, chan));
}
```
## 总结
整套协程同步原语的核心其实就是协程队列,我们通过在用户态模拟了等待队列达到了原生同步原语的效果。并对之进行更高层次的抽象,得到了Channel,它使代码变得简洁优雅,不用考虑协程间的同步问题。
# RPC连接复用
## 问题分析
对于短连接来说,每次发起rpc调用就创建一条连接,由于没有竞争实现起来比较容易,但开销太大。所以本框架实现了rpc连接复用来支持更高的并发。
连接复用的问题在于,在一条连接上可以有多个并发的调用请求,由于服务器也是并发处理这些请求的,所以导致了服务器返回的响应顺序与请求顺序不一致。为了识别请求,我们很容易想到一个方法,就是给每个连接每个请求加上一个唯一的序列号,本框架的做法是在协议头加上序列号字段,具体结构如下
```c
+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
| BYTE | | | | | | | | | | | ........ |
+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
| magic | version| type | sequence id | content length | content byte[] |
+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
```
第四个字段就是一个32位的序列号,用来识别请求顺序。
解决了请求标识的问题,剩下的问题就是如何收集并发的调用请求,按串行的顺序发送给服务提供方,以及如何将收到的调用结果转发给等待的调用者。即连接的多路复用与多路分解。
## 多路复用
![](https://img2023.cnblogs.com/blog/2390290/202309/2390290-20230908175747051-1326227161.jpg)
先看一下 `RpcClient` 的大致结构
```cpp
class RpcClient : public std::enable_shared_from_this
using MutexType = CoMutex;
private:
...
/// 序列号
uint32_t m_sequenceId = 0;
/// 序列号到对应调用者协程的 Channel 映射
std::map> m_responseHandle;
/// 保护 m_responseHandle 的 mutex
MutexType m_mutex;
/// 消息发送通道
Channel m_chan;
}
```
每个 `RpcClient` 连接对象都有一个不断自增的序列号,一个 `Channel`,一个序列号到对应调用者协程的 `Channel` 映射。
在每个对象连接到服务器时,我们开启了一个 `handleSend` 协程,这个协程的作用是不断从` Channel` 里读取调用请求,转发给服务器。通过上篇所讲的协程同步原语设计,我们知道 `Channel` 内部封装了锁和协程的 `yield`、`resume`。所以我们不用进行加锁就能优雅地收集了调用请求,在 `Channel` 没有消息时会自动挂起,等待请求到达。
```cpp
void RpcClient::handleSend() {
Protocol::ptr request;
// 通过 Channel 收集调用请求,如果没有消息时 Channel 内部会挂起该协程等待消息到达
// Channel 被关闭时会退出循环
while (m_chan >> request) {
// 发送请求
m_session->sendProtocol(request);
}
}
```
现在看一下比较重要的 `call` 方法也就是调用者使用的方法,`call` 里会开启一个 `Channel` 用于接收调用结果,将请求序列号与 `Channel` 关联起来放入 `m_responseHandle`。然后创建调用请求通过 `Channel` 向 ` handleSend` 协程发送请求。之后就通过自己的 `Channel` 挂起协程,等待调用结果。
```cpp
template
Result RpcClient::call(Serializer::ptr s) {
Result val;
...
// 开启一个 Channel 接收调用结果
Channel recvChan(1);
// 本次调用的序列号
uint32_t id = 0;
{
MutexType::Lock lock(m_mutex);
id = m_sequenceId;
// 将请求序列号与接收 Channel 关联
m_responseHandle.emplace(m_sequenceId, recvChan);
++m_sequenceId;
}
// 创建请求协议,附带上请求 id
Protocol::ptr request =
Protocol::Create(Protocol::MsgType::RPC_METHOD_REQUEST,s->toString(), id);
// 向 send 协程的 Channel 发送消息
m_chan << request;
...
Protocol::ptr response;
// 等待 response,Channel内部会挂起协程,如果有消息到达或者被关闭则会被唤醒
recvChan >> response;
...
Serializer serializer(response->getContent());
serializer >> val;
return val;
}
```
这就是多路复用的设计,并发的调用请求通过 `Channel` 不用显式进行同步操作就能向 `handleSend` 协程发送请求, `handleSend`协程不断收集请求转发给服务器。
## 多路分解
接着讲讲多路分解。多路分解和多路复用就是一个相反的过程,具体就是如何将服务器的响应解析,转发给对应的调用者。
同样的,在每个rpc对象连接到服务器时,我们也开启了一个 `handleRecv` 协程用于接收服务器的消息,并且从中解析出响应类型进行对应的处理。
```cpp
void RpcClient::handleRecv() {
while (true) {
// 接收响应
Protocol::ptr response = m_session->recvProtocol();
...
// 获取响应类型
Protocol::MsgType type = response->getMsgType();
// 判断响应类型进行对应的处理
switch (type) {
// 心跳处理
case Protocol::MsgType::HEARTBEAT_PACKET:
...
break;
// 调用结果处理
case Protocol::MsgType::RPC_METHOD_RESPONSE:
handleMethodResponse(response);
break;
...
default:
...
break;
}
}
}
```
我们看一下对服务器返回调用结果的处理。我们先获取该调用结果的序列号,这个序列号标识着一个之前已经发过的调用请求。然后查找该序列号对应的 `Channel` 是否还存在,如果调用超时到达,或者之前的调用请求已经被处理,则忽略本次调用结果。通过序列号获取等待该结果的 `Channel` ,并发送调用结果唤醒调用者,完成多路分解。
```cpp
void RpcClient::handleMethodResponse(Protocol::ptr response) {
// 获取该调用结果的序列号
uint32_t id = response->getSequenceId();
std::map>::iterator it;
MutexType::Lock lock(m_mutex);
// 查找该序列号的 Channel 是否还存在,如果不存在直接返回
it = m_responseHandle.find(id);
if (it == m_responseHandle.end()) {
return;
}
// 通过序列号获取等待该结果的 Channel
Channel chan = it->second;
// 对该 Channel 发送调用结果唤醒调用者
chan << response;
}
```
## 最后
虽然单连接的rpc实现起来相对复杂一些,要在应用层面要实现多路复用的功能。但资源的利用率远远大于短连接,就性能而言,可发送和处理的消息数也比短连接多得多,这对一个高性能的rpc框架是必备的。
# 服务订阅与通知
本篇将讲解框架的发布/订阅功能,这是框架的核心功能之一。发布者可对订阅相同主题的消费者主动推送消息,实现了系统解耦,易于维护。并且通过实时的发布/订阅模式实现自动维护服务列表,当订阅的服务发生了变化,同时更新自己的服务地址缓存。
## 接口介绍
客户端可发起对 key 的订阅,这样在服务端发布消息时可以及时收到消息,并且执行回调函数。回调函数的签名为`void(Serializer)`,在回调函数里可以反序列化服务器发布的数据并处理。
```cpp
/**
* @brief 订阅消息
* @param[in] key 订阅的key
* @param[in] func 回调函数
*/
template
void RpcConnectionPool::subscribe(const std::string& key, Func func)
```
至于为什么不用`std::function`而采用模板,是因为使用lambda作为入参时,如果捕获了太多的变量会导致``std::function``的内存动态分配,使用模板然后在函数里``std::move``就可以避免内存动态分配。
服务端的发布接口比较简单,发布订阅消息,所有订阅了 key 的客户端都可以获得消息。
```cpp
/**
* @brief 发布消息
* @param[in] key 发布的key
* @param[in] data 支持 Serializer 的都可以发布
*/
template
void RpcServiceRegistry::publish(const std::string& key, T data)
```
## 简单用例
实现了 Serializer 序列化的类型都可以直接发布,比如我们想发布一个 vector 直接 publish 就可以,所有订阅了data的客户端都会收到vector。
```cpp
std::vector vec = { 1, 1, 4, 5, 1, 4};
// 发布订阅消息,所有订阅了 data 的客户端都可以获得消息
// 框架实现了 STL 容器的序列化,所有可以直接发布
server->publish("data", vec);
```
客户端对 data 进行订阅,收到消息时在回调函数里将数据反序列化回 vector 并打印出来。
```cpp
// 订阅data,服务端发布消息时会调用回调函数来处理
client->subscribe("data",[](Serializer s){
std::vector vec;
// 因为vector的序列化框架已经实现了,所以可以直接反序列化
s >> vec;
std::string str;
std::for_each(vec.begin(), vec.end(),[&str](int i) mutable { str += std::to_string(i);});
LOG_DEBUG << "recv publish: " << str;
});
```
熟悉完简单的使用方法,接下来就是本文的重点了
## 推拉结合的服务列表维护
当一个已有服务提供者节点下线, 或者一个新的服务提供者节点加入时,订阅对应接口的消费者能及时收到注册中心的通知, 并更新本地的服务地址缓存。 这样后续的服务调用就能避免调用已经下线的节点, 并且能调用到新的服务节点。
订阅通常有 pull(拉)和 push(推)两种方式。第一种是客户端定时轮询注册中心拉取开放服务的节点,另一种是注册中心主动推送数据给客户端。 这两种方式各有利弊,本框架则是两种一起使用,采用了推拉结合的方式来维护服务列表。
客户端第一次发起 RPC 调用时采用拉取的方式,将注册中心中本服务的所有提供者地址缓存到本地,并订阅了此服务节点的上下线通知。之后则是采用了注册中心主动推送的方式,推送服务节点的上下线以此维护服务列表。
下面看看具体的代码
我们用一个字符串前缀来区分服务订阅和普通订阅
```cpp
// 连接池向注册中心订阅的前缀
inline const char* RPC_SERVICE_SUBSCRIBE = "[[rpc service subscribe]]";
```
在注册中心处理服务注册的同时发布了服务上线的消息
```cpp
Protocol::ptr RpcServiceRegistry::handleRegisterService(Protocol::ptr p, Address::ptr address) {
std::string serviceAddress = address->toString();
std::string serviceName = p->getContent();
...
// 发布服务上线消息
std::tuple data { true, serviceAddress};
publish(RPC_SERVICE_SUBSCRIBE + serviceName, data);
...
return proto;
}
```
在注册中心处理服务下线的同时发布了服务下线的消息
```cpp
void RpcServiceRegistry::handleUnregisterService(Address::ptr address) {
...
for (auto& i: its) {
m_services.erase(i);
// 发布服务下线消息
std::tuple data { false, address->toString()};
publish(RPC_SERVICE_SUBSCRIBE + i->first, data);
}
...
}
```
在连接池第一次请求服务发现的同时,订阅了该服务的通知,动态维护服务列表。
```cpp
std::vector RpcConnectionPool::discover(const std::string& name) {
...
if (!m_subHandle.contains(RPC_SERVICE_SUBSCRIBE + name)) {
// 向注册中心订阅服务变化的消息
subscribe(RPC_SERVICE_SUBSCRIBE + name, [name, this](Serializer s){
// false 为服务下线,true 为新服务节点上线
bool isNewServer = false;
std::string addr;
s >> isNewServer >> addr;
MutexType::Lock lock(m_connMutex);
if (isNewServer) {
// 一个新的服务提供者节点加入,将服务地址加入服务列表缓存
LOG_DEBUG << "service [ " << name << " : " << addr << " ] join";
m_serviceCache[name].push_back(addr);
} else {
// 已有服务提供者节点下线
LOG_DEBUG << "service [ " << name << " : " << addr << " ] quit";
// 清理缓存中断开的连接地址
auto its = m_serviceCache.find(name);
if (its != m_serviceCache.end()) {
std::erase(its->second, addr);
}
}
});
}
...
}
```
实现的效果如下,控制台打印出来服务的变化
```
service [ add : 127.0.0.1:8080 ] quit
service [ add : 127.0.0.1:8080 ] join
```
## 最后
通过发布/订阅模式来实现推拉结合的服务列表维护是服务治理的重要手段。