Dapper NetCore 分表实战

模拟业务场景:公司中有很多员工,分为不同的角色:老板(Boss), 虾兵(Shrimp Soldier),蟹将(CrabGeneral),员工除了角色以外,还有唯一的标识Id,还有姓名displayname,入职日期hiredate。因为员工众多,处于某些业务考虑我们需要根据员工角色将员工信息存储在不同的表中。


首先创建NetCore Library 通过Nuget引入所需类库:Dapper.Contrib(2.0.78)及System.Data.SqlClient(4.8.5)


public abstract class Entity<TKey>
    public virtual TKey Id { get; set; }

public class User : Entity<Guid>
    public string DisplayName { get; set; }
    public Role Role { get; set; }
    public long HireDate { get; set; }

public enum Role
    Boss = 0,
public class ExplicitKey : Attribute

public class SubmeterAttribute : Attribute
    public string Property { get; private set; }
    public SubmeterAttribute(string property)
        this.Property = property;

有了表结构之后,之后设计Repository,首先是BaseRepository除了基本的CRUD,还定义一个虚方法 protected virtual Func<string, string>? CreateScriptFunc { get; set; }要求子类根据特定的需求创建表及定义一个Schema设计的初衷是做数据隔离或者安全隔离。

using Dapper;
using Dapper.Contrib.Extensions;
using System.Data;
using System.Reflection;

namespace API.Dapper.Repository.V2
    public abstract class BaseRepositoryV2<T> where T : class, new()
        protected IDbConnectionSchema DbConnectionSchema { get; set; }
        protected string Schema { get => DbConnectionSchema.Schema; }
        protected IDbTransaction Transaction { get; private set; }
        protected IDbConnection DbConnection => DbConnectionSchema.DbConnection;
        protected Int32 CommandTimeout { get; set; } = 30;
        protected static string _tableName => typeof(T).GetCustomAttribute<TableAttribute>()?.Name ?? typeof(T).Name;
        protected static IEnumerable<string> _properties => typeof(T).GetProperties().Select(p => p.Name);

        protected BaseRepositoryV2(IDbConnectionSchema dbConnectionSchema, IDbTransaction dbTransaction) 
            DbConnectionSchema = dbConnectionSchema;
            Transaction = dbTransaction;

        protected virtual Func<string, string>? CreateScriptFunc { get; set; }

        public async Task<Int32> AddAsync(IEnumerable<T> entities, string tableName = default)
            var sql = string.Format(Sql.Insert.BuildSchema(Schema), tableName ?? _tableName, string.Join(",", _properties), string.Join(",", _properties.Select(p => $"@{p}")));
            return await DbConnection.ExecuteAsync(sql, entities, Transaction, CommandTimeout, CommandType.Text);

        public async Task<IEnumerable<T>> GetAllAsync(string tableName = default)
            var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", _properties), tableName ?? _tableName, string.Empty);
            return await DbConnection.QueryAsync<T>(sql, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text);

        public async Task<T> GetAsync(Guid id, string tableName = default)
            var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", _properties), tableName ?? _tableName, $"WHERE Id = @Id");
            var param = new DynamicParameters();
            param.Add("Id", id);
            return await DbConnection.QuerySingleOrDefaultAsync<T>(sql, param, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text);

        public async Task<IEnumerable<T>> GetAsync(IEnumerable<string> selectedProperties, IEnumerable<string> whereProperties, string tableName = default)
            var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", selectedProperties), tableName ?? _tableName, string.Join(" AND ", whereProperties.Select(p => $"{p}=@{p}")));
            return await DbConnection.QueryAsync<T>(sql, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text);

        public async Task<IEnumerable<T>> GetAsync(IEnumerable<string> selectedProperties, string whereClause, DynamicParameters sqlParams, string tableName = default)
            var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", selectedProperties), tableName ?? _tableName, whereClause);
            return await DbConnection.QueryAsync<T>(sql, sqlParams, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text);

        public async Task<Int32> UpdateAsync(IEnumerable<T> entities, IEnumerable<string> updateProperties, IEnumerable<string> whereProperties, string tableName = default)
            var updateClause = string.Join(",", _properties.Intersect(updateProperties).Select(p => $"{p}=@{p}"));
            var whereClause = $"WHERE {string.Join(" AND ", _properties.Intersect(whereProperties).Select(p => $"{p}=@{p}"))}";
            var sql = string.Format(Sql.Update.BuildSchema(Schema), tableName ?? _tableName, updateClause, whereClause);
            return await DbConnection.ExecuteAsync(sql, entities, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text);

        public async Task<Int32> DeleteAsync(IEnumerable<Guid> ids, string tableName = default)
            var sql = string.Format(Sql.Delete.BuildSchema(Schema), tableName ?? _tableName, $"WHERE Id IN @Ids");
            var param = new DynamicParameters();
            param.Add("Ids", ids);
            return await DbConnection.ExecuteAsync(sql, param, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text);




3)、调用BaseRepository AddAsync写入数据

using API.Dapper.Repository.V2;
using Dapper;
using System.Data;
using System.Reflection;

namespace API.Dapper.Repository;

public abstract class SubmeterRepository<T> : BaseRepositoryV2<T> where T : class, new()
    protected static bool _isSubmeterTable => typeof(T).IsDefined(typeof(SubmeterAttribute));
    protected static string _submeterProperty = typeof(T).GetCustomAttribute<SubmeterAttribute>()?.Property ?? string.Empty;

    public SubmeterRepository(IDbConnectionSchema dbConnectionSchema, IDbTransaction dbTransaction)
        : base(dbConnectionSchema, dbTransaction)

    public async Task<Int32> AddAsync(IEnumerable<T> entities)
        var affectedRows = 0;
        if (_isSubmeterTable)
            var propertyInfo = typeof(T).GetProperty(_submeterProperty)
                ?? throw new NotSupportedException("The current property doen't support for submeter");
            var tables = entities.GroupBy(e => propertyInfo.GetValue(e)?.ToString()!).ToDictionary(g => g.Key, g => g.ToList());
            if (!await DbConnection.IsSchemaExist(Schema, transaction: Transaction, commandTimeout: CommandTimeout))
                await DbConnection.CreateSchema(Schema, transaction: Transaction, commandTimeout: CommandTimeout);
            await tables.ForEachAsync(async table =>
                var tableName = $"{_tableName}_{table.Key}";
                if (!await DbConnection.IsTableExist(Schema, tableName, Transaction, CommandTimeout))
                    await DbConnection.ExecuteAsync(this.CreateScriptFunc?.Invoke(tableName), transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text);
                affectedRows += await base.AddAsync(table.Value, tableName);
            affectedRows = await base.AddAsync(entities);
        return affectedRows;

    public async Task<IEnumerable<T>> GetAllAsync()
        var result = new List<T>();
        if (_isSubmeterTable)
            var tables = await DbConnection.GetTables(Schema, _tableName, Transaction, CommandTimeout);
            await tables.ForEachAsync(async table =>
                var tableValues = await base.GetAllAsync(table);
            result.AddRange(await base.GetAllAsync());
        return result;

    public async Task<T> GetAsync(Guid id)
        var result = default(T);
        if (_isSubmeterTable)
            var tables = await DbConnection.GetTables(Schema, _tableName, Transaction, CommandTimeout);
            var properties = typeof(T).GetProperties().Select(p => p.Name);
            foreach (var table in tables)
                result = await base.GetAsync(id, table);
                if (result != default)
            result = await base.GetAsync(id);
        return result;

    public async Task<Int32> UpdateAsync(IEnumerable<T> entities, IEnumerable<string> updateProperties, IEnumerable<string> whereProperties)
        var affectedRows = 0;
        if (_isSubmeterTable)
            var propertyInfo = typeof(T).GetProperty(_submeterProperty)
                ?? throw new NotSupportedException("The current property doen't support for submeter");
            var tables = entities.GroupBy(e => propertyInfo.GetValue(e)?.ToString()!).ToDictionary(g => g.Key, g => g.ToList());
            foreach (var table in tables)
                var tableName = $"{_tableName}_{table.Key}";
                affectedRows += await base.UpdateAsync(table.Value, updateProperties, whereProperties, tableName);
                if (affectedRows == entities.Count())
            affectedRows = await base.UpdateAsync(entities, updateProperties, whereProperties);
        return affectedRows;

    public async Task<Int32> DeleteAsync(IEnumerable<Guid> ids)
        var affectedRows = 0;
        if (_isSubmeterTable)
            var tables = await DbConnection.GetTables(Schema, _tableName, Transaction, CommandTimeout);
            foreach (var table in tables)
                affectedRows = await base.DeleteAsync(ids, table);
                if (affectedRows == ids.Count())
            affectedRows = await base.DeleteAsync(ids);
        return affectedRows;


using API.Dapper.Entity;
using Dapper;
using System.Data;

namespace API.Dapper.Repository.V2
    public class UserRepositoryV2 : SubmeterRepository<User>
        public UserRepositoryV2(IDbConnectionSchema dbConnectionSchema, IDbTransaction dbTransaction) 
            : base(dbConnectionSchema, dbTransaction)

        public async Task<IEnumerable<User>> GetAsync(IEnumerable<Guid> ids, IEnumerable<string> selectedProperties = default)
            var result = new List<User>();
            var tables = await DbConnection.GetTables(Schema, _tableName, Transaction, CommandTimeout);
            selectedProperties = selectedProperties != null ? _properties.Intersect(selectedProperties) : _properties;
            foreach (var table in tables)
                var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", selectedProperties), table, $"WHERE Id IN @Ids");
                var param = new DynamicParameters();
                param.Add("Ids", ids);
                var tableValues = await DbConnection.QueryAsync<User>(sql, param, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text);
                if (result.Count() == ids.Count())
            return result;

        public async Task<IEnumerable<User>> GetAsync(Role role)
            if (_isSubmeterTable)
                return await base.GetAllAsync($"{_tableName}_{role}");
                return await base.GetAsync(_properties, new List<string> { "Role" });

        public async Task<IEnumerable<User>> GetAsync(IEnumerable<User> entities)
            var result = new List<User>(); 
            if (_isSubmeterTable)
                var propertyInfo = typeof(User).GetProperty(_submeterProperty)
                   ?? throw new NotSupportedException("The current property doen't support for submeter");
                var tables = entities.GroupBy(e => propertyInfo.GetValue(e)?.ToString()!).ToDictionary(g => g.Key, g => g.ToList());
                foreach (var table in tables)
                    var tableName = $"{_tableName}_{table.Key}";
                    var sqlParams = new DynamicParameters();
                    sqlParams.Add("Ids", table.Value.Select(v => typeof(User).GetProperty("Id")?.GetValue(v)));
                    var tableValues = await base.GetAsync(_properties, $"WHERE Id IN @Ids",sqlParams, tableName);
                var sqlParams = new DynamicParameters();
                sqlParams.Add("Ids",entities.Select(v => typeof(User).GetProperty("Id")?.GetValue(v)));
                result.AddRange(await base.GetAsync(_properties, $"WHERE Id IN @Ids", sqlParams));
            return result;

        protected override Func<string, string> CreateScriptFunc => tableName =>
            _tableName switch
            APITable.User => "CREATE TABLE [{schema}].".BuildSchema(Schema) + $"[{tableName}](" + @"
                [Id] [uniqueidentifier] NOT NULL,
                [DisplayName] [nvarchar](256) NOT NULL,
                [Role] [int] NULL,
                [HireDate] [bigint] NULL," +
                $"CONSTRAINT [PK_{tableName}] PRIMARY KEY CLUSTERED ([Id] ASC))" +
                "CREATE NONCLUSTERED INDEX [NonClustered_HireDate] ON [{schema}].".BuildSchema(Schema) + $"[{tableName}]([HireDate] ASC)" +
                "CREATE NONCLUSTERED INDEX [NonClustered_Role] ON [{schema}].".BuildSchema(Schema) + $"[{tableName}]([Role] ASC)",
            _ => string.Empty
 #region Database Submeter
    public static async Task<bool> IsSchemaExist(this IDbConnection connection, string schema, IDbTransaction transaction = default, Int32 commandTimeout = default)
        var sql = $"SELECT COUNT(1) FROM sys.schemas WHERE SCHEMA_NAME(schema_id) = '{schema}'";
        var result = await connection.ExecuteScalarAsync<int>(sql, transaction: transaction, commandTimeout: commandTimeout);
        return result > 0;

    public static async Task<Int32> CreateSchema(this IDbConnection connection, string schema, IDbTransaction transaction = default, Int32 commandTimeout = default)
        var sql = $"CREATE SCHEMA {schema}";
        return await connection.ExecuteAsync(sql, transaction: transaction, commandTimeout: commandTimeout);

    public static async Task<bool> IsTableExist(this IDbConnection connection, string schema, string tableName, IDbTransaction transaction = default, Int32 commandTimeout = default)
        var sql = $"SELECT count(1) FROM sys.tables WHERE name='{tableName}' AND type = 'u' AND SCHEMA_NAME(schema_id)='{schema}'";
        var result = await connection.ExecuteScalarAsync<int>(sql, transaction: transaction, commandTimeout: commandTimeout);
        return result > 0;

    public static async Task<IEnumerable<string>> GetTables(this IDbConnection connection, string schema, string tablePrefix, IDbTransaction transaction = default, Int32 commandTimeout = default)
        var sql = $"SELECT name FROM sys.tables WHERE name LIKE '{tablePrefix}%' AND type ='u' AND SCHEMA_NAME(schema_id)='{schema}'";
        return await connection.QueryAsync<string>(sql, transaction: transaction, commandTimeout: commandTimeout);
DbConnection Extension
namespace API.Dapper;

public static class IEnumerableExtension
    public static void ForEach<T>(this IEnumerable<T> values, Action<T> action)
        foreach (var value in values)

    public static async Task ForEachAsync<T>(this IEnumerable<T> values, Func<T, Task> func)
        foreach (T value in values)
            await func(value);
Common Extension
public static partial class Sql
    public const string Insert = @"INSERT INTO [{schema}].[{0}]({1})VALUES({2})";
    public const string Select = @"SELECT {0} FROM [{schema}].{1} {2}";
    public const string Update = @"UPDATE [{schema}].[{0}] SET {1} {2}";
    public const string Delete = @"DELETE FROM [{schema}].[{0}] {1}";

    public static string BuildSchema(this string sql, string schema)
        => sql.Replace("{schema}", schema);







using System.Data;

namespace API.Dapper.UnitOfWork.V2
    public class UnitOfWorkV2 : IUnitOfWork
        public IDbConnection DbConnection { get; set; }
        public IDbTransaction DbTransaction { get; set; }

        public UnitOfWorkV2(IDbConnectionSchema dbConnectionSchema)
            DbConnection = dbConnectionSchema.DbConnection;
            DbTransaction = dbConnectionSchema.DbConnection.BeginTransaction();

        public UnitOfWorkV2(IDbConnectionSchema dbConnectionSchema, IsolationLevel isolationLevel)
            DbConnection = dbConnectionSchema.DbConnection;
            DbTransaction = dbConnectionSchema.DbConnection.BeginTransaction(isolationLevel);

        public void Commit()

        public void Dispose()
            if (DbTransaction != null)
            if (DbConnection != null) 

        public void Rollback()
using API.Dapper.Repository.V2;

namespace API.Dapper.UnitOfWork.V2
    public class UserUnitOfWorkV2 : UnitOfWorkV2
        public UserRepositoryV2 UserRepositoryV2 { get; }

        public UserUnitOfWorkV2(IDbConnectionSchema dbConnectionSchema) 
            : base(dbConnectionSchema)
            UserRepositoryV2 = new UserRepositoryV2(dbConnectionSchema, DbTransaction);
public interface IDbConnectionSchema : IDisposable
    public IDbConnection DbConnection { get; }
    public String Schema { get; }

public abstract class DbConnectionSchema : IDbConnectionSchema
    public String Schema { get; protected set; }
    private IDbConnection? dbConnection;
    public IDbConnection DbConnection
            if (dbConnection == null)
                dbConnection = this.CreateDbConnection();
            return dbConnection;
        protected set
            dbConnection = value;

    public abstract IDbConnection CreateDbConnection();

    public void Dispose()
        if(dbConnection != null)
public class DemoDbConnectionSchema : DbConnectionSchema
    public override IDbConnection CreateDbConnection()
        Schema = UserContext.Current.Schema;
        return new SqlConnection(UserContext.Current.ConnectionString);


创建一个NetCore Console Application,引入上面Library工程,定义UserService

using API.Dapper.Entity;
using API.Dapper.UnitOfWork.V2;

namespace API.Dapper.Test;

public class UserService
    public async Task<IEnumerable<User>> GetAllAsync()
        using var unitOfWork = new UserUnitOfWorkV2(new DemoDbConnectionSchema());
        return await unitOfWork.UserRepositoryV2.GetAllAsync();

    public async Task AddAsync(IEnumerable<User> users)
        using var unitOfWork = new UserUnitOfWorkV2(new DemoDbConnectionSchema());
        await unitOfWork.UserRepositoryV2.AddAsync(users);



var services = new ServiceCollection();
var serviceProvider = services.BuildServiceProvider();

UserContext.Current.Init(new UserPrincipal
    UserId = Guid.Parse("00000000-0000-0000-0000-000000000001"),
    DisplayName = "Boss",
    Role = Role.Boss,
    HireDate = DateTime.UtcNow.Ticks

var userService = serviceProvider.GetRequiredService<UserService>();
Console.WriteLine("-----------Start to add user to sumbmeter tables--------------");
await userService.AddAsync(new List<User>
    new User { Id = Guid.NewGuid(), Role = Role.ShrimpSoldier, DisplayName ="tom", HireDate=DateTime.UtcNow.Ticks },
    new User { Id =Guid.NewGuid(), Role = Role.CrabGeneral, DisplayName ="jack", HireDate = DateTime.UtcNow.Ticks}
Console.WriteLine("-----------Add two users to tables----------------------------");
var users = await userService.GetAllAsync();
Console.WriteLine($"Get all users from tables \r\n {string.Join("\r\n", users.Select(u => $"Id: {u.Id}, DisplayName: {u.DisplayName}, Role: {u.Role}, Hire date: {u.HireDate}"))}");




 补充:可能有的小伙伴对下面这段代码感到好奇,解释一下哈,设计的初衷是先做数据隔离,UserId作为db schema创建不同Scheam从而做到数据的隔离,显然这样的想法是不符合实际业务需求的,这里只是尝试Schema这种方式,勿喷!!!勿喷!!!

public class UserContext
    private static readonly AsyncLocal<UserContext> _instance;

    static UserContext()
        _instance = new AsyncLocal<UserContext>();
    public static UserContext Current
        get => _instance.Value == default ? _instance.Value = new UserContext() : _instance.Value;
        private set => _instance.Value = value;

    public string Schema { get; set; }
    public string ConnectionString 
        get => $"***************";
    public IPrincipal? Principal { get; set; }
    public void Init(IPrincipal princial)
        if (princial != null && princial is UserPrincipal)
            var userPrincial = princial as UserPrincipal;
            Schema = $"Dapper{userPrincial?.UserId}".Replace("-", "");
        Principal = princial;
public interface IPrincipal

public class UserPrincipal : IPrincipal
    public Guid UserId { get; set; }
    public string DisplayName { get; set; }
    public Role Role { get; set; }
    public long HireDate { get; set; }
UserContext.Current.Init(new UserPrincipal
    UserId = Guid.Parse("00000000-0000-0000-0000-000000000001"),
    DisplayName = "Jack",
    Role = Role.Boss,
    HireDate = DateTime.UtcNow.Ticks