unsafe简介
Unsafe接口实际上是Channel接口的辅助接口,它不应该被用户代码直接调用,实际上的I/O操作都是有Unsafe接口负责完成的.
API定义
方法名 | 返回值 | 说明 |
---|---|---|
localAddress() | SocketAddress | 返回本地绑定的Socket地址 |
remoteAddress() | SocketAddress | 返回通信对端的Socket地址 |
register(EventLoop eventLoop, ChannelPromise promise) | void | 注册Channel到多路复用器上,一旦注册操作完成,通知ChannelFuture |
bind(SocketAddress localAddress, ChannelPromise promise) | void | 绑定指定的localAddress到当前的Channel上,完成后通知ChannelFuture |
connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) | void | 绑定完成之后,连接服务器,操作完成之后通知ChannelFuture |
disconnect(ChannelPromise promise) | void | 断开连接,完成后通知ChannelFuture |
close(ChannelPromise promise) | void | 关闭Channel连接,完成后通知 |
closeForcibly() | void | 强制关闭连接 |
deregister(ChannelPromise promise) | void | 取消此Channel在多路复用器上的注册 |
beginRead() | void | 设置网络操作位为用于读取消息 |
write(Object msg, ChannelPromise promise) | void | 发送消息,完成之后通知ChannelFuture |
flush() | void | 将发送缓冲数组中的消息写入到Channel中 |
voidPromise() | ChannelPromise | 返回一个特殊的可重用和传递的ChannelPromise,它不用于操作陈宫或失败的通知器,仅仅作为容器使用 |
outboundBuffer() | ChannelOutboundBuffer | 返回消息发送缓冲区 |
源码分析
继承类图
这里继承类图,左边是Unsafe的继承类图,右边是Unsafe所属的类.
这里依旧从顶自下的分析unsafe的原理.
AbstractUnsafe源码分析
这相当于是最顶层的实现类,实现了大部分方法.
字段
1 | protected abstract class AbstractUnsafe implements Unsafe { |
API
获取地址1
2
3
4
5
6
7
8
9
public final SocketAddress localAddress() {
return localAddress0();
}
public final SocketAddress remoteAddress() {
return remoteAddress0();
}
调用外部类的方法,这些方法也就是AbstractChannel的方法,但是并没有被实现,要子类自己去实现.
注册
注册方法主要是将当前Unsafe对应的Channel注册到EventLoop的多路复用器上,然后调用pipeline.fireChannelRegistered()传播注册事件,之后如果已经激活了,调用pipeline.fireChannelActive()方法传播事件.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
这里是注册的代码.首先判断一些先决条件,eventLoop不能为空,再判断是否已经被注册过了,之后,设置此eventLoop为此Channel的evetnLoop.之后有一个是否要异步执行的判断,如果当前线程就是与Channel对应的EventLoop所对应的线程,则不存在异步,直接调用register0()方法,如果不是.则异步调用,在eventLoop所有的线程去调用注册方法. 再来看一下register0()方法.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
首先Channel是否取消和是否打开着.接着调用doRegister()方法.此方法是由AbstractNioUnsafe对应的AbstractNioChannel实现,下面再介绍.接着唤醒handlerAdd()事件.然后传播.在向ChannelPromise中设置成功标识.接着在pipeline中传播register事件.然后在判断此Channel是否被激活,如果被激活,在判断是否是第一次注册,如果是,传播active事件.在判断是否是自动读,如果设置了自动读,则自动向多路复用器上注册读事件.如果中间出现异常,强制关闭连接,将异常信息写入promise中.
bind
bind方法主要用于绑定指定的端口.对于服务端,用于绑定监听端口,可以设置backlog参数,对于客户端,用于指定客户端Channel的本地绑定socket地址,具体实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
...
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
绑定操作首先判断Channel是否取消和是否打开,不符合直接返回.然后调用doBind()方法执行绑定操作.此绑定操作在NioSocketChannel和NioServerSocketChannel中有不同的实现.
客户端代码:1
2
3
4
5if (PlatformDependent.javaVersion() >= 7) {
SocketUtils.bind(javaChannel(), localAddress);
} else {
SocketUtils.bind(javaChannel().socket(), localAddress);
}
服务端代码:1
2
3
4
5
6
7
8
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
可以看到客户端就是直接将Channel绑定到本地地址,服务端还有一个配置参数,也就是backlog参数.
再来看bind()方法的如下代码:1
2
3
4
5
6
7
8if (!wasActive && isActive()) {
invokeLater(new Runnable() {
public void run() {
pipeline.fireChannelActive();
}
});
}
这段代码的意思就是说,如果在绑定之前是没有激活的,在绑定了之后被激活了,那么就会触发active事件.
disconnect
这个方法用于服务端或客户端主动关闭连接.代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final void disconnect(final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable()) {
return;
}
boolean wasActive = isActive();
try {
doDisconnect();
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
public void run() {
pipeline.fireChannelInactive();
}
});
}
safeSetSuccess(promise);
closeIfClosed(); // doDisconnect() might have closed the channel
}
关闭连接方法就是调用了doDisconnect()方法执行具体操作,之后会异步调用fireChannelInactive()方法传播事件.
deregister
取消注册事件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
if (!promise.setUncancellable()) {
return;
}
if (!registered) {
safeSetSuccess(promise);
return;
}
invokeLater(new Runnable() {
public void run() {
try {
doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
pipeline.fireChannelInactive();
}
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered, so check
// if it was registered.
if (registered) {
registered = false;
pipeline.fireChannelUnregistered();
}
safeSetSuccess(promise);
}
}
});
}
如果已经被取消注册,则直接设置成功.调用外部类的doDeregister()方法来执行取消注册操作.在AbstractNioChannel类中定义.1
2
3
4
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}
调用eventLoop中的cancel来取消注册.之后传播取消注册事件.
write
write方法并不是真正向Channel中写数据,而是将数据添加到发送缓冲区中.知道调用flush才真正将数据发送出去.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
safeSetFailure(promise, newWriteException(initialCloseCause));
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
可以看到,如果本channel不存在输出缓冲区,则直接将msg释放.在找到msg的size,之后添加到outboundBuffer中
flush
真正向Channel中写数据是在调用了此方法的时候.来看一下代码.1
2outboundBuffer.addFlush();
flush0();
这里首先将输出缓冲区的消息标记为以刷新.然后调用flush0()方法,将消息发送出去.看一下flush0()的代码片段1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18inFlush0 = true;
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(newFlush0Exception(initialCloseCause), false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer);
}
这里首先看Channel是否是激活状态,如果不是,则报错.如果是,调用doWrite()方法,向Channel中写数据.这个doWrite()方法是根据子类的不同有不同的实现.分别在AbstractNioByteChannel(客户端)和AbstractNioMessageChannel(服务端)实现.具体的逻辑在上一篇Channel讲解中说过.这里就不细说了.总之,就是将此输出缓冲区中的数据写出去.
AbstractNioUnsafe源码分析
AbstracNioUnsafe是AbstractUnsafe类的NIO实现.主要实现了connect,finishConnnect等方法.
方法解析
connect
连接方法,用来连接远程服务端.实现代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
int connectTimeoutMillis = config().getConnectTimeoutMillis();
// 设置定时任务,在这之后,
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
调用doConnect()方法来建立连接.主要的实现在NioSocketChannel类中实现.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
在发起连接请求后,操作可能有三种结果.
- 连接成功,返回true
- 暂时没有连接上,服务端没有返回ACK应答,结果不确定返回false,但是这个时候需要将NioSocketChannel中的SelectionKey设置为OP_CONNECT,监听连接应答消息.
- 连接失败,抛异常.关闭连接.
在调用玩doConnect()方法之后,会判断连接结果.如果成功出发ChannelActive事件.如下代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
return;
}
boolean active = isActive();
boolean promiseSet = promise.trySuccess();
if (!wasActive && active) {
// 这里出发ChannelActive事件
pipeline().fireChannelActive();
}
if (!promiseSet) {
close(voidPromise());
}
}
Active事件在最后会将此Channel中的SelectionKey设置为OP_READ,监听读事件.
如果没有连接上服务器.则根据连接超时事件设置定时任务,超时时间之后出发校验,如果发现还没有完成,则关闭连接句柄.释放资源.并且关闭连接.
之后在设置连接结果,如果接收到连接完成通知,则判断连接是否被取消,如果取消则关闭句柄,释放资源,发起取消注册操作.
finishConnect
客户端接受到服务端的TCP握手应答信息.通过finishConnect()方法对连接结果进行判断.代码如下所示1
2
3
4
5
6
7
8
9
10
11
12try {
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
首先获取连接状态,当前返回false,之后执行doFinishConnect()方法,在NioSocketChannel中实现,具体实现如下:1
2
3
4
5
6
protected void doFinishConnect() throws Exception {
if (!javaChannel().finishConnect()) {
throw new Error();
}
}
这里连接成功返回true,连接失败返回false,发生异常的返回false. 只要连接失败就会抛出Error.
如果这里连接完成就会执行fulfillConnectPromise(connectPromise, wasActive)方法,负责将Socketchannel修改为监听读操作.监听网络读事件.传播channelActive事件.
在检查期间发生异常之后,则由定时任务关闭客户端连接,将Channel从Reactor线程的多路复用器上擦除.
NioByteUnsafe源码分析
这里就是实现了父类的read()方法,这里重点分析此方法.1
2
3
4
5
6
7
8
9
10
11
12
13
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
这里,首先获取连接的配置信息.然后获取此Channel绑定的管道,从设置中获取缓冲区分配器,和分配器的handler.
在向下看1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
先为byteBuf分配一个合适的缓冲区.只有调用doReadBytes(byteBuf)方法将可读数据写入byteBuf中.实现代码1
2
3
4
5
6// 获取handler
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
// 将可读数据变为byteBuf的可写数据
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
// 读取数据到buf
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
从javaChannel中读取数据到buf中,长度为bytebuf可写的长度.当然真正读取到的数据不一定比length长,可能会小,返回读取的字节数.
下面接着看read()方法1
2
3
4
5
6
7
8
9
10allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
设置上次读取字节的数量,之后判断是否读取到数据,如果没有读取到数据,则释放byteBuf. 如果读取数据小于0,则说明发生了异常.设置close为false.读取失败.需要关闭.退出循环.
如果读取成功则说明完成了一次异步读取事件.这个时候会出发一次ChannelRead()事件,这里有一点就是,完成一次读操作并不代表着读到了一条完整的信息.也可能是一条不完整的.可能会有粘包现象.所以需要在pipeline中自行处理.之后再将bytebuf释放. 因为每次读操作都未必能够完成缓冲区的全部读取工作,所以会对读取的字节数进行累加.在累加之前会对长度上线做保护,如果累计读取的字节数已经发生溢出,则将读取的字节数设置为整形最大值,退出循环.主要是为了防止本次读取时间过长,影响后面排队的task任务.这里如果没有溢出,执行累加操作. 在循环体内,会判断读操作的次数,默认一次最多只能执行16此读操作.如果超过,不允许在读,等待下一次selector轮询周期在执行.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25do {
...
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
} while (allocHandle.continueReading());
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
}
~~~
这个默认的读消息次数为16此,因此到达16次之后会直接退出,等待下一次再继续读.当读取消息完成之后,出发读操作完成事件.如果中间失败过,则关闭此链接.
~~~java
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
最后如果设置了非自动读.则取消掉注册上去的读操作事件.1
2
3
4
5finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
NioMessageUnsafe源码分析
此类是用来给服务端使用的,用来接受新来的连接.有一个链表字段,用来存储新过来的连接. 也是只复写了父类的read()方法,与NioByteUnsafe的read()方法基本一致,但是在调用的实际子类的实现方法上,调用的是
doReadMessages()方法.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public void read() {
assert eventLoop().inEventLoop();
// 获取配置
final ChannelConfig config = config();
// 获取管道
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
// 设置config
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 读取message到readBuf()中
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// 增加一次消息处理次数,最大为16
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
// 将缓冲区中的消息都传入管道中
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
// 清除消息
readBuf.clear();
allocHandle.readComplete();
// 出发读完成操作
pipeline.fireChannelReadComplete();
// 如果出现异常,关闭连接,出发异常事件
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
1 |
|