Netty4.x 源码实战系列(五):深入浅出学NioEventLoopGroup

wxuande 2019-06-26

我们都知道Netty的线程模型是基于React的线程模型,并且我们都知道Netty是一个高性能的NIO框架,那么其线程模型必定是它的重要贡献之一。

在使用netty的服务端引导类ServerBootstrap或客户端引导类Bootstrap进行开发时,都需要通过group属性指定EventLoopGroup, 因为是开发NIO程序,所以我们选择NioEventLoopGroup。

接下来的两篇文章,我将从源码角度为大家深入浅出的剖析Netty的React线程模型工作机制。

本篇侧重点是NioEventLoopGroup。

首先我们先回顾一下,服务端初始化程序代码(省略非相关代码):

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap(); 
    b.group(bossGroup, workerGroup);
    
    ... // 已省略非相关代码
    
     // 侦听8000端口
     ChannelFuture f = b.bind(8000).sync(); 
    
     f.channel().closeFuture().sync();
} finally {
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
}

在分析源码之前,我们先看看NioEventLoopGroup的类继承结构图:

Netty4.x 源码实战系列(五):深入浅出学NioEventLoopGroup

初始化bossGroup及workerGroup时,使用了NioEventLoopGroup的无参构造方法,本篇将从此无参构造入手,详细分析NioEventLoopGroup的初始化过程。

首先我们看看NioEventLoopGroup的无参构造方法:

public NioEventLoopGroup() {
    this(0);
}

其内部继续调用器构造方法,并指定线程数为0:

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}

继续调用另一个构造方法,指定线程为0,且Executor为null:

public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}

在此构造中,它会指定selector的辅助类 "SelectorProvider.provider()",我们继续查看它的调用:

public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

此构造方法中,初始化了一个默认的选择策略工厂,用于生成select策略:

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

经过上面一系列的构造方法调用,此时个参数值对应如下:
nThreads:0
executor: null
selectorProvider: SelectorProvider.provider()
selectStrategyFactory: DefaultSelectStrategyFactory.INSTANCE
以及指定了拒绝策略RejectedExecutionHandlers.reject()

接着其会调用父类MultithreadEventLoopGroup的MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args)构造方法

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

此构造方法主要做了一件事,就是当指定的线程数为0时,使用默认的线程数DEFAULT_EVENT_LOOP_THREADS,此只是在MultithreadEventLoopGroup类被加载时完成初始化

private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

所以根据代码,我们得出,如果初始化NioEventLoopGroup未指定线程数时,默认是CPU核心数*2

接着继续调用父类MultithreadEventExecutorGroup的MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args)构造方法

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

在此构造方法中,我们指定了一个EventExecutor的选择工厂DefaultEventExecutorChooserFactory,此工厂主要是用于选择下一个可用的EventExecutor, 其内部有两种选择器, 一个是基于位运算,一个是基于普通的轮询,它们的代码分别如下:

基于位运算的选择器PowerOfTwoEventExecutorChooser

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

基于普通轮询的选择器GenericEventExecutorChooser

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }
}

我们接着回到刚刚的构造器,其内部会继续调用MultithreadEventExecutorGroup的另一个构造方法,此构造方法是NioEventLoopGroup的核心代码

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
    
    // 为了便于代码剖析,以省略非相关代码
    
    // 初始化executor
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    // 初始化EventExecutor
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
       
            children[i] = newChild(executor, args);
         
    }

    // 生成选择器对象
    chooser = chooserFactory.newChooser(children);
}

此构造方法主要做了三件事:
1、初始化executor为ThreadPerTaskExecutor的实例:
通过前面的构造方法调用,我们知道executor为null,所以在此构造方法中,executor会被初始化为ThreadPerTaskExecutor实例。我们看一下ThreadPerTaskExecutor的源码:

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

通过ThreadPerTaskExecutor 的代码发现,ThreadPerTaskExecutor 实现了Executor接口,其内部会通过newDefaultThreadFactory()指定的默认线程工厂来创建线程,并执行相应的任务。

2、初始化EventExecutor数组children
在MultithreadEventExecutorGroup的构造方法中我们看到,EventExecutor数组children初始化时是通过newChild(executor, args)实现的,而newChild的在MultithreadEventExecutorGroup中是个抽象方法

protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

根据最开始的类继承结构图,我们在NioEventLoopGroup中找到了newChild的实现

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

所以从此newChild的实现中,我们可以看出MultithreadEventExecutorGroup的children,其实就是对应的一组NioEventLoop对象。 关于NioEventLoop下一篇会作详细介绍。

3、根据我们指定的选择器工厂,绑定NioEventLoop数组对象

chooser = chooserFactory.newChooser(children);

在前面的构造方法中,我们指定了chooserFactory为DefaultEventExecutorChooserFactory,在此工厂内部,会根据children数组的长度来动态选择选择器对象,用于选择下一个可执行的EventExecutor,也就是NioEventLoop。

@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}

至此,NioEventLoopGroup初始化完成了。

通过上面的代码分析,在NioEventLoopGroup初始化的过程中,其实就是初始化了一堆可执行的Executor数组,然后根据某种chooser策略,来选择下一个可用的executor。

我们再回顾总结一下:
1、NioEventLoopGroup初始化时未指定线程数,那么会使用默认线程数,
即 线程数 = CPU核心数 * 2;
2、每个NioEventLoopGroup对象内部都有一组可执行的NioEventLoop(NioEventLoop对象内部包含的excutor对象为ThreadPerTaskExecutor类型)
3、每个NioEventLoopGroup对象都有一个NioEventLoop选择器与之对应,其会根据NioEventLoop的个数,动态选择chooser(如果是2的幂次方,则按位运算,否则使用普通的轮询)

所以通过上面的分析,我们得出NioEventLoopGroup主要功能就是为了选择NioEventLoop,而真正的重点就在NioEventLoop中,它是整个netty线程执行的关键。

下一篇我们将详细介绍NioEventLoop,敬请期待。

相关推荐