java NIO之Selector

概述

Selector 是一个选择器,主要用法就是,通过将一个通道注册到selector中,然后在通过select方法获取到你监听的事件绑定的键. 相当于是以前需要不断轮询去判断io流是否可读,或者可写,或者新的连接已经到来.现在通过Selector帮你判断,当有可用的连接或者可读可写时,selectKeys()方法会返回一个可用的通道集合.通过调用selector.select()来实现,这个方法是阻塞的,当没有准备就绪的通道会阻塞.当然了,这一切包括通道和注册事件都是由一个SelectionKey保存的.
所以在讲解selector之前必须得现有一个SelectionKey和Channel的概念.

SelectionKey

讲到Selector必须得将SelectionKey,因为每次向Selector中注册一个事件就会创建一个选择键,可以通过它的cancel方法,close等方法来关闭它或者取消它.

当把通道注册到Selector中的时候,Selector开始监听,但是Selector的返回就绪事件的时候总是返回一个SelectionKey,在这个key中包含了当初注册的通道,拿到这个通道然后开始做自己的事情,比如说读写,建立连接.
SelectionKey有四种事件:

  • int OP_ACCEPT = 1 << 4; //监听接收连接请求
  • int OP_CONNECT = 1 << 3;// 监听连接完成
  • int OP_WRITE = 1 << 2; // 监听可写
  • int OP_READ = 1 << 0; // 可读

事件之间可以通过或运算进行组合,比如:

1
int interestOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

这里这个interestOps就是读和写事件的合集.因为每个事件都是2的整数倍,所以或运算之后判断每一位的状态就可以知道注册的事件合集

两种事件集合:interestOps 和 readyOps

interestOps就是感兴趣的事件集合,通过register方法注册时会调用此值,可以通过Selectionkey.interestOps()方法获得
readyOps 是就绪事件集合,可通过 SelectionKey readyOps() 获取。
下面来看一下SelectionKey中的具体的方法

具体方法:

返回值 方法名 用途
Object attach(Object obj) 将给定的对象附加到此键
Object attachment() 返回附加给此键的对象
abstract void cancel() 请求取消此键的通道到其选择键的注册
Channel channel() 返回为之创建此键的通道
SelectionKey interestOps(int ops) 将此键的 interest 集合设置为给定值。
boolean isAcceptable() 测试此键的通道已经准备好接受新的套接字连接
boolean isConnectable() 测试此键的通道是否已经建立好连接
boolean isReadable() 测试是否有数据可读
boolean isValid() 此键是否有效
boolean isWriteable() 通道是否可写
boolean selector() 返回为此选择器创建的键。

ServerSocketChannel

ServerSocketChannel其实就是对ServerSocket的一个封装,使得它能够非阻塞接受请求.将socket变成通道形式的.具体类图看一下

上面说到向Selector中注册选择键,但是这个注册选择键又是怎么注册的呢.看下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 这里就是新建一个Selector
Selector selector = Selector.open();
// 这里建立一个服务的SocketChannel通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8888));
// 这里执行注册监听连接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// accept 方法接受一个连接,返回一个SocketChannel.
SocketChannel socketChannel =serverSocketChannel.accept();
// socket 返回一个ServerSocket. ServerSocket套接字就是服务端的套接字,用来接受连接请求.
ServerSocket socket = serverSocketChannel.socket();

这里调用register()方法就将这个Channel注册到了selector中.之后就会监听此通道的accept请求了.当然这里只是说明了注册方式,具体细节请继续看下去.
方法不多,但是很多方法都是继承子父类的,比如上面代码中的bind,register,configureBlocking等方法.

SocketChannel

SocketChannel,这个通道主要就类似于普通的Socket,但是使用通道来表示Socket的连接,通过通道和Selector和选择键可以实现非阻塞IO.
主要方法: 这里用一个例子来演示

1
2
3
4
5
6
// 也是通过open()打开一个连接
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",8000));
// 与ServerSocketChannel一样,也配置是异步
socketChannel.configureBlocking(false);
// 这里有一点就是如果要注册到Selector中的话必须首先设置为异步的.
socketChannel.register(selector, SelectionKey.OP_READ);

套接字通道不是连接网络套接字的完整抽象。必须通过调用 socket 方法所获得的关联 Socket 对象来完成对套接字选项的绑定、关闭和操作。不可能为任意的已有套接字创建通道,也不可能指定与套接字通道关联的套接字所使用的 SocketImpl 对象。

通过调用此类的某个 open 方法创建套接字通道。新创建的套接字通道已打开,但尚未连接。试图在未连接的通道上调用 I/O 操作将导致抛出 NotYetConnectedException。可通过调用套接字通道的 connect 方法连接该通道;一旦连接后,关闭套接字通道之前它会一直保持已连接状态。可通过调用套接字通道的 isConnected 方法来确定套接字通道是否已连接。

套接字通道支持非阻塞连接:可创建一个套接字通道,并且通过 connect 方法可以发起到远程套接字的连接,之后通过 finishConnect 方法完成该连接。可通过调用 isConnectionPending 方法来确定是否正在进行连接操作。

可单独地关闭 套接字通道的输入端和输出端,而无需实际关闭该通道。调用关联套接字对象的 shutdownInput 方法来关闭某个通道的输入端将导致该通道上的后续读取操作返回 -1(指示流的末尾)。调用关联套接字对象的 shutdownOutput 方法来关闭通道的输出端将导致该通道上的后续写入操作抛出 ClosedChannelException。

套接字通道支持异步关闭,这与 Channel 类中所指定的异步 close 操作类似。如果一个线程关闭了某个套接字的输入端,而同时另一个线程被阻塞在该套接字通道上的读取操作中,那么处于阻塞线程中的读取操作将完成,而不读取任何字节且返回 -1。I如果一个线程关闭了某个套接字的输出端,而同时另一个线程被阻塞在该套接字通道上的写入操作中,那么阻塞线程将收到 AsynchronousCloseException。

多个并发线程可安全地使用套接字通道。尽管在任意给定时刻最多只能有一个线程进行读取和写入操作,但数据报通道支持并发的读写。connect 和 finishConnect 方法是相互同步的,如果正在调用其中某个方法的同时试图发起读取或写入操作,则在该调用完成之前该操作被阻塞

Selector

Selector其实就是一个I/O多路复用器.通过复用

接下来主要说一下Selector类,这个类是NIO的核心类,如果没有它,那么可能我们还必须要像以前一样,要么阻塞在读或者写事件,要么轮询是否可读写,浪费cpu资源.Selector中可以是通过将一个通道注册上,当事件发生的时候将此通道所绑定的选择键返回给调用者,让调用这去处理,而不必让每个调用者轮询此连接是否可读可写.

这里通过SelectionKey来表示从通道到选择器的注册:
在Selector中有三种键的集合:

  • 已选择集合: 是在前一次选择期间操作期间,检测每个键的通道是否已经至少为该键的相关操作集所标识的一个操作准备就绪,此集合由 selectedKeys 方法返回
  • 已取消集合: 是被取消但是通道尚未注销的集合.不可直接访问此集合.
  • 注册集合: 表示所有注册到此选择器上的键.包括以取消但是还未注销的键.通过keys()返回
    当新创建Selector时,三个集合都是空的,每当有一个通道通过register向Selector注册该通道的时候,也会想选择器的键集添加一个与此通道绑定的键.而且需要在选择期间从键集中移除已经取消的键.
    不管是通过关闭某个键的通道还是调用该键的 cancel 方法来取消键,该键都被添加到其选择器的已取消键集中。取消某个键会导致在下一次选择操作期间注销该键的通道,而在注销时将从所有选择器的键集中移除该键。

通过选择操作将键添加到已选择键集中,可以通过调用已选择键集的remove()方法,或者调用该键集的迭代器的remove()方法移除此键.只能通过这两种方式来移除键集中的键.

创建

Selector不能直接实例化,需要使用Selector.open()来初始化:

1
Selector selector = Selector.open();

这里就打开连接了,当然了,建立一个Selector并不是这么简单,java帮我们封装了很多api,一些底层实现我们是不知道的,既然要理解Selector,这里我们也要看一下到底它都干了什么.

1
2
3
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}

发现open()函数其实是调用了一个SelectorProvider()的函数来进行创建的.在进去调用栈看一看.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
// 创建默认的SelectorProvider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}

这里调用Provider函数,创建一个SelectorProvider实例,在这个实例又调用这个DefaultSelectorProvider.create()真正创建一个Selecotr实例:

1
2
3
4
5
6
7
8
public static SelectorProvider create() {
String var0 = (String)AccessController.doPrivileged(new GetPropertyAction("os.name"));
if (var0.equals("SunOS")) {
return createProvider("sun.nio.ch.DevPollSelectorProvider");
} else {
return (SelectorProvider)(var0.equals("Linux") ? createProvide("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider());
}
}

看到这里应该明白了,其实到这里能看出来,如果是linux系统的话,Selector就是通过使用epoll来实现的.如果是SunOS就用DevPoll实现.到了这一步,大致对创建就有了一个了解.

通道注册

上面已经提到,通道注册的话就是需要调用通道的register方法来注册到selector中,不如:

1
2
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

既然已经知道了这个Selector是使用epoll来实现的,那么注册应该就是使用的是函数epoll_ctl()函数了.但是java中确不是这么实现的.在register的调用栈中,确实没有出现epoll_ctl()函数的调用.它是通过先把所有的注册事件都放在了一个事件集合中,然后在选择的时候再进行统一注册.
接下来看一下register方法的调用过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// 调用register方法
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
// 如果通道没打开,则抛异常
if (!isOpen())
throw new ClosedChannelException();
// 如果是无效事件,抛异常
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
//如果是阻塞模式,抛异常
if (blocking)
throw new IllegalBlockingModeException();

// 这里findkey方法会首先查看此通道是否已经被注册过
SelectionKey k = findKey(sel);
// 如果注册过,将此事件加入到感兴趣事件中
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
// 如果没有,则去注册
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}

public abstract class SelectorImpl extends AbstractSelector {
protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
if (!(var1 instanceof SelChImpl)) {
throw new IllegalSelectorException();
} else {
// 新建一个SelectionKeyImpl
SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
var4.attach(var3);
synchronized(this.publicKeys) {
// 调用注册函数
this.implRegister(var4);
}
// 将此次注册事件添加到感兴趣事件集合中
var4.interestOps(var2);
return var4;
}
}
}

class EPollSelectorImpl extends SelectorImpl {
// 这是上面的实现了的注册函数
protected void implRegister(SelectionKeyImpl var1) {
if (this.closed) {
throw new ClosedSelectorException();
} else {
// 首先获得一个channel
SelChImpl var2 = var1.channel;
// 获取到channel 的文件描述符
int var3 = Integer.valueOf(var2.getFDVal());
// 存储 文件描述符 和 SelectionKeyImpl 的映射关系
this.fdToKey.put(var3, var1);
// 将这个事件加入到pollWrapper中
this.pollWrapper.add(var3);
// 将这个SelectionKey加入到键集合中
this.keys.add(var1);
}
}
}


public class SelectionKeyImpl extends AbstractSelectionKey {
public SelectionKey interestOps(int ops) {
ensureValid();
return nioInterestOps(ops);
}

public SelectionKey nioInterestOps(int ops) {
if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException();
// 转换并设置感兴趣的事件
channel.translateAndSetInterestOps(ops, this);
// 设置 interestOps 变量
interestOps = ops;
return this;
}
}

class class EPollSelectorImpl extends SelectorImpl {
public void putEventOps(SelectionKeyImpl ski, int ops) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
// 设置感兴趣的事件
pollWrapper.setInterest(ch.getFDVal(), ops);
}
}

class EPollArrayWrapper {
void setInterest(int fd, int mask) {
synchronized (updateLock) {
// 扩容 updateDescriptors 数组,并存储文件描述符 fd
int oldCapacity = updateDescriptors.length;
if (updateCount == oldCapacity) {
int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;
int[] newDescriptors = new int[newCapacity];
System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);
updateDescriptors = newDescriptors;
}
updateDescriptors[updateCount++] = fd;

// events are stored as bytes for efficiency reasons
byte b = (byte)mask;
assert (b == mask) && (b != KILLED);
// 存储事件
setUpdateEvents(fd, b, false);
}
}
private void setUpdateEvents(int fd, byte events, boolean force) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if (!isEventsHighKilled(key) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}
}

到了implRegister()函数的时候基本就算完事了,所以说在注册的过程中,就只是将event添加到eventsLow数组中,至于epoll_ctl的调用,则是在选择期间调用.

选择

在每次选择期间都可以将其从已选择键集或者键集中移除.移除完之后就会在下次选择期间看看是否事件发生,如果发生就继续将其添加到已选择键集中.主要有三个方法:

  • select() 返回已经准备好I/O的通道数量
  • select(long) 返回已经准备好I/O的通道数量
  • selectNow() 返回已经准备好I/O的通道数量
  • selectKeys() 返回此选择器的已选择键集
  • keys() 返回此选择器键集
    每次做选择都会有一下四步走.
  1. 将已取消键集中的键从选择器中注销.就是清空取消键集.也就是说调用了cancel()的键将在这个时候被真正的取消
  2. 在这之后会调用操作系统的epoll_ctl函数将通道感兴趣的事件注册到epoll实例中.然后开始wait事件发生.
  3. 如果说准备就绪的键已经在已选择键集中,就会更新此键的就绪操作集.如果此键不在已选择键集中,就将此键添加到已选择操作集中.更新就绪操作集
  4. 如果此过程中有过将任意键添加到取消键集中,就继续执行步骤1

在这里要说明一下,select和selectKeys()是不一样的,select方法返回的是准备好I/O的通道的数量.而selectKeys返回的是选择器的已选择键集.如果编过程的人可能都碰到过这样的一种情况,就是在select()返回是0的情况,但是selectKeys()确返回了好几个键,这里就着重说一下,这个已选择键集和取消键集.这里的已选择键集就是如果你不去手动删除你已经处理完的已选择键集,那么选择器中的已选择键集还是不会变的.也就是说当你处理完一个已选择键集,如果没有通过remove()方法删除掉其中的键的话,这些键将依然存在与已选择键集中.这也就是为什么会出现上面说的那种情况.在编程 的时候需要注意这一点.

当移除了已选择键集中的键并不会取消注册,取消有对应的cancel方法,移除已选择键集只是在调用selectKeys的时候不会在此返回上次已经处理过的准备好的通道.

取消:

这里在说一下取消操作,取消操作需要调用SelectionKey.cancel()方法,但是这个方法映射到epoll中并不会立即取消epoll中实例的注册,而是先将此键放入取消键集中,然后等到下一次select()操作时,将此取消键集中的键注销掉.