EventLoop
Netty线程模型是被精心设计的,提升了框架的并发性能,并且在很大程度上避免锁.下面来讲一下Netty中的线程模型和它的EventLoop.
线程模型
一说到线程模型,很容易能够想到Reactor线程模型.Reactor又有单线程模型,多线程模型和主从多线程模型.
单线程模型
单线程模型,显而易见就是只有一个线程,即所有的I/O操作都在同一个NIO线程模型.对于此I/O模型的职责则有很多:
- 作为服务端,接受TCP连接
- 作为客户端,发起TCP连接
- 读取通信对端的请求和应答消息
- 向通信对端发送请求或应答消息
这种模型由于只有一个线程处理I/O,因此当并发量提高之后,一个线程很难处理那么多的请求,系统的吞吐量将变的很低.这个时候就演进出了Reactor多线程模型.使用多个线程来处理请求
多线程模型
Reactor多线程模型和单线程模型的最大的区别就是有一组I/O线程来处理I/O请求.多线程模型的特点如下:
- 有专门一个线程用来监听服务端,接受客户端的TCP连接请求
- 网络I/O操作读写等由一个NIO线程池负责.通过很多的NIO线程能够处理高并发的网络读写请求
- 一个NIO线程可以同时处理N条连接,但是一个连接只对应与一个NIO线程,防止发生并发操作问题.
这个样子很好的解决了一般并发的问题.但是当如果一个时候有数百万个连接过来,或者服务端需要对客户端握手进行安全验证,但是验证本身非常损耗性能.这个时候,一个Acceptor线程会存在性能不足的问题,这个时候就又出现了第三种Reactor线程模型,主从多线程模型.
主从Reactor多线程模型
此线程模型的特点就是服务端接受客户端连接的不在是一个单独的NIO线程,而是一个独立的NIO线程池.Acceptor接受到客户端TCP连接请求后,将新创建的SocketChannel注册到I/O线程池中的某个线程上.接下来又此线程来处理网络读写问题.
Netty的线程模型
Netty的线程模型并不是一成不变的.它实际拒绝域用户的启动参数配置.通过设置不同的启动参数,Netty可以同事支持Reactor单线程模型,多线程模型和主从多线程模型.
比如下面三种方式:
单线程模式1
2
3
4
5EventLoopGroup work = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(work)
.channel(NioServerSocketChannel.class)
.childHandler(new StringEncoder());
只有一个线程,并且没有线程池,设置EventLoopGroup的线程数为1.将上面代码改成如下形式:
Reactor多线程模式1
2
3
4
5
6EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup work = new NioEventLoopGroup(8);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,work)
.channel(NioServerSocketChannel.class)
.childHandler(new StringEncoder());
主从Reactor多线程模式1
2
3
4
5
6EventLoopGroup boss = new NioEventLoopGroup(8);
EventLoopGroup work = new NioEventLoopGroup(8);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,work)
.channel(NioServerSocketChannel.class)
.childHandler(new StringEncoder());
只需要将BossGroup的参数设置为多个就可以了. 里面的参数就是用几个线程来处理.
一般来说,都使用主从Reactor多线程模型.主要分为两个线程组,如上所示:boss线程组主要用来接受连接,然后注册到work线程池上的某个线程上.work线程主要用来处理读写请求,并且执行系统调用Task和定时任务.如心跳检测.
NioEventLoop源码分析
Netty中的NioEventLoop并不是一个纯粹的I/O线程,它除了负责I/O操作外,还兼顾处理定时任务和系统任务.在Netty中有很多系统Task任务,主要就是为了在I/O线程和用户线程同事操作网络资源的时候,为了防止并发操作导致锁的竞争,将用户线程的操作封装成Task放入消息队列中,由I/O线程负责执行.这样实现了局部无锁化. 说完了EventLoop的功能,来看一下它的实现,这里NioEventLoop是它的一个子类,也是最常用的,因此主要分析一下此类.
构造方法
先来看一下构造方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 调用父类方法初始化线程池,和回绝策略
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 这里打开一个Selector
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
Selector
作为Nio的多路复用线程,必须得有一个多路复用器,先来看一下它的多路复用器.1
2
3
4
5// 经过Netty包装过的Selector
private Selector selector;
// 没有包装过的selector
private Selector unwrappedSelector;
private final SelectorProvider provider;
Selector的初始化特别简单,就是直接调用Selector.open()创建一个新的Selector.而Netty对Selector的selectedKeys进行了优化,可以通过io.netty.noKeySetOptimization开关来决定是否启用优化,默认不打开selectedKeys优化功能. 下面来看一下代码:
1 | private SelectorTuple openSelector() { |
上面就是Selector的初始化. 初始化Selector完成后就应该
run()方法
在EventLoop中,所有的逻辑操作都在for循环体中执行,只有当NioEventLoop接受到退出指令的时候,才会退出循环.
下面看一方法实现1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25protected void run{
try {
//通过 select/selectNow 调用查询当前是否有就绪的 IO 事件
// 当 selectStrategy.calculateStrategy() 返回的是 CONTINUE, 就结束此轮循环,进入下一轮循环;
// 当返回的是 SELECT, 就表示任务队列为空,就调用select(Boolean);
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())){
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
// 调用Select方法
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
} catch (IOException e) {
// 如果发生异常就重建Selector并且处理异常
rebuildSelector0();
handleLoopException(e);
continue;
}
// 省略部分代码,待下面分析
}
首先向将wakenUp设置为false,之后在调用select()方法,并且将之前的wakenUp作为参数传进去.
来看一下select方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
// 设置选择次数为0
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// 设置select执行的终止时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 设置超时时间
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 如果超时时间到了,并且还没有选择过,就立即选择一次,这里selectNow()会立即触发Selector的选择操作,如果有准备就绪的Channel,则返回就绪的Channel的集合,否则返回0.
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// 如果有任务,并且wakenUp之前是false,则立即调用一次选择方法
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 这里调用带超时的select()方法
int selectedKeys = selector.select(timeoutMillis);
// 循环中每次选择都会将selectCnt加1.
selectCnt ++;
// 如果有事件发生或odlWakeup为true或者线程被系统唤醒或者有任务都会直接跳出循环
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
// 如果循环次数过多,超过了默认次数,则说明出现了JDK的空轮询BUG.
// 需要重建Selector.
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
}
}
这个selector的方法的就是选择出已经发生的事件.具体逻辑就是:
首先计算下一个将要触发定时任务的剩余超时时间,转换为毫秒,为超时时间增加0.5毫秒的调整值.对剩余的超时时间进行判断,如果需要立即执行或者已经超时,则调用selector.selectNow()进行轮询操作,将selectCnt设置为1.并且退出循环.
只有将定时任务剩余的超时时间作为参数进行select操作,没完成一次select操作就对selectCnt加1.
Select操作完成之后对结果进行判断,如果出现一下集中情况就退出循环:
- 有Channel处于就绪状态,就是selectedKeys不为0,说明有读写事件需要处理
- oldWakenUp为true
- 系统或者用户调用了wakeup操作,唤醒当前的多路复用器
- 系统队列中有新的任务需要处理.
如果本次selector的轮询结果为空,也没有wakeup操作或者是新的消息需要处理,则说明是个空轮询.将计数器加1. 这里会有一个判断,就是如果在一段时间内空轮询次数过多,则说明导致了一个JDk的空轮询bug,这里解决办法就是调用selectRebuildSelector()方法,来重建Selector.
看一下selectRebuildSelector()方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67private Selector selectRebuildSelector(int selectCnt) throws IOException {
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
// 这里执行重建操作
rebuildSelector();
Selector selector = this.selector;
// 重建完成后立即选择一次
selector.selectNow();
return selector;
}
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
"unchecked") (
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}
重建的逻辑也特别简单,首先新打开一个Selector,接着在将原来的Selector中的Channel取消注册,并且注册到新的Selector中,
在继续来看run方法中的循环
1 | protected void run() { |
这里有两种情况,一种是有I/O事件发生和没有I/O事件发生.重点来看一下有事件发生的时候也就是processSelectedKeysOptimized()方法;1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20private void processSelectedKeysOptimized() {
// 对每个准备好的I/O通道进行处理
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
"unchecked") (
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
首先是获取到SelectionKey上附加的对象,这个对象是在注册通道的时候将自己附加上去的.也就是获取了一个NioChannel.之后继续执行procesSelectedKey()方法
1 | private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { |
具体的逻辑,注释也写的很清楚了,这里不在细说了.大致就是处理应有的事件.在Netty中的事件都是通过Channel的内部类unsafe实现的.这里就调用了unsafe的不同的方法来处理不同的事件.对于unsafe的处理这里就不多说了,可以看我之前的博文对于unsafe的讲解.
到这里,NioEventLoop就说完了.说白了,这个EventLoop就是通过在一个循环中不断检测是否有事件发生,如果有I/O事件发生或者有task在队列中,就去执行任务. 并且在循环等待I/O事件发生时候,会记录空轮询的次数,如果在一定的时间内,空轮询的次数超过一定限制,则会从新建立一个Selector,并且把之前的Selector中的Channel重新注册到新的Selector上.