最近在升级monogdb数据库,从3.4升级到7.0,由于版本跨度过大,不能跨库复制,C#的mongodb驱动也不能同时操作2个不同版本的库,而它自己的mongodump不但慢,而且依然不能支持从3.4到7.0,没办法,我只能自己想办法转移数据,于是就想到了命名管道,于是我写了个小项目封装了这个需求
本项目采用ProtoBuf进行数据序列化,命名管道进行通信
程序入口
using Test; Console.Write("请输入1(接收端)或0(发送端):"); var cmd = Console.ReadLine(); try { if (cmd == "1") { Console.WriteLine("启动接收端"); new Server().Start(); } else if (cmd == "0") { Console.WriteLine("启动发送端"); new Client().Start(); } else { Console.WriteLine("错误的指令"); } } catch (Exception ex) { Console.WriteLine(ex.ToString()); } Console.WriteLine("任务结束,按空格键退出"); Console.ReadKey();
发送端测试
using LocalTransfer; namespace Test { internal class Client { static IEnumerable<User> GetData() { var user = User.GetTestInstance(); int count = 1024 * 1024; for (int i = 0; i < count; i++) { yield return user; } } public void Start() { TransferClient transferClient = new(); int capacity = 1024 * 1024; transferClient.Start(capacity); foreach (var list in GetData().ToBatch(GetSize, capacity)) { Console.WriteLine($"{DateTime.Now:HH:mm:ss:fff}发送:{list.Count}条"); transferClient.Send(list); } } int GetSize(User user) { int strLen = user.Name.Length + user.Remark.Length; foreach (var address in user.Address) { strLen += address.Length; } return 4 + 1 + 8 + user.Photo.Length + strLen * 3; } } }
接收端测试
using LocalTransfer; namespace Test { internal class Server { public void Start() { TransferServer transferServer = new(); transferServer.Start<List<User>>(1024, Receive); } public void Receive(List<User> list) { Console.WriteLine($"{DateTime.Now:HH:mm:ss:fff} 接收:{list.Count}条"); User user = User.GetTestInstance(); foreach (var p in list) { Compare(user.ID == p.ID); Compare(user.Name == p.Name); Compare(user.Gender == p.Gender); Compare(user.Birthdate == p.Birthdate); Compare(user.Remark == p.Remark); Compare(user.Address.SequenceEqual(p.Address)); Compare(user.Photo.SequenceEqual(user.Photo)); } } void Compare(bool b) { if (b == false) { throw new Exception("不相等"); } } } }
测试数据模型
using ProtoBuf; namespace Test { [ProtoContract] public class User { [ProtoMember(1)] public int ID { get; set; } [ProtoMember(2)] public string Name { get; set; } [ProtoMember(3)] public byte Gender { get; set; } [ProtoMember(4)] public DateTime Birthdate { get; set; } [ProtoMember(5)] public string[] Address { get; set; } [ProtoMember(6)] public string Remark { get; set; } [ProtoMember(7)] public byte[] Photo { get; set; } /// <summary> /// 提供一个固定的对象方便测试转移后数据的一致性 /// </summary> /// <returns></returns> public static User GetTestInstance() { User user = new User() { ID = 1, Name = "张三", Birthdate = new DateTime(2000, 1, 1), Gender = 1, Address = new string[] { "老家河南信阳", "目前山西太原" }, Remark = "测试啊123_abc", Photo = new byte[1024], }; Array.Fill<byte>(user.Photo, 255); return user; } } }
命名管道的客户端
using ProtoBuf; using System; using System.IO.Pipes; namespace LocalTransfer { public class TransferClient : IDisposable { NamedPipeClientStream client; BufferWriter bw; public void Start(int capacity) { client = new NamedPipeClientStream(".", "LocalTransfer", PipeDirection.Out); bw = new BufferWriter(capacity); client.Connect(); } public void Send<T>(T obj) { bw.Reset(); Serializer.Serialize(bw, obj); (byte[] buffer, int length) = bw.GetBuffer(); client.Write(buffer, 0, length); client.Flush(); } public void Dispose() { client.Close(); client.Dispose(); } } }
命名管道的接收端
using ProtoBuf; using System; using System.IO.Pipes; namespace LocalTransfer { public class TransferServer : IDisposable { NamedPipeServerStream server; public void Start<T>(int bufferBlockSize, Action<T> receiveObject) { server = new NamedPipeServerStream("LocalTransfer"); server.WaitForConnection(); LoopReader serverReader = new LoopReader(server.Read, bufferBlockSize); while (true) { (byte[] buffer, int len) = serverReader.Read(); if (len > 0) { receiveObject(Serializer.Deserialize<T>(buffer.AsMemory(0, len))); } else { break; } } } public void Dispose() { server.Close(); server.Dispose(); } } }
为protobuf-net序列化提供的IBufferWriter<T>实现
using System; using System.Buffers; namespace LocalTransfer { /// <summary> /// 实现IBufferWriter<T>接口 /// </summary> public class BufferWriter : IBufferWriter<byte> { /// <summary> /// 缓冲区 /// </summary> byte[] buffer; /// <summary> /// 缓冲区中数据的个数 /// </summary> int length; /// <summary> /// 初始容量 /// </summary> readonly int capacity; /// <summary> /// 创建一个缓存写入器 /// </summary> /// <param name="capacity"></param> public BufferWriter(int capacity) { this.capacity = capacity; buffer = new byte[capacity]; } /// <summary> /// 重置长度,复用该对象 /// </summary> public void Reset() { length = 0; } /// <summary> /// 得到缓冲区中的数据 /// </summary> /// <returns></returns> public (byte[], int) GetBuffer() { return (buffer, length); } /// <summary> /// 实现接口, /// </summary> /// <param name="count"></param> public void Advance(int count) { length += count; } /// <summary> /// 当缓冲区长度不足时,扩容缓冲区 /// </summary> /// <param name="sizeHint"></param> void TryExpand(int sizeHint) { if (sizeHint + length > buffer.Length) { Array.Resize(ref buffer, buffer.Length + capacity); } } /// <summary> /// 返回缓冲区指定长度的可操作块 /// </summary> /// <param name="sizeHint"></param> /// <returns></returns> public Memory<byte> GetMemory(int sizeHint = 0) { TryExpand(sizeHint); return buffer.AsMemory(length, sizeHint); } /// <summary> /// 返回缓冲区指定长度的可操作块 /// </summary> /// <param name="sizeHint"></param> /// <returns></returns> public Span<byte> GetSpan(int sizeHint = 0) { TryExpand(sizeHint); return buffer.AsSpan(length, sizeHint); } } }
为方便接收数据封装的一个数据接收器
using System; using System.Collections.Generic; using System.IO; namespace LocalTransfer { /// <summary> /// 数据接收器 /// </summary> internal class LoopReader { /// <summary> /// 缓冲区的容量 /// </summary> public readonly int Capacity; /// <summary> /// 接收数据的缓冲区 /// </summary> byte[] buffer; /// <summary> /// 缓冲区容器,当申请新的缓冲区时,用来放置已经填充过数据的缓冲区 /// </summary> readonly List<byte[]> list; /// <summary> /// 外部读取数据的函数 /// </summary> readonly Func<byte[], int, int, int> read; /// <summary> /// 创建一个数据接收器,用来接收未知长度的数据流 /// </summary> /// <param name="read"></param> /// <param name="capacity"></param> public LoopReader(Func<byte[], int, int, int> read, int capacity) { this.read = read; Capacity = capacity; buffer = new byte[capacity]; list = new List<byte[]>(); } /// <summary> /// 读取流中的所有数据 /// </summary> /// <returns></returns> public (byte[], int) Read() { int length = 0; while (true) { int len = read(buffer, 0, buffer.Length); //调用外部的读取函数,填充缓冲区 length += len; if (len == buffer.Length) { //有时候一次读取不玩流中的数据 list.Add(buffer); //就将已经读取的缓冲区放到集合中 buffer = new byte[Capacity]; //然后申请新的缓冲区继续读取 } else { break; } } if (list.Count > 0) { //如果本次读取到的数据由多个缓冲区组成 using (MemoryStream ms = new MemoryStream()) { foreach (var bs in list) { ms.Write(bs, 0, bs.Length); //就按照顺序将所有的缓冲区写到一个流中 } ms.Write(buffer, 0, buffer.Length); buffer = ms.GetBuffer(); //并指向第一个缓冲区,避免后续不够用 list.Clear(); } } return (buffer, length); } } }
对枚举器进行分批次的扩展方法
using System; using System.Collections.Generic; namespace LocalTransfer { public static class ExtendedMethod { /// <summary> /// 将枚举的数据按照每批次size条枚举返回 /// </summary> /// <typeparam name="T">要枚举的数据泛型</typeparam> /// <param name="values">要枚举的数据</param> /// <param name="size">每批的数量</param> /// <returns>最多size条数据的集合</returns> public static IEnumerable<List<T>> ToBatch<T>(this IEnumerable<T> values, int size) { List<T> list = new List<T>(); foreach (var val in values) { list.Add(val); if (list.Count == size) { yield return list; list.Clear(); } } yield return list; } /// <summary> /// 将枚举的数据按照每批次不超过maxByteLength字节枚举返回 /// </summary> /// <typeparam name="T">要枚举的数据泛型</typeparam> /// <param name="values">要枚举的数据</param> /// <param name="getSizeFunc">每个数据的字节数</param> /// <param name="maxByteLength">每批次不超过的字节数</param> /// <returns>不超过maxByteLength字节数据的集合</returns> public static IEnumerable<List<T>> ToBatch<T>(this IEnumerable<T> values, Func<T, int> getSizeFunc, int maxByteLength = 1024 * 1024 * 8) { List<T> list = new List<T>(); int byteLength = 0; foreach (var val in values) { int size = getSizeFunc(val); if (byteLength + size > maxByteLength) { yield return list; list.Clear(); byteLength = 0; } list.Add(val); byteLength += size; } yield return list; } } }