.NET微服务系列之Saga分布式事务案例实践

发布时间 2023-10-11 08:34:12作者: linguicheng

        自从Wing正式发布以后,很多童鞋反馈对Saga分布式事务比较感兴趣,今天就跟大家分享一下“跨行转账”的分布式事务实践案例,入门使用教程请自行前往Wing官方文档

假设自己名下有“中国农业银行(ABC)”和“中国工商银行(ICBC)”的账户余额各1万元,现在从“ABC”跨行转账1000元到“ICBC”。对于“ABC”我们创建一个项目名称为

“Saga.Bank.ABC”,“跨行转账”这个动作我们分为两个事务单元来处理:

1、当前账户扣减1000元,定义一个事务单元的数据传输模型(MyAccountUnitModel),一个事务单元的实现类(MyAccountSagaUnit),如果我们定义的事务策略是“向前恢复”,那就只需要实现“Commit”

方法,否则还需要实现 “Cancel”方法,代码如下:

事务单元的数据传输模型(MyAccountUnitModel)

 1 using System;
 2 using Wing.Saga.Client;
 3 
 4 namespace Saga.Bank.ABC.TransferSagaUnits
 5 {
 6     [Serializable]
 7     public class MyAccountUnitModel : UnitModel
 8     {
 9         /// <summary>
10         /// 账号
11         /// </summary>
12         public string BankNo { get; set; }
13 
14         /// <summary>
15         /// 转出金额
16         /// </summary>
17         public double Amount { get; set; }
18     }
19 }

事务单元的实现类(MyAccountSagaUnit)

 1 using Microsoft.AspNetCore.Mvc;
 2 using System.Threading.Tasks;
 3 using Wing.Saga.Client;
 4 
 5 namespace Saga.Bank.ABC.TransferSagaUnits
 6 {
 7     /// <summary>
 8     /// 当前账户操作
 9     /// </summary>
10     public class MyAccountSagaUnit : SagaUnit<MyAccountUnitModel>
11     {
12         public override Task<SagaResult> Cancel(MyAccountUnitModel model, SagaResult previousResult)
13         {
14             MyAccount.Balance += model.Amount;
15             return Task.FromResult(new SagaResult());
16         }
17 
18         public override Task<SagaResult> Commit(MyAccountUnitModel model, SagaResult previousResult)
19         {
20             var result = new SagaResult();
21             if (MyAccount.Balance < model.Amount)
22             {
23                 result.Success = false;
24                 result.Msg = "转账失败,当前账户余额不足!";
25             }
26             MyAccount.Balance -= model.Amount;
27             return Task.FromResult(result);
28         }
29     }
30 }

2、调用收款行“ICBC”的接口,同样,也是定义一个事务单元的数据传输模型(TransferOutUnitModel),一个事务单元的实现类(TransferOutSagaUnit),代码如下:

事务单元的数据传输模型(TransferOutUnitModel)

 1 using System;
 2 using Wing.Saga.Client;
 3 
 4 namespace Saga.Bank.ABC.TransferSagaUnits
 5 {
 6     [Serializable]
 7     public class TransferOutUnitModel : UnitModel
 8     {
 9         /// <summary>
10         /// 收款账号
11         /// </summary>
12         public string BankNo { get; set; }
13 
14         /// <summary>
15         /// 收款行
16         /// </summary>
17         public string BankName { get; set; }
18 
19         /// <summary>
20         /// 接收金额
21         /// </summary>
22         public double Amount { get; set; }
23     }
24 }

事务单元的实现类(TransferOutSagaUnit)

 1 using System.Net.Http;
 2 using System;
 3 using System.Threading.Tasks;
 4 using Wing;
 5 using Wing.Saga.Client;
 6 using Wing.ServiceProvider;
 7 using Newtonsoft.Json;
 8 using Wing.Result;
 9 using System.Text;
10 
11 namespace Saga.Bank.ABC.TransferSagaUnits
12 {
13     /// <summary>
14     /// 账户转出操作
15     /// </summary>
16     public class TransferOutSagaUnit : SagaUnit<TransferOutUnitModel>
17     {
18         private readonly IServiceFactory _serviceFactory = App.GetService<IServiceFactory>();
19         private readonly IHttpClientFactory _httpClientFactory = App.GetService<IHttpClientFactory>();
20 
21         public override Task<SagaResult> Cancel(TransferOutUnitModel model, SagaResult previousResult)
22         {
23             throw new NotImplementedException();
24         }
25 
26         public override Task<SagaResult> Commit(TransferOutUnitModel model, SagaResult previousResult)
27         {
28             return _serviceFactory.InvokeAsync("Saga.Bank.ICBC", async serviceAddr =>
29             {
30                 var client = _httpClientFactory.CreateClient();
31                 client.BaseAddress = new Uri(serviceAddr.ToString());
32                 var response = await client.PostAsync("/TransferReceive", new StringContent(JsonConvert.SerializeObject(model), Encoding.UTF8, "application/json"));
33                 var sagaResult = new SagaResult();
34                 if (response.IsSuccessStatusCode)
35                 {
36                     var apiStrResult = await response.Content.ReadAsStringAsync();
37                     var apiResult = JsonConvert.DeserializeObject<ApiResult<bool>>(apiStrResult);
38                     if (apiResult.Code == ResultType.Success)
39                     {
40                         sagaResult.Success = apiResult.Data;
41                     }
42                     else
43                     {
44                         sagaResult.Success = false;
45                     }
46                     sagaResult.Msg = apiResult.Msg;
47                 }
48                 else
49                 {
50                     sagaResult.Success= false;
51                     sagaResult.Msg = $"调用工商银行接口失败,http状态码:{(int)response.StatusCode}";
52                 }
53                 return sagaResult;
54             });
55         }
56     }
57 }

以上两个事务单元将组成一个完整的“跨行转账”事务,代码如下:

 1 using Microsoft.AspNetCore.Mvc;
 2 using Microsoft.AspNetCore.Routing;
 3 using Saga.Bank.ABC.TransferSagaUnits;
 4 using System;
 5 using Wing.Persistence.Saga;
 6 using Wing.Saga.Client;
 7 
 8 namespace Saga.Bank.ABC.Controllers
 9 {
10     /// <summary>
11     /// 转账
12     /// </summary>
13     [ApiController]
14     [Route("[controller]")]
15     public class TransferAccountsController : ControllerBase
16     {
17         public TransferAccountsController()
18         {
19         }
20 
21         /// <summary>
22         /// 当前账户余额
23         /// </summary>
24         /// <returns></returns>
25         public string Get()
26         {
27             return $"我是中国农业银行账户,当前账户余额为:{MyAccount.Balance}¥";
28         }
29 
30 
31         [HttpGet("{amount}")]
32         public bool Get(double amount)
33         {
34             if (amount <= 0)
35             {
36                 throw new Exception("转账金额必须大于0");
37             }
38             var result = Wing.Saga.Client.Saga.Start("跨行转账", new SagaOptions { TranPolicy = TranPolicy.Forward })
39                    .Then(new MyAccountSagaUnit(), new MyAccountUnitModel
40                    {
41                        Name = "当前账户扣减",
42                        BankNo = MyAccount.BankNo,
43                        Amount = 1000
44                    })
45                   .Then(new TransferOutSagaUnit(), new TransferOutUnitModel
46                   {
47                       Name = "调用收款行接口",
48                       BankNo = "987654321",
49                       Amount = 1000,
50                       BankName = "中国工商银行"
51                   })
52                   .End();
53             if (!result.Success)
54             {
55                 throw new Exception(result.Msg);
56             }
57             return result.Success;
58         }
59     }
60 }

对于“ICBC”,我们创建一个项目名称为“Saga.Bank.ICBC”,它的职责很简单,就是增加收款账号的转账金额,代码如下:

 1 using Microsoft.AspNetCore.Mvc;
 2 using Saga.Bank.ICBC.Models;
 3 using System;
 4 
 5 namespace Saga.Bank.ICBC.Controllers
 6 {
 7     /// <summary>
 8     /// 转账
 9     /// </summary>
10     [ApiController]
11     [Route("[controller]")]
12     public class TransferReceiveController : ControllerBase
13     {
14         private static bool _result = false;
15         public TransferReceiveController()
16         {
17         }
18 
19         /// <summary>
20         /// 当前账户余额
21         /// </summary>
22         /// <returns></returns>
23         public string Get()
24         {
25             return $"我是中国工商银行账户,当前账户余额为:{MyAccount.Balance}¥";
26         }
27 
28         /// <summary>
29         /// 手动控制跨行转账收款是否成功,测试需要
30         /// </summary>
31         /// <param name="result"></param>
32         /// <returns></returns>\
33         [HttpGet("{result}")]
34         public bool Get(int result)
35         {
36             _result = result == 1; 
37             return _result;
38         }
39 
40         /// <summary>
41         /// 跨行转账收款动作
42         /// </summary>
43         /// <param name="model"></param>
44         /// <returns></returns>
45         /// <exception cref="Exception"></exception>
46         [HttpPost]
47         public bool Post(ReceivedModel model)
48         {
49             if (model.BankNo != MyAccount.BankNo)
50             {
51                 throw new Exception("账号不存在!");
52             }
53             if (!_result)
54             {
55                 throw new Exception("跨行转账业务失败!");
56             }
57             MyAccount.Balance += model.Amount;
58             return true;
59         }
60     }
61 }

 启动“Saga.Bank.ICBC”项目,可以看到当前账户余额为10000元,如下图:

启动“Saga.Bank.ABC”项目,可以看到当前账户余额也是为10000元,如下图:

 启动Saga协调服务“Saga.Bank.Server”,启动“Wing.UI”示例1.3,  我们调用农业银行跨行转账接口 http://localhost:9110/TransferAccounts/1000,这时我们可以看到“ABC”的余额为

9000元,“ICBC”的余额还是10000元,因为“ICBC”自身业务操作处理失败,如下图所示:

 

 

我们把“Saga.Bank.ICBC”的收款接口处理结果改为“成功”(调用接口 http://localhost:9111/TransferReceive/1),1分钟左右,我们重新查看“ICBC”的账户余额为11000元,“跨行转账”事务也处理完成了,如下图:

 

 

 代码完整示例下载地址:https://gitee.com/linguicheng/wing-demo