在项目开发过程中很多时候需要持久化大数据,其中的一项选择就是数据库分库分表,本篇从实战角度介绍在NetCore中如何通过Dapper实现分表,废话不多说,开搞!
模拟业务场景:公司中有很多员工,分为不同的角色:老板(Boss), 虾兵(Shrimp Soldier),蟹将(CrabGeneral),员工除了角色以外,还有唯一的标识Id,还有姓名displayname,入职日期hiredate。因为员工众多,处于某些业务考虑我们需要根据员工角色将员工信息存储在不同的表中。
基于以上虚拟的需求,我们根据员工角色信息创建不同的表。
首先创建NetCore Library 通过Nuget引入所需类库:Dapper.Contrib(2.0.78)及System.Data.SqlClient(4.8.5)
设计员工表结构,其中TableAttribute标识表名,SubmeterAttribute标识当前表是分表结构,Property代表按照某个属性分表
public abstract class Entity<TKey> { [ExplicitKey] public virtual TKey Id { get; set; } } [Table("User")] [Submeter(property:"Role")] public class User : Entity<Guid> { public string DisplayName { get; set; } public Role Role { get; set; } public long HireDate { get; set; } } public enum Role { Boss = 0, ShrimpSoldier, CrabGeneral }
[AttributeUsage(AttributeTargets.Property)] public class ExplicitKey : Attribute { } [AttributeUsage(AttributeTargets.Class)] public class SubmeterAttribute : Attribute { public string Property { get; private set; } public SubmeterAttribute(string property) { this.Property = property; } }
有了表结构之后,之后设计Repository,首先是BaseRepository除了基本的CRUD,还定义一个虚方法 protected virtual Func<string, string>? CreateScriptFunc { get; set; }要求子类根据特定的需求创建表及定义一个Schema设计的初衷是做数据隔离或者安全隔离。
using Dapper; using Dapper.Contrib.Extensions; using System.Data; using System.Reflection; namespace API.Dapper.Repository.V2 { public abstract class BaseRepositoryV2<T> where T : class, new() { protected IDbConnectionSchema DbConnectionSchema { get; set; } protected string Schema { get => DbConnectionSchema.Schema; } protected IDbTransaction Transaction { get; private set; } protected IDbConnection DbConnection => DbConnectionSchema.DbConnection; protected Int32 CommandTimeout { get; set; } = 30; protected static string _tableName => typeof(T).GetCustomAttribute<TableAttribute>()?.Name ?? typeof(T).Name; protected static IEnumerable<string> _properties => typeof(T).GetProperties().Select(p => p.Name); protected BaseRepositoryV2(IDbConnectionSchema dbConnectionSchema, IDbTransaction dbTransaction) { DbConnectionSchema = dbConnectionSchema; Transaction = dbTransaction; } protected virtual Func<string, string>? CreateScriptFunc { get; set; } public async Task<Int32> AddAsync(IEnumerable<T> entities, string tableName = default) { var sql = string.Format(Sql.Insert.BuildSchema(Schema), tableName ?? _tableName, string.Join(",", _properties), string.Join(",", _properties.Select(p => $"@{p}"))); return await DbConnection.ExecuteAsync(sql, entities, Transaction, CommandTimeout, CommandType.Text); } public async Task<IEnumerable<T>> GetAllAsync(string tableName = default) { var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", _properties), tableName ?? _tableName, string.Empty); return await DbConnection.QueryAsync<T>(sql, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text); } public async Task<T> GetAsync(Guid id, string tableName = default) { var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", _properties), tableName ?? _tableName, $"WHERE Id = @Id"); var param = new DynamicParameters(); param.Add("Id", id); return await DbConnection.QuerySingleOrDefaultAsync<T>(sql, param, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text); } public async Task<IEnumerable<T>> GetAsync(IEnumerable<string> selectedProperties, IEnumerable<string> whereProperties, string tableName = default) { var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", selectedProperties), tableName ?? _tableName, string.Join(" AND ", whereProperties.Select(p => $"{p}=@{p}"))); return await DbConnection.QueryAsync<T>(sql, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text); } public async Task<IEnumerable<T>> GetAsync(IEnumerable<string> selectedProperties, string whereClause, DynamicParameters sqlParams, string tableName = default) { var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", selectedProperties), tableName ?? _tableName, whereClause); return await DbConnection.QueryAsync<T>(sql, sqlParams, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text); } public async Task<Int32> UpdateAsync(IEnumerable<T> entities, IEnumerable<string> updateProperties, IEnumerable<string> whereProperties, string tableName = default) { var updateClause = string.Join(",", _properties.Intersect(updateProperties).Select(p => $"{p}=@{p}")); var whereClause = $"WHERE {string.Join(" AND ", _properties.Intersect(whereProperties).Select(p => $"{p}=@{p}"))}"; var sql = string.Format(Sql.Update.BuildSchema(Schema), tableName ?? _tableName, updateClause, whereClause); return await DbConnection.ExecuteAsync(sql, entities, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text); } public async Task<Int32> DeleteAsync(IEnumerable<Guid> ids, string tableName = default) { var sql = string.Format(Sql.Delete.BuildSchema(Schema), tableName ?? _tableName, $"WHERE Id IN @Ids"); var param = new DynamicParameters(); param.Add("Ids", ids); return await DbConnection.ExecuteAsync(sql, param, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text); } } }
BaseRepository中基础的CURD不能满足分表的业务需求,还需要一些特定的逻辑,比如写入数据的时候该往那张表去写,怎么读数据等,往往需要指定特定的表信息,所以这里定义了一个SubmeterRepository继承自BaseRepository处理特定的需求。以AddAsync为例简单阐述需要哪些操作:
1)、判断Schema是否存在,不存在需要创建
2)、根据写入数据信息判断是否有响应的表,不存在需要创建
3)、调用BaseRepository AddAsync写入数据
using API.Dapper.Repository.V2; using Dapper; using System.Data; using System.Reflection; namespace API.Dapper.Repository; public abstract class SubmeterRepository<T> : BaseRepositoryV2<T> where T : class, new() { protected static bool _isSubmeterTable => typeof(T).IsDefined(typeof(SubmeterAttribute)); protected static string _submeterProperty = typeof(T).GetCustomAttribute<SubmeterAttribute>()?.Property ?? string.Empty; public SubmeterRepository(IDbConnectionSchema dbConnectionSchema, IDbTransaction dbTransaction) : base(dbConnectionSchema, dbTransaction) { } public async Task<Int32> AddAsync(IEnumerable<T> entities) { var affectedRows = 0; if (_isSubmeterTable) { var propertyInfo = typeof(T).GetProperty(_submeterProperty) ?? throw new NotSupportedException("The current property doen't support for submeter"); var tables = entities.GroupBy(e => propertyInfo.GetValue(e)?.ToString()!).ToDictionary(g => g.Key, g => g.ToList()); if (!await DbConnection.IsSchemaExist(Schema, transaction: Transaction, commandTimeout: CommandTimeout)) await DbConnection.CreateSchema(Schema, transaction: Transaction, commandTimeout: CommandTimeout); await tables.ForEachAsync(async table => { var tableName = $"{_tableName}_{table.Key}"; if (!await DbConnection.IsTableExist(Schema, tableName, Transaction, CommandTimeout)) await DbConnection.ExecuteAsync(this.CreateScriptFunc?.Invoke(tableName), transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text); affectedRows += await base.AddAsync(table.Value, tableName); }); } else affectedRows = await base.AddAsync(entities); return affectedRows; } public async Task<IEnumerable<T>> GetAllAsync() { var result = new List<T>(); if (_isSubmeterTable) { var tables = await DbConnection.GetTables(Schema, _tableName, Transaction, CommandTimeout); await tables.ForEachAsync(async table => { var tableValues = await base.GetAllAsync(table); result.AddRange(tableValues); }); } else result.AddRange(await base.GetAllAsync()); return result; } public async Task<T> GetAsync(Guid id) { var result = default(T); if (_isSubmeterTable) { var tables = await DbConnection.GetTables(Schema, _tableName, Transaction, CommandTimeout); var properties = typeof(T).GetProperties().Select(p => p.Name); foreach (var table in tables) { result = await base.GetAsync(id, table); if (result != default) break; } } else result = await base.GetAsync(id); return result; } public async Task<Int32> UpdateAsync(IEnumerable<T> entities, IEnumerable<string> updateProperties, IEnumerable<string> whereProperties) { var affectedRows = 0; if (_isSubmeterTable) { var propertyInfo = typeof(T).GetProperty(_submeterProperty) ?? throw new NotSupportedException("The current property doen't support for submeter"); var tables = entities.GroupBy(e => propertyInfo.GetValue(e)?.ToString()!).ToDictionary(g => g.Key, g => g.ToList()); foreach (var table in tables) { var tableName = $"{_tableName}_{table.Key}"; affectedRows += await base.UpdateAsync(table.Value, updateProperties, whereProperties, tableName); if (affectedRows == entities.Count()) break; } } else affectedRows = await base.UpdateAsync(entities, updateProperties, whereProperties); return affectedRows; } public async Task<Int32> DeleteAsync(IEnumerable<Guid> ids) { var affectedRows = 0; if (_isSubmeterTable) { var tables = await DbConnection.GetTables(Schema, _tableName, Transaction, CommandTimeout); foreach (var table in tables) { affectedRows = await base.DeleteAsync(ids, table); if (affectedRows == ids.Count()) break; } } else affectedRows = await base.DeleteAsync(ids); return affectedRows; } }
接下来定义UserRepository实现CreateScriptFunc及补充业务独有的需求及自定义一些DbConnection的扩展方法
using API.Dapper.Entity; using Dapper; using System.Data; namespace API.Dapper.Repository.V2 { public class UserRepositoryV2 : SubmeterRepository<User> { public UserRepositoryV2(IDbConnectionSchema dbConnectionSchema, IDbTransaction dbTransaction) : base(dbConnectionSchema, dbTransaction) { } public async Task<IEnumerable<User>> GetAsync(IEnumerable<Guid> ids, IEnumerable<string> selectedProperties = default) { var result = new List<User>(); var tables = await DbConnection.GetTables(Schema, _tableName, Transaction, CommandTimeout); selectedProperties = selectedProperties != null ? _properties.Intersect(selectedProperties) : _properties; foreach (var table in tables) { var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", selectedProperties), table, $"WHERE Id IN @Ids"); var param = new DynamicParameters(); param.Add("Ids", ids); var tableValues = await DbConnection.QueryAsync<User>(sql, param, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text); result.AddRange(tableValues); if (result.Count() == ids.Count()) break; } return result; } public async Task<IEnumerable<User>> GetAsync(Role role) { if (_isSubmeterTable) return await base.GetAllAsync($"{_tableName}_{role}"); else return await base.GetAsync(_properties, new List<string> { "Role" }); } public async Task<IEnumerable<User>> GetAsync(IEnumerable<User> entities) { var result = new List<User>(); if (_isSubmeterTable) { var propertyInfo = typeof(User).GetProperty(_submeterProperty) ?? throw new NotSupportedException("The current property doen't support for submeter"); var tables = entities.GroupBy(e => propertyInfo.GetValue(e)?.ToString()!).ToDictionary(g => g.Key, g => g.ToList()); foreach (var table in tables) { var tableName = $"{_tableName}_{table.Key}"; var sqlParams = new DynamicParameters(); sqlParams.Add("Ids", table.Value.Select(v => typeof(User).GetProperty("Id")?.GetValue(v))); var tableValues = await base.GetAsync(_properties, $"WHERE Id IN @Ids",sqlParams, tableName); result.AddRange(tableValues); } } else { var sqlParams = new DynamicParameters(); sqlParams.Add("Ids",entities.Select(v => typeof(User).GetProperty("Id")?.GetValue(v))); result.AddRange(await base.GetAsync(_properties, $"WHERE Id IN @Ids", sqlParams)); } return result; } protected override Func<string, string> CreateScriptFunc => tableName => _tableName switch { APITable.User => "CREATE TABLE [{schema}].".BuildSchema(Schema) + $"[{tableName}](" + @" [Id] [uniqueidentifier] NOT NULL, [DisplayName] [nvarchar](256) NOT NULL, [Role] [int] NULL, [HireDate] [bigint] NULL," + $"CONSTRAINT [PK_{tableName}] PRIMARY KEY CLUSTERED ([Id] ASC))" + "CREATE NONCLUSTERED INDEX [NonClustered_HireDate] ON [{schema}].".BuildSchema(Schema) + $"[{tableName}]([HireDate] ASC)" + "CREATE NONCLUSTERED INDEX [NonClustered_Role] ON [{schema}].".BuildSchema(Schema) + $"[{tableName}]([Role] ASC)", _ => string.Empty }; } }
#region Database Submeter public static async Task<bool> IsSchemaExist(this IDbConnection connection, string schema, IDbTransaction transaction = default, Int32 commandTimeout = default) { var sql = $"SELECT COUNT(1) FROM sys.schemas WHERE SCHEMA_NAME(schema_id) = '{schema}'"; var result = await connection.ExecuteScalarAsync<int>(sql, transaction: transaction, commandTimeout: commandTimeout); return result > 0; } public static async Task<Int32> CreateSchema(this IDbConnection connection, string schema, IDbTransaction transaction = default, Int32 commandTimeout = default) { var sql = $"CREATE SCHEMA {schema}"; return await connection.ExecuteAsync(sql, transaction: transaction, commandTimeout: commandTimeout); } public static async Task<bool> IsTableExist(this IDbConnection connection, string schema, string tableName, IDbTransaction transaction = default, Int32 commandTimeout = default) { var sql = $"SELECT count(1) FROM sys.tables WHERE name='{tableName}' AND type = 'u' AND SCHEMA_NAME(schema_id)='{schema}'"; var result = await connection.ExecuteScalarAsync<int>(sql, transaction: transaction, commandTimeout: commandTimeout); return result > 0; } public static async Task<IEnumerable<string>> GetTables(this IDbConnection connection, string schema, string tablePrefix, IDbTransaction transaction = default, Int32 commandTimeout = default) { var sql = $"SELECT name FROM sys.tables WHERE name LIKE '{tablePrefix}%' AND type ='u' AND SCHEMA_NAME(schema_id)='{schema}'"; return await connection.QueryAsync<string>(sql, transaction: transaction, commandTimeout: commandTimeout); } #endregion
namespace API.Dapper; public static class IEnumerableExtension { public static void ForEach<T>(this IEnumerable<T> values, Action<T> action) { foreach (var value in values) { action(value); } } public static async Task ForEachAsync<T>(this IEnumerable<T> values, Func<T, Task> func) { foreach (T value in values) { await func(value); } } }
public static partial class Sql { public const string Insert = @"INSERT INTO [{schema}].[{0}]({1})VALUES({2})"; public const string Select = @"SELECT {0} FROM [{schema}].{1} {2}"; public const string Update = @"UPDATE [{schema}].[{0}] SET {1} {2}"; public const string Delete = @"DELETE FROM [{schema}].[{0}] {1}"; public static string BuildSchema(this string sql, string schema) => sql.Replace("{schema}", schema); }
其次就是对于Repository的事务管理UnitOfWork,这里也是定义了一个基类的UnitOfWork及UserUnitOfWork,同时为了初始化UnitOfWork,这里定义了IDbConnectionSchema包含Schema及IDbConenction属性,DemoDbConnectionSchema主要是用于构建IDbConnection对象。
简单回顾下事务的特性(ACID)
Automicity:原子性是事务不可分割的工作单元,事务中的操作要么全部成功,要么全部失败。
Consistency:事务必须使数据库从一个一致性状态变换到另外一个一致性状态。
Isolation:事务的隔离性是多个用户并发访问数据库时,数据库为每个用户开启的事务,不能被其他事务的操作所干扰,多个并发事务之间要相互隔离。
Durability:持久性是指一个事务一旦被提交,它对数据库中的数据的改变时永久性的。
using System.Data; namespace API.Dapper.UnitOfWork.V2 { public class UnitOfWorkV2 : IUnitOfWork { public IDbConnection DbConnection { get; set; } public IDbTransaction DbTransaction { get; set; } public UnitOfWorkV2(IDbConnectionSchema dbConnectionSchema) { DbConnection = dbConnectionSchema.DbConnection; DbConnection.Open(); DbTransaction = dbConnectionSchema.DbConnection.BeginTransaction(); } public UnitOfWorkV2(IDbConnectionSchema dbConnectionSchema, IsolationLevel isolationLevel) { DbConnection = dbConnectionSchema.DbConnection; DbConnection.Open(); DbTransaction = dbConnectionSchema.DbConnection.BeginTransaction(isolationLevel); } public void Commit() { DbTransaction.Commit(); } public void Dispose() { if (DbTransaction != null) DbTransaction.Dispose(); if (DbConnection != null) DbConnection.Dispose(); GC.SuppressFinalize(this); } public void Rollback() { DbTransaction.Rollback(); } } }
using API.Dapper.Repository.V2; namespace API.Dapper.UnitOfWork.V2 { public class UserUnitOfWorkV2 : UnitOfWorkV2 { public UserRepositoryV2 UserRepositoryV2 { get; } public UserUnitOfWorkV2(IDbConnectionSchema dbConnectionSchema) : base(dbConnectionSchema) { UserRepositoryV2 = new UserRepositoryV2(dbConnectionSchema, DbTransaction); } } }
public interface IDbConnectionSchema : IDisposable { public IDbConnection DbConnection { get; } public String Schema { get; } } public abstract class DbConnectionSchema : IDbConnectionSchema { public String Schema { get; protected set; } private IDbConnection? dbConnection; public IDbConnection DbConnection { get { if (dbConnection == null) dbConnection = this.CreateDbConnection(); return dbConnection; } protected set { dbConnection = value; } } public abstract IDbConnection CreateDbConnection(); public void Dispose() { if(dbConnection != null) dbConnection.Dispose(); GC.SuppressFinalize(this); } }
public class DemoDbConnectionSchema : DbConnectionSchema { public override IDbConnection CreateDbConnection() { Schema = UserContext.Current.Schema; return new SqlConnection(UserContext.Current.ConnectionString); } }
有了上面对数据库分表的基本操作之后,最后定义Service实现CRUD
创建一个NetCore Console Application,引入上面Library工程,定义UserService
using API.Dapper.Entity; using API.Dapper.UnitOfWork.V2; namespace API.Dapper.Test; public class UserService { public async Task<IEnumerable<User>> GetAllAsync() { using var unitOfWork = new UserUnitOfWorkV2(new DemoDbConnectionSchema()); return await unitOfWork.UserRepositoryV2.GetAllAsync(); } public async Task AddAsync(IEnumerable<User> users) { using var unitOfWork = new UserUnitOfWorkV2(new DemoDbConnectionSchema()); await unitOfWork.UserRepositoryV2.AddAsync(users); unitOfWork.Commit(); } }
在Program中通过DI调用UserService测试AddAsync及GetAsync
//DI var services = new ServiceCollection(); services.AddTransient<UserService>(); var serviceProvider = services.BuildServiceProvider(); UserContext.Current.Init(new UserPrincipal { UserId = Guid.Parse("00000000-0000-0000-0000-000000000001"), DisplayName = "Boss", Role = Role.Boss, HireDate = DateTime.UtcNow.Ticks }); var userService = serviceProvider.GetRequiredService<UserService>(); Console.WriteLine("-----------Start to add user to sumbmeter tables--------------"); await userService.AddAsync(new List<User> { new User { Id = Guid.NewGuid(), Role = Role.ShrimpSoldier, DisplayName ="tom", HireDate=DateTime.UtcNow.Ticks }, new User { Id =Guid.NewGuid(), Role = Role.CrabGeneral, DisplayName ="jack", HireDate = DateTime.UtcNow.Ticks} }); Console.WriteLine("-----------Add two users to tables----------------------------"); var users = await userService.GetAllAsync(); Console.WriteLine($"Get all users from tables \r\n {string.Join("\r\n", users.Select(u => $"Id: {u.Id}, DisplayName: {u.DisplayName}, Role: {u.Role}, Hire date: {u.HireDate}"))}");
结果如下:
查看DB发现根据Role创建了不同的数据库表,数据也成功写入。
补充:可能有的小伙伴对下面这段代码感到好奇,解释一下哈,设计的初衷是先做数据隔离,UserId作为db schema创建不同Scheam从而做到数据的隔离,显然这样的想法是不符合实际业务需求的,这里只是尝试Schema这种方式,勿喷!!!勿喷!!!
public class UserContext { private static readonly AsyncLocal<UserContext> _instance; static UserContext() { _instance = new AsyncLocal<UserContext>(); } public static UserContext Current { get => _instance.Value == default ? _instance.Value = new UserContext() : _instance.Value; private set => _instance.Value = value; } public string Schema { get; set; } public string ConnectionString { get => $"***************"; } public IPrincipal? Principal { get; set; } public void Init(IPrincipal princial) { if (princial != null && princial is UserPrincipal) { var userPrincial = princial as UserPrincipal; Schema = $"Dapper{userPrincial?.UserId}".Replace("-", ""); } Principal = princial; } }
public interface IPrincipal { } public class UserPrincipal : IPrincipal { public Guid UserId { get; set; } public string DisplayName { get; set; } public Role Role { get; set; } public long HireDate { get; set; } }
UserContext.Current.Init(new UserPrincipal { UserId = Guid.Parse("00000000-0000-0000-0000-000000000001"), DisplayName = "Jack", Role = Role.Boss, HireDate = DateTime.UtcNow.Ticks });