Dapper NetCore 分区实战

发布时间 2023-11-17 15:27:19作者: 云霄宇霁

在上一篇中介绍了基于Dapper的NetCore分表,本篇旨在介绍基于Dapper的NetCore分区,废话不多说开搞吧!

模拟业务场景:基于公司所在地区对表建立分区

设计公司表结构,其中TableAttribute标识表名,PartitionAttribute标识当前表是分区结构,Property代表按照某个属性分区

public abstract class Entity<TKey>
{
    [ExplicitKey]
    public virtual TKey Id { get; set; }
}

[Table("Company")]
[Partition(property:"Region")]
public class Company : Entity<Guid>
{
    public string Name { get; set; }
    public string Region { get; set; }
}
Company
[AttributeUsage(AttributeTargets.Property)]
public class ExplicitKey : Attribute
{
}

public class PartitionAttribute : Attribute
{
    public string Property { get; set; }
    public PartitionAttribute(string property)
    {
        this.Property = property;
    }
}
Attribute

有了表结构之后设计Repository,首先是BaseRepository除了基本的CRUD,还定义一个虚方法 protected virtual Func<string, string>? CreateScriptFunc { get; set; }要求子类根据特定的需求创建表及定义一个Schema设计的初衷是做数据隔离或者安全隔离。

using Dapper.Contrib.Extensions;
using System.Data;
using System.Reflection;

namespace API.Dapper.Repository.V2
{
    public abstract class BaseRepositoryV2<T> where T : class, new()
    {
        protected IDbConnectionSchema DbConnectionSchema { get; set; }
        protected string Schema { get => DbConnectionSchema.Schema; }
        protected IDbTransaction Transaction { get; private set; }
        protected IDbConnection DbConnection => DbConnectionSchema.DbConnection;
        protected Int32 CommandTimeout { get; set; } = 30;
        protected static string _tableName => typeof(T).GetCustomAttribute<TableAttribute>()?.Name ?? typeof(T).Name;
        protected static IEnumerable<string> _properties => typeof(T).GetProperties().Select(p => p.Name);

        protected BaseRepositoryV2(IDbConnectionSchema dbConnectionSchema, IDbTransaction dbTransaction) 
        {
            DbConnectionSchema = dbConnectionSchema;
            Transaction = dbTransaction;
        }

        protected virtual Func<string, string>? CreateScriptFunc { get; set; }

        public async Task<Int32> AddAsync(IEnumerable<T> entities, string tableName = default)
        {
            var sql = string.Format(Sql.Insert.BuildSchema(Schema), tableName ?? _tableName, string.Join(",", _properties), string.Join(",", _properties.Select(p => $"@{p}")));
            return await DbConnection.ExecuteAsync(sql, entities, Transaction, CommandTimeout, CommandType.Text);
        }

        public async Task<IEnumerable<T>> GetAllAsync(string tableName = default)
        {
            var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", _properties), tableName ?? _tableName, string.Empty);
            return await DbConnection.QueryAsync<T>(sql, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text);
        }

        public async Task<T> GetAsync(Guid id, string tableName = default)
        {
            var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", _properties), tableName ?? _tableName, $"WHERE Id = @Id");
            var param = new DynamicParameters();
            param.Add("Id", id);
            return await DbConnection.QuerySingleOrDefaultAsync<T>(sql, param, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text);
        }

        public async Task<IEnumerable<T>> GetAsync(IEnumerable<string> selectedProperties, IEnumerable<string> whereProperties, string tableName = default)
        {
            var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", selectedProperties), tableName ?? _tableName, string.Join(" AND ", whereProperties.Select(p => $"{p}=@{p}")));
            return await DbConnection.QueryAsync<T>(sql, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text);
        }

        public async Task<IEnumerable<T>> GetAsync(IEnumerable<string> selectedProperties, string whereClause, DynamicParameters sqlParams, string tableName = default)
        {
            var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", selectedProperties), tableName ?? _tableName, whereClause);
            return await DbConnection.QueryAsync<T>(sql, sqlParams, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text);
        }

        public async Task<Int32> UpdateAsync(IEnumerable<T> entities, IEnumerable<string> updateProperties, IEnumerable<string> whereProperties, string tableName = default)
        {
            var updateClause = string.Join(",", _properties.Intersect(updateProperties).Select(p => $"{p}=@{p}"));
            var whereClause = $"WHERE {string.Join(" AND ", _properties.Intersect(whereProperties).Select(p => $"{p}=@{p}"))}";
            var sql = string.Format(Sql.Update.BuildSchema(Schema), tableName ?? _tableName, updateClause, whereClause);
            return await DbConnection.ExecuteAsync(sql, entities, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text);
        }

        public async Task<Int32> DeleteAsync(IEnumerable<Guid> ids, string tableName = default)
        {
            var sql = string.Format(Sql.Delete.BuildSchema(Schema), tableName ?? _tableName, $"WHERE Id IN @Ids");
            var param = new DynamicParameters();
            param.Add("Ids", ids);
            return await DbConnection.ExecuteAsync(sql, param, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text);
        }
    }
}
BaseRepository

BaseRepository中基础的CURD不能满足分区的业务需求,还需要一些特定的逻辑,比如写入数据的时候需要判断是否有分区Function,分区Schema及是否已经创建分区Range values等,所以定义了PartionRepository继承自BaseRepository完成以上操作

using API.Dapper.Attributes;
using API.Dapper.Repository.V2;
using Dapper;
using System.Data;
using System.Reflection;

namespace API.Dapper.Repository;

public abstract class PartitionRepository<T> : BaseRepositoryV2<T> where T : class, new()
{
    public PartitionRepository(IDbConnectionSchema dbConnectionSchema, IDbTransaction dbTransaction)
        : base(dbConnectionSchema, dbTransaction)
    {
    }

    public async Task<Int32> AddAsync(IEnumerable<T> entities)
    {
        if (typeof(T).GetCustomAttribute<PartitionAttribute>() != null)
        {
            if (!await DbConnection.IsTableExist(Schema, _tableName, Transaction, CommandTimeout))
            {
                await DbConnection.ExecuteAsync(this.CreateScriptFunc?.Invoke(_tableName), transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text);
                var partitionValues = GetPartitionValues(entities);
                var existPartitionValues = await DbConnection.GetPartitionValues(_tableName, Schema, transaction: Transaction, commandTimeout: CommandTimeout);
                if (partitionValues.Except(existPartitionValues).Any())
                    await DbConnection.CreatePartitionValues(partitionValues.Except(existPartitionValues), _tableName, Schema, transaction: Transaction, commandTimeout: CommandTimeout);
            }
        }
        return await base.AddAsync(entities);
    }

    private IEnumerable<string> GetPartitionValues(IEnumerable<T> entities)
    {
        var partitionAttribute = typeof(T).GetCustomAttribute(typeof(PartitionAttribute)) as PartitionAttribute;
        var propertyInfo = typeof(T).GetProperties().Where(p => p.Name.Equals(partitionAttribute!.Property)).FirstOrDefault();
        if (propertyInfo != null)
        {
            return entities.Select(e => propertyInfo.GetValue(e, null)).Select(v => $"{v}");
        }
        return default;
    }
}
PartitionRepository

接下来定义CompanyRepository继承自PartitionRepository完成业务Repository

using API.Dapper.Entity;
using System.Data;


namespace API.Dapper.Repository.V2
{
    public class CompanyRepositoryV2 : PartitionRepository<Company>
    {
        private static string _defaultPartitionValue => "CHN";
        public CompanyRepositoryV2(IDbConnectionSchema dbConnectionSchema, IDbTransaction dbTransaction) 
            : base(dbConnectionSchema, dbTransaction)
        {
        }

        protected override Func<string, string> CreateScriptFunc => tableName =>
           _tableName switch
           {
               APITable.Company =>
                $"CREATE PARTITION FUNCTION [{Schema}_{tableName}PartitionFun](nvarchar(256)) AS RANGE RIGHT FOR VALUES ('{_defaultPartitionValue}');" +
                $"CREATE PARTITION SCHEME [{Schema}_{tableName}PartitonSchema] AS PARTITION [{Schema}_{tableName}PartitionFun] ALL TO ([PRIMARY]);" +
                @"CREATE TABLE [{schema}].".BuildSchema(Schema) + $@"[{tableName}](
                [Id] [uniqueidentifier] NOT NULL,
                [Name] [nvarchar](256) NOT NULL,
                [Region] [nvarchar](256) NOT NULL)
                ON [{Schema}_{tableName}PartitonSchema]([Region]);"+
                "CREATE CLUSTERED INDEX [Clustered_Region] ON [{schema}].".BuildSchema(Schema) + $"[{tableName}]([Region] ASC);"+
                "CREATE NONCLUSTERED INDEX [index_ActivityType] ON [{schema}].".BuildSchema(Schema) + $"[{tableName}]([Id] DESC);",
               _ => string.Empty
           };
    }
}
CompanyRepository

其次创建事务管理UnitOfWork

using System.Data;

namespace API.Dapper.UnitOfWork.V2
{
    public class UnitOfWorkV2 : IUnitOfWork
    {
        public IDbConnection DbConnection { get; set; }
        public IDbTransaction DbTransaction { get; set; }

        public UnitOfWorkV2(IDbConnectionSchema dbConnectionSchema)
        {
            DbConnection = dbConnectionSchema.DbConnection;
            DbConnection.Open();
            DbTransaction = dbConnectionSchema.DbConnection.BeginTransaction();
        }

        public UnitOfWorkV2(IDbConnectionSchema dbConnectionSchema, IsolationLevel isolationLevel)
        {
            DbConnection = dbConnectionSchema.DbConnection;
            DbConnection.Open();
            DbTransaction = dbConnectionSchema.DbConnection.BeginTransaction(isolationLevel);
        }

        public void Commit()
        {
            DbTransaction.Commit();
        }

        public void Dispose()
        {
            if (DbTransaction != null)
                DbTransaction.Dispose();
            if (DbConnection != null) 
                DbConnection.Dispose();
            GC.SuppressFinalize(this);
        }

        public void Rollback()
        {
            DbTransaction.Rollback();
        }
    }
}
UnitOfWork
using API.Dapper.Repository.V2;

namespace API.Dapper.UnitOfWork.V2;

public class CompanyUnitOfWorkV2 : UnitOfWorkV2
{
    public CompanyRepositoryV2 CompanyRepositoryV2 { get; }
    public CompanyUnitOfWorkV2(IDbConnectionSchema dbConnectionSchema) 
        : base(dbConnectionSchema)
    {
        CompanyRepositoryV2 = new CompanyRepositoryV2(dbConnectionSchema, DbTransaction);
    }

}
CompanyUnitOfWork

IDbConnection扩展方法

 #region Partition
    public static async Task<bool> IsPartitionTable(this IDbConnection connection, string tableName, string schema, IDbTransaction transaction = null, int? commandTimeout = null)
    {
        var sql = $"SELECT Count(t.object_id) FROM sys.tables t INNER JOIN sys.indexes i ON t.object_id = i.object_id AND i.type IN(0,1) INNER JOIN sys.partition_schemes ps ON i.data_space_id = ps.data_space_id WHERE t.name = '{tableName}' AND SCHEMA_NAME(schema_id) = '{schema}';";
        return await connection.ExecuteScalarAsync<int>(sql, transaction: transaction, commandTimeout: commandTimeout) > 0;
    }

    public static async Task<Int32> CreatePartitionValues(this IDbConnection connection, IEnumerable<string> rangeValues, string tableName, string schema, IDbTransaction transaction = null, int? commandTimeout = null)
    {
        var partitionSchemaFunctions = await connection.GetPartitionSchemaFunctions(tableName, schema, transaction, commandTimeout);
        if (partitionSchemaFunctions?.Count() == 0)
            throw new Exception($"The partition scheme and function is null or empty, target table: {tableName}, schema: {schema}.");
        var partition = partitionSchemaFunctions!.First();
        var affectedRows = 0;
        foreach (var rangeValue in rangeValues)
        {
            var sql = $"ALTER PARTITION SCHEME [{partition.Scheme}] Next used 'PRIMARY' ALTER PARTITION FUNCTION [{partition.Function}]() SPLIT RANGE('{rangeValue}');";
            affectedRows += await connection.ExecuteAsync(sql, transaction: transaction, commandTimeout: commandTimeout);
        }
        return affectedRows;
    }

    private static async Task<IEnumerable<PartitionEntity>> GetPartitionSchemaFunctions(this IDbConnection connection, string tableName, string schema, IDbTransaction transaction = null, int? commandTimeout = null)
    {
        var sql = $"SELECT ps.name [Scheme], pf.name [Function] FROM sys.partition_functions AS pf JOIN sys.partition_schemes AS ps ON pf.function_id = ps.function_id JOIN sys.indexes AS si ON ps.data_space_id = si.data_space_id AND si.type IN(0,1) JOIN sys.tables AS st ON si.object_id = st.object_id WHERE st.name = '{tableName}'  AND SCHEMA_NAME(schema_id) = '{schema}'";
        return await connection.QueryAsync<PartitionEntity>(sql, transaction: transaction, commandTimeout: commandTimeout);
    }

    public static async Task<IEnumerable<string>> GetPartitionValues(this IDbConnection connection, string tableName, string schema, IDbTransaction transaction = null, int? commandTimeout = null)
    {
        var sql = $"SELECT srv.value FROM sys.tables AS st JOIN sys.indexes AS si ON st.object_id = si.object_id JOIN sys.partitions AS sp ON si.object_id = sp.object_id AND si.index_id = sp.index_id JOIN sys.partition_schemes AS ps ON si.data_space_id = ps.data_space_id JOIN sys.partition_functions AS sf ON ps.function_id = sf.function_id JOIN sys.partition_range_values AS srv ON sf.function_id = srv.function_id and srv.boundary_id = sp.partition_number WHERE si.type <= 1 AND st.name = '{tableName}' AND SCHEMA_NAME(schema_id) = '{schema}'";
        return await connection.QueryAsync<string>(sql, transaction: transaction, commandTimeout: commandTimeout);
    }
    #endregion
DbConnectionExtension

sql

namespace API.Dapper;

public static partial class Sql
{
    public const string Insert = @"INSERT INTO [{schema}].[{0}]({1})VALUES({2})";
    public const string Select = @"SELECT {0} FROM [{schema}].{1} {2}";
    public const string Update = @"UPDATE [{schema}].[{0}] SET {1} {2}";
    public const string Delete = @"DELETE FROM [{schema}].[{0}] {1}";

    public static string BuildSchema(this string sql, string schema)
        => sql.Replace("{schema}", schema);
}
Sql

创建一个NetCore Console Application,引入上面Library工程,定义ComanyService

using API.Dapper.Entity;
using API.Dapper.UnitOfWork.V2;

namespace API.Dapper.Test.Services;

public class CompanyService
{
    public async Task<IEnumerable<Company>> GetAllAsync()
    {
        using var unitOfWork = new CompanyUnitOfWorkV2(new DemoDbConnectionSchema());
        return await unitOfWork.CompanyRepositoryV2.GetAllAsync();
    }

    public async Task<Company> GetAsync(Guid id)
    {
        using var unitOfWork = new CompanyUnitOfWorkV2(new DemoDbConnectionSchema());
        return await unitOfWork.CompanyRepositoryV2.GetAsync(id);
    }

    public async Task AddAsync(IEnumerable<Company> companies)
    {
        using var unitOfWork = new CompanyUnitOfWorkV2(new DemoDbConnectionSchema());
        await unitOfWork.CompanyRepositoryV2.AddAsync(companies);
        unitOfWork.Commit();
    }
}
CompanyService

在Program中通过DI调用UserService测试AddAsync及GetAsync

//DI
var services = new ServiceCollection();
services.AddTransient<CompanyService>();
var serviceProvider = services.BuildServiceProvider();

UserContext.Current.Init(new UserPrincipal
{
    UserId = Guid.Parse("00000000-0000-0000-0000-000000000001"),
    DisplayName = "Jack",
    Role = Role.Boss,
    HireDate = DateTime.UtcNow.Ticks
});

var companyService = serviceProvider.GetRequiredService<CompanyService>();
Console.WriteLine("-----------Start to add company to partition table--------------");
await companyService.AddAsync(new List<Company>
{
    new Company { Id = Guid.NewGuid(), Name ="Compnay01", Region ="AU" },
    new Company { Id= Guid.NewGuid(), Name ="Company02",Region="US"}
});
Console.WriteLine("-----------Add two companies to table----------------------------");
var companies = await companyService.GetAllAsync();
Console.WriteLine($"Get all users from tables \r\n {string.Join("\r\n", companies.Select(c => $"Id: {c.Id}, Name: {c.Name}, Region: {c.Region}"))}");
Console.ReadKey();
Program

运行结果

 查看DB table及分区函数

 分区Scheme及Function

 在database storage中查看数据分布情况

 OK,简单的分区即完成,但在真正的生产环境通常不会指定同一个data file[Primary]作为分区,可以指定多个data file,也可能会将多个data file放在不同磁盘上,从而提高IO。当前资源和精力有限,之后会有需要会逐渐完善。