mongo-listener: 监听 mongo 数据变化的一个应用程序相关设计

发布时间 2023-03-23 16:38:30作者: 述不作

mongo-listener

为了开放一些数据库变更信息, 需要监听数据库的数据变更, 之后将信息投递到信息队列中, 开放对应的端口, 供其他系统订阅.

mongo 数据变更监听

对于数据库信息的变更, 或者说是数据异构, 也就是当数据源数据变更时, 另一个数据源可以接收到相关的变更信息进行处理, 从而达到两个可能数据存储并不一致的数据在逻辑上是相同的, 虽然这种相同的在时间上是存在延迟的, 但大多数情况下是可以接受的. 而对于常见数据库的数据变更信息的订阅, 一般都是通过数据库的 binlog 来实现的, 比如 mysql, pgsql 是提供 listen 方法, 用户基于连接通过配置相关的监听信息, 调用 listen 方法来获取到预期的数据变更, 但是对于 mongo 来说, 由于其没有 binlog, 所以需要通过监听数据库的 oplog 来实现. 而 mongo 本身也开放了对应的功能, 即 watch 方法, 通过 watch 方法可以监听数据库的数据变更. mysql 本身并不提供这样的 api 供用户调用, 而要实现相关的功能, 需要伪装成一个从节点, 接收到主节点的 binlog 信息同步, 通过解析 binlog 来获取变更的信息, 实现通常需要使用第三方的工具, 比如 canal, maxwell 等.

对比 mysql, pgsql, mongo 的数据异构以及获取数据变更信息的方式可以发现, pgsql 和 mongo 主要将变更信息的相关记录抽象出了一个 api, 而这个 api 实质上是对类似 binlog 功能的 oplog 的封装, 再加上相关操作上的 api, 这样就将订阅数据变化融入到了数据库基本的 api 中. 并不需要用户去引入更多的组件, 通过一种非规范化的组件来实现最终的功能. 当然, 这种方式可能对数据库本身来说也有一定的压力, 毕竟对于操作日志形入 oplog 和 binlog 的记录, 在主从同步的过程中, 只需要复制执行, 开放了这个功能意味着解析的工作下放到了数据库本身, 而原来可能是由第三方的组件来完成的, 这样对于数据库来说, 可能会有一定的压力. 这些都是需要考虑的. 在程序设计中, 对于功能的转移来说, 是值得权衡的, 因为功能不仅是实现的问题, 还可能因为实现的归属造成相关的利弊, 比如在数据库本身实现, 可能会对数据库本身造成一定的压力, 但是对于用户来说, 可能会更加方便, 如果不再数据库本身实现, 对于数据库本身相对压力比较小, 同样的对于用户来说需要做更多的工作. 也就是说, 对于功能的转移, 一定要考虑到相关的利弊, 以及是否值得转移, 以及转移后的功能是否能够满足需求.

单个服务好吗

下面我们来探讨关于 mongo-listener 的相关问题

首先这个功能是需要单独写成一个服务, 还是分散到各个服务中去. 如果是单独写成一个服务, 只需要保证这个服务高可用的, 整个 mongo-listener 的服务就是高可用的, 因为单独服务表达的就是整体 mongo-listener 功能, 此时的关注点完全在这个单独的服务本身. 如果是分散到各个服务中去, 就意味着如果想保证 mongo-listener 的高可用就需要保证其分散的各个宿主的高可用, 这是比较复杂的, 意味着整个 mongo-listener 的高可用需要多个宿主服务的高可用来保证. 为了只关注 mongo-listener 单个服务的可用性, 以及为了不在实现本身引入过多的复杂性, 这里选择了单独写成一个服务的方式.

部分和全部

接下来讨论 mongo-listener 的功能内容, 以及如何实现, 以及如何保证高可用. 我们使用一个简单的例子来说明, 对于一个集合中的文档的监控, 只监控数据增加. 相关的操作就是连接数据库, 拿到对应实体的操作实例, 进行 watch 监控. watch 会返回一个对应的流, 可以理解成一个网络连接, 流是对网络数据的一种抽象. 之后对流进行相关事件的绑定, 这里主要关注的是 change 事件, 该事件会返回对应的信息变更, 信息变更不仅有变更之后的内容, 还有一个比较重要的标记, 这个标记是一个唯一的 token, 而这个 token 在整个数据变更中是唯一的, 且有序的. 这意味着拿到任何一个 token 就知道这个 token 所在的时间点, 拿着这个 token 可以继续该时间点之后的消息订阅. 可以理解成这个 token 是对数据变更以时间变化的序列的标志. 这样不仅获取到了数据变更的实体信息, 还获取到了一个 token, 这个 token 官方文档叫做 resumeToken, 考虑到 resumeToken 可以拿到某个时刻的值在一定时间之后继续按照 resumeToken 的时间点消费堆积消息, 理想情况下当我们程序挂掉的时候把最新的 resumeToken 保存下来, 下次启动的时候检查这个值, 根据这个 resumeToken 来初始化 watch, 所以说这个值对于程序的高可用很重要. 获取到变更信息如何投递呢?

确定要引入 buffer

获取到变更信息投递到队列的过程中, 需要注意一些问题, 关注数据输入, 因为输入有不确定性, 对系统的稳定性带来不可控的因素, 外部的请求也算是一种出现的输入, 这是一个经典的生产消费模型, 比如接收消息和消费消息之间速率是否差距很大, 差距很大意味着消息堆积, 消息是有大小, 堆积的大小一定是在程序中去的, 表现的行为就是内存占用过大, 甚至是 OOM. 从消息生产和消费的速率来看, 消息生产的最终来源是 mongo, 虽然 watch 可以限制消息频率, 即 mongo 给订阅者发相关消息时速率下降, 但是此项的开启意味者可能程序处理消息的性能达不到最大化. 解决该问题最好的方式就是找到一个缓冲区来缓存这些消息, 缓冲区不能是程序本身, 因为消息的大量堆积会出现内存问题. 而且消息进入缓存的速度最好能持平生产速度, 大致要点如此, 这里不应该直接使用消息队列直接消费消息, 消息队列投递消息的速度很难与生产速度持平, 或者差距不大. 这里选择 redis 作为 buffer 来进行缓存, 之后再订阅 redis 的队列, 一点点投递到消息队列中去. 选择 redis 主要有两种考虑, 首先投递速度绝对比 kafka 快, kafka 的网络环境可能会影响投递速度, 而这里的 redis 完全在本地的, 不需要对外暴露任何端口, 作为一个内部的依赖性来参与其中. 其次就是 redis 的持久化, 本质上来说选择 redis 和程序内部的 buffer 都会成内存的升高, 但是 redis 可以通过系统隔离来限制资源, 大量的消息堆积面对程序突然挂掉是完全无能为力的, 大量的 buffer 会直接丢失. 而 redis 可以通过持久化来保证消息的堆积, 但是这里需要注意的是 redis 的持久化是异步的, 也就是说 redis 的持久化是在后台进. 借助 redis 作为缓冲的一种中间件, 对于现在的程序来说只剩下最后的消息投递到消息队列或者分布式队列中, 对于实现来说也是比较简单的, 但投递的方式和操作因为对系统可用性的要求, 业务正确性的要求而呈现不同, 可用性的核心围绕着持久化, 业务正确性的核心围绕着顺序性.

保存?

整体来说, mongo-listener 需要保证持久化 resumeToken 为了在下次启动时可以继续关闭之前时间点的消息订阅, 如何确定这个时间点究竟是什么, 这要求需要拿到没有持久化的第一条消息的 resumeToken 或者已经持久化的 resumeToken, 之后就可以完美的继续关闭之前时间点的消息订阅, 当然需要剔除最后一条持久化消息再次接收. 关于持久化的消息, 首先 redis 和 消息队列都需要作持久化, 那么服务挂掉最后持久化到 redis 的消息必然是最后一个投递成功到 redis 的消息. 当然这里涉及到投递顺序性的问题, 暂时悬置. 所以在每次投递到 redis 消息成功之后, 更新并持久化 resumeToken 中去, 这里的持久化成本是比较高的, 其实最终执行的是更新操作, 持久化的工作会有定时器设置时间执行, 当然这个时间必然会导致持久化的 resumeToken 在时间线上等于或者前于最后一条持久化消息的 resumeToken, 针对这个问题所带来的结果就是挂起和启动时, 末尾的几条消息容易重复, 所以需要考虑剔除, 剔除的方式就是维护一个列表保存上次持久化到下次持久化所有信息的 resumeToken. 这里不对已经投递到消息队列中的消息作任何讨论, 因为得益于 redis 作为消息堆积的处理, 只需要处理好消息到 redis, redis 到消息队列, 整体就没有消息丢失的问题. 而程序所面对的中间件最直接的就是 redis. 对消息队列的投递完全是由 redis 的消息通知驱动的. 程序的持久化和高可用主要是围绕 resumeToken 来展开的.

redis 的持久化需要开启相关的配置以及对应的卷挂载, 不然当容器被删除再次被创建时因为没有相关存储数据的卷进行挂载依旧会出现所有的 key 消失. 这里不再多说相关配置以及卷的操作.

消息队列使用的是 kafka, 持久化的主要操作就是数据挂载到对应卷, 设置相关的持久化策略, 以及设置相关的配置. 这里不再多说相关配置以及卷的操作. 对于 kafka 的持久化生产者也需要做一些处理, 处理要求进行相关的事务消息投递, 保证消息一定进入到了 kafka 的持久化过程中并成功持久化, 而不是进入到了 kafka 中未进入持久化, 这时 kafka 挂掉时, 意味着消息的丢失. 不再多述, 后再分享 kafka 的消息完整性.

通过持久化可以保证整体系统的高可用, 首先是程序的高可用, 随便挂掉, 主要能启动, 就能接着之前的状态进行订阅. 在进行订阅的过程中因为保存的 resumeToken 可能并不是最新的, 这个时候从 redis 的维护的列表中进行判断, 存在则不继续投递 redis, 不存在就进行投递. 消息通过 redis 持久化之后可以进行缓慢投递, 并保证投递顺序性, 使用原子消息投递, 保证消息不会丢失.

有序? 无序?

关于序列需要注意一些问题, 程序最初接收到的信息必然是有序的, 是否需要之后都保证有序, 如果需要如何保证. 对于是否需要保证之后都是有序的答案是否定的, 对于一些单一事件可能不需要保证有序的, 比如增加, 删除. 但是更改就不一定, 取决于更改内容本身. 这意味着程序可以从有序和无序两种状态切换, 且能进行相关配置. 下面来分析有序的相关设计, 首先消息投递到 redis 的过程中由于涉及到本地网络通信, 虽然可能并没有网络阻塞一些常见的问题, 但仍不能保证是顺序的, 这个问题悬置, 后续再进行思考. 如果将每个消息的投递维护到一个队列中去, 从队列第一个启动投递, 投递成功之后继续下一个, 这样确实能保证有序且投递成功. 但可能性能并不是很好, 有没有可能在队列列表存量很多的时候合并处理, 将一些序列中可以合并或者没有硬性序列要求的进行并发发送, 这是其中一个改善思路, 可能还有其他更改的方式. 该问题先悬置. redis 到 kafka 的投递如果保证有序, 只需要在接收 redis 的消息通知之后一条一条进行处理就好了. 此时涉及的问题还存在合并问题, 基本可以沿用 redis 处理消息的策略, 同时消息合并可以有效和事务结合起来使用, 因为事务在保证持久化的同时待见是比较大的, 不仅是网络连接方面.

配置

以上是关于一个简化模型的处理, 实际上可能有很多不同的业务都需要订阅, 这就要求程序能够支持多个订阅, 且能够进行相关的配置, 还有程序日志, 监控和警告的问题. 监控由于少基础设施暂时不考虑, 简单进行相关资源限制基本能兜底. 程序日志则需要在比较关键的程序运行中记录相关状态, 警告暂时考虑接入 webhook 进行相关的通知, 毕竟通知方式比较简单快捷. 配置则考虑相关的 pipeline 信息, resumeToken 信息以及投递的 topic 信息, 从单一的对象扩展为对象数组, 程序则扩展为面向数组的相同处理, 这理相同的处理就是单一的逻辑, 其实如果根据不同的配置以不同的程序示例运行效果可能更好, 毕竟不同配置之间并没有关系, 使用的 redis 和 kafka 队列都不同, 全部放到一个程序示例中确实牵强很多, 对这个示例压力也比较大.

部署相关

完成相关程序实现之后, 部署编排程序镜像, 相关服务的 yml 文件, 其中需要额外考虑的是对外开放的消息队列端口需要配置 ssl, 可能借助 nginx 配置完成, 还需要注意安全问题, kafka 最好配上 scram 设置比较复杂的账号和密码, 以及相关队列的权限. 保证对外开放的安全性. redis 由于不需要对宿主以及外任何端口, 单纯作为一个程序的附属来运行, 不需额外的安全配置.

总结

其中悬置的问题需要再加考虑, 对于整体的取舍和设计可能还有很多不足, 也可能已经有现成的解决方案, 这里只是简单的进行了一些思考.

上述涉及的高可用其实做的并不够彻底, 整体思路还是以单机模式进行的设计, 对于单机宕机的情况, 仍然需要进行相关的处理, 但是这个问题可以通过多机部署来解决, 但是多机部署的问题就是如何保证多机之间的数据一致性, 这个问题需要进一步思考.