Netty之unsafe

unsafe简介

Unsafe接口实际上是Channel接口的辅助接口,它不应该被用户代码直接调用,实际上的I/O操作都是有Unsafe接口负责完成的.

API定义

方法名 返回值 说明
localAddress() SocketAddress 返回本地绑定的Socket地址
remoteAddress() SocketAddress 返回通信对端的Socket地址
register(EventLoop eventLoop, ChannelPromise promise) void 注册Channel到多路复用器上,一旦注册操作完成,通知ChannelFuture
bind(SocketAddress localAddress, ChannelPromise promise) void 绑定指定的localAddress到当前的Channel上,完成后通知ChannelFuture
connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) void 绑定完成之后,连接服务器,操作完成之后通知ChannelFuture
disconnect(ChannelPromise promise) void 断开连接,完成后通知ChannelFuture
close(ChannelPromise promise) void 关闭Channel连接,完成后通知
closeForcibly() void 强制关闭连接
deregister(ChannelPromise promise) void 取消此Channel在多路复用器上的注册
beginRead() void 设置网络操作位为用于读取消息
write(Object msg, ChannelPromise promise) void 发送消息,完成之后通知ChannelFuture
flush() void 将发送缓冲数组中的消息写入到Channel中
voidPromise() ChannelPromise 返回一个特殊的可重用和传递的ChannelPromise,它不用于操作陈宫或失败的通知器,仅仅作为容器使用
outboundBuffer() ChannelOutboundBuffer 返回消息发送缓冲区

源码分析

继承类图


这里继承类图,左边是Unsafe的继承类图,右边是Unsafe所属的类.
这里依旧从顶自下的分析unsafe的原理.

AbstractUnsafe源码分析

这相当于是最顶层的实现类,实现了大部分方法.

字段

1
2
3
4
5
6
7
8
9
10
protected abstract class AbstractUnsafe implements Unsafe {
// 新建一个输出缓冲区
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
// recvhandler
private RecvByteBufAllocator.Handle recvHandle;
// 是否刷新数据
private boolean inFlush0;
/** true if the channel has never been registered, false otherwise */
private boolean neverRegistered = true;
}

API

获取地址

1
2
3
4
5
6
7
8
9
@Override
public final SocketAddress localAddress() {
return localAddress0();
}

@Override
public final SocketAddress remoteAddress() {
return remoteAddress0();
}

调用外部类的方法,这些方法也就是AbstractChannel的方法,但是并没有被实现,要子类自己去实现.

注册
注册方法主要是将当前Unsafe对应的Channel注册到EventLoop的多路复用器上,然后调用pipeline.fireChannelRegistered()传播注册事件,之后如果已经激活了,调用pipeline.fireChannelActive()方法传播事件.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

这里是注册的代码.首先判断一些先决条件,eventLoop不能为空,再判断是否已经被注册过了,之后,设置此eventLoop为此Channel的evetnLoop.之后有一个是否要异步执行的判断,如果当前线程就是与Channel对应的EventLoop所对应的线程,则不存在异步,直接调用register0()方法,如果不是.则异步调用,在eventLoop所有的线程去调用注册方法. 再来看一下register0()方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();

safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

首先Channel是否取消和是否打开着.接着调用doRegister()方法.此方法是由AbstractNioUnsafe对应的AbstractNioChannel实现,下面再介绍.接着唤醒handlerAdd()事件.然后传播.在向ChannelPromise中设置成功标识.接着在pipeline中传播register事件.然后在判断此Channel是否被激活,如果被激活,在判断是否是第一次注册,如果是,传播active事件.在判断是否是自动读,如果设置了自动读,则自动向多路复用器上注册读事件.如果中间出现异常,强制关闭连接,将异常信息写入promise中.

bind
bind方法主要用于绑定指定的端口.对于服务端,用于绑定监听端口,可以设置backlog参数,对于客户端,用于指定客户端Channel的本地绑定socket地址,具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
...
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}

绑定操作首先判断Channel是否取消和是否打开,不符合直接返回.然后调用doBind()方法执行绑定操作.此绑定操作在NioSocketChannel和NioServerSocketChannel中有不同的实现.
客户端代码:

1
2
3
4
5
if (PlatformDependent.javaVersion() >= 7) {
SocketUtils.bind(javaChannel(), localAddress);
} else {
SocketUtils.bind(javaChannel().socket(), localAddress);
}

服务端代码:

1
2
3
4
5
6
7
8
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}

可以看到客户端就是直接将Channel绑定到本地地址,服务端还有一个配置参数,也就是backlog参数.
再来看bind()方法的如下代码:

1
2
3
4
5
6
7
8
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}

这段代码的意思就是说,如果在绑定之前是没有激活的,在绑定了之后被激活了,那么就会触发active事件.

disconnect
这个方法用于服务端或客户端主动关闭连接.代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public final void disconnect(final ChannelPromise promise) {
assertEventLoop();

if (!promise.setUncancellable()) {
return;
}

boolean wasActive = isActive();
try {
doDisconnect();
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}

if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}

safeSetSuccess(promise);
closeIfClosed(); // doDisconnect() might have closed the channel
}

关闭连接方法就是调用了doDisconnect()方法执行具体操作,之后会异步调用fireChannelInactive()方法传播事件.

deregister
取消注册事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
if (!promise.setUncancellable()) {
return;
}
if (!registered) {
safeSetSuccess(promise);
return;
}
invokeLater(new Runnable() {
@Override
public void run() {
try {

doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
pipeline.fireChannelInactive();
}
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered, so check
// if it was registered.
if (registered) {
registered = false;
pipeline.fireChannelUnregistered();
}
safeSetSuccess(promise);
}
}
});
}

如果已经被取消注册,则直接设置成功.调用外部类的doDeregister()方法来执行取消注册操作.在AbstractNioChannel类中定义.

1
2
3
4
@Override
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}

调用eventLoop中的cancel来取消注册.之后传播取消注册事件.

write
write方法并不是真正向Channel中写数据,而是将数据添加到发送缓冲区中.知道调用flush才真正将数据发送出去.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
safeSetFailure(promise, newWriteException(initialCloseCause));
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}

可以看到,如果本channel不存在输出缓冲区,则直接将msg释放.在找到msg的size,之后添加到outboundBuffer中

flush
真正向Channel中写数据是在调用了此方法的时候.来看一下代码.

1
2
outboundBuffer.addFlush();
flush0();

这里首先将输出缓冲区的消息标记为以刷新.然后调用flush0()方法,将消息发送出去.看一下flush0()的代码片段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
inFlush0 = true;
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(newFlush0Exception(initialCloseCause), false);
}
} finally {
inFlush0 = false;
}
return;
}

try {
doWrite(outboundBuffer);
}

这里首先看Channel是否是激活状态,如果不是,则报错.如果是,调用doWrite()方法,向Channel中写数据.这个doWrite()方法是根据子类的不同有不同的实现.分别在AbstractNioByteChannel(客户端)和AbstractNioMessageChannel(服务端)实现.具体的逻辑在上一篇Channel讲解中说过.这里就不细说了.总之,就是将此输出缓冲区中的数据写出去.

AbstractNioUnsafe源码分析

AbstracNioUnsafe是AbstractUnsafe类的NIO实现.主要实现了connect,finishConnnect等方法.
方法解析
connect
连接方法,用来连接远程服务端.实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
 @Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
int connectTimeoutMillis = config().getConnectTimeoutMillis();
// 设置定时任务,在这之后,
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}

promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}

调用doConnect()方法来建立连接.主要的实现在NioSocketChannel类中实现.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}

boolean success = false;
try {
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}

在发起连接请求后,操作可能有三种结果.

  1. 连接成功,返回true
  2. 暂时没有连接上,服务端没有返回ACK应答,结果不确定返回false,但是这个时候需要将NioSocketChannel中的SelectionKey设置为OP_CONNECT,监听连接应答消息.
  3. 连接失败,抛异常.关闭连接.

在调用玩doConnect()方法之后,会判断连接结果.如果成功出发ChannelActive事件.如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
return;
}
boolean active = isActive();
boolean promiseSet = promise.trySuccess();
if (!wasActive && active) {
// 这里出发ChannelActive事件
pipeline().fireChannelActive();
}
if (!promiseSet) {
close(voidPromise());
}
}

Active事件在最后会将此Channel中的SelectionKey设置为OP_READ,监听读事件.
如果没有连接上服务器.则根据连接超时事件设置定时任务,超时时间之后出发校验,如果发现还没有完成,则关闭连接句柄.释放资源.并且关闭连接.
之后在设置连接结果,如果接收到连接完成通知,则判断连接是否被取消,如果取消则关闭句柄,释放资源,发起取消注册操作.

finishConnect
客户端接受到服务端的TCP握手应答信息.通过finishConnect()方法对连接结果进行判断.代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
try {
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}

首先获取连接状态,当前返回false,之后执行doFinishConnect()方法,在NioSocketChannel中实现,具体实现如下:

1
2
3
4
5
6
@Override
protected void doFinishConnect() throws Exception {
if (!javaChannel().finishConnect()) {
throw new Error();
}
}

这里连接成功返回true,连接失败返回false,发生异常的返回false. 只要连接失败就会抛出Error.
如果这里连接完成就会执行fulfillConnectPromise(connectPromise, wasActive)方法,负责将Socketchannel修改为监听读操作.监听网络读事件.传播channelActive事件.

在检查期间发生异常之后,则由定时任务关闭客户端连接,将Channel从Reactor线程的多路复用器上擦除.

NioByteUnsafe源码分析

这里就是实现了父类的read()方法,这里重点分析此方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;

这里,首先获取连接的配置信息.然后获取此Channel绑定的管道,从设置中获取缓冲区分配器,和分配器的handler.
在向下看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}

allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());

先为byteBuf分配一个合适的缓冲区.只有调用doReadBytes(byteBuf)方法将可读数据写入byteBuf中.实现代码

1
2
3
4
5
6
// 获取handler
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
// 将可读数据变为byteBuf的可写数据
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
// 读取数据到buf
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());

从javaChannel中读取数据到buf中,长度为bytebuf可写的长度.当然真正读取到的数据不一定比length长,可能会小,返回读取的字节数.
下面接着看read()方法

1
2
3
4
5
6
7
8
9
10
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}

设置上次读取字节的数量,之后判断是否读取到数据,如果没有读取到数据,则释放byteBuf. 如果读取数据小于0,则说明发生了异常.设置close为false.读取失败.需要关闭.退出循环.
如果读取成功则说明完成了一次异步读取事件.这个时候会出发一次ChannelRead()事件,这里有一点就是,完成一次读操作并不代表着读到了一条完整的信息.也可能是一条不完整的.可能会有粘包现象.所以需要在pipeline中自行处理.之后再将bytebuf释放. 因为每次读操作都未必能够完成缓冲区的全部读取工作,所以会对读取的字节数进行累加.在累加之前会对长度上线做保护,如果累计读取的字节数已经发生溢出,则将读取的字节数设置为整形最大值,退出循环.主要是为了防止本次读取时间过长,影响后面排队的task任务.这里如果没有溢出,执行累加操作. 在循环体内,会判断读操作的次数,默认一次最多只能执行16此读操作.如果超过,不允许在读,等待下一次selector轮询周期在执行.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
do {
...
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
} while (allocHandle.continueReading());

@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
}
~~~

这个默认的读消息次数为16此,因此到达16次之后会直接退出,等待下一次再继续读.当读取消息完成之后,出发读操作完成事件.如果中间失败过,则关闭此链接.


~~~java
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}

最后如果设置了非自动读.则取消掉注册上去的读操作事件.

1
2
3
4
5
finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}

NioMessageUnsafe源码分析

此类是用来给服务端使用的,用来接受新来的连接.有一个链表字段,用来存储新过来的连接. 也是只复写了父类的read()方法,与NioByteUnsafe的read()方法基本一致,但是在调用的实际子类的实现方法上,调用的是
doReadMessages()方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@Override
public void read() {
assert eventLoop().inEventLoop();
// 获取配置
final ChannelConfig config = config();
// 获取管道
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
// 设置config
allocHandle.reset(config);

boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 读取message到readBuf()中
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// 增加一次消息处理次数,最大为16
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}

int size = readBuf.size();
// 将缓冲区中的消息都传入管道中
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
// 清除消息
readBuf.clear();

allocHandle.readComplete();
// 出发读完成操作
pipeline.fireChannelReadComplete();

// 如果出现异常,关闭连接,出发异常事件
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}

if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
// 读取消息操作
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
//如果获取到一个新连接.
if (ch != null) {
// 加入到缓冲区中
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
// 失败之后关闭.
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}

return 0;
}