分布式事务的21种武器 - 2

发布时间 2023-05-23 16:25:06作者: 俞凡

在分布式系统中,事务的处理分布在不同组件、服务中,因此分布式事务的ACID保障面临着一些特殊难点。本系列文章介绍了21种分布式事务设计模式,并分析其实现原理和优缺点,在面对具体分布式事务问题时,可以选择合适的模式进行处理。原文: Exploring Solutions for Distributed Transactions (2)

Nathan Dumlao @Unsplash

在不同业务场景下,可以有不同的解决方案,常见方法有:

  1. 阻塞重试(Blocking Retry)
  2. 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
  3. 基于后台队列的异步处理(Using Queues to Process Asynchronously in the Background)
  4. TCC补偿(TCC Compensation Matters)
  5. 本地消息表(异步保证)/发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
  6. MQ事务(MQ Transaction)
  7. Saga模式(Saga Pattern)
  8. 事件驱动(Event Sourcing)
  9. 命令查询职责分离(Command Query Responsibility Segregation, CQRS)
  10. 原子提交(Atomic Commitment)
  11. 并行提交(Parallel Commits)
  12. 事务复制(Transactional Replication)
  13. 一致性算法(Consensus Algorithms)
  14. 时间戳排序(Timestamp Ordering)
  15. 乐观并发控制(Optimistic Concurrency Control)
  16. 拜占庭容错(Byzantine Fault Tolerance, BFT)
  17. 分布式锁(Distributed Locking)
  18. 分片(Sharding)
  19. 多版本并发控制(Multi-Version Concurrency Control, MVCC)
  20. 分布式快照(Distributed Snapshots)
  21. 主从复制(Leader-Follower Replication)

本文将介绍TCC补偿、本地消息表/发件箱以及MQ事务三种模式。

4. TCC补偿(Try-Confirm-Cancel Compensation Matters)

图片来源: https://blogs.oracle.com/database/post/making-try-confirmcancel-easy-with-microtx

  • 用于处理跨多个服务的长时间事务。
  • 将一个复杂事务分解为多个步骤,每个步骤都是一个单独的服务调用或数据库操作。事务中的每个步骤都有3个阶段: 尝试、确认和取消。
  • 尝试(Try)阶段 —— 服务尝试执行操作,执行一系列检查,以确保执行操作是安全的。如果所有检查都通过,则执行该操作,并将状态保存在临时存储中。
  • 确认(Confirm)阶段 —— 服务确认操作成功,验证事务状态,并确保满足所有依赖关系。如果所有内容都有效,则将临时状态提交给主数据库。
  • 取消(Cancel)阶段 —— 如果在尝试阶段或确认阶段出现错误,将进入取消阶段。服务将撤销在尝试阶段所做的更改,将状态恢复到事务开始之前。
import requests

class OrderService:
    def __init__(self):
        self.session = requests.Session()

    def create_order(self, order_data):
        try:
            # Step 1: Reserve stock
            res = self.session.post('http://inventory-service/reserve-stock', json=order_data)
            res.raise_for_status()

            # Step 2: Charge payment
            res = self.session.post('http://payment-service/charge-payment', json=order_data)
            res.raise_for_status()

            # Step 3: Confirm order
            res = self.session.post('http://order-service/confirm-order', json=order_data)
            res.raise_for_status()
        except requests.exceptions.RequestException as e:
            # Step 4: Cancel order
            self.session.post('http://order-service/cancel-order', json=order_data)
            self.session.post('http://payment-service/refund-payment', json=order_data)
            self.session.post('http://inventory-service/release-stock', json=order_data)
            raise e

示例代码

OrderService类遵循尝试-确认-取消(TCC)模式创建订单。

订单创建过程包括3个步骤:

  1. 保留库存
  2. 付款
  3. 确认订单

任何一个步骤失败,都需要取消订单并撤消所做的更改。这些都在异常处理程序中完成,该处理程序回滚try代码块执行期间所做的任何数据库更改。

reserve_stock()方法中,检查是否有足够库存,如果有,则通过减少数据库中的可用库存计数来保留库存,如果没有,则引发异常并回滚到目前为止所做的任何更改。

charge_payment()方法中,检查客户是否有足够资金支付订单。如果有,将从余额中扣除订单金额,如果没有,则引发异常并回滚到目前为止所做的任何更改。

confirm_order()方法中,将数据库中的订单状态更新为"confirmed"。如果在此步骤中出现异常,则取消订单并回滚到目前为止所做的任何更改。

优点

  • 避免资源被长时间锁定
  • 处理失败或不完整的交易时提供补偿机制
  • 在2PC不适合的系统中处理分布式事务

缺点

  • 需要额外工作为每个事务实现补偿机制
  • 没有原子性保证

适用场景

  • 处理系统中的长时间事务

挑战

  • 实现补偿事务可能很复杂
  • 在分布式环境中协调多个服务可能具有挑战性

5. 本地消息表(异步保证)/发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
  • 微服务架构中的消息传递模式。
  • 允许微服务使用本地消息表作为中间缓冲区异步交换消息。
  • 每个需要与其他微服务交换消息的微服务都有自己的消息表。
  • 当一个微服务需要向另一个微服务发送消息时,会将消息添加到自己的消息表中,消息中包含接收微服务处理请求所需的所有必要信息。
  • 一旦消息被添加到表中,发送微服务就可以继续处理其他任务。同时,一个后台工作线程或一个单独的进程持续监控消息表中的新消息。
  • 当检测到新消息时,工作线程/进程检索该消息并将其发送到接收端微服务。
  • 一旦接收端微服务处理了消息,就会向发送端微服务发送确认,表明消息已成功处理。确认还可以包含发送端微服务需要的任何结果或数据。
  • 如果在一定时间内没有收到确认,发送端微服务可以假设消息没有被成功处理,并可以采取适当措施,例如重新发送消息或取消请求。
import psycopg2
from psycopg2 import sql
from psycopg2.extras import DictCursor
import json
import threading
import time

# Define the message schema
message_schema = {
    "id": "",
    "data": {},
    "status": ""
}

# Connect to the PostgreSQL database
conn = psycopg2.connect(
    dbname="my_database",
    user="my_username",
    password="my_password",
    host="localhost",
    port="5432"
)

# Create the messages table if it doesn't exist
with conn.cursor() as cur:
    cur.execute("""
        CREATE TABLE IF NOT EXISTS messages (
            id UUID PRIMARY KEY,
            data JSONB NOT NULL,
            status TEXT NOT NULL
        )
    """)
    conn.commit()

# Define the message producer
def send_message(data):
    with conn.cursor() as cur:
        # Create the message using the schema
        message = message_schema.copy()
        message["id"] = str(uuid.uuid4())
        message["data"] = data
        message["status"] = "new"
        
        # Insert the message into the messages table
        cur.execute("""
            INSERT INTO messages (id, data, status)
            VALUES (%s, %s, %s)
        """, (message["id"], json.dumps(message["data"]), message["status"]))
        conn.commit()

# Define the message consumer
def message_consumer():
    while True:
        with conn.cursor(cursor_factory=DictCursor) as cur:
            # Select the oldest message from the messages table that has a status of "new"
            cur.execute("""
                SELECT *
                FROM messages
                WHERE status = 'new'
                ORDER BY id
                LIMIT 1
                FOR UPDATE SKIP LOCKED
            """)
            row = cur.fetchone()
            if row:
                message = dict(row)
                # Update the message status to "processing"
                cur.execute("""
                    UPDATE messages
                    SET status = 'processing'
                    WHERE id = %s
                """, (message["id"],))
                conn.commit()
                # Process the message
                process_message(message)
                # Update the message status to "processed"
                cur.execute("""
                    UPDATE messages
                    SET status = 'processed'
                    WHERE id = %s
                """, (message["id"],))
                conn.commit()
        time.sleep(1)

# Start the message consumer in a separate thread
consumer_thread = threading.Thread(target=message_consumer)
consumer_thread.start()

示例代码

  • 使用PostgreSQL作为数据库,使用psycopg2库与数据库交互。
  • send_message函数创建一条新消息并将其插入PostgreSQL数据库。
  • message_consumer函数轮询数据库,查找状态为"new"的新消息,处理后将状态更新为"已处理(processed)"。
  • 本地消息表(Local Message Table)模式的实际实现可能会根据特定需求和所选择的数据库解决方案而有所不同。

优点

  • 消息不会丢失,也不会被多次处理。
  • 当在后台交换消息时,发送端微服务继续处理其他任务。
  • 微服务彼此解耦,可以独立开发和部署。
  • 允许在不影响现有系统的情况下添加新的微服务。

缺点

  • 考虑到消息格式、消息大小和错误处理等因素,可能会比较复杂。
  • 可能不适合需要实时处理的场景,因为发送端和接收端微服务之间可能会有延迟。
  • 需要额外的基础设施,如消息队列或数据库来存储和管理消息。

适用场景

  • 需要处理不同领域业务的拥有多个微服务的电子商务网站,如库存管理、订单处理和支付处理。
  • 可用于在不同服务之间共享电子医疗记录和医疗账单的医疗保健系统。
  • 需要处理不同领域业务的拥有多个微服务的银行系统,如账户管理、交易处理和欺诈检测。

挑战

  • 消息格式 —— 消息格式应该与将要交换消息的所有服务兼容,可能需要跨服务的标准化。
  • 消息大小 —— 消息应该尽量小,以减少处理时间和在传输过程中发生错误的可能性。
  • 错误处理 —— 处理消息传输或业务过程中发生的错误。例如,重新尝试失败的消息或采取其他纠正措施。
  • 实时处理 —— 这种模式可能不适合需要实时处理的场景,因为发送端和接收端消息之间可能会有一些延迟。

发件箱模式

图片来源: https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern

图片来源: https://microservices.io/patterns/data/transactional-outbox.html

  • 一种面向消息的模式,有助于实现服务之间的事务一致性
  • 确保消息只交付一次,并且即使发生失败或错误,也保持一致的状态
  • 以下是一些步骤:
    1. 服务接收来自客户端或其他服务的请求
    2. 当收到请求时,服务将需要发送给其他服务的数据写入自己数据库中的发件箱表,发件箱表包含接收服务处理请求所需的所有必要信息
    3. 写到发件箱表后,服务提交数据库事务,以确保数据安全存储在数据库中
    4. 后台运行的发件箱处理进程定期运行,读取数据库中的发件箱表,以检查需要发送的新消息
    5. 如果发件箱处理进程在发件箱表中检测到新消息,将从数据库中检索该消息并将其发送给适当的消息代理
    6. 一旦消息被消息代理接收,就被发送给接收端,接收端可以是其他服务或下游系统
    7. 消息成功交付给接收端后,消息代理向发件箱处理进程发送确认,以标记消息已成功发送
    8. 一旦发件箱处理进程接收到来自消息代理的确认,将从发件箱表中删除该消息以避免重复发送
import json
import pika
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, DateTime

# Create SQLAlchemy engine and metadata
engine = create_engine('postgresql://user:password@localhost:5432/db')
metadata = MetaData()

# Define the Outbox table schema
outbox_table = Table('outbox', metadata,
                     Column('id', Integer, primary_key=True),
                     Column('event_type', String),
                     Column('payload', String),
                     Column('created_at', DateTime)
                    )

# Define RabbitMQ connection parameters
rabbitmq_params = pika.ConnectionParameters(host='localhost', port=5672)

# Define RabbitMQ queue name
queue_name = 'outbox'

# Connect to RabbitMQ
connection = pika.BlockingConnection(rabbitmq_params)
channel = connection.channel()

# Declare the queue
channel.queue_declare(queue=queue_name, durable=True)

# Define function to publish messages to RabbitMQ
def publish_message(message):
    channel.basic_publish(exchange='',
                          routing_key=queue_name,
                          body=message,
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # Make messages persistent
                          ))

# Define function to read messages from the Outbox table and publish them to RabbitMQ
def process_outbox():
    with engine.connect() as conn:
        result = conn.execute(outbox_table.select().order_by(outbox_table.c.id))
        for row in result:
            # Create message payload as a dictionary
            payload = {
                'event_type': row['event_type'],
                'data': json.loads(row['payload'])
            }
            # Convert payload to JSON string
            message = json.dumps(payload)
            # Publish message to RabbitMQ
            publish_message(message)
            # Delete message from Outbox table
            conn.execute(outbox_table.delete().where(outbox_table.c.id == row['id']))

# Define function to write data to the Outbox table
def write_to_outbox(event_type, data):
    with engine.connect() as conn:
        # Convert data to JSON string
        payload = json.dumps(data)
        # Insert data into Outbox table
        conn.execute(outbox_table.insert().values(
            event_type=event_type,
            payload=payload,
            created_at=datetime.now()
        ))

# Example usage:
write_to_outbox('user_created', {'user_id': 123, 'name': 'John Doe'})
process_outbox()

示例代码

  • 通过SQLAlchemy和RabbitMQ实现
  • 通过SQLAlchemy连接到PostgreSQL数据库,并定义发件箱表结构
  • 通过Pika连接到RabbitMQ消息代理并定义队列名
  • write_to_outbox函数将数据写入发件箱表
  • process_outbox函数从发件箱表中读取消息并发布到RabbitMQ
  • publish_message函数向RabbitMQ发布消息

优点

  • 即使数据库事务失败,所有事件也会发布到消息代理
  • 将事件发布与主应用程序代码解耦
  • 允许高效的批处理事件

缺点

  • 需要额外的开发工作来实现发件箱表并将事件发布到消息代理的后台工作进程/线程
  • 为系统架构引入了额外的复杂性
  • 需要仔细管理数据库事务,以确保只有在事务成功时才发布消息

适用场景

  • 当微服务需要向多个订阅者发布事件时,例如在发布/订阅消息传递场景中,并确保在消息流中捕获所有数据库更新。
  • 需要向多个订阅者发布关于新订单、更新订单状态和客户反馈事件(包括订单履行系统、客户服务和营销)的电子商务系统
  • 需要捕获消息流中的所有金融交易,以用于审计和遵从性目的的金融系统

挑战

  • 确保负责发布事件的后台工作进程/线程具有高可用性和容错能力
  • 管理数据库事务,确保仅在关联的数据库事务成功时才发布消息
  • 在向消息代理发布事件时处理错误和重试

注意事项

  • 消息格式 —— 使用JSON或Protobuf消息格式来定义要发布的事件。格式将取决于特定应用程序的需求,但必须与正在使用的消息代理兼容。
  • 消息大小 —— 取决于所发布事件的大小和复杂性。确保消息大小不超过所使用的消息代理的限制。大型消息可能会导致性能问题,为了有效处理,可能需要将其拆分为较小的消息。
  • 错误处理 —— 如果在事件发布过程中发生错误,需要处理错误并重试。应使用后退策略执行重试,以避免消息代理过载或引起拒绝服务攻击。如果在将事件插入发件箱表期间发生错误,则不应将事件发布到消息代理,以避免数据不一致。

相似点

  • 两种模式都使用消息表来存储需要在服务之间交换的消息
  • 两种模式都允许服务在后台交换消息时继续处理其他任务
  • 两种模式都提供了在微服务之间交换消息的机制

不同点

  • 数据库 —— 发件箱模式使用数据库来存储所有需要交换的消息,而本地消息表模式为每个需要交换消息的服务使用一个消息表。
  • 实现 —— 发件箱模式通过后台工作线程/进程从发件箱表中读取消息,并发送到消息代理(如RabbitMQ或Kafka)。消息表模式通过后台工作线程/进程监视消息表中的新消息,并使用任何消息传递机制(如直接HTTP请求或消息代理)发送到接收端服务。
  • 用例 —— 发件箱模式在微服务需要向多个订阅者发布事件的场景中特别有用。当多个服务需要彼此交换消息,但并非所有服务都需要接收每条消息时,本地消息表模式非常有用。
  • 挑战 —— 发件箱模式需要仔细处理数据库事务,以确保只有在关联的数据库事务成功时才发布消息。本地消息表模式可能需要更复杂的错误处理,因为如果消息未能到达目的地,可能需要重新尝试发送。

6. MQ事务(MQ Transaction)

图片来源: https://learn.microsoft.com/en-us/previous-versions/windows/desktop/msmq/ms699870%28v=vs.85%29

  • 一系列消息操作被分组在一起作为单个工作单元。
  • 事务确保所有操作都成功完成,或者所有操作都失败,从而确保即使在发生故障或错误时,系统也保持一致的状态。
  • 以确保原子性、一致性、隔离性和持久性(ACID)的方式处理消息。
  • 步骤:
    1. 通过指定连接设置(如代理URL、用户名和密码)来建立到消息代理的连接。
    2. 一旦建立连接,通过调用begin方法启动事务。此方法通知消息代理事务已经启动,并且所有后续消息操作应该作为事务组合在一起。
    3. 通过将消息发送到队列或主题,执行需要作为事务组合在一起的消息操作。
    4. 一旦成功执行了所有消息操作,通过调用commit方法提交事务。此方法通知消息代理,作为事务执行的所有消息操作都已成功完成。
    5. 如果事务期间发生任何错误,则通过调用rollback方法回滚事务。此方法通知消息代理事务已经失败,作为事务执行的所有消息操作都应该撤消。
import stomp

class MyListener(stomp.ConnectionListener):

    def __init__(self, conn):
        self.conn = conn

    def on_message(self, headers, message):
        # Process the message
        print("Received message:", message)
        # Acknowledge the message
        self.conn.ack(id=headers['message-id'], subscription=headers['subscription'])

    def on_error(self, headers, message):
        print('Received an error "{}": {}'.format(headers, message))
    
    def on_disconnected(self):
        print('Disconnected from the message broker.')

# Define the message broker connection settings
broker_url = "tcp://localhost:61613"
username = "myusername"
password = "mypassword"
queue_name = "/queue/myqueue"

# Establish a connection to the message broker
conn = stomp.Connection([(broker_url, 61613)])
conn.set_listener('', MyListener(conn))
conn.start()
conn.connect(username, password)

try:
    # Begin the transaction
    conn.begin()
    # Send the first message
    conn.send(body="Message 1", destination=queue_name)
    # Send the second message
    conn.send(body="Message 2", destination=queue_name)
    # Commit the transaction
    conn.commit()
except Exception as ex:
    # Rollback the transaction in case of errors
    conn.rollback()
    print("Error sending messages:", str(ex))

# Disconnect from the message broker
conn.disconnect()

示例代码

  • 基于Apache ActiveMQ库实现
  • beginsendcommit方法用于将两个消息的发送组合为单个事务。
  • 如果在事务期间发生错误,则调用rollback方法以确保事务中的消息都没有被处理。
  • 并不是所有消息代理都支持事务,实际实现可能因特定消息代理和库而有所不同。

优点

  • 即使在网络故障或发生其他中断的情况下,也要确保将消息传递到预期的收件人
  • 一组消息操作被视为单独的事务,要么处理事务中的所有消息,要么不处理任何消息
  • 可以处理大量消息,并且可以根据需要横向扩展
  • 可以跨不同平台和编程语言使用,使其成为消息交换的通用解决方案

缺点

  • 实现MQ事务可能很复杂
  • 增加了消息处理的开销,会影响系统性能
  • 需要授权费用和硬件成本,实现可能比较昂贵

适用场景

  • 需要处理大量消息并且需要保证原子性时
  • 处理与金融交易相关的消息,如股票交易或在线支付
  • 处理与供应链管理相关的消息,例如跟踪库存水平或协调交付
  • 处理与在线订购系统相关的消息,例如处理订单和跟踪发货

注意事项

  • 消息格式 —— 取决于正在使用的具体实现。通常,消息与其他元数据(如消息头和属性)一起被格式化为数据载荷,有效负载可以是各种格式,比如XML、JSON或二进制数据。
  • 消息大小 —— 最大消息大小取决于正在使用的具体实现。较大的消息可能需要分成较小的块进行处理。
  • 错误处理 —— 发生错误时,确保消息处理不中断,错误得到适当处理。因此,需要实现适当的重试机制和错误报告。例如,MQ事务系统向发送方返回错误消息,发送方可以重新发送该消息或通知管理员。需要实现一些错误处理策略,比如设置专用的错误处理流程、实现重试机制,或者使用日志记录和监控工具跟踪错误和性能指标。

参考文献

Message Queue Transactions

Handling Transactions in IBM MQ

Message Queuing and the database: Solving the dual write problem

Message Queues: An Introduction

MSMQ Basics: Queues, Messages, Transactions

Reliable Microservices Data Exchange With the Outbox Pattern

Microservices Pattern: Transactional outbox

The Outbox Pattern

Outbox Pattern for Microservices Architecture

Microservices 101: Transactional Outbox and Inbox

Design Pattern for Distributed Transactions

Making Try-Confirm/Cancel Easy with MicroTx

Build software better, together

An In-Depth Analysis of Distributed Transaction Solutions


你好,我是俞凡,在Motorola做过研发,现在在Mavenir做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。
微信公众号:DeepNoMind

本文由mdnice多平台发布