博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty源码分析之-服务端启动核心源码分析(5)
阅读量:4290 次
发布时间:2019-05-27

本文共 8748 字,大约阅读时间需要 29 分钟。

前面分析过引导的作用,这里将分析启动引导也就是对应的客户端或者服务端启动的核心流程

ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();

bind方法具体实现如下:

private ChannelFuture doBind(final SocketAddress localAddress) {        final ChannelFuture regFuture = initAndRegister();   ①        final Channel channel = regFuture.channel();        if (regFuture.cause() != null) {            return regFuture;        }        if (regFuture.isDone()) {            // At this point we know that the registration was complete and successful.            ChannelPromise promise = channel.newPromise();            doBind0(regFuture, channel, localAddress, promise);  ②            return promise;        } else {            // Registration future is almost always fulfilled already, but just in case it's not.            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);            regFuture.addListener(new ChannelFutureListener() {                @Override                public void operationComplete(ChannelFuture future) throws Exception {                    Throwable cause = future.cause();                    if (cause != null) {                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an                        // IllegalStateException once we try to access the EventLoop of the Channel.                        promise.setFailure(cause);                    } else {                        // Registration was successful, so set the correct executor to use.                        // See https://github.com/netty/netty/issues/2586                        promise.registered();                        doBind0(regFuture, channel, localAddress, promise);                    }                }            });            return promise;        }    }

对于①处initAndRegister()方法实现代码:

final ChannelFuture initAndRegister() {        Channel channel = null;        try {            channel = channelFactory.newChannel();            init(channel);        } catch (Throwable t) {            if (channel != null) {                // channel can be null if newChannel crashed (eg SocketException("too many open files"))                channel.unsafe().closeForcibly();            }            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);        }        ChannelFuture regFuture = config().group().register(channel);        if (regFuture.cause() != null) {            if (channel.isRegistered()) {                channel.close();            } else {                channel.unsafe().closeForcibly();            }        }        return regFuture;    }

initAndRegister方法中channelFactory是AbstractBootstrap中持有的成员变量,是我们之前执行:

serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)

channel(NioServerSocketChannel.class)进行赋值的,也就是调用来之前分析的关于反射的部分,调用默认的无参构造方法生成对象:

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();...public NioServerSocketChannel() {        this(newSocket(DEFAULT_SELECTOR_PROVIDER));    }...private static ServerSocketChannel newSocket(SelectorProvider provider) {        try {            return provider.openServerSocketChannel();        } catch (IOException e) {            throw new ChannelException(                    "Failed to open a server socket.", e);        }    }

通过SelectorProvider打开一个SocketChannel通道,openServerSocketChannel是对SocketChannel.open()方法阻塞上的一个优化,也就是会创建于一个ServerSocketChannel返回

initAndRegister方法中init(channel)的实现代码:

void init(Channel channel) throws Exception {    final Map
, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map
, Object> attrs = attrs0(); synchronized (attrs) { for (Entry
, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey
key = (AttributeKey) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry
, Object>[] currentChildOptions; final Entry
, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer
() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });}

init方法主要对传入的Channel进行一些通用属性的设置,最后会添加一个特殊的ChannelInitializer来向pipeline中添加一些列用来进行初始化的ChannelHandler,其中ChannelHandler handler = config.handler();可以发现是之前引导初始化传入的handler:

serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).                    handler(new LoggingHandler(LogLevel.WARN)).                    childHandler(new MyServerInitializer());

如果有通过handler方法设置一个channelHandler,则在init初始化ServerSocketChannel的时候会添加到对应的pipeline中的最后,对于最后向pipeline中添加的ServerBootstrapAcceptor具体实现如下:

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
... public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry
, Object> e: childAttrs) { child.attr((AttributeKey
) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } ...}

由此可见ServerBootstrapAcceptor是属于InboundHandler的一种,ServerBootstrapAcceptor在channelRead事件触发的时候(也就有客户端连接的时候),把childHandler加到childChannel Pipeline的末尾,设置childHandler的options和attrs,最后把childHandler注册进childGroup

回到doBind方法中doBind0(regFuture, channel, localAddress, promise)实现:

private static void doBind0(            final ChannelFuture regFuture, final Channel channel,            final SocketAddress localAddress, final ChannelPromise promise) {        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up        // the pipeline in its channelRegistered() implementation.        channel.eventLoop().execute(new Runnable() {            @Override            public void run() {                if (regFuture.isSuccess()) {                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);                } else {                    promise.setFailure(regFuture.cause());                }            }        });    }

channel.bind的调用绑定对应的SocketAddress到指定的Channel同时一旦完成并通知添加的监听

回到最初的initAndRegister方法中的ChannelFuture regFuture = config().group().register(channel);

//在AbstractNioChannel中的:protected void doRegister() throws Exception {        boolean selected = false;        for (;;) {            try {                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);                return;            } catch (CancelledKeyException e) {                if (!selected) {                    // Force the Selector to select now as the "canceled" SelectionKey may still be                    // cached and not removed because no Select.select(..) operation was called yet.                    eventLoop().selectNow();                    selected = true;                } else {                    // We forced a select operation on the selector before but the SelectionKey is still cached                    // for whatever reason. JDK bug ?                    throw e;                }            }        }    }
private final SelectableChannel ch;...protected SelectableChannel javaChannel() {        return ch;    }...private Selector unwrappedSelector;

可以看到底层的实现也就是java nio中的将Channel注册到Selector上

转载地址:http://zhrgi.baihongyu.com/

你可能感兴趣的文章
搜集的动植物分类、检索网站
查看>>
ffmpeg源码分析之媒体打开过程
查看>>
Ubuntu/centos/redhat/SUSE sipp安装(带rtp支持,3.5.1版本)
查看>>
周鸿祎:很多程序员聪明,但我一看就知道他不会成功
查看>>
编译程序遇到问题 relocation R_X86_64_32 against `.rodata' can not be used when making a shared object;
查看>>
Const指针 、 指向const的指针 、引用、指针
查看>>
GDB调试命令
查看>>
常见数据类型的字节数
查看>>
gcc/g++ 编译常见问题
查看>>
【设计模式】 工厂模式实例 C++ , 利用工厂模式写一个计算器
查看>>
opencv
查看>>
【图像处理】 一、OSTU分割
查看>>
Android常用的框架有哪些
查看>>
SSL 证书创建与部署
查看>>
学习搭建人工智能环境遇到的问题
查看>>
自己编写的计算器
查看>>
视频物体检测
查看>>
Tomcat启动遇到的问题
查看>>
Gradle-gradlew
查看>>
grpc-整合gradle与代码生成
查看>>