Netty 源码分析

发布时间 2023-11-27 13:32:20作者: hligy

ServerBootstrap

主要介绍服务端的启动流程以及如何绑定端口号、开启服务端 Socket 并让其进入接收连接状态的

启动模板如下;

try {
    ChannelFuture future = new ServerBootstrap().group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel sc) {
                    if (serverProperties.isSsl()) {
                        sc.pipeline().addFirst(sslContext.newHandler(sc.alloc()));
                    }
                    sc.pipeline().addLast(new IdleStateHandler(0, 0, serverProperties.getIdleTime()));
                    sc.pipeline().addLast(channelInitializer);
                    sc.pipeline().addLast(codecExceptionHandler);
                    sc.pipeline().addLast(serviceHandler);
                }
            })
            .option(ChannelOption.SO_BACKLOG, 4096)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .bind(port);
    future.sync().addListener((ChannelFutureListener) f -> {
        if (f.isSuccess()) {
            log.info("Started {} with Port {} in {} ms", serverName, port, System.currentTimeMillis() - startTime);
        }
    });
    serverChannels.add(future.channel());
    future.channel().closeFuture().sync();
} catch (Exception e) {
    log.error("Failed to start server. port:'{}'", port, e);
} finally {
    log.info("{} - Shutdown initiated...", serverName);
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
    log.info("{} - Shutdown completed.", serverName);
}

NioServerSocketChannel 启动过程

简略过程

  • 建立 Java 的 ServerSocketChannel 和 Selector,封装为 NioServerSocketChannel
  • 为 NioServerSocketChannel pipeline 添加 ServerBootstrapAcceptor(负责处理传入的 NioSocketChannel,为其添加启动模板的 childHandler)
  • 将其注册到 NioEventLoopGroup 中的一个 NioEventLoop 中(也就是将 serverSocketChannel 注册到这个 NioEventLoop 创建的 selector 上)

详细过程

其他方法都是进行配置,核心在 bind 方法,之后进入 io.netty.bootstrap.AbstractBootstrap#doBind 方法,它有两个核心方法,一个是 initAndRegister 方法,它负责创建 NioServerSocketChannel、初始化 Pipeline 以及将其注册到 bossGroup 其中的一个 NioEventLoop 中;另一个方法是 doBind0,它负责在前面所有流程完成后将 ServerSocket 绑定到端口号上,开始正式工作。

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // 没完成就异步注册,其实也是调用 doBind0,省略...
        });
        return promise;
    }
}

initAndRegister

首先看 initAndRegister 方法

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        // init 方法对 nioServerSocketChannel 的 pipeline 添加了一个 ServerBootstrapAcceptor(ChannelInboundHandlerAdapter 类型),它的构造函数里有 workerGroup,并重写了 channelRead 方法,在其触发时将 channel(这个 channel 其实是传递进来的 msg 强转得到的,只不过对于 bossGroup 来说,传进来的消息一定是建立连接后的 nioSocketChannel)
        init(channel); 
    } catch (Throwable t) {
        // 省略...
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

可以看到通过 initAndRegister 方法内通过 newChannel 创建了 NioServerSocketChannel(启动模板配置的 channel 参数,然后通过反射创建的),然后使用 group().register(channel) 对 NioServerSocketChannel 进行了注册,接着选择了 bossGroup 其中的一个 NioEventLoop 进行 register,毫无疑问,最终的调用是指向 AbstractUnsafe 的 register 方法:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

可以看到 register 内调用了有 register0,调用是进行了 inEventLoop 判断,此时是主线程启动的,所以实际是异步执行的,先不管异步同步,看看 register0 的实现:

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // 省略...
    } catch (Throwable t) {
        // 省略...
    }
}

可以看到 register0 内部又调用了 io.netty.channel.nio.AbstractNioChannel#doRegister,其中终于发现 javaChannel 被注册到 selector 了,不过它并没有注册兴趣

protected void doRegister() throws Exception {
    // 省略...
    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
    // 省略...
}

register0 还调用了 invokeHandlerAddedIfNeeded,因为前面是通过 ChannelInitializer 添加 ServerBootstrapAcceptor 的,所以需要 invokeHandlerAddedIfNeeded 来使用 PendingHandlerAddedTask 进行 ctx.handlerAdded 通知,此时 ChannelInitializer 的 handlerAdded 被调用就会让 ServerBootstrapAcceptor 真正添加到 pipeline 中。

自此,NioServerSocketChannel(ServerSocket)的创建、pipeline ChannelInitializer 的添加、注册到 selector 就算完成了。

doBind0

这一步很简单,就是将刚才的 NioServerSocketChannel 绑定到真正的地址。

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

channel 的 bind 是调用的 pipline 的,而 bind 是出站事件,因此 pipeline 的实现是从 tailContext 开始的,随着一路向前调,最后进入 headContext 的 bind,而后进入 io.netty.channel.AbstractChannel.AbstractUnsafe#bind:

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    // 省略...
    boolean wasActive = isActive(); // NioServerSocketChannel 的 isActive 实现是 isOpen() && javaChannel().socket().isBound()
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

doBind 方法将本地地址真正的绑定上了,此时的状态表示已经激活了,所以 pipeline.fireChannelActive() 触发,进而给 selectorKey 注册了 Accept 兴趣:

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ctx.fireChannelActive();

    readIfIsAutoRead();
}

private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) { // NioServerSocketChannel 和 NioSocketChannel 都是 true
        channel.read();
    }
}

readIfIsAutoRead 后面说过了,再说一遍,它通过 channel.read() -> pipeline.read() -> tailContext.read() 一系列消息传递最终将其 传递给 headContext.read(),它的 read 又将其转向 io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead 的 read:

// headContext.read()
public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}

// AbstractUnsafe#beginRead
public final void beginRead() {
    assertEventLoop();

    try {
        doBeginRead();
    } catch (final Exception e) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireExceptionCaught(e);
            }
        });
        close(voidPromise());
    }
}

// io.netty.channel.nio.AbstractNioChannel#doBeginRead
@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

最终在 AbstractNioChannel 的 doBeginRead 的结尾判断,如果兴趣中没有 readInterestOp 就添加,而 readInterestOp 这个值是通过 AbstractNioChannel 的构造函数传入的,它的两个 Nio 实现类 NioServerSocketChannel 和 NioSocketChannel 分别传入了 SelectionKey.OP_ACCEPT 和 SelectionKey.OP_READ,很明显嘛,NioServerSocketChannel 是用来接收连接的,所以兴趣是 SelectionKey.OP_ACCEPT,而 NioSocketChannel 是与客户端对应的服务端连接,是用来接收和发送客户端消息的,所以兴趣是读数据。

自此,绑定地址以及兴趣注册就完成了

NioSocketChannel 启动过程

首先是连接建立,建立的过程一定是在 NioEventLoop 的死循环由 NioServerSocketChannel 负责的,即 NioEventLoop 的 run 方法的 io.netty.channel.nio.NioEventLoop#processSelectedKeys,里面发现 selectorKey 兴趣是 accept(或 read)就会调用 unsafe 的 read 方法,而 nioServerSocketChannel 的 unsafe 调用的是 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read:

public void read() {
    // 省略...
    do {
        int localRead = doReadMessages(readBuf);
        if (localRead == 0) {
            break;
        }
        if (localRead < 0) {
            closed = true;
            break;
        }

        allocHandle.incMessagesRead(localRead);
    } while (continueReading(allocHandle));

    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
        readPending = false;
        pipeline.fireChannelRead(readBuf.get(i));
    }
    // 省略...
}

其他地方全部省略,主要关注 doReadMessages 和 fireChannelRead,doReadMessages 的实现在 NioServerSocketChannel 中,它通过 javaChannel(ServerSocketChannel)serverSocketChannel.accept() 接收了一个连接,并将其封装为 NioSocketChannel 后添加到 readBuf,之后 fireChannelRead 处理每一个刚才接收的 NioSocketChannel,而 nioServerSocketChannel 的 pipeline 中的 ServerBootstrapAcceptor 就重写了该方法:

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;

        child.pipeline().addLast(childHandler);

        setChannelOptions(child, childOptions, logger);
        setAttributes(child, childAttrs);

        try {
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
}

它首先将启动模板中设置 childHandler 添加到了 nioSocketChannel(nioServerSocketChannel 传进来的 msg 当然是子连接啦) 的 pipeline 中(完成 childHandler 到 nioSocketChannel 的绑定),又将 msg(NioSocketChannel)注册到 WorkerGroup 的其中一个 EventLoop 中,到此为止就完成了连接的建立以及如何将建立后的连接绑定到 EventLoop。

与 NioServerSocketChannel 在 channelActive 中触发 readIfIsAutoRead 来完成兴趣注册相同,NioSocketChannel 也是在 channelActive 完成兴趣注册的,只是兴趣变成了 Read。

NioEventLoop 事件循环 run 方法

启动 EventLoop

NioEventLoop 是一个 SingleThreadEventExecutor,它的 execute 方法如下

@Override
public void execute(Runnable task) {
    execute0(task);
}

private void execute0(@Schedule Runnable task) {
    ObjectUtil.checkNotNull(task, "task");
    // wakesUpForTask 直接返回的 true,是用于子类扩展的,比如通过 task instanceof 某个类型来决定是否理解唤醒 EventLoop
    execute(task, wakesUpForTask(task));
}

private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    addTask(task);
    if (!inEventLoop) {
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && immediate) { // 后面讲到唤醒时会说到
        wakeup(inEventLoop);
    }
}

当不在 eventLoop 中执行时就会判断线程是否启动,如果没有启动的话就进行启动线程:

private void startThread() {
    if (state == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { // 经典 cas 判断是否执行
            boolean success = false;
            try {
                doStartThread();
                success = true;
            } finally {
                if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() { // executor 是 ThreadPerTaskExecutor,就是每次都创建一个线程执行任务
        @Override
        public void run() {
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                // 省略...
            }
        }
    });
}

事件循环

可以看到调用到了 run 方法,这个 run 方法是在 NioEventLoop 中实现的,它是重点!重点!重点!

protected void run() {
    int selectCnt = 0;
    for (;;) {
        try {
            int strategy;
            try {
                // hasTasks() 判断 taskQueue 中是否有任务
                // calculateStrategy 方法返回值如下:hasTasks ? selectSupplier.get() : SelectStrategy.SELECT
                // 如果 hasTasks 为 true 的话就表示应该立即执行,所以 selectSupplier.get() 即实际调用的 selectNow() 是立即返回 selector 中
                // 准备好的 IO 事件,否则 hasTasks 为 false 就表示没有任务可以执行,从 selector 中获取 IO 事件可以是阻塞的(避免 CPU 自旋过度)
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                // 先看 hasTasks 为 true,此时返回的 strategy 是大于等于 0 的,即跳过 switch,直接进入下一阶段
                switch (strategy) {
                    case SelectStrategy.CONTINUE: // -2
                        continue;

                    case SelectStrategy.BUSY_WAIT: // -3
                    // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT: // -1
                        // 此时表示任务队列中没任务且此时 selector 没有准备好的 io 事件,即可以阻塞的获取 selector 中准备好的 io 事件
                        // 阻塞的时间由最将要发生的定时任务的执行时间执行,阻塞的时间如果到了,起码是可以执行这个定时任务的
                        // 如果没有定时任务,curDeadlineNanos 就为 -1,表示可以无限期阻塞,也就是 selector.select()
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            // NONE 是 Long.MAX_VALUE
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        // nextWakeupNanos 的作用是记录什么时间后 eventLoop 不在阻塞状态,用于 eventLoop 的唤醒
                        // 比如 selector.select(5000),即 阻塞 5 秒后再返回,而此时提交了一个新的定时任务 A
                        // 这个定时任务 A 的执行时间就在发生阻塞时的 2s 后执行,如果不中断 select 的话,这个定时任务 A 就超时执行了
                        // 为了避免这种情况发生,就需要记录阻塞的时间,并提供通过判断阻塞时间判断是否可以中断(selector.wakeup())select 的手段
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) { // 进行二次判断,如果还没有的话就阻塞获取准备好的 io 事件
                                // 如果 curDeadlineNanos 是 NONE 的话就调用 selector.select() 进行无限期的阻塞获取 io 事件
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            // 此时已经不阻塞了,但 nextWakeupNanos 没有被更新
                            // 因此要更新这个状态,避免后面的 execute 提交任务时对 selector 进行 weakup 唤醒
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                }
            } catch (IOException e) {
                // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                // the selector and retry. https://github.com/netty/netty/issues/8566
                rebuildSelector0();
                selectCnt = 0;
                handleLoopException(e);
                continue;
            }

            selectCnt++;
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            // 此时要么是有新任务导致被唤醒
            // 要么是 selector 阻塞获取到了 io 事件
            // 要么就是 switch 走的 default,即 selectNow 直接就获取到了 io 事件
            if (ioRatio == 100) {
                // ioRatio 为 100 表示不平衡 io 事件和非 io 事件执行的事件,
                // 每次都把所有可以处理的 io 事件、任务队列和定时任务队列的任务处理完再进行下一轮循环
                try {
                    if (strategy > 0) {
                        processSelectedKeys();
                    }
                } finally {
                    // Ensure we always run tasks.
                    ranTasks = runAllTasks();
                }
            } else if (strategy > 0) {
                // 判断 strategy > 0 即判断是否存在 io 事件,如果存在就先处理 io 事件,然后根据 ioRatio 计算非 io 事件执行时间
                // ioRatio 不为 100 就表示要平衡 io 事件和非 io 事件,该值越小 io 事件执行占的总时间就越小
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys(); // 首先处理 io 事件
                } finally {
                    // Ensure we always run tasks.
                    // 处理完 io 事件就计算其占用事件,根据该时间再反向计算出非 io 事件可以占用的执行时间
                    final long ioTime = System.nanoTime() - ioStartTime;
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {
                // 此时表示不存在 io 事件,尝试运行任务队列中的任务
                // 因为传的时间的 0,也就是说只运行最少数量的任务(大于 64 就是最多运行 64 个任务,详见后面 runAllTasks 方法的分析)
                ranTasks = runAllTasks(0); // This will run the minimum number of tasks
            }

            if (ranTasks || strategy > 0) {
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
                selectCnt = 0;
            } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                selectCnt = 0;
            }
        } catch (CancelledKeyException e) {
            // Harmless exception - log anyway
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
        } catch (Error e) {
            throw e;
        } catch (Throwable t) {
            handleLoopException(t);
        } finally {
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) { // 最后进行 EventLoop 是否关闭的检测
                    closeAll(); // 先关闭所有连接
                    if (confirmShutdown()) { // 取消所有定时任务,执行所有任务队列中的任务
                        return;
                    }
                }
            } catch (Error e) {
                throw e;
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
}

唤醒

NioEventLoop 的唤醒方法如下:

@Override
protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) { // 如果不是唤醒状态就唤醒,AWAKE 等于 -1
        selector.wakeup();
    }
}

回顾前面的 NioEventLoop execute 方法,如果方法需要立即执行,调用的就是 execute(Runnable)(包括提交的是定时任务,比如该定时任务提交了就发现已经可以执行了这种),最后的 !addTaskWakesUp && immediate 也就为 true,即尝试唤醒:

@Override
public void execute(Runnable task) {
    execute0(task);
}

@Override
public void lazyExecute(Runnable task) {
    lazyExecute0(task);
}

private void execute0(@Schedule Runnable task) {
    ObjectUtil.checkNotNull(task, "task");
    // wakesUpForTask 直接返回的 true,是用于子类扩展的,比如通过 task instanceof 某个类型来决定是否理解唤醒 EventLoop
    execute(task, wakesUpForTask(task));
}

private void lazyExecute0(@Schedule Runnable task) {
    execute(ObjectUtil.checkNotNull(task, "task"), false); // 此时不进行唤醒
}

private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    addTask(task);
    if (!inEventLoop) {
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}

而如果提交的是定时任务时就对其时间进行判断,如果执行时间在唤醒的时间前就提交任务并主动唤醒,否则就只提交任务,不主动唤醒:

private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if (inEventLoop()) {
        scheduleFromEventLoop(task); // 不明白为什么在 EventLoop 中就直接插到定时任务队列中,而不是像下面一样
    } else {
        final long deadlineNanos = task.deadlineNanos();
        // task will add itself to scheduled task queue when run if not expired
        if (beforeScheduledTaskSubmitted(deadlineNanos)) { // 如果是在阻塞唤醒的时间前执行,就直接执行,并主动唤醒
            execute(task);
        } else {
            lazyExecute(task); // 否则提交到任务队列就好,不需要主动唤醒(话说为什么要提交到任务队列而不是定时任务队列???)
            // Second hook after scheduling to facilitate race-avoidance
            if (afterScheduledTaskSubmitted(deadlineNanos)) {
                // 可能这个任务 B 只比前面的任务 A 晚一毫秒执行,但前面的任务执行 A 完这个任务才添加进去,导致这个定时任务 B 需要等到下一个定时任务 C 时间触发才执行,为了避免这种情况,需要添加到队列后再检查一遍执行时间
                execute(WAKEUP_TASK);
            }
        }
    }

    return task;
}

protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
    // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
    return deadlineNanos < nextWakeupNanos.get(); // 根据阻塞的唤醒时间判断定时任务是否应该在其之前执行
}

protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
    // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
    return deadlineNanos < nextWakeupNanos.get();
}

处理 io 事件

主要的流程说完了,但如何处理的 io 事件和非 io 事件还没解释,那么首先看 io 事件的处理

private void processSelectedKeys() {
    if (selectedKeys != null) {
        // processSelectedKeysOptimized 处理的是 netty 优化后的 selector,它与 java 原生不同的是存储 selectionKey 的数据结构
        // java 原生的是 hashSet,而 netty 的是数组,而数组的遍历是比 hashSet 快的
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

最终的调用指向 io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            // If the channel implementation throws an exception because there is no event loop, we ignore this
            // because we are only trying to determine if ch is registered to this event loop and thus has authority
            // to close ch.
            return;
        }
        // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
        // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
        // still healthy and should not be closed.
        // See https://github.com/netty/netty/issues/5125
        if (eventLoop == this) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
        }
        return;
    }

    try {
        int readyOps = k.readyOps();
        // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
        // the NIO JDK channel implementation may throw a NotYetConnectedException.
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
           unsafe.forceFlush();
        }

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        // 这里进行了兴趣判断,对于 NioServerSocketChannel 来说,readyOps & SelectionKey.OP_ACCEPT 为 true
        // 对于 NioSocketChannel 来说,readyOps & SelectionKey.OP_READ 为 true
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            // 重点
            // 重点
            // 重点
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

上面的 unsafe.read() 是重点!是重点!是重点!

  • 对于 NioServerSocketChannel 来说,调用的是 NioMessageUnsafe#read,里面完成了 NioSocketChannel 的创建、初始化、绑定以及兴趣注册(包括 pipeline 的 fireChannelRead 和 fireChannelReadComplete,只不过它的兴趣注册是在绑定后由 channelActive 注册的,前面提到过)
  • 对于 NioSocketChannel 来说,调用的是 NioByteUnsafe#read,里面完成了消息读取(pipeline 的 fireChannelRead 和 fireChannelReadComplete,其中 fireChannelReadComplete 调用 HeadContext 完成了 readIfIsAutoRead,即 Read 兴趣的注册)

处理非 io 事件

protected boolean runAllTasks(long timeoutNanos) {
    fetchFromScheduledTaskQueue(); // 将所有到达执行时间的定时任务全部迁移到 taskQueue 中
    Runnable task = pollTask(); // 从 taskQueue 中取一个任务
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }

    // 根据 timeoutNanos 计算执行的截止时间,到达这个时间后就不再继续执行,剩余的任务等下次再执行
    final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        safeExecute(task); // 执行任务,如果有异常就只打印日志,不抛异常

        runTasks ++; // 对任务计数

        // Check timeout every 64 tasks because nanoTime() is relatively expensive.
        // XXX: Hard-coded value - will make it configurable if it is really a problem.
        if ((runTasks & 0x3F) == 0) { // 每 64 个任务检测一次截止时间,因为 getCurrentTimeNanos() 是比较昂贵(耗时)的操作
            lastExecutionTime = getCurrentTimeNanos();
            if (lastExecutionTime >= deadline) { // 如果超过截止时间,就立即返回
                break;
            }
        }

        task = pollTask(); // 一直从任务队列中获取并执行
        if (task == null) {
            lastExecutionTime = getCurrentTimeNanos();
            break;
        }
    }

    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

EventLoop

EventExecutorGroup

io.netty.util.concurrent 包下,继承 ScheduledExecutorService

  • next 方法:用于获取一个 EventExecutor
  • execute 方法:next 获取一个 EventExecutor 并提交给它
  • schedule 方法:因为实现了 ScheduledExecutorService 接口,所以可以实现定时任务,同样是交给 EventExecutor 执行。

主要负责管理 EventExecutor 生命周期,其实它还重写了 shutdown 和 shutdownNow 将它们标记为废弃,并新建了一个 shutdownGracefully 来保证安全关闭所有的 EventExector。

EventExecutor

继承 EventExecutorGroup,很奇怪,如果是我的话我不这样写。

  • execute:真正的执行在这里,具体由子类指定

  • inEventLoop 方法:判断当前线程是否是在 EventExecutor 中执行。

主要负责任务的执行,以及判断线程是否是事件循环的线程。

EventLoopGroup

io.netty.channel 包下,EventLoopGroup 和 EventLoop 继承关系如下:

img

  • register 方法:注册一个 Channel 到 next 方法提供的 EventLoop 中

它主要是负责将 Channel 到注册 EventLoop

EventLoop

处理 Channel 有关的所有 IO 操作,一个 EventLoop 对应多个 Channel。

  • register 方法:注册一个 Channel 到当前 EventLoop。

register 方法虽然是从 EventLoopGroup 继承来的,但 EventLoop 才是真正的核心实现。

NioEventLoop

img

NioEventLoop 继承的比较多,核心是下面几个:

  • AbstractScheduledEventExecutor:netty 的 EventExeutor,提供定时任务的支持
  • SingleThreadEventExecutor:单线程执行所有提交的任务
  • SingleThreadEventLoop:单线程的 EventLoop

因此 NioEventLoop 是单线程处理它负责的 Channel 的 IO 等任务的!!!

AbstractEventExecutor

继承 AbstractExecutorService,并实现 EventExecutor 接口。

  • 重写 AbstractExecutorService submit 方法:修改返回类型为 Netty 自己实现的 Future(当然这个 Future 是继承的 java 的,否则重写改不了返回类型)
  • 重写 newTaskFor 方法:AbstractExecutorService 允许自定义 Task,因此从写为 Netty 的 PromiseTask
  • 重写一堆定时任务方法为 UnsupportedOperationException 不可用,这些方法是 EventExecutor 接口继承来的(会在 AbstractEventExecutor 的子类 AbstractScheduledEventExecutor 中重新支持)

AbstractScheduledEventExecutor

继承 AbstractExecutorService,提供了对定时任务的支持

  • 增加 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue 字段用来存放定时任务

  • 再次重写 AbstractEventExecutor 中被定义为不可用的定时任务方法。添加定时任务时如果不在事件循环中就需要判断一下事件,如果当前时间晚于任务时间,那么应该立即自行,否则提交到 执行 execute 方法,(由子类实现来确定如何执行,如何提交定时任务?channel.executor().schedule())

  • 增加一堆 scheduledTaskQueue 相关的任务方法,比如 pollScheduledTask(队头定时任务符合达到执行时间就返回,否则返回 null)等

SingleThreadEventExecutor

  • 增加 Queue<Runnable> taskQueue 字段:任务队列。提交任务的线程不是 EventLoop 线程,就放在这里。默认 LinkedBlockingQueue 类型,也预留了 protected 的 newTaskQueue 方法用于自定义队列
  • 重写 execute 方法:将任务提交到 taskQueue 字段,等待执行,如果 EventLoop 正在阻塞等待执行的状态,此时还有是否将 EventLoop 唤醒的功能(通过提交一个空的 Runnable 到 taskQueue 中),如果是 execute 方法,
  • 和 AbstractScheduledEventExecutor 对 scheduledTaskQueue 的支持一样,增加了一堆 taskQueue 相关方法,比如 pollTask 等(获取一个任务)
  • 增加 takeTask 方法:核心方法,但 NioEventLoop 从不调用该方法哦!!!作用是获取任务,来源包括 taskQueue 和 scheduledTaskQueue,如果线程被中断或者被唤醒,返回 null。方法如下:
protected Runnable takeTask() {
    assert inEventLoop(); // 断言保证是在事件循环中
    if (!(taskQueue instanceof BlockingQueue)) { // 必须是阻塞队列,可以让线程阻塞,阻塞天然线程安全,非常好用
        throw new UnsupportedOperationException();
    }

    BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
    for (;;) {
        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();// 先获取定时任务,因为可能已经过期了
        if (scheduledTask == null) { // 如果没有就获取任务队列中的
            Runnable task = null;
            try {
                task = taskQueue.take();
                if (task == WAKEUP_TASK) { // 是被唤醒,返回空
                    task = null;
                }
            } catch (InterruptedException e) {
                // Ignore
            }
            return task;
        } else {
            long delayNanos = scheduledTask.delayNanos();
            Runnable task = null;
            if (delayNanos > 0) { // 有定时任务,但没到执行时间,从任务队列中获取
                try {
                    // 尝试在该定时任务执行前获取任务队列的任务,如果期间有新任务,poll 就返回该任务了
                    task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
                } catch (InterruptedException e) {
                    // Waken up.
                    // 这里的阻塞是可能被中断的
                    // 因为新加入定时任务可能比该定时任务还早执行,不能接着等任务队列的 poll 被新任务触发
                    // 所以需要中断
                    return null;
                }
            }
            if (task == null) {
                // We need to fetch the scheduled tasks now as otherwise there may be a chance that
                // scheduled tasks are never executed if there is always one task in the taskQueue.
                // This is for example true for the read task of OIO Transport
                // See https://github.com/netty/netty/issues/1614
                // 如果到该定时任务执行时间任务队列还没有新任务,
                // 就将 scheduledTaskQueue 中满足执行条件的定时任务转移到任务队列 taskQueue 中,包括当前这个定时任务
                fetchFromScheduledTaskQueue();
                // 这个 task 还是可能为空的,因为前面转移可能失败
                task = taskQueue.poll();
            }

            if (task != null) {// 如果为 null 就一直获取,直到成功
                return task;
            }
        }
    }
}
  • wake up 机制:有一个叫 WAKEUP_TASK 的 Runable 字段,什么都不做,目的是唤醒正在阻塞的任务队列 poll,在任务队列 poll 的时候也会忽略这个 Runnable,队列任务不在 eventLoop 线程且需要立即执行的才会触发提交 WAKEUP_TASK(提交定时任务,此时定时已经到期,也会触发;队列任务不在 EventLoop 线程也会触发)。下面是相关方法:
@Override
public void execute(Runnable task) {
    execute0(task);
}

@Override
public void lazyExecute(Runnable task) {
    lazyExecute0(task);
}

private void execute0(@Schedule Runnable task) {
    ObjectUtil.checkNotNull(task, "task");
    execute(task, wakesUpForTask(task)); // wakesUpForTask 默认为 true,可以重写。
}

private void lazyExecute0(@Schedule Runnable task) {
    execute(ObjectUtil.checkNotNull(task, "task"), false);
}

private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    addTask(task);
    // ...

    if (!addTaskWakesUp && immediate) { // NioEventLoop 的 addTaskWakesUp 默认为 false
        wakeup(inEventLoop);
    }
}

protected boolean wakesUpForTask(Runnable task) { // 子类可以用该方法判断 Runnable 是否应该唤醒 EventLoop
    return true;
}

protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop) {
        // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
        // is already something in the queue.
        // 唤醒线程
        taskQueue.offer(WAKEUP_TASK);
    }
}

SingleThreadEventLoop

继承 SingleThreadEventExecutor,实现 EventLoop

主要是重写了 EventLoop 的 register 方法,将其调用转向 Channel 的 io.netty.channel.Channel.Unsafe 的 register(最后转到具体的 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#doRegister 实现),其中 java channel 注册的 selector 由 NioEventLoop 的 unwrappedSelector() 方法提供

Channel

netty 中所有 channel 的 IO 操作全是异步的,因此提供了 ChannelFuture 进行 IO 操作成功、失败和取消相关的通知。

Channel 的 实现非常多,最常用的 NioServerSocketChannel 和 NioSocketChannel 的继承结构如下:
img

  • channel:定义基本的操作,比如 id()、eventLoop()、pipeline()。

  • ServerChannel:表示它可以接受连接并创建子 Channel。

  • ServerSocketChannel:表示可以接受 TCP/IP 的服务器通道

  • AbstractChannel:Channel骨架,抽象类,实现 id()、eventLoop()、pipeline() 等方法,还有重要的 doRegister 等方法

  • AbstractNioChannel:基于Selector 的Nio Channel 实现,比如它的 doRegister 实现了将 javaChannel 注册到 Selector 的操作

  • SocketChannel:表示 TCP/IP socket 通道

Netty 的 channel 是如何避免被 gc 的呢,在 register 向 selector 注册时,待完成!!!

Unsafe

这个类不允许用户使用

  • read:负责消息的读取,并将拿到的消息通过 pipeline 的 fireChannelRead 和 fireChannelReadComplete 通知给 channelHandler,在 NioEventLoop 中如果触发了 selector 的兴趣(Accept 或 Read)就会触发这个方法。实现类是 NioByteUnsafe 就是在读取 NioSocketChannel 客户端发来的数据;实现类是 NioMessageUnsafe 就是 serverSocket.accept() 接收一个新链接。
  • register(EventLoop, ChannelPromise):负责真正的注册
  • write:写到一个缓存区(io.netty.channel.ChannelOutboundBuffer,负责放置 ByteBuffer 对象)
  • flush:从缓存区取数据并写入连接通道

ChannelHandler

处理或拦截 IO 事件,并将其转发到 ChannelPipline 中的下一个 ChannelHandler

  • handlerAdded 方法:将 ChannelHandler 添加到实际 ChannelHandlerContext(ChannelPipeline) 后调用。ChannelInitializer 就是通过它来调用 initChannel 方法来注册其中的 channelHandler 的,最后将其自身 remove(this) 了
  • handlerRemoved 方法:将 ChannelHandler 添加到实际 ChannelHandlerContext 后调用
  • @ChannelHandler.Sharable 注解:文档型注释,表示被标注的 ChannelHandler 类可以共享(即单例),运行时,Netty 会给每个 ChannlHandler 加一个用于检测的 added 变量(在 ChannelHandlerAdapter 中定义的,这个变量不适用 volatile,只作为健康性检查,所以没问题),每次添加 ChannlHandler 时都会校验依次这个变量,如果重复添加(即 added 为 true)且没被这个注解标注,就会报错显示这个 ChannelHandler 被重复添加到不同的 ChannelPipline 里了

ChannelHandlerAdapter

是适配器,所以把上面的方法都用空实现代替了

ChannelInboundHandler

入站 ChannelHandler,负责状态变动的通知回调

  • channelRegistered 方法:ChannelHandlerContext(实际是 ChannelPipeline,下面省略)的 Channel 被注册到 EventLoop
  • channelUnregistered 方法:从 EventLoop 中注销
  • channelActive 方法:被激活
  • channelRead 方法:读取到一个消息
  • channelReadComplete 方法:一个消息很长,channelRead 调用了 n 次才读完,读完整个消息时调用该方法
  • userEventTriggered:用户事件被触发
  • exceptionCaught:入站出现异常时。注意!!!出站不调用该方法,出站的 write 需要用添加回调来判断异常是否存在

ChannelInboundHandlerAdapter

继承自 ChannelHandlerAdapter,实现 ChannelInboundHandler。

ChannelInboundHandler 的适配器,提供默认实现,即 ctx.fireXXX

ChannelOutboundHandler

出站 ChannelHandler,收到 IO 出站操作时通知此 ChannelHandler

  • read 方法:比较神奇的是,出站处理器竟然有 read 方法?实际 Netty 连接成功后对 selector 注册读事件就是通过它实现的,具体是在连接是 Accept 即接收就绪时触发 channelActive,首先来到 DefaultChannelPipeline 的 第一个节点 HeadContext,触发它的 channelActive ,此时会先向后面的 节点 fire 进行通知,然后就会检测 Channel 是否 autoRead(默认是),如果是就调用 channel.read() 方法,channel.read() 方法实际调用的是 pipeline 的 read,而 pineline 实际是调用的 tailContext 的read,这个 read 是 outbound 即出站处理器的方法,所以向前传递,具体方法如下:
final ChannelHandler handler = handler();
final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
if (handler == headContext) {
    headContext.read(this);
} else if (handler instanceof ChannelDuplexHandler) {
    ((ChannelDuplexHandler) handler).read(this);
} else {
    ((ChannelOutboundHandler) handler).read(this);
}

可以看到判断了如果属于出站处理器就调用,因此如果你重写了 ChannelOutboundHandler 的 read 方法,并且没有调用 ctx.read() 来向前传递通知,那么头结点 HeadContext 就收不到 read 通知,因此一般该方法和其他方法一样都是默认的 fire 调用,接着说头结点 HeadContext 的 read 收到通知(被调用)时,会调用 channel 的 unsafe 字段进行 beginRead 向实际的 selectionKey (AbstractNioChannel)注册 read 类型事件(这个 unsafe 也检测了 inEventLoop,只是它用的是 assert 强制检测,这样挺合理的,因为 read 是只有自己才关心的,和其他人没关系)

为什么ChannelOutboundHandler会有read方法?auto-read参数的意义?DefaultPipeline的数据流到底是怎样的?_channelinboundhandler 中的channelread和channeloutboun_枫_Maple的博客-CSDN博客

ChannelOutboundHandlerAdapter

与 ChannelInboundHandlerAdapter 类似,继承自 ChannelHandlerAdapter,实现 ChannelOutboundHandler。

ChannelOutboundHandler 的适配器,提供默认实现,即 ctx.fireXXX

ChannelDuplexHandler

同时继承了 ChannelInboundHandlerApapter (默认都是 ctx.fire 向下传递)并实现 ChannelOutboundHandler(没继承ChannelOutboundHandlerAdapter 是因为 java 不能继承多个类),实现的 ChannelOutboundHandler 和 ChannelOutboundHandlerAdapter 一样都是调用 ctx 向上传递

常用的 SimpleChannelInboundHandler 和 MessageToMessageCodec

两个抽象出来的类,把常见的用法封装到一起,并提供抽象方法使用,比如 SimpleChannelInboundHandler 的 channelRead0 方法,自行后会自动释放 channelRead0 参数(如果这个参数对象属于引用计数类型)。

还比如 MessageToMessageCodec 就是 ChannelDuplexHandler 的一种常见的编解码封装。

它们的继承关系如下:

img

img

特殊的 ChannelInitializer

继承自 ChannelInboundHandlerAdapter,负责将一些 channelHandler 添加到 pipeline 中,在 channel 注册到 eventLoop 时执行,当其添加到 pipeline 后,handlerAdded 方法被通知执行,之后 ChannelInitializer 调用 initChannel 完成其中一些 ChannelHandler 的注册,全部添加后再将自身 remove(remove(this))

注意这个类被 @ChannelHandler.Sharable 标注,因此它必须是“单例”的

ChannelHandlerContext

  • channel 方法:获取其 pipeline 中的 channel,作为便捷方法
  • executor 方法:获取其 channel 中的 eventLoop,作为便捷方法
  • name 方法:获取设置到 pipeline 时的名字,不能重复,不写就用默认命名规则 simpleClassName + #0,默认的还重复(有缓存)就生成(生成规则忘了,不重要,忘了就忘了)
  • handler 方法:获取具体的 ChannelHandler
  • alloc 方法:分配的 ByteBufAllocator,用于分配 ByteBuf

它实现了 ChannelInboundInvoker 接口(传到尾节点 TailContext,如果最后传到这里了,参数如果属于计数类型就会释放,保证内存安全):

  • fireChannelActive:传递给 pipeline 中当前 ctx 下一个包含 ChannelInboundHandler 的 ctx
  • fireChannelRead:和上面相同,传递读取到的消息
  • fireExceptionCaught:只有它和其他 fire 方法不同,异常如果处理完了就不会传递了!!!

因为是双向链表的节点,所以还实现了 ChannelOutboundInvoker 接口(头结点 HeadContext):

  • read 方法:前面提到过,请求从通道中读取数据到入站缓冲区,真的读到数据了就触发 ChannelInboundHandler 的 channelRead 方法
  • write 方法:传递给 pipeline 中当前 ctx 前一个包含 ChannelOutboundHandler 的 ctx(到头结点 HeadContext 停,该调用 unsafe 的调用 unsafe,比如 write、flush 等)

AbstractChannelHandlerContext

定义前后指针 prev/next、以及所述的 pipeline

DefaultChannelHandlerContext

定义 handler 字段,重写 handler() 方法,约等于什么都没干。

HeadContext 和 TailContext

在 DefaultChannelPipeline 中定义的双向链表头尾节点(当然继承了 AbstractChannelHandlerContext)

  • HeadContext:实现 ChannelInboundHandler 和 ChannelOutboundHandler,实现 Inbound 是负责日常入站的最初的 ctx.fire 以及其他自动操作,实现 outbound 是将出站的最后操作委托给 unsafe。比如在 io.netty.channel.DefaultChannelPipeline.HeadContext#channelRegistered 时检查是否需要为当前 pipeline 添加 channelInitializer(实际是委托给 PendingHandlerCallback,由其添加 channelInitializer 添加到 pipeline,进而触发 handlerAdded 来执行 initChannel 里 ChannelHandler 注册的操作)。还比如 channelActive 中的自动 read,详见前面提到 HeadContext 中 channel.read() 调到 TailContext 后再往回调最后触发 head.read() 即 unsafe.read() 的解释

  • TailContext:实现 ChannelInboundHandler,因为是尾节点,所以默认将一些传递到此的计数资源释放,以及异常到此处时告知用户“你没有默认的异常处理”,对于其他不需要释放计数资源的方法就是空实现

ChannelPipeline

Channelhandler 列表,用于处理入站和出站信息,入站从左向右,出站从右向左

实现 ChannelInboundInvoker 和 ChannelOutboundInvoker 接口来完成入站和出站处理

  • addLast 方法:添加 channelHandler 到 pipeline 的最后一个位置,最常使用的添加 handler 方法,名字不允许重复,否则报 IllegalArgumentException 异常
  • remove 方法:从 pipeline 中删除一个 channelHandler
  • fireXXX 方法:实现 ChannelInboundInvoker 的一堆入站通知
  • write、writeAndFlush、read 等方法:实现 ChannelOutboundInvoker 的一堆出站方法

DefaultChannelPipeline

channelpipeline 的默认实现,实现入站和出站 ChannelHandler 的双向链表,每个节点是 ChannelHandlerContext 类型,由 ctx 中的 handler 确定是否向右通知入站或向左通知出站。

  • head 字段:HeadContext 类型,作为双向链表的头指针,作用详见前面
  • tail 字段:TailContext 类型,作为双向链表的尾指针,作用详见前面
  • pendingHandlerCallbackHead 字段:PendingHandlerCallback 类型,在 channel 在 boss 注册时设置为 PendingHandlerAddedTask(存疑),在 io.netty.channel.AbstractChannel.AbstractUnsafe#register0 中间接(一层调一层)调用了 io.netty.channel.DefaultChannelPipeline#callHandlerAddedForAllHandlers,进而执行了 pendingHandlerAddedTask 的 executre 方法将 ChannelInitializer 中待注册的 handler 全部添加 pipeline

ByteBuf

java.nio 下的 ByteBuffer 不好用,所以 Netty 自己搞了一个。

介绍

特性

几个重要的特性:池化、引用计数、零拷贝(不是)

  • 继承了 ReferenceCounted 接口实现引用计数。refCnt 方法获取对象的引用计数,为 0 表示对象解除占用;retain 方法对引用计数加 1;release 方法对引用计数减 1,减完为 0 时返回 true 表示对象解除占用
  • 继承了 ByteBufConvertible 接口实现 asByteBuf 方法将当前对象转为 ByteBuf

实现

ByteBuf 的实现主要是 ByteBuf -> AbstractByteBuf -> AbstractReferenceCountedByteBuf

  • AbstractByteBuf:ByteBuf 骨架,实现 readerIndex、writerIndex、maxCapacity 等基础功能
  • AbstractReferenceCountedByteBuf:实现了 ReferenceCounted 接口的引用计数功能,比如 refCnt、retain 和 release

分类

实现类有两种分类方法,按照内存分配方式不同分为:

  • XxxHeapByteBuf:堆内存分配的 ByteBuf
  • XxxDirectByteBuf:直接内存分配的 ByteBuf

按照池化非池化又分为:

  • PooledByteBuf:池化的,PooledDirectByteBuf、PooledHeapByteBuf、PooledUnsafeDirectByteBuf 和 PooledUnsafeHeapByteBuf
  • UnpooledXxxByteBuf:非池化的

特殊的 ByteBuf

UnreleasableByteBuf

包装一个 ByteBuf 后返回,目的是防止用户增加或减少其包装的 ByteBuf 引用计数,实际上就是重写了控制引用技术的相关方法(比如 retain、release 等),什么都不做而已,还有 slice 分片、duplicate 复制等新建副本之类的也用 UnreleasableByteBuf 包装后再返回。

可以用 io.netty.buffer.Unpooled#unreleasableBuffer 方法调用,通常用于一些特殊的常量 ByteBuf,将其包装为 UnreleasableByteBuf 防止其他人错误的销毁

CompositeByteBuf

组合多个 ByteBuf,将其抽象成一个

有趣的 ByteBufHolder

不是 ByteBuf 的子类,而是继承 ReferenceCounted,表示发送和接收的数据包,默认的 DefaultByteBufHolder 里有一个 ByteBuf 类型的 data 字段作为组合设计模式使用,WebSocketFrame 等数据包就是它的子类

  • content 方法:返回此 ByteBufHolder 持有的 ByteBuf
  • copy 方法:返回一个深拷贝 ByteBuf 副本的新 ByteBufHolder
  • duplicate 方法:返回一个浅拷贝 ByteBuf(只新建了读写指针,数据还是同一个对象)的新 ByteBufHolder
  • retainedDuplicate 方法:返回一个浅拷贝 ByteBuf(只新建了读写指针,数据还是同一个对象)的新 ByteBufHolder,同时对 ByteBuf 进行 retain 引用计数加 1

池化

本地线程缓存加 PoolArena 数组,太复杂,没看

引用计数

AbstractReferenceCountedByteBuf 是实现引用计数的 ByteBuf 抽象类,继承自 AbstractByteBuf

  • updater 字段:ReferenceCountUpdater<AbstractReferenceCountedByteBuf> 类型,实际的实现是 AtomicIntegerFieldUpdater<AbstractReferenceCounted> 来 cas 更新 volatile 类型 refCnt 字段,cas 加 volatile 很明显是为了不加锁进行更新,减少并发竞争锁资源以及其带来的上下文切换时间
  • retain 方法:利用上面的 updater 进行 cas 加 1
  • release 方法:同样是用 updater 进行 cas 减 1
  • deallocate 方法:用于泄露检测,release 如果返回 true,即引用计数为 0 时,触发该方法

ByteBuf 泄露检测

ByteBuf 把释放权利交给了用户(池化的 ByteBuf 以及非池化的 DirectByteBuf),而用户可能在不再持有该 ByteBuf 后没有将其引用计数清零归还到池中,从而导致 jvm 把 ByteBuf gc 后让 ByteBuf 池发生了泄漏,最后导致 ByteBuf 池无内存可用,因此需要泄漏检测,检测的原理很简单,利用 ReferenceQueue 和 WeakReference,在 ByteBuf release 时判断如果引用计数为 0 了,就对其标记为已释放,在 ReferenceQueue 接到 WeakReference<ByteBuf> 时(gc 回收了该对象)检测该对象是否被标记为已释放,如果没被标记为已释放就表示放生了内存泄漏。

下面先介绍下相关类:

  • ResourceLeakDetector:资源泄露检测器,增加泄漏检测的 api 入口。track 方法用于创建一个 ResourceLeakTracker 进行资源跟踪,同时检查是否有泄漏的资源要报告(调用 reportLeak 方法)
  • ResourceLeakTracker:具体的跟踪者,close 方法用于关闭泄漏,即正确的释放资源;record 方法用于记录调用方的当前堆栈,Throwable.getStackTrace() 和 Thread.currentThread().getStackTrace() 都可以获取堆栈,不过后者在 jdk6前是直接 dump 的堆栈,因为打印堆栈的目标线程和执行打印的工作线程不一定是一个,这样会很慢(需要进入安全点),所以后面优化为如果工作线程和目标线程是同一个线程,就使用 new Exception().getStackTrace() 快速获取堆栈
  • DefaultResourceLeak:ResourceLeakTracker 的默认实现,继承 WeakReference
  • TraceRecord:记录一次使用的位置,继承 Throwable

接着开始说流程,我们以 PooledByteBufAllocator 为例:

  • 分配时:如果是直接内存,buffer() 最终调用的就是 newDirectBuffer(),获取到的 ByteBuf 是被 toLeakAwareBuffer() 处理了一遍才返回的,其中是 AbstractByteBuf.leakDetector.track(buf) 将其包裹为 DefaultResourceLeak(WeakReference类型,创建时添加到 Set<DefaultResourceLeak<?>> allLeaks 表示跟踪),并将 defaultResourceLeak 与 buf 封装为 SimpleLeakAwareByteBuf(只以默认的这个 Simple 级别的讲述)返回,利用的是组合加继承的设计模式,目的是保持其作为 ByteBuf 的功能,同时在其 release 时判断如果引用技术为 0 则调用 defaultResourceLeak 的 close (从 中删除 defaultResourceLeak)告知其正确的释放了资源
  • 检测时:检测的时机发生在 AbstractByteBuf.leakDetector.track() 方法中,每次创建新的 DefaultResourceLeak 时都会检测,过程是从 resourceLeakDetector 的 referenceQueue 取元素,如果元素不为空且 dispose 为 true 就表示检测到泄漏(dispose 方法见下方),而在前面“分配时”讲到如果正确释放了资源 allLeaks 是不会有这个元素的,即返回 false。
boolean dispose() {
    clear();
    return allLeaks.remove(this);
}

最后说一下为什么用 allLeaks 这个 set 存放 DefaultResourceLeak,因为 gc 后 ByteBuf 和 SimpleLeakAwareByteBuf 都被 gc 了,需要一个和他们同样引用关系的对象(同时记载这个 ByteBuf 使用的记录)由 set 强引用,直到泄漏检测完成后再完全释放引用交给 gc。

netty中的内存泄漏检测机制ResourceLeakDetector - 简书 (jianshu.com)

零拷贝

传统的 Zero-Copy 是 IO 数据传输无需由内核态到用户态,再从用户态到内核态,较少拷贝次数

Netty 的 Zero-Copy 是完全在用户态下的,即传输层的零拷贝机制,比如在拆包、合并包时,常见的做法是 System.arrayCopy 拷贝数据,但这样有一定的开销,而 Netty 通过 slice、wrapedBuffer 等操作拆分合并 ByteBuf,从而无需进行数据拷贝