ACID 2

发布时间 2023-09-08 18:05:55作者: yytarget
# 无侵入式序列化——反射 ## 引言 由于 cpp 还未提供反射,所以一般项目里序列化里需要实现对应类的序列化,不仅繁琐还容易出错,使用宏也并没有本质差别,都是侵入式的序列化。 最近看 [yalantinglibs](https://github.com/alibaba/yalantinglibs) 库中 struct_pack 的反射非常有意思,很简单的一些代码就可以实现反射。 另外这个库的很多实现很 tricky 可以仔细阅读。 为了便于理解本文将简化一下实现。 ## 反射的作用 对于一个类 ```cpp struct Foo { int n; string str; }; ``` 如果我们想对该类的对象进行序列化,在没有反射的情况下,需要由用户来手动来遍历该类的成员,例如 ```cpp struct Foo { int n; string str; friend Serializer& operator >> (Serializer& s, Foo& f) { s >> f.n >> f.str; return s; } friend Serializer& operator << (Serializer& out, Foo& f) { s << f.n << f.str; return s; } }; ``` 这样就可以实现 Foo 类的序列化和反序列化。但正如前文所说,这样不仅繁琐还容易出错。 我们希望实现一个非侵入式的序列化,用户只要定义类就行,由框架来完成遍历类的成员并完成序列化和反序列化,直接做到以下效果,这就是反射的作用 ```cpp Foo foo; Serializer s; // 序列化 s << foo; // 反序列化 s >> foo; ``` ## 遍历类的成员 首先最为核心的地方,要想获取类的全部成员,在 cpp 17 里有个简单的方法,就是结构化绑定(structured binding)。 ```cpp Foo foo; auto &&[a1, a2] = foo; ``` 现在 a1 就是对 foo.n 的引用,a2 就是对 foo.str 的引用。 简单封装一下,我们需要定义一个高阶函数 VisitMembers,实现 Vistor 模式,其接受两个参数: 1. 反射的对象 auto&& object 2. 一个函数 visitor,对对象全部字段进行访问、操作,签名为 `void(auto &&...items)` ,其中参数为变参模板 ```cpp constexpr decltype(auto) VisitMembers(auto &&object, auto &&visitor) { using type = std::remove_cvref_t; constexpr auto Count = MemberCount(); ... if constexpr (Count == 0) { return visitor(); } else if constexpr (Count == 1) { auto &&[a1] = object; return visitor(a1); } else if constexpr (Count == 2) { auto &&[a1, a2] = object; return visitor(a1, a2); } else if constexpr (Count == 3) { auto &&[a1, a2, a3] = object; return visitor(a1, a2, a3); } ... } ``` 代码实现里一直暴力枚举下去。 VisitMembers 里先获取类的成员数量,然后利用 if constexpr 来编译期生成对应成员数量的结构化绑定,将全部成员转发给 visitor,这就完成了对对象成员的访问。 到目前为止都很简单,但有个问题,MemberCount 获取类的成员数量该如何实现,这也是最为魔法的地方。 ## 获取类的成员数量 MemberCount 的真正实现是 MemberCountImpl ```cpp template consteval std::size_t MemberCount() { ... return MemberCountImpl(); } ``` MemberCountImpl 实现如下 ```cpp struct UniversalType { template operator T(); }; template consteval std::size_t MemberCountImpl() { if constexpr (requires { T { {Args{}}..., {UniversalType{}} }; }) { return MemberCountImpl(); } else { return sizeof...(Args); } } ``` 要想理解这个函数必须先理解这个 concept 约束了什么。 这里涉及到了一个特性 ```cpp struct Foo { int a; int b; int c; }; ``` 对于一个聚合类 Foo,以下初始化方法都是合法的 ```cpp Foo a{1}; Foo a{1, 2}; Foo a{1, 2, 3}; ``` concept 里借助了一个万能类型 UniversalType,UniversalType 中有一个可以隐式转换成任意类的稻草人函数。然后将所有的变参 UniversalType 展开检查初始化类 T 时的参数个数是否合法。 第一个分支通过不断构造新的 UniversalType,当 concept 不满足时,说明当前参数的个数就等于类的成员数量。 ## 序列化实现 ```cpp template Serializer& operator << (const T& i){ using T = std::remove_cvref_t; static_assert(!std::is_pointer_v); if constexpr(std::is_same_v || std::is_same_v || std::is_same_v){ m_byteArray->writeFint8(t); } else if constexpr(std::is_same_v){ m_byteArray->writeFloat(t); } else if constexpr(std::is_same_v){ m_byteArray->writeDouble(t); } else if constexpr(std::is_same_v){ m_byteArray->writeFint8(t); } else if constexpr(std::is_same_v){ m_byteArray->writeFuint8(t); } else if constexpr(std::is_same_v){ m_byteArray->writeFint16(t); } else if constexpr(std::is_same_v){ m_byteArray->writeFuint16(t); } else if constexpr(std::is_same_v){ m_byteArray->writeInt32(t); } else if constexpr(std::is_same_v){ m_byteArray->writeUint32(t); } else if constexpr(std::is_same_v){ m_byteArray->writeInt64(t); } else if constexpr(std::is_same_v){ m_byteArray->writeUint64(t); } else if constexpr(std::is_same_v){ m_byteArray->writeStringVint(t); } else if constexpr(std::is_same_v){ m_byteArray->writeStringVint(std::string(t)); } else if constexpr(std::is_same_v){ m_byteArray->writeStringVint(std::string(t)); } else if constexpr(std::is_enum_v){ m_byteArray->writeInt32(static_cast(t)); } else if constexpr(std::is_class_v) { static_assert(std::is_aggregate_v); VisitMembers(t, [&](auto &&...items) { (void)((*this) << ... << items); }); } return *this; } ``` 在最后一个if constexpr 里,判断是否为聚合类,然后遍历所有的成员进行序列化。 当然,由于不是原生的反射,还是有许多缺陷,比如无法对一个非聚合类进行自动序列化,此时依旧可以通过模板特化来手动实现,例如 ```cpp class Foo { private: int a; int b; public: friend Serializer& operator >> (Serializer& s, Foo& f) { s >> f.n >> f.str; return s; } friend Serializer& operator << (Serializer& out, Foo& f) { s << f.n << f.str; return s; } }; ``` ## 最后 虽然通过模拟反射,我们完成了无侵入式序列化的目的,但毕竟是模拟出来的,能获取到的元数据及其匮乏,实现起来也蹩脚。 期望在不久的将来能看到 cpp 提供足够的反射信息。 # 无栈协程状态机解析HTTP 和普通的HTTP状态机一样,我们使用了一个主从状态机来解析。不同的是这里将从状态机改造成协程状态机,免去了繁琐的状态切换,使解析流程的可读性、可维护性都大大提高。由于无栈协程的性能相当好,解析也有着不错的性能。 关于cpp20协程的使用教程已经很多了,这里不过多介绍语法了。但目前标准库缺少协程组件,先简单写一个`promise_type`,我们主要使用到了`co_return`和`co_yeild`,所以实现了`promise_type`的`return_value`和`yield_value`。 ```cpp template struct Task { struct promise_type; using handle = std::coroutine_handle; struct promise_type { ... void return_value(T val) { m_val = val; } std::suspend_always yield_value(T val) { m_val = val; return {}; } ... T m_val; }; ... T get() { return m_handle.promise().m_val; } void resume() { m_handle.resume(); } ... private: handle m_handle = nullptr; }; ``` 虽然从状态机使用协程消除了显式的状态,但主状态机还是需要状态转移,定义了以下的主状态 ```cpp /// 主状态机的状态 enum CheckState{ NO_CHECK, // 初始状态 CHECK_LINE, // 正在解析请求行 CHECK_HEADER, // 正在解析请求头 CHECK_CHUNK // 解析chunk }; ``` 定义错误码 ```cpp /// 错误码 enum Error { NO_ERROR = 0, // 正常解析 INVALID_METHOD, // 无效的 Method INVALID_PATH, // 无效的 Path INVALID_VERSION, // 无效的 Version INVALID_LINE, // 无效的请求行 INVALID_HEADER, // 无效的请求头 INVALID_CODE, // 无效的 Code INVALID_REASON, // 无效的 Reason INVALID_CHUNK // 无效的 Chunk }; ``` 由于Request和Response的格式一样都是请求行,请求头,正文,所以这两个主状态机一样,靠parse_line和parse_header两个虚函数多态来实现不同的从状态机。 ```cpp // 主状态机 size_t HttpParser::execute(char *data, size_t len, bool chunk) { ... size_t offset = 0; size_t i = 0; switch (m_checkState) { case NO_CHECK: { m_checkState = CHECK_LINE; m_parser = parse_line(); } // 解析请求行 case CHECK_LINE: { // 对请求行的每个字符解析 for(; i < len; ++i){ m_cur = data + i; m_parser.resume(); m_error = m_parser.get(); ++offset; if(isFinished()){ memmove(data, data + offset, (len - offset)); return offset; } if(m_checkState == CHECK_HEADER){ ++i; m_parser = parse_header(); break; } } if(m_checkState != CHECK_HEADER) { memmove(data, data + offset, (len - offset)); return offset; } } // 解析请求头 case CHECK_HEADER: { // 对请求头的每个字符解析 for(; i < len; ++i){ m_cur = data + i; m_parser.resume(); m_error = m_parser.get(); ++offset; if(isFinished()){ memmove(data, data + offset, (len - offset)); return offset; } } break; } ... } memmove(data, data + offset, (len - offset)); return offset; } ``` 这是解析的回调函数,结合下面的从状态机来看 ```cpp // Request回调 void on_request_method(const std::string& str); void on_request_path(const std::string& str); void on_request_query(const std::string& str); void on_request_fragment(const std::string& str) ; void on_request_version(const std::string& str); void on_request_header(const std::string& key, const std::string& val); void on_request_header_done(); // Response回调 void on_response_version(const std::string& str); void on_response_status(const std::string& str); void on_response_reason(const std::string& str); void on_response_header(const std::string& key, const std::string& val); void on_response_header_done(); void on_response_chunk(const std::string& str); ``` 接下来就是协程状态机的主要部分,这个是解析Request的请求行的从状态机,可以看到解析的流程非常清晰,每解析完一部分就会触发对应的回调来获取请求内容。重点就在于parse_line()每次resume后都是在上一次解析过程的下一步,因为协程本身就是一个状态机,利用协程就消除了显式状态转移,使得解析过程连贯,逻辑清晰。 ```cpp /** * @brief 解析 HTTP 请求的请求行 */ Task HttpRequestParser::parse_line() { std::string buff; // 读取method while(isalpha(*m_cur)) { buff.push_back(*m_cur); co_yield NO_ERROR; } if(buff.empty()) { co_return INVALID_METHOD; } if(*m_cur != ' ') { // method之后不是空格,格式错误。 co_return INVALID_METHOD; } else { // 读完method, 触发回调函数 on_request_method(buff); if (m_error) { co_return INVALID_METHOD; } buff = ""; co_yield NO_ERROR; } // 读取路径 while(std::isprint(*m_cur) && strchr(" ?", *m_cur) == nullptr) { buff.push_back(*m_cur); co_yield NO_ERROR; } if(buff.empty()) { // path为空,格式错误 co_return INVALID_PATH; } if(*m_cur == '?'){ on_request_path(buff); buff = ""; co_yield NO_ERROR; // 读取query while(std::isprint(*m_cur) && strchr(" #", *m_cur) == nullptr) { buff.push_back(*m_cur); co_yield NO_ERROR; } if(*m_cur == '#'){ on_request_query(buff); buff = ""; // 读取fragment while(std::isprint(*m_cur) && strchr(" ", *m_cur) == nullptr) { buff.push_back(*m_cur); co_yield NO_ERROR; } if(*m_cur != ' '){ // fragment之后不是空格,格式错误。 co_return INVALID_PATH; } else { on_request_fragment(buff); buff = ""; co_yield NO_ERROR; } } else if(*m_cur != ' '){ // query之后不是空格,格式错误。 co_return INVALID_PATH; } else { on_request_query(buff); buff = ""; co_yield NO_ERROR; } } else if(*m_cur != ' '){ // path之后不是空格,格式错误。 co_return INVALID_PATH; } else { on_request_path(buff); buff = ""; co_yield NO_ERROR; } const char* version = "HTTP/1."; while (*version) { if(*m_cur != *version) { co_return INVALID_VERSION; } version++; co_yield NO_ERROR; } if(*m_cur != '1' && *m_cur != '0') { co_return INVALID_VERSION; } else { buff="1."; buff.push_back(*m_cur); on_request_version(buff); if (m_error) { co_return INVALID_VERSION; } buff = ""; co_yield NO_ERROR; } if(*m_cur != '\r') { co_return INVALID_LINE; } co_yield NO_ERROR; if(*m_cur != '\n') { co_return INVALID_LINE; } // 状态转移 m_checkState = CHECK_HEADER; co_return NO_ERROR; } ``` 解析Request的请求头 ```cpp /** * @brief 解析 HTTP 请求的请求头 */ Task HttpRequestParser::parse_header() { std::string key, val; // 循环读取header,直到读取到\r\n\r\n时完成 while (!isFinished()){ // 读取key while(std::isprint(*m_cur) && *m_cur != ':') { // 读取所有连续的字符存储入key中。 key.push_back(*m_cur); co_yield NO_ERROR; } // 读取: if(*m_cur != ':') { co_return INVALID_HEADER; } else { co_yield NO_ERROR; } // 读取空格 while(*m_cur == ' ') { co_yield NO_ERROR; } // 读取value while (std::isprint(*m_cur)) { val.push_back(*m_cur); co_yield NO_ERROR; } if(*m_cur != '\r') { co_return INVALID_HEADER; } co_yield NO_ERROR; if(*m_cur != '\n') { co_return INVALID_HEADER; } co_yield NO_ERROR; if(*m_cur == '\r') { co_yield NO_ERROR; // 判断是不是 \r\n\r\n if(*m_cur == '\n'){ on_request_header(key, val); on_request_header_done(); m_finish = true; // 状态转移 m_checkState = NO_CHECK; //yield; //会退出循环 } else { co_return INVALID_HEADER; } } else { on_request_header(key, val); key.clear(); val.clear(); key.push_back(*m_cur); co_yield NO_ERROR; } } // 状态转移 m_checkState = NO_CHECK; co_return NO_ERROR; } ``` 解析Response的请求行 ```cpp /** * @brief 解析 HTTP 响应的请求行 */ Task HttpResponseParser::parse_line() { std::string buff; const char* version = "HTTP/1."; // 判断HTTP版本是否合法 while (*version) { if(*m_cur != *version) { co_return INVALID_VERSION; } version++; co_yield NO_ERROR; } if(*m_cur != '1' && *m_cur != '0') { co_return INVALID_VERSION; } else { buff="1."; buff.push_back(*m_cur); on_response_version(buff); if (m_error) { co_return INVALID_VERSION; } buff = ""; co_yield NO_ERROR; } // 跳过空格 while(*m_cur == ' ') { co_yield NO_ERROR; } // 读取status while(isdigit(*m_cur)) { // 读取所有连续的字符存储入method_中。 buff.push_back(*m_cur); co_yield NO_ERROR; } // 读完了method if(buff.empty()) { co_return INVALID_CODE; } // 读取空格 if(*m_cur != ' ') { // status之后不是空格,格式错误。 co_return INVALID_CODE; } else { // 读完status, 触发回调函数 on_response_status(buff); buff = ""; co_yield NO_ERROR; } // 跳过空格 while(*m_cur == ' ') { co_yield NO_ERROR; } // 读取Reason while(std::isalpha(*m_cur) || *m_cur == ' ') { buff.push_back(*m_cur); co_yield NO_ERROR; } if(buff.empty()) { // path为空,格式错误 co_return INVALID_REASON; } if(*m_cur != '\r') { co_return INVALID_LINE; } co_yield NO_ERROR; if(*m_cur != '\n') { co_return INVALID_LINE; } on_response_reason(buff); // 状态转移 m_checkState = CHECK_HEADER; co_return NO_ERROR; } ``` 解析Response的请求头 ```cpp /** * @brief 解析 HTTP 响应的请求头 */ Task HttpResponseParser::parse_header() { std::string key, val; // 循环读取header,直到读取到\r\n\r\n时完成 while (!isFinished()){ // 读取key while(std::isprint(*m_cur) && strchr(":", *m_cur) == nullptr) { key.push_back(*m_cur); co_yield NO_ERROR; } // 读取: if(*m_cur != ':') { co_return INVALID_HEADER; } else { co_yield NO_ERROR; } // 读取空格 while(*m_cur == ' ') { co_yield NO_ERROR; } // 读取value // 读取所有连续的字符存储入val中。 while (std::isprint(*m_cur)) { val.push_back(*m_cur); co_yield NO_ERROR; } if(*m_cur != '\r') { co_return INVALID_HEADER; } co_yield NO_ERROR; if(*m_cur != '\n') { co_return INVALID_HEADER; } co_yield NO_ERROR; if(*m_cur == '\r') { co_yield NO_ERROR; // 判断是不是 \r\n\r\n if(*m_cur == '\n'){ on_response_header(key, val); on_response_header_done(); m_finish = true; // 状态转移 m_checkState = NO_CHECK; //co_yield NO_ERROR; //会退出循环 } else { co_return INVALID_HEADER; } } else { on_response_header(key, val); key.clear(); val.clear(); key.push_back(*m_cur); co_yield NO_ERROR; } } // 状态转移 m_checkState = NO_CHECK; co_return NO_ERROR; } ``` 以上的解析基本都大同小异,只要理解了一个协程状态机,你就会发现任何的状态机基本都可以改写成协程形式的。 读完头部信息后,就可以通过ContentLength来读取正文的内容了,完整的协议接收如下。 ```cpp HttpRequest::ptr HttpSession::recvRequest() { HttpRequestParser::ptr parser(new HttpRequestParser); uint64_t buff_size = HttpRequestParser::GetHttpRequestBufferSize(); std::string buffer; buffer.resize(buff_size); char* data = &buffer[0]; size_t offset = 0; while (!parser->isFinished()) { ssize_t len = read(data + offset, buff_size - offset); if (len <= 0) { close(); return nullptr; } len += offset; size_t nparse = parser->execute(data, len); if (parser->hasError() || nparse == 0) { ACID_LOG_DEBUG(g_logger) << "parser error code:" << parser->hasError(); close(); return nullptr; } offset = len - nparse; } uint64_t length = parser->getContentLength(); if (length >= 0) { std::string body; body.resize(length); size_t len = 0; if (length >= offset) { memcpy(&body[0], data, offset); len = offset; } else { memcpy(&body[0], data, length); len = length; } length -= len; if(length > 0) { if(readFixSize(&body[len], length) <= 0) { close(); return nullptr; } } parser->getData()->setBody(body); } return parser->getData(); } ``` ## 最后 透过现象看本质,协程底层通过协程帧来保存了上下文,状态机也隐藏在了里面,使得我们不用显式定义状态,免去了繁琐的状态转换。如同大佬所说,协程之于状态机就像递归之于栈。 协程的使用场合不止异步,这是一个强大的利器,值得我们更深入去了解。 # 分布式 KV 存储 ------------------- ## 引言 注册中心作为服务治理框架的核心,负责所有服务的注册,服务之间的调用也都通过注册中心来请求服务发现。 注册中心重要性不言而喻,一旦宕机,全部的服务都会出现问题,所以我们需要多个注册中心组成集群来提供服务。 本项目中,通过 raft 分布式共识算法,简单实现了分布式一致性的 KV 存储系统,对接口进行了封装,并且提供了 HTTP 接口和 RPC 接口。 为以后注册中心集群的实现打下了基础。 项目链接:https://github.com/zavier-wong/acid ## 用例 在深入源码之前,先简单了解一下使用样例。在 `acid/example/kvhttp` 下提供了一个基于 kvraft 模块实现的简单分布式 kv 存储的前后端。 ``` shell kvhttp ├── CMakeLists.txt # cmakelists ├── KVHttpServer接口文档.md # 接口文档 ├── index.html # 前端 └── kvhttp_server.cpp # 后端 ``` 先用 cmake 构建 kvhttp,然后使用 `./kvhttp_server 1`,`./kvhttp_server 2`,`./kvhttp_server 3` 来启动三个节点组成一个 kv 集群。 现在双击 index.html 启动前端,就可以通过 web 来于 kv 集群交互。 每个节点都会创建一个 `./kvhttp-x` 目录来存储状态,`./kvhttp-x/raft-state` 存储的是 raft 的持久化数据,当日志的长度大于阈值, 节点会全量序列化当前时刻的数据以快照的形式存储在 `./kvhttp-x/snapshot/` 目录下,并以 raft 的 term 和 index 来命名快照。 这里建议读者先完整运行一遍用例,再继续接下来的学习。 ## 设计思路 用例 kvhttp_server 通过 `acid::http::KVStoreServlet` 来处理 http 请求,KVStoreServlet 转发请求给 `acid::kvraft::KVServer` 暴露出来操作 KV 的接口,KVServer 可以看成是一个持有所有已提交的键值对的 map, 并且封装了 `acid::raft::RaftNode`,KVServer 将所有的 KV 操作提交给了 RaftNode,等待操作在集群间达成共识后更新自己的 map。 如下图,KVServer 为 HttpServer 和 RaftNode 之间的通信建立起了桥梁。 ``` 1 2 3 ┌───────────────┐ ┌─────────────────────┐ ┌─────────────────┐ │ │ │ ┌───────────────┐ │ │ │ │ ◄──────┼────┼──┤ ├──┼───┼─────────► │ │ │ │ │ RaftNode │ │ │ │ │ ────────┼────┼──┤ ├──┼───┼────────── │ │ │ │ └───┬──────▲────┘ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ┌───▼──────┴────┐ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ KVServer │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └───┬──────▲────┘ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ┌───▼──────┴────┐ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ HttpServer │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └───────────────┘ │ │ │ └───────────────┘ └─────────────────────┘ └─────────────────┘ ``` ## KVStoreServlet `acid::http::KVStoreServlet` 是 Http 服务器的实现。这并不是我们关注的重点。我们需要关注的只是其通过`acid::kvraft::KVServer`中的哪些方法来提供服务。 ```cpp class KVStoreServlet : public Servlet { public: using ptr = std::shared_ptr; KVStoreServlet(std::shared_ptr store); /** * request 和 response 都以 json 来交互 * request json 格式: * { * "command": "put", * "key": "name", * "value": "zavier" * } * response json 格式: * { * "msg": "OK", * "value": "" // 如果 request 的 command 是 get 请求返回对应 key 的 value, 否则为空 * "data": { // 如果 request 的 command 是 dump 请求返回数据库的全部键值对 * "key1": "value1", * "key2": "value2", * ... * } * } */ int32_t handle(HttpRequest::ptr request, HttpResponse::ptr response, HttpSession::ptr session) override; private: // KV 存储服务器 std::shared_ptr m_store; }; ``` ```cpp int32_t KVStoreServlet::handle(HttpRequest::ptr request, HttpResponse::ptr response, HttpSession::ptr session) { nlohmann::json req = request->getJson(); nlohmann::json resp; co_defer_scope { response->setJson(resp); }; ... Params params(req); ... const std::string& command = *params.command; if (command == "dump") { const auto& data = m_store->getData(); ... return 0; } else if (command == "clear") { kvraft::CommandResponse commandResponse = m_store->Clear(); ... return 0; } ... const std::string& key = *params.key; kvraft::CommandResponse commandResponse; if (command == "get") { commandResponse = m_store->Get(key); resp["msg"] = kvraft::toString(commandResponse.error); resp["value"] = commandResponse.value; } else if (command == "delete") { commandResponse = m_store->Delete(key); resp["msg"] = kvraft::toString(commandResponse.error); } else if (command == "put") { ... const std::string& value = *params.value; commandResponse = m_store->Put(key, value); resp["msg"] = kvraft::toString(commandResponse.error); } else if (command == "append") { ... const std::string& value = *params.value; commandResponse = m_store->Append(key, value); resp["msg"] = kvraft::toString(commandResponse.error); } else { resp["msg"] = "command not allowed"; } return 0; } ``` 可以看到 KVStoreServlet 只是将请求简单转发给 KVServer。 ## KVServer KVServer 是连接 raft 服务器与 http 服务器的桥梁,是实现键值存储功能的重要组件,但是其实现很简单。 ```cpp class KVServer { public: ... using KVMap = std::map; KVServer(std::map& servers, int64_t id, Persister::ptr persister, int64_t maxRaftState = 1000); ... void start(); CommandResponse handleCommand(CommandRequest request); CommandResponse Get(const std::string& key); CommandResponse Put(const std::string& key, const std::string& value); CommandResponse Append(const std::string& key, const std::string& value); CommandResponse Delete(const std::string& key); CommandResponse Clear(); const KVMap& getData() const { return m_data;} private: void applier(); void saveSnapshot(int64_t index); void readSnapshot(Snapshot::ptr snap); ... CommandResponse applyLogToStateMachine(const CommandRequest& request); private: ... KVMap m_data; Persister::ptr m_persister; std::unique_ptr m_raft; co::co_chan m_applychan; ... int64_t m_maxRaftState = -1; }; ``` 先看几个字段,m_data 是一个由 map 实现的键值存储,m_persister 是一个持久化管理模块,m_raft 指向了一个 raft 服务器, m_applychan 是接收 raft 达成共识消息的 channel, m_maxRaftState 是一个阈值,超过之后 KVServer 会生成快照替换日志。 从简单的 start 函数开始看 ```cpp void KVServer::start() { readSnapshot(m_persister->loadSnapshot()); go [this] { applier(); }; m_raft->start(); } ``` star 里会通过 m_persister 从本地加载一个最近的快照,并调用 readSnapshot 来恢复之前的状态, 然后启动一个协程执行 applier 函数,最后启动 raft 服务器并阻塞在这里。 再看一下在协程里执行的 applier 函数,不断从 m_applychan 里接收 raft 达成共识的消息,再根据消息类型进行对应的操作。 ```cpp void KVServer::applier() { ApplyMsg msg{}; while (m_applychan.pop(msg)) { std::unique_lock lock(m_mutex); SPDLOG_LOGGER_DEBUG(g_logger, "Node[{}] tries to apply message {}", m_id, msg.toString()); if (msg.type == ApplyMsg::SNAPSHOT) { auto snap = std::make_shared(); snap->metadata.index = msg.index; snap->metadata.term = msg.term; snap->data = std::move(msg.data); m_raft->persistSnapshot(snap); readSnapshot(snap); ... continue; } else if (msg.type == ApplyMsg::ENTRY) { // Leader 选出后提交的空日志 if (msg.data.empty()) { continue; } int64_t msg_idx = msg.index; ... m_lastApplied = msg_idx; auto request = msg.as(); CommandResponse response; if (request.operation != GET && isDuplicateRequest(request.clientId, request.commandId)) { SPDLOG_LOGGER_DEBUG(g_logger, "Node[{}] doesn't apply duplicated message {} to stateMachine because maxAppliedCommandId is {} for client {}", m_id, request.toString(), m_lastOperation[request.clientId].second.toString(), request.clientId); response = m_lastOperation[request.clientId].second; } else { response = applyLogToStateMachine(request); if (request.operation != GET) { m_lastOperation[request.clientId] = {request.commandId, response}; } } auto [term, isLeader] = m_raft->getState(); if (isLeader && msg.term == term) { m_nofiyChans[msg.index] << response; } if (needSnapshot()) { saveSnapshot(msg.index); } } else { SPDLOG_LOGGER_CRITICAL(g_logger, "unexpected ApplyMsg type: {}, index: {}, term: {}, data: {}", (int)msg.type, msg.index, msg.term, msg.data); exit(EXIT_FAILURE); } } } ``` 如果接收的消息是 `ApplyMsg::SNAPSHOT`,KVServer 会恢复快照数据。如果接收的消息是 `ApplyMsg::ENTRY`, 说明这条日志已经在集群达成共识,KVServer 将日志反序列化回请求,并判断请求是否写请求且根据 clientId 和 commandId 判断是否在上次执行过,执行过则直接返回上次结果,否则调用 applyLogToStateMachine 函数来 apply 到状态机。 最后通过 `m_nofiyChans[msg.index] << response;` 唤醒等待的协程。 ```cpp CommandResponse KVServer::applyLogToStateMachine(const CommandRequest& request) { CommandResponse response; KVMap::iterator it; switch (request.operation) { case GET: it = m_data.find(request.key); if (it == m_data.end()) { response.error = NO_KEY; } else { response.value = it->second; } break; case PUT: m_data[request.key] = request.value; break; case APPEND: m_data[request.key] += request.value; break; case DELETE: it = m_data.find(request.key); if (it == m_data.end()) { response.error = NO_KEY; } else { m_data.erase(it); } break; case CLEAR: m_data.clear(); break; default: SPDLOG_LOGGER_CRITICAL(g_logger, "unexpect operation {}", (int)request.operation); exit(EXIT_FAILURE); } return response; } ``` applyLogToStateMachine 函数里我们看到了 KV 存储的庐山真面目,就是对一个 map 进行增删改查。 回顾成员函数,会发现这几个相似的函数 ```cpp CommandResponse handleCommand(CommandRequest request); CommandResponse Get(const std::string& key); CommandResponse Put(const std::string& key, const std::string& value); CommandResponse Append(const std::string& key, const std::string& value); CommandResponse Delete(const std::string& key); CommandResponse Clear(); ``` 看一下具体实现 ```cpp CommandResponse KVServer::Get(const std::string& key) { CommandRequest request{.operation = GET, .key = key, .commandId = GetRandom()}; return handleCommand(request); } CommandResponse KVServer::Put(const std::string& key, const std::string& value) { CommandRequest request{.operation = PUT, .key = key, .value = value, .commandId = GetRandom()}; return handleCommand(request); } CommandResponse KVServer::Append(const std::string& key, const std::string& value) { CommandRequest request{.operation = APPEND, .key = key, .value = value, .commandId = GetRandom()}; return handleCommand(request); } CommandResponse KVServer::Delete(const std::string& key) { CommandRequest request{.operation = DELETE, .key = key, .commandId = GetRandom()}; return handleCommand(request); } CommandResponse KVServer::Clear() { CommandRequest request{.operation = CLEAR, .commandId = GetRandom()}; return handleCommand(request); } ``` 很容易看出这里为了复用代码减少冗余将所有操作封装成一个 handleCommand 函数 ```cpp CommandResponse KVServer::handleCommand(CommandRequest request) { CommandResponse response; ... if (request.operation != GET && isDuplicateRequest(request.clientId, request.commandId)) { response = m_lastOperation[request.clientId].second; return response; } ... auto entry = m_raft->propose(request); if (!entry) { response.error = WRONG_LEADER; response.leaderId = m_raft->getLeaderId(); return response; } ... auto chan = m_nofiyChans[entry->index]; ... if (!chan.TimedPop(response, ...)) { response.error = TIMEOUT; } ... m_nofiyChans.erase(entry->index); return response; } ``` 在这个函数中会先简单判断一下请求是否重复出现,是的话返回上次结果,否则就提交给了 raft 节点。提交后会判断当前节点是否是集群 leader,不是的话返回错误。 然后通过一个 channel 来等待消息在 raft 间达成共识,如果超时返回错误。 我们可以看到,KVServer 中基本上没有多少与 raft 相关的处理逻辑,大部分代码是对键值存储抽象本身的实现。 ## RaftNode 最后,我们来学习 raft 服务器的实现,这也是最重要的部分。 ### RaftNode 结构 先来看看结构体中的字段,注释描述了其用途 ```cpp /** * @brief Raft 节点,处理 rpc 请求,并改变状态,通过 RaftPeer 调用远端 Raft 节点 */ class RaftNode : public acid::rpc::RpcServer { ... private: MutexType m_mutex; // 节点状态,初始为 Follower RaftState m_state = Follower; // 该 raft 节点的唯一id int64_t m_id; // 任期内的 leader id,用于 follower 返回给客户端让客户端重定向请求到leader,-1表示无leader int64_t m_leaderId = -1; // 服务器已知最新的任期(在服务器首次启动时初始化为0,单调递增) int64_t m_currentTerm = 0; // 当前任期内收到选票的 candidateId,如果没有投给任何候选人 则为-1 int64_t m_votedFor = -1; // 日志条目,每个条目包含了用于状态机的命令,以及领导人接收到该条目时的任期(初始索引为1) RaftLog m_logs; // 远端的 raft 节点,id 到节点的映射 std::map m_peers; // 对于每一台服务器,发送到该服务器的下一个日志条目的索引(初始值为领导人最后的日志条目的索引+1) std::map m_nextIndex; // 对于每一台服务器,已知的已经复制到该服务器的最高日志条目的索引(初始值为0,单调递增) std::map m_matchIndex; // 选举定时器,超时后节点将转换为候选人发起投票 CycleTimerTocken m_electionTimer; // 心跳定时器,领导者定时发送日志维持心跳,和同步日志 CycleTimerTocken m_heartbeatTimer; // 持久化 Persister::ptr m_persister; // 提交已经达成共识的消息 co::co_condition_variable m_applyCond; // 用来已通过raft达成共识的已提交的提议通知给其它组件的信道。 co::co_chan m_applyChan; }; ``` ### RaftNode的创建与启动 在创建 RaftNode 时,需要提供节点 id、持久化管理器 persister、要 apply 到状态机的通道 applyChan这些参数,节点会注册三个 raft 服务的 rpc 调用然后添加其他节点。 ```cpp RaftNode::RaftNode(std::map& servers, int64_t id, Persister::ptr persister, co::co_chan applyChan) : m_id(id) , m_logs(persister, 1000) , m_persister(persister) , m_applyChan(std::move(applyChan)) { rpc::RpcServer::setName("Raft-Node[" + std::to_string(id) + "]"); // 注册服务 RequestVote registerMethod(REQUEST_VOTE,[this](RequestVoteArgs args) { return handleRequestVote(std::move(args)); }); // 注册服务 AppendEntries registerMethod(APPEND_ENTRIES, [this](AppendEntriesArgs args) { return handleAppendEntries(std::move(args)); }); // 注册服务 InstallSnapshot registerMethod(INSTALL_SNAPSHOT, [this](InstallSnapshotArgs args) { return handleInstallSnapshot(std::move(args)); }); for (auto peer: servers) { if (peer.first == id) continue; Address::ptr address = Address::LookupAny(peer.second); // 添加节点 addPeer(peer.first, address); } } ``` 在构造函数中,仅初始化了部分参数和注册了一些服务,并没有做过多的操作。现在看一下 start 函数。 ```cpp void RaftNode::start() { auto hs = m_persister->loadHardState(); if (hs) { // 恢复崩溃前的状态 m_currentTerm = hs->term; m_votedFor = hs->vote; SPDLOG_LOGGER_INFO(g_logger, "initialize from state persisted before a crash, term {}, vote {}, commit {}", hs->term, hs->vote, hs->commit); } else { becomeFollower(0); } rescheduleElection(); go [this] { applier(); }; RpcServer::start(); } ``` start 函数也很简单。首先,尝试从持久化的数据里获取 raft 服务器崩溃前的状态,然后恢复状态,如果没有则重新成为任期为 0 的 follower。 恢复状态后重置选举定时器,开启一个协程执行 applier 函数,然后调用 RpcServer::start 阻塞在这里。 在 applier 协程里,raft 阻塞获取需要 apply 的日志,然后发送到 m_applyChan 里通知上层服务日志可以 apply,最后将日志提交到该 index。 ```cpp void RaftNode::applier() { while (!isStop()) { std::unique_lock lock(m_mutex); // 如果没有需要apply的日志则等待 while (!m_logs.hasNextEntries()) { m_applyCond.wait(lock); } auto last_commit = m_logs.committed(); auto ents = m_logs.nextEntries(); std::vector msg; for (auto& ent: ents) { msg.emplace_back(ent); } ... for (auto& m: msg) { m_applyChan << m; } ... // 1. push applyCh 结束之后更新 lastApplied 的时候一定得用之前的 commitIndex ,因为很可能在 push channel 期间发生了改变。 // 2. 防止与 installSnapshot 并发导致 lastApplied 回退:需要注意到,applier 协程在 push channel 时,中间可能夹杂有 snapshot // 也在 push channel。 如果该 snapshot 有效,那么上层状态机和 raft 模块就会原子性的发生替换,即上层状态机更新为 snapshot 的状态, // raft 模块更新 log, commitIndex, lastApplied 等等,此时如果这个 snapshot 之后还有一批旧的 entry 在 push channel, // 那上层服务需要能够知道这些 entry 已经过时,不能再 apply,同时 applier 这里也应该加一个 Max 自身的函数来防止 lastApplied 出现回退。 m_logs.appliedTo(std::max(m_logs.applied(), last_commit)); } } ``` ### Raft Leader 选举 在 leader 选举方面有许多种优化算法,目前精力有限并没有进行优化,但以后会慢慢加上。 在 star 函数里我们重置了选举定时器,超时后将会调用 startElection 向全部节点发起选举投票。 要注意的是,为了防止节点间超时时间相同一起发起选举然后瓜分选票,这里需要从 GetRandomizedElectionTimeout 获取随机超时。 ```cpp void RaftNode::rescheduleElection() { ... m_electionTimer = CycleTimer(GetRandomizedElectionTimeout(), [this] { ... if (m_state != RaftState::Leader) { becomeCandidate(); // 异步投票,不阻塞选举定时器 startElection(); } }); } ``` startElection 是选举的核心,这里的 requestVote 是对 rpc 的封装,节点使用协程发起异步投票,不阻塞选举定时器,这样在选举超时后能发起新的选举。 在协程间通过 shared_ptr 来共享本轮票数,如果票数过半则成功竞选为 leader,否则重新等待选举定时器超时。 注意,在投票前需要先持久化状态。 ```cpp void RaftNode::startElection() { RequestVoteArgs request{}; request.term = m_currentTerm; request.candidateId = m_id; request.lastLogIndex = m_logs.lastIndex(); request.lastLogTerm = m_logs.lastTerm(); ... m_votedFor = m_id; persist(); // 统计投票结果,自己开始有一票 std::shared_ptr grantedVotes = std::make_shared(1); for (auto& peer: m_peers) { // 使用协程发起异步投票,不阻塞选举定时器,才能在选举超时后发起新的选举 go [grantedVotes, request, peer, this] { auto reply = peer.second->requestVote(request); ... // 检查自己状态是否改变 if (m_currentTerm == request.term && m_state == RaftState::Candidate) { if (reply->voteGranted) { ++(*grantedVotes); // 赢得选举 if (*grantedVotes > ((int64_t)m_peers.size() + 1) / 2) { ... becomeLeader(); } } else if (reply->term > m_currentTerm) { ... becomeFollower(reply->term, reply->leaderId); rescheduleElection(); } } }; } } ``` 当前节点在 handleRequestVote 里处理别的节点发起的投票请求。 首先会进行一系列检查,只有通过检查的才能给节点投票,然后重置选举定时器。 重点,成功投票后才重置选举定时器,这样有助于网络不稳定条件下选主的 liveness 问题。 例如:极端情况下,Candidate 只能发送消息而收不到 Follower 的消息,Candidate 会不断发起新 一轮的选举,如果当前节点在 becomeFollower 后马上重置选举定时器,会导致节点无法发起选举,该集群 无法选出新的 Leader。 只是重置定时器的时机不一样,就导致了集群无法容忍仅仅 1 个节点故障 ```cpp RequestVoteReply RaftNode::handleRequestVote(RequestVoteArgs request) { std::unique_lock lock(m_mutex); RequestVoteReply reply{}; co_defer_scope { persist(); ... }; // 拒绝给任期小于自己的候选人投票 if (request.term < m_currentTerm || // 多个候选人发起选举的情况 (request.term == m_currentTerm && m_votedFor != -1 && m_votedFor != request.candidateId)) { ... return reply; } // 任期比自己大,变成追随者 if (request.term > m_currentTerm) { becomeFollower(request.term); } // 拒绝掉那些日志没有自己新的投票请求 if (!m_logs.isUpToDate(request.lastLogIndex, request.lastLogTerm)) { ... return reply; } // 投票给候选人 m_votedFor = request.candidateId; ... rescheduleElection(); // 返回给候选人的结果 ... return reply; } ``` ### Raft 日志复制与快照安装 raft 的快照复制与快照安装都是由 leader 发起的。 当节点 becomeLeader 后,会开启心跳定时器 ```cpp void RaftNode::becomeLeader() { ... m_state = RaftState::Leader; m_leaderId = m_id; // 成为领导者后,领导者并不知道其它节点的日志情况,因此与其它节点需要同步那么日志,领导者并不知道集群其他节点状态, // 因此他选择了不断尝试。nextIndex、matchIndex 分别用来保存其他节点的下一个待同步日志index、已匹配的日志index。 // nextIndex初始化值为lastIndex+1,即领导者最后一个日志序号+1,因此其实这个日志序号是不存在的,显然领导者也不 // 指望一次能够同步成功,而是拿出一个值来试探。matchIndex初始化值为0,这个很好理解,因为他还未与任何节点同步成功过, // 所以直接为0。 for (auto& peer : m_peers) { m_nextIndex[peer.first] = m_logs.lastIndex() + 1; m_matchIndex[peer.first] = 0; } persist(); ... // note: 即使日志已经被同步到了大多数个节点上,依然不能认为是已经提交了 // 所以 leader 上任后应该提交一条空日志来提交之前的日志 Propose(""); // 成为领导者,发起一轮心跳 broadcastHeartbeat(); // 开启心跳定时器 resetHeartbeatTimer(); } ``` 心跳定时器超时后会调用 broadcastHeartbeat 广播心跳。 ```cpp void RaftNode::resetHeartbeatTimer() { ... m_heartbeatTimer = CycleTimer(GetStableHeartbeatTimeout(), [this] { ... if (m_state == RaftState::Leader) { // broadcast broadcastHeartbeat(); } }); } ``` broadcastHeartbeat 里对每个节点开启一个协程调用 replicateOneRound 来日志复制或快照安装。 ```cpp void RaftNode::broadcastHeartbeat() { for (auto& peer: m_peers) { go [id = peer.first, this]{ replicateOneRound(id); }; } } ``` replicateOneRound 分成两部分来看。 首先获取该节点的下一条日志 index,如果小于自己快照的最后 index,说明该节点的进度远远落后,直接发送快照同步。 ```cpp void RaftNode::replicateOneRound(int64_t peer) { ... int64_t prev_index = m_nextIndex[peer] - 1; // 该节点的进度远远落后,直接发送快照同步 if (prev_index < m_logs.lastSnapshotIndex()) { InstallSnapshotArgs request{}; Snapshot::ptr snapshot = m_persister->loadSnapshot(); ... request.snapshot = std::move(*snapshot); ... auto reply = m_peers[peer]->installSnapshot(request); ... if (m_currentTerm != request.term || m_state != Leader) { return; } // 如果因为网络原因集群选举出新的leader则自己变成follower if (reply->term > m_currentTerm) { becomeFollower(reply->term, reply->leaderId); return; } if (request.snapshot.metadata.index > m_matchIndex[peer]) { m_matchIndex[peer] = request.snapshot.metadata.index; } if (request.snapshot.metadata.index > m_nextIndex[peer]) { m_nextIndex[peer] = request.snapshot.metadata.index + 1; } } else { ... } } ``` 节点收到安装快照的请求会调用 handleInstallSnapshot 来处理。 先进行快照的合法性检查,通过检查后压缩自己的日志,并持久化快照。同时会发起一个协程向 m_applyChan 发送快照消息,让上层应用来决定快照如何使用。 ```cpp InstallSnapshotReply RaftNode::handleInstallSnapshot(InstallSnapshotArgs request) { std::unique_lock lock(m_mutex); InstallSnapshotReply reply{}; ... if (request.term < m_currentTerm) { return reply; } if (request.term > m_currentTerm) { becomeFollower(request.term, request.leaderId); } rescheduleElection(); reply.leaderId = m_leaderId; const int64_t snap_index = request.snapshot.metadata.index; const int64_t snap_term = request.snapshot.metadata.term; // 过时的快照 if (snap_index <= m_logs.committed()) { ... return reply; } // 快速提交到快照的index if (m_logs.matchLog(snap_index, snap_term)) { ... m_logs.commitTo(snap_index); m_applyCond.notify_one(); return reply; } if (request.snapshot.metadata.index > m_logs.lastIndex()) { // 如果自己的日志太旧了就全部清除了 m_logs.clearEntries(request.snapshot.metadata.index, request.snapshot.metadata.term); } else { // 压缩一部分日志 m_logs.compact(request.snapshot.metadata.index); } go [snap = request.snapshot, this] { m_applyChan << ApplyMsg{snap}; }; persist(std::make_shared(std::move(request.snapshot))); return reply; } ``` replicateOneRound 的另一部分就是日志复制,对该节点发起日志追加请求后,会收到节点的 nextIndex,这是用来日志优化的,可以快速回退到该 index。 在最后会进行检查日志是否已经复制到大部分节点上,然后提交该 index。 ```cpp void RaftNode::replicateOneRound(int64_t peer) { ... int64_t prev_index = m_nextIndex[peer] - 1; // 该节点的进度远远落后,直接发送快照同步 if (prev_index < m_logs.lastSnapshotIndex()) { ... } else { auto ents = m_logs.entries(m_nextIndex[peer]); AppendEntriesArgs request{}; request.term = m_currentTerm; request.leaderId = m_id; request.leaderCommit = m_logs.committed(); request.prevLogIndex = prev_index; request.prevLogTerm = m_logs.term(prev_index); request.entries = ents; ... auto reply = m_peers[peer]->appendEntries(request); ... if (m_state != RaftState::Leader) { return; } // 如果因为网络原因集群选举出新的leader则自己变成follower if (reply->term > m_currentTerm) { becomeFollower(reply->term, reply->leaderId); return; } // 过期的响应 if (reply->term < m_currentTerm) { return; } // 日志追加失败,根据 nextIndex 更新 m_nextIndex 和 m_matchIndex if (!reply->success) { if (reply->nextIndex) { m_nextIndex[peer] = reply->nextIndex; m_matchIndex[peer] = std::min(m_matchIndex[peer], reply->nextIndex - 1); } return; } // 日志追加成功,则更新 m_nextIndex 和 m_matchIndex if (reply->nextIndex > m_nextIndex[peer]) { m_nextIndex[peer] = reply->nextIndex; m_matchIndex[peer] = m_nextIndex[peer] - 1; } int64_t last_index = m_matchIndex[peer]; // 计算副本数目大于节点数量的一半才提交一个当前任期内的日志 int64_t vote = 1; for (auto match: m_matchIndex) { if (match.second >= last_index) { ++vote; } // 假设存在 N 满足N > commitIndex,使得大多数的 matchIndex[i] ≥ N以及log[N].term == currentTerm 成立, // 则令 commitIndex = N(5.3 和 5.4 节) if (vote > ((int64_t)m_peers.size() + 1) / 2) { // 只有领导人当前任期里的日志条目可以被提交 if (m_logs.maybeCommit(last_index, m_currentTerm)) { m_applyCond.notify_one(); } } } } } ``` 节点收到日志追加的请求会调用 handleAppendEntries 来处理。 所有与日志有关的操作都封装到 RaftLog 里,这里会调用 maybeAppend 尝试追加日志, 追加失败会查找冲突日志来快速回退。最后会提交日志并唤醒 applier 协程。 ```cpp AppendEntriesReply RaftNode::handleAppendEntries(AppendEntriesArgs request) { std::unique_lock lock(m_mutex); AppendEntriesReply reply{}; co_defer_scope { persist(); if (reply.success && m_logs.committed() < request.leaderCommit) { // 更新提交索引,为什么取了个最小?committed是leader发来的,是全局状态,但是当前节点 // 可能落后于全局状态,所以取了最小值。last_index 是这个节点最新的索引, 不是最大的可靠索引, // 如果此时节点异常了,会不会出现commit index以前的日志已经被apply,但是有些日志还没有被持久化? // 这里需要解释一下,raft更新了commit index,raft会把commit index以前的日志交给使用者apply同时 // 会把不可靠日志也交给使用者持久化,所以这要求必须先持久化日志再apply日志,否则就会出现刚刚提到的问题。 m_logs.commitTo(std::min(request.leaderCommit, m_logs.lastIndex())); m_applyCond.notify_one(); } ... }; // 拒绝任期小于自己的 leader 的日志复制请求 if (request.term < m_currentTerm) { ... return reply; } // 对方任期大于自己或者自己为同一任期内败选的候选人则转变为 follower // 已经是该任期内的follower就不用变 if (request.term > m_currentTerm || (request.term == m_currentTerm && m_state == RaftState::Candidate)) { becomeFollower(request.term, request.leaderId); } if (m_leaderId < 0) { m_leaderId = request.leaderId; } // 自己为同一任期内的follower,更新选举定时器就行 rescheduleElection(); // 拒绝错误的日志追加请求 if (request.prevLogIndex < m_logs.lastSnapshotIndex()) { ... return reply; } int64_t last_index = m_logs.maybeAppend(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, request.entries); if (last_index < 0) { // 尝试查找冲突的日志 int64_t conflict = m_logs.findConflict(request.prevLogIndex, request.prevLogTerm); reply.term = m_currentTerm; reply.leaderId = m_leaderId; reply.success = false; reply.nextIndex = conflict; return reply; } reply.term = m_currentTerm; reply.leaderId = m_leaderId; reply.success = true; reply.nextIndex = last_index + 1; return reply; } ``` ## 最后 本文简单介绍了 kv 的整体结构,忽略了很多细节,旨在帮助读者理清思路更好地阅读源码。