C# 使用命名管道本地转移数据

发布时间 2024-01-11 11:02:46作者: WmW

最近在升级monogdb数据库,从3.4升级到7.0,由于版本跨度过大,不能跨库复制,C#的mongodb驱动也不能同时操作2个不同版本的库,而它自己的mongodump不但慢,而且依然不能支持从3.4到7.0,没办法,我只能自己想办法转移数据,于是就想到了命名管道,于是我写了个小项目封装了这个需求

本项目采用ProtoBuf进行数据序列化,命名管道进行通信

程序入口

using Test;

Console.Write("请输入1(接收端)或0(发送端):");
var cmd = Console.ReadLine();
try {
    if (cmd == "1") {
        Console.WriteLine("启动接收端");
        new Server().Start();
    } else if (cmd == "0") {
        Console.WriteLine("启动发送端");
        new Client().Start();
    } else {
        Console.WriteLine("错误的指令");
    }
} catch (Exception ex) {
    Console.WriteLine(ex.ToString());
}
Console.WriteLine("任务结束,按空格键退出");
Console.ReadKey();
View Code

发送端测试

using LocalTransfer;

namespace Test {
    internal class Client {
        static IEnumerable<User> GetData() {
            var user = User.GetTestInstance();
            int count = 1024 * 1024;
            for (int i = 0; i < count; i++) {
                yield return user;
            }
        }
        public void Start() {
            TransferClient transferClient = new();
            int capacity = 1024 * 1024;
            transferClient.Start(capacity);
            foreach (var list in GetData().ToBatch(GetSize, capacity)) {
                Console.WriteLine($"{DateTime.Now:HH:mm:ss:fff}发送:{list.Count}条");
                transferClient.Send(list);
            }
        }
        int GetSize(User user) {
            int strLen = user.Name.Length + user.Remark.Length;
            foreach (var address in user.Address) {
                strLen += address.Length;
            }
            return 4 + 1 + 8 + user.Photo.Length + strLen * 3;
        }
    }
}
View Code

接收端测试

using LocalTransfer;

namespace Test {
    internal class Server {
        public void Start() {
            TransferServer transferServer = new();
            transferServer.Start<List<User>>(1024, Receive);
        }
        public void Receive(List<User> list) {
            Console.WriteLine($"{DateTime.Now:HH:mm:ss:fff} 接收:{list.Count}条");
            User user = User.GetTestInstance();
            foreach (var p in list) {
                Compare(user.ID == p.ID);
                Compare(user.Name == p.Name);
                Compare(user.Gender == p.Gender);
                Compare(user.Birthdate == p.Birthdate);
                Compare(user.Remark == p.Remark);
                Compare(user.Address.SequenceEqual(p.Address));
                Compare(user.Photo.SequenceEqual(user.Photo));
            }
        }
        void Compare(bool b) {
            if (b == false) {
                throw new Exception("不相等");
            }
        }
    }
}
View Code

测试数据模型

using ProtoBuf;

namespace Test {
    [ProtoContract]
    public class User {
        [ProtoMember(1)]
        public int ID { get; set; }
        [ProtoMember(2)]
        public string Name { get; set; }
        [ProtoMember(3)]
        public byte Gender { get; set; }
        [ProtoMember(4)]
        public DateTime Birthdate { get; set; }
        [ProtoMember(5)]
        public string[] Address { get; set; }
        [ProtoMember(6)]
        public string Remark { get; set; }
        [ProtoMember(7)]
        public byte[] Photo { get; set; }

        /// <summary>
        /// 提供一个固定的对象方便测试转移后数据的一致性
        /// </summary>
        /// <returns></returns>
        public static User GetTestInstance() {
            User user = new User() {
                ID = 1,
                Name = "张三",
                Birthdate = new DateTime(2000, 1, 1),
                Gender = 1,
                Address = new string[] { "老家河南信阳", "目前山西太原" },
                Remark = "测试啊123_abc",
                Photo = new byte[1024],
            };
            Array.Fill<byte>(user.Photo, 255);
            return user;
        }
    }
}
View Code

命名管道的客户端

using ProtoBuf;
using System;
using System.IO.Pipes;

namespace LocalTransfer {
    public class TransferClient : IDisposable {
        NamedPipeClientStream client;
        BufferWriter bw;

        public void Start(int capacity) {
            client = new NamedPipeClientStream(".", "LocalTransfer", PipeDirection.Out);
            bw = new BufferWriter(capacity);
            client.Connect();
        }
        public void Send<T>(T obj) {
            bw.Reset();
            Serializer.Serialize(bw, obj);
            (byte[] buffer, int length) = bw.GetBuffer();
            client.Write(buffer, 0, length);
            client.Flush();
        }
        public void Dispose() {
            client.Close();
            client.Dispose();
        }
    }
}
View Code

命名管道的接收端

using ProtoBuf;
using System;
using System.IO.Pipes;

namespace LocalTransfer {
    public class TransferServer : IDisposable {
        NamedPipeServerStream server;
        public void Start<T>(int bufferBlockSize, Action<T> receiveObject) {
            server = new NamedPipeServerStream("LocalTransfer");
            server.WaitForConnection();
            LoopReader serverReader = new LoopReader(server.Read, bufferBlockSize);
            while (true) {
                (byte[] buffer, int len) = serverReader.Read();
                if (len > 0) {
                    receiveObject(Serializer.Deserialize<T>(buffer.AsMemory(0, len)));
                } else {
                    break;
                }
            }
        }
        public void Dispose() {
            server.Close();
            server.Dispose();
        }
    }
}
View Code

为protobuf-net序列化提供的IBufferWriter<T>实现

using System;
using System.Buffers;

namespace LocalTransfer {
    /// <summary>
    /// 实现IBufferWriter<T>接口
    /// </summary>
    public class BufferWriter : IBufferWriter<byte> {
        /// <summary>
        /// 缓冲区
        /// </summary>
        byte[] buffer;
        /// <summary>
        /// 缓冲区中数据的个数
        /// </summary>
        int length;
        /// <summary>
        /// 初始容量
        /// </summary>
        readonly int capacity;
        /// <summary>
        /// 创建一个缓存写入器
        /// </summary>
        /// <param name="capacity"></param>
        public BufferWriter(int capacity) {
            this.capacity = capacity;
            buffer = new byte[capacity];
        }
        /// <summary>
        /// 重置长度,复用该对象
        /// </summary>
        public void Reset() {
            length = 0;
        }
        /// <summary>
        /// 得到缓冲区中的数据
        /// </summary>
        /// <returns></returns>
        public (byte[], int) GetBuffer() {
            return (buffer, length);
        }
        /// <summary>
        /// 实现接口,
        /// </summary>
        /// <param name="count"></param>
        public void Advance(int count) {
            length += count;
        }
        /// <summary>
        /// 当缓冲区长度不足时,扩容缓冲区
        /// </summary>
        /// <param name="sizeHint"></param>
        void TryExpand(int sizeHint) {
            if (sizeHint + length > buffer.Length) {
                Array.Resize(ref buffer, buffer.Length + capacity);
            }
        }
        /// <summary>
        /// 返回缓冲区指定长度的可操作块
        /// </summary>
        /// <param name="sizeHint"></param>
        /// <returns></returns>
        public Memory<byte> GetMemory(int sizeHint = 0) {
            TryExpand(sizeHint);
            return buffer.AsMemory(length, sizeHint);
        }
        /// <summary>
        /// 返回缓冲区指定长度的可操作块
        /// </summary>
        /// <param name="sizeHint"></param>
        /// <returns></returns>
        public Span<byte> GetSpan(int sizeHint = 0) {
            TryExpand(sizeHint);
            return buffer.AsSpan(length, sizeHint);
        }
    }
}
View Code

为方便接收数据封装的一个数据接收器

using System;
using System.Collections.Generic;
using System.IO;

namespace LocalTransfer {
    /// <summary>
    /// 数据接收器
    /// </summary>
    internal class LoopReader {
        /// <summary>
        /// 缓冲区的容量
        /// </summary>
        public readonly int Capacity;
        /// <summary>
        /// 接收数据的缓冲区
        /// </summary>
        byte[] buffer;
        /// <summary>
        /// 缓冲区容器,当申请新的缓冲区时,用来放置已经填充过数据的缓冲区
        /// </summary>
        readonly List<byte[]> list;
        /// <summary>
        /// 外部读取数据的函数
        /// </summary>
        readonly Func<byte[], int, int, int> read;
        /// <summary>
        /// 创建一个数据接收器,用来接收未知长度的数据流
        /// </summary>
        /// <param name="read"></param>
        /// <param name="capacity"></param>
        public LoopReader(Func<byte[], int, int, int> read, int capacity) {
            this.read = read;
            Capacity = capacity;
            buffer = new byte[capacity];
            list = new List<byte[]>();
        }
        /// <summary>
        /// 读取流中的所有数据
        /// </summary>
        /// <returns></returns>
        public (byte[], int) Read() {
            int length = 0;
            while (true) {
                int len = read(buffer, 0, buffer.Length); //调用外部的读取函数,填充缓冲区
                length += len;
                if (len == buffer.Length) { //有时候一次读取不玩流中的数据
                    list.Add(buffer); //就将已经读取的缓冲区放到集合中
                    buffer = new byte[Capacity]; //然后申请新的缓冲区继续读取
                } else {
                    break;
                }
            }
            if (list.Count > 0) { //如果本次读取到的数据由多个缓冲区组成
                using (MemoryStream ms = new MemoryStream()) {
                    foreach (var bs in list) {
                        ms.Write(bs, 0, bs.Length); //就按照顺序将所有的缓冲区写到一个流中
                    }
                    ms.Write(buffer, 0, buffer.Length);
                    buffer = ms.GetBuffer(); //并指向第一个缓冲区,避免后续不够用
                    list.Clear();
                }
            }
            return (buffer, length);
        }
    }
}
View Code

对枚举器进行分批次的扩展方法

using System;
using System.Collections.Generic;

namespace LocalTransfer {
    public static class ExtendedMethod {
        /// <summary>
        /// 将枚举的数据按照每批次size条枚举返回
        /// </summary>
        /// <typeparam name="T">要枚举的数据泛型</typeparam>
        /// <param name="values">要枚举的数据</param>
        /// <param name="size">每批的数量</param>
        /// <returns>最多size条数据的集合</returns>
        public static IEnumerable<List<T>> ToBatch<T>(this IEnumerable<T> values, int size) {
            List<T> list = new List<T>();
            foreach (var val in values) {
                list.Add(val);
                if (list.Count == size) {
                    yield return list;
                    list.Clear();
                }
            }
            yield return list;
        }
        /// <summary>
        /// 将枚举的数据按照每批次不超过maxByteLength字节枚举返回
        /// </summary>
        /// <typeparam name="T">要枚举的数据泛型</typeparam>
        /// <param name="values">要枚举的数据</param>
        /// <param name="getSizeFunc">每个数据的字节数</param>
        /// <param name="maxByteLength">每批次不超过的字节数</param>
        /// <returns>不超过maxByteLength字节数据的集合</returns>
        public static IEnumerable<List<T>> ToBatch<T>(this IEnumerable<T> values, Func<T, int> getSizeFunc, int maxByteLength = 1024 * 1024 * 8) {
            List<T> list = new List<T>();
            int byteLength = 0;
            foreach (var val in values) {
                int size = getSizeFunc(val);
                if (byteLength + size > maxByteLength) {
                    yield return list;
                    list.Clear();
                    byteLength = 0;
                }
                list.Add(val);
                byteLength += size;
            }
            yield return list;
        }
    }
}
View Code