Netty之EventLoop

EventLoop

Netty线程模型是被精心设计的,提升了框架的并发性能,并且在很大程度上避免锁.下面来讲一下Netty中的线程模型和它的EventLoop.

线程模型

一说到线程模型,很容易能够想到Reactor线程模型.Reactor又有单线程模型,多线程模型和主从多线程模型.

单线程模型

单线程模型,显而易见就是只有一个线程,即所有的I/O操作都在同一个NIO线程模型.对于此I/O模型的职责则有很多:

  • 作为服务端,接受TCP连接
  • 作为客户端,发起TCP连接
  • 读取通信对端的请求和应答消息
  • 向通信对端发送请求或应答消息

这种模型由于只有一个线程处理I/O,因此当并发量提高之后,一个线程很难处理那么多的请求,系统的吞吐量将变的很低.这个时候就演进出了Reactor多线程模型.使用多个线程来处理请求

多线程模型

Reactor多线程模型和单线程模型的最大的区别就是有一组I/O线程来处理I/O请求.多线程模型的特点如下:

  • 有专门一个线程用来监听服务端,接受客户端的TCP连接请求
  • 网络I/O操作读写等由一个NIO线程池负责.通过很多的NIO线程能够处理高并发的网络读写请求
  • 一个NIO线程可以同时处理N条连接,但是一个连接只对应与一个NIO线程,防止发生并发操作问题.

这个样子很好的解决了一般并发的问题.但是当如果一个时候有数百万个连接过来,或者服务端需要对客户端握手进行安全验证,但是验证本身非常损耗性能.这个时候,一个Acceptor线程会存在性能不足的问题,这个时候就又出现了第三种Reactor线程模型,主从多线程模型.

主从Reactor多线程模型

此线程模型的特点就是服务端接受客户端连接的不在是一个单独的NIO线程,而是一个独立的NIO线程池.Acceptor接受到客户端TCP连接请求后,将新创建的SocketChannel注册到I/O线程池中的某个线程上.接下来又此线程来处理网络读写问题.

Netty的线程模型

Netty的线程模型并不是一成不变的.它实际拒绝域用户的启动参数配置.通过设置不同的启动参数,Netty可以同事支持Reactor单线程模型,多线程模型和主从多线程模型.
比如下面三种方式:

单线程模式

1
2
3
4
5
EventLoopGroup work = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(work)
.channel(NioServerSocketChannel.class)
.childHandler(new StringEncoder());

只有一个线程,并且没有线程池,设置EventLoopGroup的线程数为1.将上面代码改成如下形式:

Reactor多线程模式

1
2
3
4
5
6
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup work = new NioEventLoopGroup(8);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,work)
.channel(NioServerSocketChannel.class)
.childHandler(new StringEncoder());

主从Reactor多线程模式

1
2
3
4
5
6
EventLoopGroup boss = new NioEventLoopGroup(8);
EventLoopGroup work = new NioEventLoopGroup(8);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,work)
.channel(NioServerSocketChannel.class)
.childHandler(new StringEncoder());

只需要将BossGroup的参数设置为多个就可以了. 里面的参数就是用几个线程来处理.

一般来说,都使用主从Reactor多线程模型.主要分为两个线程组,如上所示:boss线程组主要用来接受连接,然后注册到work线程池上的某个线程上.work线程主要用来处理读写请求,并且执行系统调用Task和定时任务.如心跳检测.

NioEventLoop源码分析

Netty中的NioEventLoop并不是一个纯粹的I/O线程,它除了负责I/O操作外,还兼顾处理定时任务和系统任务.在Netty中有很多系统Task任务,主要就是为了在I/O线程和用户线程同事操作网络资源的时候,为了防止并发操作导致锁的竞争,将用户线程的操作封装成Task放入消息队列中,由I/O线程负责执行.这样实现了局部无锁化. 说完了EventLoop的功能,来看一下它的实现,这里NioEventLoop是它的一个子类,也是最常用的,因此主要分析一下此类.

构造方法

先来看一下构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 调用父类方法初始化线程池,和回绝策略
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 这里打开一个Selector
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}

Selector

作为Nio的多路复用线程,必须得有一个多路复用器,先来看一下它的多路复用器.

1
2
3
4
5
// 经过Netty包装过的Selector
private Selector selector;
// 没有包装过的selector
private Selector unwrappedSelector;
private final SelectorProvider provider;

Selector的初始化特别简单,就是直接调用Selector.open()创建一个新的Selector.而Netty对Selector的selectedKeys进行了优化,可以通过io.netty.noKeySetOptimization开关来决定是否启用优化,默认不打开selectedKeys优化功能. 下面来看一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 一个为包装的Selector,通过provider打开一个Selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}

// 如果没有开启优化,则直接返回一个SelectorTuple对象
// 这个SelectorTuple就是未包装的Selector和包装的Selector的集合.
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
// 省略部分代码 这里就是将Netty自己的selectedKeys将JDk的selectedKeys替换掉.
// 主要是通过反射来从Selector实例中获取selectedKeys和publicSelectedKeys,将上述
// 两个变量设置为可写,在通过反射替换掉.

}

上面就是Selector的初始化. 初始化Selector完成后就应该

run()方法

在EventLoop中,所有的逻辑操作都在for循环体中执行,只有当NioEventLoop接受到退出指令的时候,才会退出循环.

下面看一方法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected void run{
try {
//通过 select/selectNow 调用查询当前是否有就绪的 IO 事件
// 当 selectStrategy.calculateStrategy() 返回的是 CONTINUE, 就结束此轮循环,进入下一轮循环;
// 当返回的是 SELECT, 就表示任务队列为空,就调用select(Boolean);
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())){
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
// 调用Select方法
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
} catch (IOException e) {
// 如果发生异常就重建Selector并且处理异常
rebuildSelector0();
handleLoopException(e);
continue;
}
// 省略部分代码,待下面分析
}

首先向将wakenUp设置为false,之后在调用select()方法,并且将之前的wakenUp作为参数传进去.
来看一下select方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
// 设置选择次数为0
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// 设置select执行的终止时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 设置超时时间
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 如果超时时间到了,并且还没有选择过,就立即选择一次,这里selectNow()会立即触发Selector的选择操作,如果有准备就绪的Channel,则返回就绪的Channel的集合,否则返回0.
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// 如果有任务,并且wakenUp之前是false,则立即调用一次选择方法
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 这里调用带超时的select()方法
int selectedKeys = selector.select(timeoutMillis);
// 循环中每次选择都会将selectCnt加1.
selectCnt ++;
// 如果有事件发生或odlWakeup为true或者线程被系统唤醒或者有任务都会直接跳出循环
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
// 如果循环次数过多,超过了默认次数,则说明出现了JDK的空轮询BUG.
// 需要重建Selector.
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}

if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
}
}

这个selector的方法的就是选择出已经发生的事件.具体逻辑就是:

首先计算下一个将要触发定时任务的剩余超时时间,转换为毫秒,为超时时间增加0.5毫秒的调整值.对剩余的超时时间进行判断,如果需要立即执行或者已经超时,则调用selector.selectNow()进行轮询操作,将selectCnt设置为1.并且退出循环.

只有将定时任务剩余的超时时间作为参数进行select操作,没完成一次select操作就对selectCnt加1.

Select操作完成之后对结果进行判断,如果出现一下集中情况就退出循环:

  • 有Channel处于就绪状态,就是selectedKeys不为0,说明有读写事件需要处理
  • oldWakenUp为true
  • 系统或者用户调用了wakeup操作,唤醒当前的多路复用器
  • 系统队列中有新的任务需要处理.

如果本次selector的轮询结果为空,也没有wakeup操作或者是新的消息需要处理,则说明是个空轮询.将计数器加1. 这里会有一个判断,就是如果在一段时间内空轮询次数过多,则说明导致了一个JDk的空轮询bug,这里解决办法就是调用selectRebuildSelector()方法,来重建Selector.

看一下selectRebuildSelector()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private Selector selectRebuildSelector(int selectCnt) throws IOException {
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
// 这里执行重建操作
rebuildSelector();
Selector selector = this.selector;

// 重建完成后立即选择一次
selector.selectNow();
return selector;
}

private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}

重建的逻辑也特别简单,首先新打开一个Selector,接着在将原来的Selector中的Channel取消注册,并且注册到新的Selector中,

在继续来看run方法中的循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
protected void run() {
cancelledKeys = 0;
needsToSelectAgain = false;
//ioRatio表示:此线程分配给IO操作所占的时间比(即运行processSelectedKeys耗时在整个循环中所占用的时间).
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
//查询就绪的 IO 事件, 然后处理它;
processSelectedKeys();
} finally {
//运行 taskQueue 中的任务.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
//查询就绪的 IO 事件, 然后处理它;
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
}
看到在run的方法中,执行完上面的select方法逻辑后,会开始执行I/O事件和Task.这里有一个ioRatio值,这个值就代表着线程分配给IO操作所占的时间比,例如ioRatio值为50,则IO执行的时间和执行task的时间为1:1.

在来看一下processSelectedKeys()方法
~~~java
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}

这里有两种情况,一种是有I/O事件发生和没有I/O事件发生.重点来看一下有事件发生的时候也就是processSelectedKeysOptimized()方法;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void processSelectedKeysOptimized() {
// 对每个准备好的I/O通道进行处理
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);

selectAgain();
i = -1;
}
}
}

首先是获取到SelectionKey上附加的对象,这个对象是在注册通道的时候将自己附加上去的.也就是获取了一个NioChannel.之后继续执行procesSelectedKey()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// 获取到unsafe实例
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 判断此键是否无效,如果是无效的,则使用unsafe来关闭连接.
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
try {
// 获取准备好的事件
int readyOps = k.readyOps();
// 如果是连接事件,将连接事件取消,表明已经简历好连接,
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
// 调用finishConnect方法传播连接完成事件
unsafe.finishConnect();
}
// 如果是可写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// 直接调用强制刷新函数来写数据到通道中
ch.unsafe().forceFlush();
}
// 如果是读事件或者接受连接事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 调用读方法,来处理
unsafe.read();
}
// 发生异常就关闭通道.
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

具体的逻辑,注释也写的很清楚了,这里不在细说了.大致就是处理应有的事件.在Netty中的事件都是通过Channel的内部类unsafe实现的.这里就调用了unsafe的不同的方法来处理不同的事件.对于unsafe的处理这里就不多说了,可以看我之前的博文对于unsafe的讲解.

到这里,NioEventLoop就说完了.说白了,这个EventLoop就是通过在一个循环中不断检测是否有事件发生,如果有I/O事件发生或者有task在队列中,就去执行任务. 并且在循环等待I/O事件发生时候,会记录空轮询的次数,如果在一定的时间内,空轮询的次数超过一定限制,则会从新建立一个Selector,并且把之前的Selector中的Channel重新注册到新的Selector上.