Netty 原理解析与实战开发(二)

发布时间 2023-08-03 17:30:11作者: zolmk

Netty 原理解析与开发实战

八、ChannelHandler

8.1 ChannelHandler介绍

我们对数据的处理都是在ChannelHandler中完成的,Netty提供了众多ChannelHandler的实现类来帮助我们实现一些网络编程中通用功能,比如最常用的心跳检测、数据编解码等。

Netty中的ChannelHandler分为两类,一类处理入站数据,都实现了ChannelInboundHandler接口,一类处理出站数据,都实现了ChannelOutboundHandler接口。前面章节所说的编解码器是一类比较特殊的Handler,解码器负责将入站byte数据解码为对象,编码器负责将出站对象数据编码为byte数据。

ChannelHandler在进行数据处理的时候有匹配机制,意思就说如果当前Handler所注册的类型不能够处理,则直接交给下一个处理器。具体的匹配方法大致是:在运行时获取到类的模版参数列表,获得运行时模版类的Class对象,然后当要处理数据时,调用class.isInstance(msg)来判断当前数据是否是该模版类中模版的对象,如果是,则处理,如果不是,则交给下一个处理器。

上面的匹配方式也给了我一些启发,即如何模版类如何在运行时获取到模版所代表的类型,模版类即public class ClassA<T,I>(其中T和I被称为模版,这在写框架的时候应该很有用。示例代码如下:

// 示例父类
public class Role <T>{
    T t;
}
// 示例子类,继承了 Role<T>,该子类必须指定T的类型,不然会报错
public class Admin extends Role<String>{

}

// 下面的代码会输出 true false
Role<?> role = new Admin();
TypeParameterMatcher matcher = TypeParameterMatcher.find(role, Role.class, "T");
System.out.println(matcher.match(""));
System.out.println(matcher.match(0));

ChannelHandler为什么会一直传下去呢?
下图展示了ChannelHandler从调用Pipeline.fireChannelRead()函数后ChannelHandler是如何传播OP_READ事件的。

Alt text
如上图所示,OP_READ事件的传播过程如下:

  1. 当调用事件循环器检测到有OP_READ事件后,调用unsafe.read()来读取从java.nio.channels.SocketChannel中读取数据;
  2. 然后调用pipeline.fireChannelRead方法将数据传入处理器链中,(pipeline中包含了一个双向链表,链表头为head,链表尾为tail)
  3. 然后pipeline调用head.invokeChannelRead方法,该方法会主动调用该ChannelHandlerContext下的handler的channelRead方法(channelRead方法是我们经常来处理数据的地方),在该方法内,用户可自己选择是否将事件传播到下一个处理器中
  4. 当用户可以通过主动调用ChannelHandlerContext对象的fireChannelRead方法时,该方法会自动找到下一个入站事件处理器,然后调用该事件处理器的invokeChannelRead方法,从而将事件传播下去。

OP_READ事件处理流程

  1. 通过Selector.selectNow()检测到读事件
  2. 检查Channel.config是否读取,如果可读取,则调用channel.unsafe.read()方法
  3. unsafe通过RecvByteBufAllocator分配接收缓存,然后将javaChannel中的数据写入道该缓存中
  4. unsafe将读取到的数据放入当前Channel的pipeline中。

OP_WRITE事件处理流程

  1. 用户调用ctx.write()方法,最终会调用当前channel的unsafe的write方法将数据写入到ChannelOutboundBuffer中,ChannelOutboundBuffer是Netty的出站数据缓存。
  2. 用户调用ctx.flush()方法,最终会调用当前channel的unsafe的flush方法,将ChannelOutboundBuffer中的数据写入到javaChannel中。

8.2 超时处理、心跳检测

首先来说心跳检测,Netty提供了IdleStateHandler类来帮助我们完成心跳检测功能,该类实现了ChannelDuplexHandler类,说明该类是一个双向处理器,可以处理入站和出站事件,主要的构造方法如下:

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
  • observeOutput:如果设置该值为true,Netty将考虑是否是接受端接收数据缓慢的问题(通过检查出站缓存区)导致写超时。如果出站缓慢,则Netty不认为是这是空闲。

  • readerIdleTime:表示读空闲,多长时间未接收到数据后将触发READER_IDLE_STATE_EVENT事件

  • writerIdleTime:表示写空闲,多长时间未写入数据后将触发WRITER_IDLE_STATE_EVENT事件

  • allIdleTime:表示读写都空闲,多长时间未读入或者写入后将触发ALL_IDLE_STATE_EVENT事件

  • unit:表示以上参数的时间单位

一般需要将IdleStateHandler处理器加入到处理链的最前面,并且要注意的是该处理器仅仅按以上逻辑来触发一个用户事件,对事件的处理还需要用户自定义处理器来实现,并且该自定义处理器需要实现userEventTriggered方法。

示例代码如下:

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        IdleStateEvent event = (IdleStateEvent) evt;
        switch (event.state()) {
            case READER_IDLE: {
                // 处理读空闲
            }break;
            case WRITER_IDLE: {
                // 处理写空闲
            }break;
            case ALL_IDLE: {
                // 处理读写空闲
            }
        }
    } else {
        super.userEventTriggered(ctx, evt);
    }
}

读超时:读超时的实现有ReadTimeoutHandler,当发生读超时,会触发ReadTimeoutException异常并关闭通道。

写超时:写超时的实现有WriteTimeoutHandler,当写入数据时,如果任务过了设定的时间后还没开始做,则会触发WriteTimeoutExeception。底层通过定时任务+异步编程实现,提交一个定时任务,当时间到达后,如果给定的Promise的isDone()返回false,则表明发生写超时。

8.3 Netty日志框架

Netty内部有自己的日志框架,但是可配置性不高,因此我们在这里引入log4j日志框架。

首先,引入pom依赖包:

<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>2.0.5</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>2.0.5</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>2.0.5</version>
</dependency>

然后配置Log4j,resources/log4j.properties,完毕,Netty会自动探测log4j。
log4j配置详见log4j配置详解

8.4 IP地址过滤

Netty提供了IP地址过滤的Handler,只需配置一些规则,即可使用。主要的Handler如下:

  • RuleBasedIpFilter:基于规则的IP过滤器
// 创建规则
IpFilterRule ipFilterRule = new IpSubnetFilterRule("127.0.0.1", 32, IpFilterRuleType.ACCEPT);
new RuleBasedIpFilter(ipFilterRule);
  • IpSubnetFilter:Ip子网过滤器,也需要配置规则
IpSubnetFilterRule ipFilterRule = new IpSubnetFilterRule("127.0.0.1", 32, IpFilterRuleType.ACCEPT);
new IpSubnetFilter(ipFilterRule);
  • UniqueIpFilter:用来确保一个IP只建立一个连接的过滤器

8.5 大数据流的处理

Netty中提供了对大数据流的处理,当我们想向通道中写大数据流的时候可以使用netty提供的方法。几个主要的类如下:

  • ChunkedInput:是对输入数据的抽象,提供了ChunkedFileChunkedNioFileChunkedStream等实现。

  • ChunkedWriteHandler:Netty提供的大数据写入的处理器,该处理器接收ChunkedInput对象,并对该对象进行处理,将数据以块的方式写入到Channel中。

示例代码如下:

pipeline.addLast(new ChunkedWriteHandler())
    .addLast(new MyChannelHandler());

//然后可以直接使用下面方式写入文件等数据流,ChunckedWriteHandler会自动对ChunkedInput的子类进行处理
ctx.writeAndFlush(new ChunkedNioFile(FileChannel.open(Paths.get("t.txt"))));

8.6 数据安全,使用SSL/TLS协议

SSL/TLS 中文全称为安全套接字层(Secure Sockets Layer),首先我们需要生成密钥仓库,生成方式详见:使用 keytool 生成密钥对 + keytool 命令详解

一般认证方式有两种,双向认证和单向认证,双向认证需要双方都持有对方的证书,单向认证中,一般是客户端需持有服务器的证书(1.先导出服务器的cer证书 2.将服务器的cer证书导入到客户端的秘钥仓库中)。

在Netty中使用SSL/TLS协议非常简单,有以下几步(该方式较为第二步较为繁琐,可看下面的方法):

  1. 通过keytool生成密钥仓库,具体生成命令可以见使用 keytool 生成密钥对 + keytool 命令详解
  2. 编写工具类,该工具类的作用是读取密钥仓库初始化SSLContext对象。
    具体代码如下:

SslUtil.java


import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.*;
import java.security.cert.CertificateException;


public class SslUtil {

    public static SSLContext getClientContext(String pkPath, String password) {
        return getContext(pkPath, null, password);
    }
    
    public static SSLContext getServerContext(String pkPath, String caPath, String password) {
        return getContext(password, caPath, password);
    }

    /**
     * 获取SSL上下文
     * @param pkPath jks密钥仓库文件,单向验证中服务器方需要提供
     * @param caPath ca仓库文件
     * @param password 密码
     * @return SSLContext
     */
    public static SSLContext getContext(String pkPath, String caPath, String password) {
        KeyManagerFactory kmf = null;
        TrustManagerFactory tfm = null;
        if (pkPath != null) {
            try (InputStream pkIn = new FileInputStream(pkPath)) {

                KeyStore ks = KeyStore.getInstance("JKS");
                ks.load(pkIn, password.toCharArray());
                kmf = KeyManagerFactory.getInstance("SunX509");
                kmf.init(ks, password.toCharArray());
            } catch (KeyStoreException | CertificateException | IOException | NoSuchAlgorithmException | UnrecoverableKeyException e) {
                e.printStackTrace();
            }
        }
        if (caPath != null) {
            try (InputStream caIn = new FileInputStream(caPath)) {
                KeyStore tks = null;
                tks = KeyStore.getInstance("JKS");
                tfm = TrustManagerFactory.getInstance("SunX509");
                tfm.init(tks);
                tks.load(caIn, password.toCharArray());
            } catch (CertificateException | IOException | NoSuchAlgorithmException | KeyStoreException e) {
                e.printStackTrace();
            }
        }
        try {
            SSLContext sslContext = SSLContext.getInstance("TLS");
            sslContext.init(kmf == null ? null : kmf.getKeyManagers(),
                    tfm == null ? null : tfm.getTrustManagers(), null);
            return sslContext;
        } catch (NoSuchAlgorithmException | KeyManagementException e) {
            e.printStackTrace();
        }
        return null;
    }
}

SslContextFactory.java


public class SslContextFactory {
    private static SSLContext SSL_CONTEXT;

    public static SSLContext getSslContext(String pkPath, String caPath, String password) {
        if (SSL_CONTEXT != null) {
            return SSL_CONTEXT;
        }
        // 这里也可以根据情况调用SslUtil.getClientContext()或者SslUtil.getServerContext()方法
        SSL_CONTEXT = SslUtil.getContext(pkPath, caPath, password);
        return SSL_CONTEXT;
    }
}

  1. 将SslHandler添加到Pipeline中,SslHandler的构造方法中需要SSLEngine,可以通过SSLContext获取。代码如下:
// 提供jks密钥仓库文件的路径
String jksPath = Paths.get(System.getProperty("user.dir"), "nettyServer.jks").toString();
// 一般情况下在开发过程中,密钥仓库和证书仓库是同一个,其他的不一定。
SSLEngine sslEngine = SslContextFactory.getSslContext(jksPath, jksPath, "123456")
    .createSSLEngine();
    // 设置是否需要验证客户端
    //sslEngine.setNeedClientAuth(true);
    // 设置当前SSLEngine的模式,true-客户端模式 false-服务器模式
sslEngine.setUseClientMode(false);

//....

//一般将SslHandler添加到Pipeline的最前面
pipe.addLast(new SslHandler(sslEngine))
    //.addLast(...)

使用Netty提供的方法生成SslHandler

// 指定密钥仓库文件地址
Path path = Paths.get(System.getProperty("user.dir"), "nettyServer.jks");
try (InputStream is = Files.newInputStream(path)) {
    // 新建一个KeyStore对象,从流中加载密钥仓库,密钥仓库类型为JKS
    KeyStore keyStore = KeyStore.getInstance("JKS");
    keyStore.load(is, "123456".toCharArray());
    // 获取私钥
    PrivateKey privateKey = (PrivateKey) keyStore.getKey("nettyserver", "123456".toCharArray());
    // 获取证书
    X509Certificate certificate = (X509Certificate) keyStore.getCertificate("nettyserver");
    // 通过SslContextBuilder构建SslContext
    SslContext sslContext = SslContextBuilder.forServer(privateKey, certificate).build();
    // 通过SslContext创建新的SslHandler
    SslHandler sslHandler = sslContext.newHandler(PooledByteBufAllocator.DEFAULT);
}

以上是生成服务端Sslhandler的方法,如果是客户端,只需要提供X509Certificate对象,然后调用SslContextBuilder.forClient()方法即可。如下:

// 指定证书文件地址
Path path = Paths.get(System.getProperty("user.dir"), "nettyServer.cer");
try (InputStream is = Files.newInputStream(path)) {
    // 从文件中获取证书
    X509CertImpl x509Cert = new X509CertImpl(is);
    // 使用SslContextBuilder构建SslContext,这里trustManager可以传入多个
    SslContext sslContext = SslContextBuilder.forClient().trustManager(x509Cert).build();
    // 通过SslContext新建SslHandler
    SslHandler sslHandler = sslContext.newHandler(PooledByteBufAllocator.DEFAULT);
}

如果需要双向认证,则客户端可以进行如下配置:

 // 指定证书文件地址
Path path = Paths.get(System.getProperty("user.dir"), "nettyServer.cer");
Path jksPath = Paths.get(System.getProperty("user.dir"), "nettyClient.jks");
try (InputStream is = Files.newInputStream(path);
        InputStream jksIs = Files.newInputStream(jksPath)) {
    // 从文件中获取证书
    X509CertImpl x509Cert = new X509CertImpl(is);

    // 加载密钥和公钥
    KeyStore keyStore = KeyStore.getInstance("JKS");
    keyStore.load(jksIs, "123456".toCharArray());
    PrivateKey key = (PrivateKey) keyStore.getKey("nettyclient", "123456".toCharArray());
    X509Certificate x509Certificate = (X509Certificate) keyStore.getCertificate("nettyclient");

    // 使用SslContextBuilder构建SslContext
    SslContext sslContext = SslContextBuilder.forClient()
            .trustManager(x509Cert)
            // 客户端的私钥和公钥
            .keyManager(key, x509Certificate)
            .build();
    // 通过SslContext新建SslHandler
    SslHandler sslHandler = sslContext.newHandler(PooledByteBufAllocator.DEFAULT);
}

8.7 流量整形

前面有讲过使用ChannelOption配置Channel的高低水位,通过高低水位可以控制Channel的写入速度(写入前需要调用channel.isWritable()方法来判断通道是否可写,如果直接写,可能导入写入失败)。本小节来讲流量整形,它是流量控制的一种机制。流量整形是一种主动调整流量输出速率的措施。它的思路如下:

  • 写入:对通道的写入事件进行监控,来实时计算写入速率,然后每次写入前计算一个写入等待时间(该等待时间是根据写入速率求得的,如果速率过快,则等待时间越大),如果等待时间大于10ms,则会将数据缓存到一个队列中,再在流量计量算法的控制下“均匀”地发送这些被缓存的数据。当缓存队列满或者等待时间超过最大等待时间时,会设置channel.isWritable()方法返回false。
  • 读取:对通道的读取事件进行监控,计算得到读入速率,然后计算读等待时间wait,如果等待时间大于10ms,则设置通道的autoRead为false,并暂停通道读取,然后启动一个定时任务,等待wait时间后,设置autoRead为true,开始读通道。当暂停通道读事件的时候,会导致操作系统的端口缓存被占满(如果速率过快),因为TCP的滑动窗口机制,发送方会自动调节发送速率。

流量整形可能会增加延迟。

Netty提供了3种流量整形方式:

  • ChannelTrafficShapingHandler(通道流量整形):只对当前通道起作用
// writeLimit 写速率限制,单位bytes/s
// readLimit 读速率限制,单位bytes/s
// checkInterval 两次速率计算之间的时间间隔
// maxTime 流量速率过快时的最大等待时间
public ChannelTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime)
  • GlobalChannelTrafficShapingHandler(全局通道流量整形):一般用在服务器端,需要注意的是,全部通道需公用一个该类的对象。该类会对所有的通道和全局流量速率都做计算。
// executor:调度处理器,用来做速率计算、读写任务调度
// 下面的参数同上
public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
            long writeGlobalLimit, long readGlobalLimit,
            long writeChannelLimit, long readChannelLimit,
            long checkInterval, long maxTime)
  • GlobalTrafficShapingHandler(全局流量整形):该类和GlobalChannelTrafficShapingHandler的区别是,它只对全局流量做监控,对单个通道不做更细的监控。
// 同上
public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit,
            long checkInterval, long maxTime)

九、常用的协议

Netty支持的传输层协议有TCP、UDP等,本章主要介绍应用层协议,因此仅仅涉及Handler的使用。

Netty支持的应用层协议有:SSL、HTTP、WebSocket、FTP、SMTP等。

协议本身是一种人为设定的规约,就好比语言一样,两个人对话,只有使用相同的语言,才能更好的交流。

只需要在接受到数据后对数据进行解码,发送数据时对数据进行编码,而这两个操作均可在ChannelHandler中实现。

9.1 Http协议的使用

Netty中使用http协议非常方便,Netty本身提供了http的编解码器,只需要在pipeline中添加即可,需要注意的是:此类协议需要最先添加。

HttpServerCodec类继承了CombinedChannelDuplexHandler<HttpResponseDecoder, HttpRequestEncoder>,可以看到它是将Http编解码器进行了组合。也可以用HttpResponseDecoderHttpRequestEncoder替换HttpServerCodec,不过一般不必。

示例代码如下:

public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec())
                .addLast(new HttpObjectAggregator())
                .addLast(new MyHttpChannelInboundHandler());
    }

    public static class MyHttpChannelInboundHandler extends SimpleChannelInboundHandler<HttpObject> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
            if (msg instanceof HttpRequest) {
                HttpRequest request = (HttpRequest) msg;
                URI uri = new URI(request.uri());
                if ("/favicon.ico".equals(uri.getPath()))
                    return;
                ByteBuf byteBuf = Unpooled.buffer(100);
                byteBuf.writeCharSequence("Hello world!", StandardCharsets.UTF_8);

                HttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                        HttpResponseStatus.OK, byteBuf);
                httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
                HttpUtil.setContentLength(httpResponse, byteBuf.readableBytes());

                ctx.writeAndFlush(httpResponse);

            }
        }
    }
}

上面的代码中,创建了一个Channel初始化器,并且在pipeline中添加了HttpServerCodec对象、HttpObjectAggregator对象、自定义的MyHttpChannelInboundHandler对象。

HttpObjectAggregator的作用是将HttpMessage和HttpContent进行聚合,组成FullHttpRequest或者FullHttpResponse对象。

服务端Channel的初始化代码如下:

public class NettyServer {

    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup boss = null;
        NioEventLoopGroup worker = null;
        try {
            boss = new NioEventLoopGroup(1);
            worker = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .handler(new DetectChannelHandler())
                    .childHandler(new MyChannelInitializer());
            ChannelFuture sync = serverBootstrap.bind(8080).sync();
            sync.channel().closeFuture().sync();
        } finally {
            if (boss != null) boss.shutdownGracefully();
            if (worker != null) worker.shutdownGracefully();
        }
    }

}

可以看到,初始化Channel时,同一般的TCP服务没有区别。

9.2 Http2

Http2具有如下特点:

1.二进制分帧层

img

如图所示,二进制分帧层是加载SSL/TLS(如果有)之上的,Http2将所有传输的信息分隔为更小的消息和帧,并采用二进制格式来对它们进行编码。

2.数据流、消息和帧
新的二进制分帧机制改变了客户端和服务端之间数据交换的方式。为了说明这个过程,需要了解下面三个概念:

  • 数据流(Stream):已建立的连接内的双向字节流,可以承载一条或者多条消息
  • 消息(Message):与逻辑请求或响应消息对应的一系列帧
  • 帧(Frame):Http2的最小通信单位,每个帧都包含有帧头,至少也会标识出当前帧所属的数据流

这些概念之间的关系如下:

  • 所有通信都是在一条TCP连接上完成,此连接可以承载任意数量的双向数据流
  • 每一个数据流都有一个唯一的标识符和优先级信息,用于承载双向消息
  • 每条消息都是一条逻辑Http消息(例如请求或响应),包含一个或多个帧
  • 帧是最小通信单位,承载特定类型的数据,如Http标头、消息负载等。来自不同数据流的帧可以交错发送,再根据每个帧头的数据流表示标识重新组装。

3.请求与响应复用

  • 并行交错地发送多个请求,请求之间互不影响
  • 并行交错地发送多个响应,响应之间互不干扰
  • 使用一个连接并行发送多个请求或响应
  • 消除不必要的连接和提高了现有网络容量的利用率,从而减少页面加载时间

4.头部压缩算法(HPack)
头部压缩算法需要再Http客户端和服务器端进行如下操作:

  • 维护一份相同的静态表,包含常见的头部名称以及特别常见的头部名称和值的组合
  • 维护一份相同的动态表,可以动态地添加内容
  • 基于静态哈夫曼码表的哈夫曼编码

头部压缩算法原理就是使用静态表和动态表对头部字段进行替换(用表索引),然后对于动态表中不存在的内容,还可以使用哈夫曼编码来减小体积。

5.协商机制
虽然http2是比http1更加优秀的协议,但是目前仍有很多公司使用http1.1或http1.0,因此需要协商机制来保障不同协议之间的兼容。

通过协商机制,如果双方都支持http2,则会进行协议升级(Upgrade)。

这部分实战内容暂时空着,有点麻烦。。

9.3 WebSocket

使用WebSocket可以实现如在线聊天室、在线推送等功能。

WebSocket协议是建立在http协议之上的,在建立websocket时会进行协议升级(upgrade),WebSocket数据传输的最小单位为帧Frame,数据将被承载到一个或多个帧(如果数据过大)中进行传输。

  1. 因为WebSocket是建立在Http协议之上的,因此使用的编解码器仍然是HttpServerCodec
  2. 如果数据过大,WebSocket会将数据分为多个帧,因此需要将他们聚合起来,因此需要使用HttpObjectAggregator(http也类似)
  3. 然后使用WebSocketServerProtocolHandler解析WebSocket数据帧

初始化Pipeline代码如下:

// 设置WebSocket的端点
WebSocketServerProtocolHandler webSocketServerProtocolHandler = new WebSocketServerProtocolHandler("/hello",
                                    null, true, false, 10000, null);
ch.pipeline()
        // 添加心跳检测
        .addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS))
        // 添加http编解码器
        .addLast(new HttpServerCodec())
        // 添加http对象聚合器
        .addLast(new HttpObjectAggregator(8192))
        // 添加WebSocket帧编解码器
        .addLast(webSocketServerProtocolHandler)
        // 添加文本帧处理器,继承自SimpleChannelInboundHandler<TextWebSocketFrame>
        .addLast(new WebSocketTextHandler())
        .addLast(new EchoTextHandler());

WebSocket的帧类型
帧的抽象为WebSocketFrame接口。

  • BinaryWebSocketFrame:包含二进制数据
  • CloseWebSocketFrame:Close帧,用于关闭连接
  • ContinuationWebSocketFrame:包含延续的文本或二进制数据
  • PingWebSocketFrame:Ping帧
  • PongWebSocketFrame:Pong帧
  • TextWebSocketFrame:包含文本数据

与websocket服务器通信的浏览器代码如下:

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
	<script type="text/javascript">
		var socket;
		if (!window.WebSocket) {
			window.WebSocket = window.MozWebSocket;
		}
		if (window.WebSocket) {
			socket = new WebSocket("ws://localhost:8080/ws");
			socket.onmessage = function(event) {
				var ta = document.getElementById('responseText');
				ta.value = ta.value + '\n' + event.data
			};
			socket.onopen = function(event) {
				var ta = document.getElementById('responseText');
				ta.value = "连接开启!";
			};
			socket.onclose = function(event) {
				var ta = document.getElementById('responseText');
				ta.value = ta.value + "连接被关闭";
			};
		} else {
			alert("你的浏览器不支持 WebSocket!");
		}

		function send(message) {
			if (!window.WebSocket) {
				return;
			}
			if (socket.readyState == WebSocket.OPEN) {
				socket.send(message);
			} else {
				alert("连接没有开启.");
			}
		}
	</script>
	<form onsubmit="return false;">
		<h3>WebSocket 聊天室:</h3>
		<textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
		<br> 
		<input type="text" name="message"  style="width: 300px" value="Welcome to waylau.com">
		<input type="button" value="发送消息" onclick="send(this.form.message.value)">
		<input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天记录">
	</form>
	<br> 
	<br> 
	更多例子请访问  waylau.com
</body>
</html>

十、测试

10.1 使用EmbeddedChannel测试ChannelHandler

首先导入Junit测试框架依赖包

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13.2</version>
    <scope>test</scope>
</dependency>

然后编写测试类,在这里我们测试了FixedLengthFrameDecoder固定长度解码器

public class TestHandler {
    @Test
    public void test() {
        EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(8));
        ByteBuf buf = Unpooled.copiedBuffer("124523452435534541225320".getBytes(StandardCharsets.UTF_8));
        channel.writeInbound(buf);
        ByteBuf bfuf;
        while ((bfuf = channel.readInbound()) != null) {
            System.out.println(bfuf);
        }
    }
}

测试函数的输出如下:

UnpooledSlicedByteBuf(ridx: 0, widx: 8, cap: 8/8, unwrapped: UnpooledHeapByteBuf(ridx: 24, widx: 24, cap: 24/24))
UnpooledSlicedByteBuf(ridx: 0, widx: 8, cap: 8/8, unwrapped: UnpooledHeapByteBuf(ridx: 24, widx: 24, cap: 24/24))
UnpooledSlicedByteBuf(ridx: 0, widx: 8, cap: 8/8, unwrapped: UnpooledHeapByteBuf(ridx: 24, widx: 24, cap: 24/24))

可以看到,FixedLengthFrameDecoder对输入的ByteBuf对象进行了正确的解码。

10.2 使用Apache JMeter来对网络程序进行压力测试

测试TCP连接

  1. 首先从Apache Jimeter处下载 jimeter程序包,然后运行
  2. 点击保存,保存测试计划
  3. 右键测试计划,选择add->Threads->Threads Group,设置 Thread Properties
  • Number of Threads(users):线程的数量,开启多少个线程,即模拟多少个用户

  • Ramp-up period(seconds):加速时间,即上述每个线程启动的间隔时间

  • Loop Count:循环的次数

  1. 配置好Thread Group后,鼠标右键Thread Group,选择add->Sampler->TCP Sampler,设置TCP连接的Server Name or Ip 和 port、Text to Send。

  2. 配置监听,鼠标右键 TCP Sampler,选择add->Linsteners->Summary Report

  3. 启动测试任务

十一、案例分析

11.1 RocketMQ

11.2 Eclipse Vert.x

Vert.x的API很友好,可以用极少的代码量实现所需的功能,个人认为它对于小项目非常友好,因此也非常适合微服务,SpringBoot对于微服务来说可能有点庞大,或许Vert.x刚好。

Vert.x底层使用Netty实现的,支持高并发。如果项目需要快速上线,或许Vert.x是一个不错的选择。

Vert.x更像是一个工具的集合,它隐藏了复杂的实现细节,只暴露了简单的调用接口就可实现功能。

img