Netty服务端启动分析

Netty服务端

大家都知道Netty启动服务端只需要配置好参数,然后调用bind()函数就可以启动了.下面这段代码就是普遍的的Netty中服务器的写法.我们从下面这段开始分析.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
bootstrapServer.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
for (ChannelHandler handler : channelHandlers){
ch.pipeline().addLast(handler);
}
}
})
.option(ChannelOption.SO_BACKLOG,1024)
.option(ChannelOption.SO_RCVBUF,1024)
.option(ChannelOption.SO_SNDBUF,1024)
.childOption(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture cf = bootstrapServer.bind(port).sync();

本文略长,请耐心解读.本文分为四部分,分为NioServerSocketChannel的创建,初始化,注册和端口绑定四部分,也可以按需浏览.

启动

从启动开始说起,启动之后,Bootstrap配置的参数才会一一设置到Channel中.从bind()方法说起.
bind()可以分为一下几个步骤:

  1. 检查参数合法性,检查的是BootStrap中的设置的合法性,比如说是否有childHandler和childGroup
  2. 调用initAndRegister()方法,初始化和注册NioChannel
  3. 调用newChannel()方法创建Channel. 在newChannel()中初始化Channel和配置各种参数.
  4. 真正的绑定端口
1
ChannelFuture cf = bootstrapServer.bind(port).sync();

一般我们就是通过bind()一个端口,然后进行启动服务端的.接下来看一看bind()到底做了什么

1
2
3
4
5
6
7
8
9
public ChannelFuture bind(SocketAddress localAddress) {
// 这一句是检验参数的有效性,比如是否有childHandler 和 childGroup
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
// 真正执行bind()操作
return doBind(localAddress);
}

在看doBind()方法

1
2
3
4
5
6
7
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel>{
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
...
}
}

有一个initAndRegister方法()

1
2
3
4
5
6
7
8
9
10
11
12
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel>{
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
...
}
...
}
}

这里可以看到是通过channelFactory来获得一个Channel 的,这个Factory又是个什么呢.其实在Netty默认的是ReflectiveChannelFactory.通过newChannel来获得一个Channel.

1
2
3
4
5
6
7
8
9
10
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
public T newChannel() {
try {
// 这里就是通过反射获得一个Channel
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
}

但是这个Factory是什么时候来的呢? 我们知道在我们配置BootStrap的时候我们会设置一个Channel的参数,如下面这个样子调用

1
.channel(NioServerSocketChannel.class)

这里设置了NioServerSocketChannel. 所以这里constructor其实就是一个NioServerSocketChannel的类型.

1
2
3
4
5
6
7
8
9
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel>{
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
// 这里也就是将 channelFactory设置为ReflectiveChannelFactory.
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
}

所以也就很清晰明了了.这个channelFactory的由来.是通过上面传入的channel的参数来建立的.

创建

上面说到newChannel()来看一下newChannel()是怎么创建一个Channel的
如果按照调用顺序最后一直点进去会发现一个语句,也就是newSocket方法.
这里创建ServerSocketChannel也有几步要走,总结一下如下:

  1. 首先通过SelectorProvider创建一个ServerSocketChannel
  2. 根据ServerSocketChannel初始化和创建Netty封装的NioServerSocketChannel
  3. 初始化NioServerSocketChannelConfig配置参数.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public class NioServerSocketChannel{
    // 这里调用Jdk中的SelectorProvider
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    // 通过调用newSocket来返回一个JDK的ServerSocketChannel.
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
    // 使用SelectorProvider返回一个SocketChannel
    return provider.openServerSocketChannel();
    } catch (IOException e) {
    throw new ChannelException(
    "Failed to open a server socket.", e);
    }
    }
    // 上面提到factory调用newInstance来初始化NioServerSocket. 调用默认构造方法.
    public NioServerSocketChannel() {
    // 首先调用newSocket()创建ServerSocketChannel,接着根据此Channel初始化NioServerSocketChannel.
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

    public NioServerSocketChannel(ServerSocketChannel channel) {
    // 这里可以看是有JDK的身影,就是OP_ACCEPT,但是这里的OP_ACCEPT并不是马上就注册到选择器上的.这里只是初始化感兴趣事件而已.具体的注册函数在绑定操作实现的.之后在继续初始化channel,设置为非阻塞
    super(null, channel, SelectionKey.OP_ACCEPT);
    // 初始化NioServerSocketChannelConfig
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    }

再向深入了解,也就是上面的super()调用的方法就可以看到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
// 设置感兴趣的事件
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}

先将channel赋值,然后在设置感兴趣的事件.之后再设置为非阻塞.

之后再看parent方法

1
2
3
4
5
6
7
8
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
}

这里就是真正的设置Channel的参数.前面说过,每一个Channel都会有一个ChannelPipeline()于之相关联,并且会有一个Id.这里就是创建它们的地方.而这个unsafe 是用来实现I/O传输的.

初始化

到了这里基本上服务器Channel的创建就有了一个直观的了解.
现在再来看一下initAndRegister()方法中init()方法,这就是初始化ServerChannel的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel>{
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 这里刚才讲过了是创建了一个channel. 但是大致参数都没有配置,这里就是配置的过程.
channel = channelFactory.newChannel();
// 这里的init()方法就是配置的过程.
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}

ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class ServerBootstrap{
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
// 这里是设置Channel的Option参数.
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
// 设置Attr值
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}

ChannelPipeline p = channel.pipeline();
// 接下来就是设置childChannel的参数了.childChannel的参数也就是为新建的SocketChannel设置的参数.
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
// 获得选项参数
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
// 获取附加的值
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
// 对管道进行处理.为管道添加处理器.这里添加的处理器是为父类的.
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
}

在最后进行对管道的处理,向管道中添加config的一些handler.
最后再异步想管道中添加一个处理程序也就是ServerBootstrapAcceptor. 将设置好的child的所有参数都传输进去.
group,handler,option,attrs等.

接下来看一下ServerBootstrapAcceptor类,这是ServerBootstrap的内部类.通过上面可以看到.这个Acceptor被添加到管道的最后一个位置.所以消息到这里通过前面一系列的处理,已经是一个Channel.这个时候就需要将Channel注册到childGroup中.
可以分为一下几步:

  1. 复写父类的channelRead()方法,监听连接事件
  2. 当有新连接到来的时候,调用channelRead()方法,获取新建立的channel.
  3. 为这个新建的channel配置参数,包括选项,pipeline中的处理器
  4. 将此Channel注册到childGroup中,监听事件.

这个类不长,稍微看一看就能理解大概了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
private final Runnable enableAutoReadTask;

// 初始化,将child的参数都设置好,并且保存好
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
enableAutoReadTask = new Runnable() {
@Override
// 这里设置自动读取
public void run() {
channel.config().setAutoRead(true);
}
};
}

// 覆写Read()事件,在这个方法中,将新建立的连接添加到childGroup中.
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 为新建的连接添加childHandler()处理器.
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 将此Channel注册到childGroup上.
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
// 添加回调函数,如果失败,则关闭.
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

private static void forceClose(Channel child, Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: {}", child, t);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
}
ctx.fireExceptionCaught(cause);
}
}

到这里初始化就基本完成了.初始化完成就应该到注册了.

注册

上面初始化完成之后就要调用注册方法,也就是要将此ServerChannel注册到Selector上.在看一下initAndRegister方法

1
2
3
4
5
6
7
8
9
10
11
12
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel>{
final ChannelFuture initAndRegister() {
...
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
}

在init()初始化Channel之后有这么一段代码,可以看出来就是将channel注册到Selector上的.继续点进去之后会发现以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
// 判断是否已经注册过了
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}

AbstractChannel.this.eventLoop = eventLoop;
// 这里主要是看当前线程是否是EventLoop中Channel绑定的线程.如果是,直接调用,如果不是,则异步调用
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

}

这里就是真正的注册逻辑.首先判断eventLoop是否为空,在判断是否已经注册过了. 接着看这一句:

1
AbstractChannel.this.eventLoop = eventLoop;

这里就是将此Channel绑定到此EventLoop上.这个EventLoop也就是当初我们调用方法b.group(bossGroup, workerGroup)中的bossGroup中的一个eventLoop.在Netty中,一个EventLoop包含多个Channel.而每个Channel的事件都是由同一个EventLoop来执行的. 这里就是执行真正的绑定操作. 再向下走会执行register0()方法.来看一下register0()方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 真正的执行注册方法的地方
doRegister();
neverRegistered = false;
registered = true;
// 唤醒并且传播handlerAdd
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// 传播ChannelRegistered()方法
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

这里又做了几件事情.

  • doRegister()就是调用jdk的register方法进行注册.并且把NioServerSocketChannel自己作为attachment添加进去.
  • 调用pipeline.invokeHandlerAddedIfNeeded(); 也就是唤醒HandlerAdd()事件.就是普通复写的handlerAdded()事件将会被触发.
  • pipeline.fireChannelRegistered();传播事件,这里传播Registered()事件.
    这里应该就是在刚刚注册好的时候判断一下是否有连接到达.如果有,并且是第一次连接,就传播此事件.

再来看一下doRegister()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 这里就是真正执行注册的地方.
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}

上面javaChannel()也就是java中的channel,第一个参数就是此eventLoop中的selector选择器.0代表不监听任何事件.再讲自己作为attachment传进去.当下一次获得事件的时候可以直接把此NioServerSocketChannel拿出来用,然后做一些事件传播.

1
2
3
4
5
6
7
8
9
10
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
// 如果是第一次调用,肯定为true
if (firstRegistration) {
firstRegistration = false;
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
callHandlerAddedForAllHandlers();
}
}

看官方的注释,就是说在整个注册过程还没有全部完成之前应该先回调ChannelHandlers中的handlerAdd()方法.所以就是在这里回调我们的复写的handlerAdd()方法的.

1
2
3
4
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}

在这里调用毁掉我们的Register方法.所以handlerAdd事件是发生在register事件之前的.

到这里整个注册过程应该就明了了. 在总结一下:

  1. 将此ServerSocketChannel绑定到对应的eventLoop上
  2. 执行注册操作
  3. 调用jdk中的注册方法,将javaChannel注册到Selector上,并且将对应的NioServerSocketChannel作为attachment添加上去
  4. 回调handler中handlerAdd()方法和register()方法
  5. 注册完成.返回,继续执行绑定操作

注意,这里注册完成之后,Channel并没有完成绑定操作,也就是说,注册在Selector上的Channel并没有绑定端口.

绑定

在一系列的准备工作完成之后,就是真正执行绑定操作的时候了.到这里initAndRegister方法算是结束了. 在来看一下doBind()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
}

可以看到,在这里又有一个回调,因为在Netty中都是异步执行的,所以初始化和Register也都是异步执行的.所以这里会先判断是否已经初始化和注册完成.如果完成,直接调用这个doBind0()方法,如果没有完成,添加一个监听事件,在完成之后调用doBind0()方法.
在来看一下doBind0()方法都做了什么事情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// 这里也是异步执行bind操作,是调用的channel的bind()方法的.这里的channel也就是传入的Channel的类型.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
}

再向调用栈的深处走,这个bind()方法会调用AbstractChannel中的内部类AbstractUnsafe的bind()方法,也就是如下的方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
protected abstract class AbstractUnsafe implements Unsafe {
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();

if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}

if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}

safeSetSuccess(promise);
}
}

发现这里会调用doBind()方法,这个doBind()方法就是我们bind()方法的真正实现了.也就是在我们为bootstrap配置的Channel的bind()方法.这里调用的是NioServerSocketChannel的doBind()方法

1
2
3
4
5
6
7
8
9
10
11
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
}

可以看到,这里就是将javaChannel()方法绑定到本地的某个端口上.这里就是调用的jdk的bind()方法来绑定端口和地址的.
然后再Unsafe的bind()方法的最后,这里是要传播Active()事件. 不知道大家发现没有,在刚才注册的阶段,并没有channel并没有注册感兴趣的事件,而是没有感兴趣的事件.下面在看到底在哪里注册感兴趣的事件.就是在这个fireChannelActive中

1
2
3
4
5
6
7
8
9
10
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
}

再向下走,会调用pipeline中的头结点的channelActive()方法,关于这个头结点可以理解为pipeline中的出入口.所有从出入管道的数据都要走这里.这里也应该是传播读事件的开端.具体的可以看另一篇文章. 这里要传播active方法.

1
2
3
4
5
6
7
8
9
10
11
12
public class DefaultChannelPipeline implements ChannelPipeline {
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();

readIfIsAutoRead();
}
}

}

在这里,有两件事情要做.

  1. 传播Active事件
  2. 注册accept事件.

传播Active事件都不说了,和上面传播handlerAdd事件一样.重点说一下注册accept事件.从readIfAutoRead()方法开始
最终会调用到AbstractNioChannel的doBeginRead()方法上.调用堆栈如下:

下面看一下doBeginRead()方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;

final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
}

到这里调用doBeginRead()方法,我们刚开始的时候说过,将Channel注册到Selector上的时候,并没有感兴趣的时间.因此这里的interestOps应该为0,但是我们在创建NioServerSocketChannel()的时候,Netty中的Channel是有传入过要注册事件的.

1
2
3
4
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

这里想就向这个AbstractNioChannel的readInterestOp赋值,注意,这里的readInterestOp是感兴趣的事件,并不是读事件.也就是感兴趣事件为接受连接.所以上面的代码就容易理解了.就是将新建的NioServerSocketChannel的刚兴趣的事件真正绑定到javaChannel上去.到这里.

总结

服务端启动其实就类似与平时我们编写NIO的方式,不过就是为我们做好了很多封装.我们只需要配置好参数,并且写好业务逻辑就好了,不需要我们去管理连接的Channel.极大的简化了我们的编程. 这里在做一下启动过程的大致流程的总结.

配置属性

  1. 首先有两个EventLoopGroup,一个为parent,用来监听连接事件,一个为child,用来监听已经建立好的Channel的事件.
  2. 设置Channel的类型,是NIO还是OIO或者是AIO
  3. 设置parentChannel的选项参数,
  4. 设置childHandler.为建立好的连接添加处理器
  5. 设置childOption选项

启动

  1. 调用bind()方法启动服务器.
  2. 新建一个Channel(这个时候也会初始化此Channel的Pipeline)
  3. 初始化Channel
  4. 将Channel注册上去,并且将NioSelectorChannel作为attachment添加上去
  5. 绑定端口,为channel添加感兴趣事件.

到这里就算完成了.整个Netty服务端的启动过程就大致明了了.