TCP连接(Netty)

发布时间 2023-06-09 17:33:07作者: 小侯学编程

启动类增加

public static void main(String[] args) {
	 SpringApplication application = new SpringApplication(CampusSecurityApplication.class);
application.run(args);
protocolThreadRun();
}
//服务端的,开启一个端口监听
private static void protocolThreadRun() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = CampusSecurityApplication.threadPoolExecutor();
        threadPoolTaskExecutor.submit(() ->
                TcpProtocolServer.tcpServer(8399, new ElectrictyTcpServerHandler()));
    }
	
    @Bean
    public static ThreadPoolTaskExecutor threadPoolExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(100);
        executor.setMaxPoolSize(300);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("executorService-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setKeepAliveSeconds(120);
        executor.initialize();
        return executor;
    }

监听

package ***;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

/**
 * @author
 * @title ElectrictyTcpServerHandler
 * @date 2023/6/1 18:48
 * @description TODO
 */
@ChannelHandler.Sharable
@Slf4j
public class ElectrictyTcpServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("【电表】客户端连接通道建立完成:" + ctx.channel().remoteAddress());
        ChannelMap.addChannel("1001", ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        log.info("【电表】========断开连接========" + ctx.channel());
        log.error("【电表】========断开连接========" + ctx.channel());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        String receiveData = MessageUtil.readTcpMessage((ByteBuf) msg);
        log.info("【电表】读取到电表客户端数据:" + receiveData);
        //获取唯一标识
        receiveData = receiveData.replace(" ", "");
        String snno = receiveData.substring(receiveData.indexOf("68") + 2, receiveData.indexOf("68") + 14);
        //put返回的值
        ChannelMap.dataMap.put(snno, receiveData);
        //通道put
        ChannelMap.addChannel(snno, ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 处理客户端异常
        ctx.close();
        log.info("【电表】连接异常了");
    }
}

ChannelMap类

package ***;

import ***;
import ***;
import io.netty.channel.Channel;
import io.netty.channel.socket.DatagramPacket;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author
 * @date 2021/5/31 15:11
 */
public class ChannelMap {

    protected static Map<String, String> dataMap = new HashMap<>();

    private static ConcurrentHashMap<String, Channel> channelHashMap = null;

    private static ConcurrentHashMap<String, DatagramPacket> packetHashMap = null;

    public static ConcurrentHashMap<String, Channel> getChannelHashMap() {
        return channelHashMap;
    }

    public static ConcurrentHashMap<String, DatagramPacket> getPacketHashMap() {
        return packetHashMap;
    }


    public static void addChannel(String key, Channel channel) {
        if (channelHashMap == null) {
            channelHashMap = new ConcurrentHashMap<>(10);
        }
        channelHashMap.put(key, channel);
    }

    public static Channel getChannel(String channelKey) {
        String key = channelKey;
        ConcurrentHashMap<String, Channel> channelHashMap = getChannelHashMap();
        if (!channelHashMap.containsKey(key)) {
            //不包含的话赋值初始值
            key = "1001";
        }
        Channel channel = channelHashMap.get(key);
        if (channel == null || !channel.isActive()) {
            ChannelMap.getChannelHashMap().remove(key);
            throw new BusinessException(ResultCode.DATA_ERROR, "连接中断");
        }
        return channel;
    }

    public static DatagramPacket getPacket(String key) {
        ConcurrentHashMap<String, DatagramPacket> packetHashMap = getPacketHashMap();
        return packetHashMap.get(key);
    }

}

MessageUtil

package ***;

import ***;
import ***;
import ***;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

/**
 * @author
 * @title MessageUtil
 * @date 2023/6/1 18:53
 * @description TODO
 */
@Slf4j
public class MessageUtil {
    /**
     * 发送tcp消息
     *
     * @param channelKey  通道标识
     * @param message     要发送的消息
     * @param timeout     超时时间,范围,秒
     * @param messageType 发送的消息类型
     * @return 返回客户端回送的消息
     */
    public static String sendTcpMessageAddListener(String channelKey, String message, Integer timeout, MessageType messageType) {
        Channel channel = ChannelMap.getChannel(channelKey);
        ByteBuf byteBuf = writeMessageToByteBuff(message, messageType);
        ChannelFuture channelFuture = channel.writeAndFlush(byteBuf);
        return messageListener(channelFuture, channelKey, timeout);
    }

    /**
     * 发送tcp消息,不监听返回
     *
     * @param channelKey
     * @param message
     * @param messageType
     * @date 2023/6/2 16:34
     * @author hhs
     */
    public static void sendTcpMessage(String channelKey, String message, MessageType messageType) {
        Channel channel = ChannelMap.getChannel(channelKey);
        ByteBuf byteBuf = writeMessageToByteBuff(message, messageType);
        ChannelFuture channelFuture = channel.writeAndFlush(byteBuf);
        messageListenerStatus(channelFuture,channelKey);
    }

    private static void messageListenerStatus(ChannelFuture channelFuture,String channelKey) {
        channelFuture.addListener((ChannelFutureListener) future -> {
            if (!future.isSuccess()) {
                throw new BusinessException(ResultCode.OPERATION_FAILURE, "发送消息到客户端失败!");
            } else {
                log.info("=========成功发送消息到客户端==========");
            }
        });
        String data = ChannelMap.dataMap.get(channelKey);
        if (StringUtil.isNotEmpty(data)) {
            ChannelMap.dataMap.remove(channelKey);
        }
    }

    public static String sendUdpMessageAddListener(String key, String message, Integer timeout, MessageType messageType) {
        Channel channel = ChannelMap.getChannel(key);
        DatagramPacket packet = ChannelMap.getPacket(key);
        ByteBuf byteBuf = writeMessageToByteBuff(message, messageType);
        DatagramPacket datagramPacket = new DatagramPacket(byteBuf, packet.sender());
        ChannelFuture channelFuture = channel.writeAndFlush(datagramPacket);
        return messageListener(channelFuture, key, timeout);
    }

    private static ByteBuf writeMessageToByteBuff(String message, MessageType messageType) {
        ByteBuf buff = Unpooled.buffer();
        switch (messageType) {
            case HEX:
                return buff.writeBytes(BinaryUtil.hexString2Bytes(message));
            case STRING:
                return buff.writeBytes(message.getBytes(CharsetUtil.UTF_8));
            default:
                return Unpooled.copiedBuffer(message, CharsetUtil.UTF_8);
        }
    }


    /**
     * 消息监控
     *
     * @param channelFuture channel监控线程
     * @param channelKey    ChannelMap中对应哪个的key
     * @param timeout       超时时间   单位:秒
     * @return 返回客户端返回的消息
     */
    private static String messageListener(ChannelFuture channelFuture, String channelKey, Integer timeout) {
        long startTime = System.currentTimeMillis();
        channelFuture.addListener((ChannelFutureListener) future -> {
            if (!future.isSuccess()) {
                throw new BusinessException(ResultCode.OPERATION_FAILURE, "发送消息到客户端失败!");
            } else {
                log.info("=========成功发送消息到客户端==========");
            }
        });
        String data;
        while (true) {
            long endTime = System.currentTimeMillis();
            data = ChannelMap.dataMap.get(channelKey);
            if (StringUtil.isNotEmpty(data)) {
                ChannelMap.dataMap.remove(channelKey);
                break;
            }
            if (endTime - startTime > timeout * 1000) {
                throw new BusinessException(ResultCode.FAILURE, "等待超时!");
            }
        }
        return data;
    }

    public static String readTcpMessage(ByteBuf msg) {
        byte[] bytes = new byte[msg.readableBytes()];
        msg.readBytes(bytes);
        String message = BinaryUtil.bytesToHex(bytes);
        msg.release();
        return message;
    }
}

TcpProtocolServer类

package ***;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.ResourceLeakDetector;
import lombok.extern.slf4j.Slf4j;


/**
 * @author
 * @date 2021/5/31 13:34
 */
@Slf4j
public class TcpProtocolServer {

    private TcpProtocolServer() {
    }

    public static void tcpServer(Integer port, ChannelHandler channelHandler) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(8);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(channelHandler);
                        }
                    });
            ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
            ChannelFuture cf = bootstrap.bind(port).sync();
            cf.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("tcp服务端启动失败,端口号:" + port + "错误消息:" + e.getMessage());
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

BinaryUtil类,转换16进制等

package ***;

/**
 * 字节转换工具类
 *
 * @author admin
 */
public class BinaryUtil {
    /**
     * 字节数组转16进制
     *
     * @param bytes 需要转换的byte数组
     * @return 转换后的Hex字符串
     */
    public static String bytesToHex(byte[] bytes) {
        StringBuilder sb = new StringBuilder();
        for (byte aByte : bytes) {
            String hex = Integer.toHexString(aByte & 0xFF);
            if (hex.length() < 2) {
                sb.append(0);
            }
            sb.append(hex);
        }
        return sb.toString().toUpperCase();
    }

    /**
     * hexString2Bytes
     * 16进制字符串转字节数组
     *
     * @param src 16进制字符串
     * @return 字节数组
     */
    public static byte[] hexString2Bytes(String src) {
        int l = src.length() / 2;
        byte[] ret = new byte[l];
        for (int i = 0; i < l; i++) {
            ret[i] = Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue();
        }
        return ret;
    }
}