分布式事务的21种武器 - 6

发布时间 2023-05-27 14:11:09作者: 俞凡

在分布式系统中,事务的处理分布在不同组件、服务中,因此分布式事务的ACID保障面临着一些特殊难点。本系列文章介绍了21种分布式事务设计模式,并分析其实现原理和优缺点,在面对具体分布式事务问题时,可以选择合适的模式进行处理。原文: Exploring Solutions for Distributed Transactions (6)

Duncan Meyer @Unsplash

在不同业务场景下,可以有不同的解决方案,常见方法有:

  1. 阻塞重试(Blocking Retry)
  2. 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
  3. 基于后台队列的异步处理(Using Queues to Process Asynchronously in the Background)
  4. TCC补偿(TCC Compensation Matters)
  5. 本地消息表(异步保证)/发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
  6. MQ事务(MQ Transaction)
  7. Saga模式(Saga Pattern)
  8. 事件驱动(Event Sourcing)
  9. 命令查询职责分离(Command Query Responsibility Segregation, CQRS)
  10. 原子提交(Atomic Commitment)
  11. 并行提交(Parallel Commits)
  12. 事务复制(Transactional Replication)
  13. 一致性算法(Consensus Algorithms)
  14. 时间戳排序(Timestamp Ordering)
  15. 乐观并发控制(Optimistic Concurrency Control)
  16. 拜占庭容错(Byzantine Fault Tolerance, BFT)
  17. 分布式锁(Distributed Locking)
  18. 分片(Sharding)
  19. 多版本并发控制(Multi-Version Concurrency Control, MVCC)
  20. 分布式快照(Distributed Snapshots)
  21. 主从复制(Leader-Follower Replication)

本文将介绍拜占庭容错、分布式锁以及分片三种模式。

16. 拜占庭容错(Byzantine Fault Tolerance, BFT)
  • 一种容错机制,允许分布式系统在存在故障节点的情况下正常运行。
  • 节点之间相互通信从而就决策达成共识,即使存在某些恶意或错误节点,也可以工作。
  • 涉及如下步骤:
    1. 系统中每个节点向所有其他节点发送消息,其中包含建议的决策或值的信息。
    2. 每个节点检查接收到的消息的有效性。如果接收到来自故障节点或攻击者的消息,则忽略该消息。
    3. 每个节点收集接收到的所有有效消息,并创建系统视图(view)。视图是一组特定节点的有效消息,每个节点与系统中的所有其他节点共享视图。
    4. 每个节点检查从其他节点接收到的视图的有效性。如果接收到无效视图,则忽略该视图。
    5. 每个节点创建包含其系统视图的证书(certificate),并与所有其他节点共享。证书是经过签名的声明,用于证明视图的有效性。
    6. 每个节点检查从其他节点收到的证书的有效性。如果收到无效证书,应该忽略该证书。
    7. 一旦节点验证了收到的所有证书,就可以对提议的决策或值达成共识。每个节点根据收到的证书对相同的值达成共识。
    8. 一旦达成共识,每个节点执行已达成一致的决策。
from typing import List, Dict, Tuple

class Message:
    def __init__(self, sender: int, content: str):
        self.sender = sender
        self.content = content

class ByzantineNode:
    def __init__(self, id: int, network: Dict[int, List[Message]], threshold: int):
        self.id = id
        self.network = network
        self.threshold = threshold
        self.decisions = {}

    def send_message(self, receiver: int, content: str):
        message = Message(self.id, content)
        self.network[receiver].append(message)

    def receive_messages(self) -> List[Message]:
        messages = self.network[self.id]
        self.network[self.id] = []
        return messages

    def generate_vote(self, messages: List[Message]) -> bool:
        count = 0
        for message in messages:
            if message.content == 'True':
                count += 1
            elif message.content == 'False':
                count -= 1
        return count >= self.threshold

    def run_bft(self, decision_content: str):
        # Phase 1: Broadcast proposal to all nodes
        proposal = Message(self.id, decision_content)
        for node_id in self.network:
            self.send_message(node_id, str(proposal))

        # Phase 2: Receive messages and generate votes
        messages = self.receive_messages()
        vote = self.generate_vote(messages)

        # Phase 3: Broadcast decision to all nodes
        decision = Message(self.id, str(vote))
        for node_id in self.network:
            self.send_message(node_id, str(decision))

        # Phase 4: Receive decisions and count votes
        decisions = [m.content for m in self.receive_messages()]
        count_true = decisions.count('True')
        count_false = decisions.count('False')

        # Record decision if it meets threshold, else record failure
        if count_true >= self.threshold:
            self.decisions[decision_content] = True
        elif count_false >= self.threshold:
            self.decisions[decision_content] = False
        else:
            self.decisions[decision_content] = None

示例代码

  • 只要故障节点数量小于阈值,该算法就可以容忍故障。
  • 由两个类组成:
    1. Message —— 表示网络中节点之间发送的消息,包含发送者ID和消息内容。
    2. ByzantineNode —— 表示网络节点,包含ID、网络拓扑、容忍的故障数量阈值,以及存储节点决策的字典。
    • ByzantineNode类提供了几个方法:
      • send_message()方法 —— 发送消息到网络中的另一个节点
      • receive_messages()方法 —— 检索自上次调用receive_messages()以来收到的所有消息
      • generate_vote()方法 —— 将消息列表作为输入,并根据消息内容生成投票。如果"True"消息的数量大于或等于阈值,则该方法返回True,否则返回False。
      • run_bft()方法 —— 实现BFT算法的4个阶段。
        • 阶段1 —— 用send_message()方法向网络中所有节点广播提案。提案是个Message对象,其内容是作为参数传递给run_bft()方法的decision_content。
        • 阶段2 —— 用receive_messages()方法接收来自网络中其他节点的消息。用generate_vote()方法根据收到的消息生成投票,根据收到的"True"和"False"数量,投票"True"或"False"。
        • 阶段3 —— 用send_message()方法将决策广播到网络中所有节点。决策是个Message对象,其内容为上一阶段生成的投票。
        • 阶段4 —— 计算收到的票数,如果票数达到阈值,则记录决策。使用receive_messages()方法检索自上次调用receive_messages()以来收到的所有消息。检查每条消息内容,并计算"True"和"False"消息的数量。如果"True"的数量大于或等于阈值,则将该决策记录为True。如果"False"的数量大于或等于阈值,则该决策被记录为False。如果两个条件都不满足,则该决策记录为None。

优点

  • 在分布式系统中容忍一定数量的错误或失败
  • 即使存在故障节点或恶意攻击,也能确保分布式系统中所有节点达成一致的决策
  • 为用于加密货币和其他应用的区块链网络提供高水平的安全性和弹性

缺点

  • 可能需要昂贵的计算,并且需要节点之间有高质量网络通信,否则可能会增加延迟并降低系统性能
  • 因为可能需要节点之间的高级协调和通信,因此可能不适合所有类型的分布式系统
  • 不能为分布式系统中所有类型的故障或攻击提供完整解决方案

适用场景

  • 金融系统 —— 股票交易
  • 基础设施系统 —— 电网或运输系统
  • 区块链网络 —— 加密货币和其他应用

挑战

  • 设计和实现BFT系统可能很复杂,并且需要在分布式系统、密码学和安全性方面具有高水平专业知识。
  • 确保所有节点都是可信、没有恶意的。
  • 在BFT系统中实现高性能和低延迟具有挑战性。

17. 分布式锁(Distributed Locking)
  • 管理分布式系统中共享资源的访问。
  • 保证系统中多个节点不能同时访问或修改相同的资源,避免可能的不一致和数据损坏。
  • 涉及以下步骤:
    1. 节点请求对共享资源加锁。请求包含资源的唯一标识符以及所请求的锁类型(例如,读或写)。
    2. 锁管理器管理锁,接收请求,并检查资源是否已经锁定。如果资源未被锁定,锁管理器将锁授予请求节点并发送确认。
    3. 如果资源已经被锁定,锁管理器检查请求节点是否被授权访问该资源。如果该节点已获得授权,锁管理器将该请求添加到资源的挂起请求队列中,并向请求节点发送确认信息。如果该节点未被授权,则锁管理器拒绝该请求并发送拒绝消息。
    4. 在等待授予锁时,请求节点定期轮询锁管理器以获取锁状态。
    5. 当节点访问完资源后,通过向锁管理器发送释放请求来释放锁。锁管理器从资源中删除锁,并将锁授予队列中的下一个节点(如果有的话)。
    6. 如果持有锁的节点发生故障或崩溃,锁管理器将检测到该故障,并代表发生故障的节点释放锁,然后将锁授予队列中的下一个节点(如果有的话)。
    7. 如果节点请求锁,但没有收到锁管理器的响应,那么假定锁管理器已经失败,并通过选举新的领导节点来接管锁管理器的角色。
from kazoo.client import KazooClient
from kazoo.exceptions import LockTimeout
import time

class DistributedLock:
    def __init__(self, zk_address, lock_path):
        self.zk = KazooClient(hosts=zk_address)
        self.lock_path = lock_path
        self.lock = None

    def __enter__(self):
        self.zk.start()
        self.lock = self.zk.Lock(self.lock_path)
        try:
            self.lock.acquire(timeout=10)
        except LockTimeout:
            self.zk.stop()
            raise Exception("Timeout while waiting for lock")

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.lock.release()
        self.zk.stop()

if __name__ == '__main__':
    zk_address = 'localhost:2181'
    lock_path = '/my_lock'
    with DistributedLock(zk_address, lock_path):
        print("Acquired lock!")
        time.sleep(10)
    print("Released lock!")

示例代码

  • 基于Apache ZooKeeper分布式协调服务
  • 导入所需的库 —— KazooClient和LockTimeout
    • KazooClient —— ZooKeeper的Python客户端
    • LockTimeout —— 当不能在指定超时内获得锁时引发异常。
  • 定义DistributedLock类 —— 接受2个参数: zk_addresslock_path
    • zk_address —— ZooKeeper服务器地址
    • lock_path —— 锁节点在ZooKeeper中的路径
    • init方法中初始化ZooKeeper客户端,并存储lock_path,并且将锁变量初始化为None。
    • enter方法中启动ZooKeeper客户端,创建锁对象,并尝试获取锁。如果锁不能在10秒内获得,则引发LockTimeout异常。
    • exit方法中释放锁并停止ZooKeeper客户端。
  • 在代码主体部分,用指定的ZooKeeper地址和锁路径创建DistributedLock类实例。
  • with语句获取锁。
  • 当获得锁时,打印一条消息,表示已获得锁,然后sleep 10秒,以模拟持有锁时正在完成的一些工作。
  • sleep后,锁会被with语句自动释放,打印一条消息,表明锁已被释放。

优点

  • 在分布式系统中,通过确保一次只有一个进程可以修改共享资源来维护数据一致性
  • 防止多个进程同时访问共享资源,以确保该资源在需要时始终可用
  • 允许多个进程跨多个节点访问共享资源

缺点

  • 需要在分布式系统中的多个节点之间进行协调
  • 在分布式系统中引入延迟并降低性能
  • 如果分布式锁定机制失败,可能会导致整个分布式系统失败

适用场景

  • 分布式数据库
  • 电子商务系统 —— 管理对购物车的访问或防止多个用户同时购买相同的商品

挑战

  • 需要公平分配资源,以确保所有进程都能平等访问共享资源
  • 不正确实现的分布式锁定可能导致死锁,多个进程等待彼此释放锁

18. 分片(Sharding)
  • 用于在多个服务器之间对数据进行水平分区,称为分片。
  • 每个分片包含数据的一个子集,所有分片组合起来就构成了完整的数据集。
  • 用于提高分布式数据库的可伸缩性、性能和可用性。
  • 涉及如下步骤:
    1. 根据分片键将数据划分为更小的子集。分片键的选择使得数据可以均匀分布在各个分片上,并且可以将查询路由到正确的分片。
    2. 数据分区后,分片分布在多个服务器上。每个分片一个特定服务器,多个分片可以分配给同一个服务器。
    3. 当客户端向数据库发送查询时,该查询首先路由到协调器节点。协调器节点负责确定哪个分片包含执行查询所需的数据。
    4. 一旦协调节点确定了正确的分片,查询将被发送到包含该分片的服务器。服务器执行查询并将结果返回给协调器节点。
    5. 如果需要来自多个分片的数据完成查询,协调节点将每个分片的结果聚合并将最终结果返回给客户端。
import mysql.connector

# Connect to MySQL database
mydb = mysql.connector.connect(
  host="localhost",
  user="yourusername",
  password="yourpassword",
  database="mydatabase"
)

# Define sharding rules
shard_key = "user_id"
num_shards = 4

# Create sharded tables
for i in range(num_shards):
    cursor = mydb.cursor()
    cursor.execute(f"CREATE TABLE users_{i} (id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255), email VARCHAR(255))")

# Insert data into sharded tables
users = [
    {"id": 1, "name": "John", "email": "john@example.com"},
    {"id": 2, "name": "Jane", "email": "jane@example.com"},
    {"id": 3, "name": "Bob", "email": "bob@example.com"},
    # ...
]

for user in users:
    shard_id = user[shard_key] % num_shards
    cursor = mydb.cursor()
    cursor.execute(f"INSERT INTO users_{shard_id} (id, name, email) VALUES (%s, %s, %s)", (user["id"], user["name"], user["email"]))

# Query data from sharded tables
cursor = mydb.cursor()
cursor.execute("SELECT * FROM users_0 UNION SELECT * FROM users_1 UNION SELECT * FROM users_2 UNION SELECT * FROM users_3")
users = cursor.fetchall()

print(users)

示例代码

  • 使用MySQL
  • 可能因用例和所使用的数据库系统而异。
  • 连接MySQL数据库,定义分片规则。在本例中,我们用user_id作为分片键,并创建4个分片表来存储数据。
  • 通过计算基于用户ID的分片ID,并用该ID将数据插入到相应的分片表中,从而将数据插入到分片表。
  • 使用UNION语句查询所有分片表中的数据并打印结果。

优点

  • 允许数据库水平扩容,因此随着数据增长,可以向系统添加额外的服务器来处理增加的负载。
  • 因为每个服务器只需要搜索较小的数据集,因此可以更快执行查询。
  • 如果一台服务器出现故障,只会影响部分数据,系统的其余部分可以继续正常运行。

缺点

  • 需要确保对数据进行了正确的分区,并且分片均匀分布在各个服务器上。
  • 维护所有分片之间的数据一致性可能具有挑战
  • 实现分片需要额外的硬件、软件和维护成本

适用场景

  • 适用于高读写负载的大型数据库,可以横向扩展以处理增加的流量。当存在地理或法规限制,需要将数据存储在不同位置时,会非常有用。
  • 生成大量数据并需要快速存储和检索的社交媒体平台
  • 处理大量交易,需要快速存储和检索数据的电商平台
  • 需要快速安全的存储和检索大量患者数据的医疗保健应用

参考文献

Byzantine Fault Tolerance (BFT) | River Glossary

What Is Byzantine Fault Tolerance?

Byzantine Fault Tolerance (BFT) Explained

Distributed Locks with Redis

How to do distributed locking

Distributed Locking

Sharding

What is Database Sharding?

What is Sharding?

Understanding Database Sharding


你好,我是俞凡,在Motorola做过研发,现在在Mavenir做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。微信公众号:DeepNoMind

本文由mdnice多平台发布