最近在学习netty网络编程,下面以netty 中 DiscardServer 例子分析其流程

先贴下DiscardServer.java 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class DiscardServer {
private int port;

public DiscardServer(int port) {
this.port = port;
}

public void run() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // 1
b.group(bossGroup, workerGroup) // 2
.channel(NioServerSocketChannel.class) // 3
.childHandler(new ChannelInitializer<SocketChannel>() { // 4
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // 5
.childOption(ChannelOption.SO_KEEPALIVE, true); // 6
ChannelFuture f = b.bind(port).sync(); // 7 bind 8 sync

f.channel().closeFuture().sync(); // 9
} finally {
workerGroup.shutdownGracefully(); // 10
bossGroup.shutdownGracefully();
}
}
}

消息处理类 DiscardServerHandler.java 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class DiscardServerHandler extends ChannelHandlerAdapter {

private Logger logger = Logger.getLogger(DiscardServerHandler.class);

public void channelRead(ChannelHandlerContext ctx, Object msg) {
logger.info("enter");
ByteBuf in = (ByteBuf)msg;
try {
logger.info("recv msg " + in.toString(CharsetUtil.US_ASCII));
} finally {
((ByteBuf)msg).release();
}
}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

好,现在现从 DiscardServer 的第1步开始分析

  1. 调用ServerBootstrap 的无参构造函数,实例化对象
    1
    2
    3
    ServerBootstrap b = new ServerBootstrap(); 
    // 对应源码
    public ServerBootstrap() { } // 什么也没干

其中ServerBootstrap 的继承关系

1
2
3
public class ServerBootstrap 
public class AbstractBootstrap
public interface Cloneable

  1. 将父子事件管理器添加至ServerBootstrap中
    1
    b.group(bossGroup, workerGroup)

ServerBootstrap group 方法代码

1
2
3
4
5
6
7
8
9
10
11
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup); // 2.1 调用 父类AbstractBootstrap group 方法, 初始化父类事件管理器
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) { // 如果已经存在则抛异常
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup; // 初始化子类时间管理器
return this; // 返回对象本身
}

2.1. super.group(parentGroup);

1
2
3
4
5
6
7
8
9
10
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return (B) this;
}

流程和ServerBootstrap.group 类似

  1. b.channel(NioServerSocketChannel.class)

利用反射机制生成 SocketChannel 实例, 调用的是 AbstractBootstrap.channel

1
2
3
4
5
6
public B channel(Class<? extends C> channelClass) { 
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); // 3.1
}

其中 NioServerSocketChannel 类的继承关系如下

1
2
3
4
5
6
7
8
9
10
public class NioServerSocketChannel                                                                             
public class AbstractNioMessageChannel
public class AbstractNioChannel
public class AbstractChannel
public interface Channel
public interface Comparable
public interface ServerSocketChannel
public interface ServerChannel
public interface Channel
public interface Comparable

3.1. 调用AbstractBootstrap.channelFactory方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
return channelFactory((ChannelFactory<C>) channelFactory);
}

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}

this.channelFactory = channelFactory; // 设置 Channel 生成工厂实例即ReflectiveChannelFactory
return (B) this;
}

其中ReflectiveChannelFactory 类的继承关系

1
2
3
public class ReflectiveChannelFactory                                           
public interface ChannelFactory // io.netty.channel.ChannelFactory
public interface ChannelFactory // io.netty.bootstrap.channelFactory

ReflectiveChannelFactory 的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public ReflectiveChannelFactory(Class<? extends T> clazz) {                     
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz; // 设置自己需要生成的Channel 实例类型
}

// 工厂方法,后面会讲到其作用
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}

  1. 设置childHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
b.childHandler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new DiscardServerHandler()); // 没来一次请求都放到pipeline 队列尾, 该函数由childHandler 调用
}
});

public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}

其中ChannelInitializer 的继承关系为

1
2
3
public class ChannelInitializer
public class ChannelHandlerAdapter
public interface ChannelHandler //定义了各种读写就绪接口

5 6. 都是添加ChannelOption, option由AbstractBootstrap调用,childOption由ServerBootstrap调用

  1. ChannelFuture f = b.bind(port).sync();

先看 bind 的实现逻辑
bind 在AbstractBootstrap 中有5个实现

1
2
3
4
5
public ChannelFuture bind() {
public ChannelFuture bind(int inetPort) // 本文中调用的
public ChannelFuture bind(String inetHost, int inetPort)
public ChannelFuture bind(InetAddress inetHost, int inetPort)
public ChannelFuture bind(SocketAddress localAddress)

最终都调了 doBind方法

1
private ChannelFuture doBind(final SocketAddress localAddress)

现在开始分析doBind 方法逻辑逻辑
AbstractBootstrap.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}

首先
AbstractBootstrap.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
final ChannelFuture regFuture = initAndRegister();

// initAndRegister 方法逻辑
final ChannelFuture initAndRegister() {
// 生成 NioServerSocketChannel 实例(之前channel 方法设定的)
final Channel channel = channelFactory().newChannel(); // 1
try {
init(channel); // 2
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}

ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.

return regFuture;
}

上述代码中 1 中 newInstance 方法进行分析
首先调用的是

1
2
3
public NioServerSocketChannel() {                                                                                    
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

其中 DEFAULT_SELECTOR_PROVIDER 定义为

1
2
3
4
5
// 这句话的最终结果就是获得java上层Selector Provider
// 据系统的不同,IO复用机制也不一样,比如linux 2.6以上使用时epoll,以下则是select/poll
// 为什么不用jdk自带的呢?或许这样性能会有所提高吧???
// 具体源码在 java/nio/channels/spi/SelectorProvider.java
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

接着newSocket 方法 => openServerSocketChannel[返回ServerSocketChannel] => 调用NioServerSocketChannel(返回ServerSocketChannel channel) 构造函数实例化

1
2


上述代码中 2 中 init 方法 源码实现如下
ServerBootstrap.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
void init(Channel channel) throws Exception {
// 获取之前设置父类的options
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}

final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}

留言

Sep 13 2015