博客 / 詳情

返回

詳細圖解Netty Reactor啓動全流程 | 萬字長文 | 多圖預警

歡迎關注公眾號:bin的技術小屋,閲讀公眾號原文

本系列Netty源碼解析文章基於 4.1.56.Final版本

image.png

大家第一眼看到這幅流程圖,是不是腦瓜子嗡嗡的呢?

image.png

大家先不要驚慌,問題不大,本文筆者的目的就是要讓大家清晰的理解這幅流程圖,從而深刻的理解Netty Reactor的啓動全流程,包括其中涉及到的各種代碼設計實現細節。

image.png

在上篇文章《聊聊Netty那些事兒之Reactor在Netty中的實現(創建篇)》中我們詳細介紹了Netty服務端核心引擎組件主從Reactor組模型 NioEventLoopGroup以及Reactor模型 NioEventLoop的創建過程。最終我們得到了netty Reactor模型的運行骨架如下:

image.png

現在Netty服務端程序的骨架是搭建好了,本文我們就基於這個骨架來深入剖析下Netty服務端的啓動過程。

我們繼續回到上篇文章提到的Netty服務端代碼模板中,在創建完主從Reactor線程組:bossGroupworkerGroup後,接下來就開始配置Netty服務端的啓動輔助類ServerBootstrap 了。

public final class EchoServer {
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        //創建主從Reactor線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主從Reactor
             .channel(NioServerSocketChannel.class)//配置主Reactor中的channel類型
             .option(ChannelOption.SO_BACKLOG, 100)//設置主Reactor中channel的option選項
             .handler(new LoggingHandler(LogLevel.INFO))//設置主Reactor中Channel->pipline->handler
             .childHandler(new ChannelInitializer<SocketChannel>() {//設置從Reactor中註冊channel的pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server. 綁定端口啓動服務,開始監聽accept事件
            ChannelFuture f = b.bind(PORT).sync();
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

在上篇文章中我們對代碼模板中涉及到ServerBootstrap 的一些配置方法做了簡單的介紹,大家如果忘記的話,可以在返回去回顧一下。

ServerBootstrap類其實沒有什麼特別的邏輯,主要是對Netty啓動過程中需要用到的一些核心信息進行配置管理,比如:

image.png

  • Netty的核心引擎組件主從Reactor線程組: bossGroup,workerGroup。通過ServerBootstrap#group方法配置。
  • Netty服務端使用到的Channel類型:NioServerSocketChannel ,通過ServerBootstrap#channel方法配置。
    以及配置NioServerSocketChannel時用到的SocketOptionSocketOption用於設置底層JDK NIO Socket的一些選項。通過ServerBootstrap#option方法進行配置。

主ReactorGroup中的MainReactor管理的Channel類型為NioServerSocketChannel,如圖所示主要用來監聽端口,接收客户端連接,為客户端創建初始化NioSocketChannel,然後採用round-robin輪詢的方式從圖中從ReactorGroup中選擇一個SubReactor與該客户端NioSocketChannel進行綁定。

從ReactorGroup中的SubReactor管理的Channel類型為NioSocketChannel,它是netty中定義客户端連接的一個模型,每個連接對應一個。如圖所示SubReactor負責監聽處理綁定在其上的所有NioSocketChannel上的IO事件。

  • 保存服務端NioServerSocketChannel和客户端NioSocketChannel對應pipeline中指定的ChannelHandler。用於後續Channel向Reactor註冊成功之後,初始化Channel裏的pipeline。
不管是服務端用到的NioServerSocketChannel還是客户端用到的NioSocketChannel,每個Channel實例都會有一個PipelinePipeline中有多個ChannelHandler用於編排處理對應Channel上感興趣的IO事件

ServerBootstrap結構中包含了netty服務端程序啓動的所有配置信息,在我們介紹啓動流程之前,先來看下ServerBootstrap的源碼結構:

ServerBootstrap

image.png

ServerBootstrap的繼承結構比較簡單,繼承層次的職責分工也比較明確。

ServerBootstrap主要負責對主從Reactor線程組相關的配置進行管理,其中帶child前綴的配置方法是對從Reactor線程組的相關配置管理。從Reactor線程組中的Sub Reactor負責管理的客户端NioSocketChannel相關配置存儲在ServerBootstrap結構中。

父類AbstractBootstrap則是主要負責對主Reactor線程組相關的配置進行管理,以及主Reactor線程組中的Main Reactor負責處理的服務端ServerSocketChannel相關的配置管理。

1. 配置主從Reactor線程組

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主從Reactor
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

     //Main Reactor線程組
    volatile EventLoopGroup group;
    //Sub Reactor線程組
    private volatile EventLoopGroup childGroup;

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        //父類管理主Reactor線程組
        super.group(parentGroup);
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
        return this;
    }

}

2. 配置服務端ServerSocketChannel

ServerBootstrap b = new ServerBootstrap();
b.channel(NioServerSocketChannel.class);
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

    //用於創建ServerSocketChannel  ReflectiveChannelFactory
    private volatile ChannelFactory<? extends C> channelFactory;

    public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(
                ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }

    @Deprecated
    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        ObjectUtil.checkNotNull(channelFactory, "channelFactory");
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }

        this.channelFactory = channelFactory;
        return self();
    }

}

在向ServerBootstrap配置服務端ServerSocketChannelchannel 方法中,其實是創建了一個ChannelFactory工廠實例ReflectiveChannelFactory,在Netty服務端啓動的過程中,會通過這個ChannelFactory去創建相應的Channel實例。

我們可以通過這個方法來配置netty的IO模型,下面為ServerSocketChannel在不同IO模型下的實現:

BIO NIO AIO
OioServerSocketChannel NioServerSocketChannel AioServerSocketChannel

EventLoopGroup Reactor線程組在不同IO模型下的實現:

BIO NIO AIO
ThreadPerChannelEventLoopGroup NioEventLoopGroup AioEventLoopGroup

我們只需要將IO模型的這些核心接口對應的實現類前綴改為對應IO模型的前綴,就可以輕鬆在Netty中完成對IO模型的切換。

image.png

2.1 ReflectiveChannelFactory

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    //NioServerSocketChannelde 構造器
    private final Constructor<? extends T> constructor;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            //反射獲取NioServerSocketChannel的構造器
            this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                    " does not have a public non-arg constructor", e);
        }
    }

    @Override
    public T newChannel() {
        try {
            //創建NioServerSocketChannel實例
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }
}

從類的簽名我們可以看出,這個工廠類是通過泛型反射的方式來創建對應的Channel實例。

  • 泛型參數T extends Channel表示的是要通過工廠類創建的Channel類型,這裏我們初始化的是NioServerSocketChannel
  • ReflectiveChannelFactory 的構造器中通過反射的方式獲取NioServerSocketChannel的構造器。
  • newChannel 方法中通過構造器反射創建NioServerSocketChannel實例。

注意這時只是配置階段,NioServerSocketChannel此時並未被創建。它是在啓動的時候才會被創建出來。

3. 為NioServerSocketChannel配置ChannelOption

ServerBootstrap b = new ServerBootstrap();
//設置被MainReactor管理的NioServerSocketChannel的Socket選項
b.option(ChannelOption.SO_BACKLOG, 100)
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    //serverSocketChannel中的ChannelOption配置
    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();

    public <T> B option(ChannelOption<T> option, T value) {
        ObjectUtil.checkNotNull(option, "option");
        synchronized (options) {
            if (value == null) {
                options.remove(option);
            } else {
                options.put(option, value);
            }
        }
        return self();
    }
}

無論是服務端的NioServerSocketChannel還是客户端的NioSocketChannel它們的相關底層Socket選項ChannelOption配置全部存放於一個Map類型的數據結構中。

由於客户端NioSocketChannel是由從Reactor線程組中的Sub Reactor來負責處理,所以涉及到客户端NioSocketChannel所有的方法和配置全部是以child前綴開頭。

ServerBootstrap b = new ServerBootstrap();
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

   //客户端SocketChannel對應的ChannelOption配置
    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();

    public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
        ObjectUtil.checkNotNull(childOption, "childOption");
        synchronized (childOptions) {
            if (value == null) {
                childOptions.remove(childOption);
            } else {
                childOptions.put(childOption, value);
            }
        }
        return this;
    }
}

相關的底層Socket選項,netty全部枚舉在ChannelOption類中,筆者這裏就不一一列舉了,在本系列後續相關的文章中,筆者還會為大家詳細的介紹這些參數的作用。

public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> {

    ..................省略..............

    public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");
    public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
    public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");
    public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
    public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");
    public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");
    public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");
    public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");

    ..................省略..............

}

4. 為服務端NioServerSocketChannel中的Pipeline配置ChannelHandler

    //serverSocketChannel中pipeline裏的handler(主要是acceptor)
    private volatile ChannelHandler handler;

    public B handler(ChannelHandler handler) {
        this.handler = ObjectUtil.checkNotNull(handler, "handler");
        return self();
    }

NioServerSocketChannel中的Pipeline添加ChannelHandler分為兩種方式:

  • 顯式添加: 顯式添加的方式是由用户在main線程中通過ServerBootstrap#handler的方式添加。如果需要添加多個ChannelHandler,則可以通過ChannelInitializerpipeline中進行添加。
關於ChannelInitializer後面筆者會有詳細介紹,這裏大家只需要知道ChannelInitializer是一種特殊的ChannelHandler,用於初始化pipeline。適用於向pipeline中添加多個ChannelHandler的場景。
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主從Reactor
             .channel(NioServerSocketChannel.class)//配置主Reactor中的channel類型
             .handler(new ChannelInitializer<NioServerSocketChannel>() {
                 @Override
                 protected void initChannel(NioServerSocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(channelhandler1)
                      .addLast(channelHandler2)
                      
                      ......
                     
                      .addLast(channelHandler3);
                 }
             })
  • 隱式添加:隱式添加主要添加的就是主ReactorGroup的核心組件也就是下圖中的acceptor,Netty中的實現為ServerBootstrapAcceptor,本質上也是一種ChannelHandler,主要負責在客户端連接建立好後,初始化客户端NioSocketChannel,在從Reactor線程組中選取一個Sub Reactor,將客户端NioSocketChannel 註冊到Sub Reactor中的selector上。
隱式添加ServerBootstrapAcceptor是由Netty框架在啓動的時候負責添加,用户無需關心。

image.png

在本例中,NioServerSocketChannelPipeLine中只有兩個ChannelHandler,一個由用户在外部顯式添加的LoggingHandler,另一個是由Netty框架隱式添加的ServerBootstrapAcceptor

其實我們在實際項目使用的過程中,不會向netty服務端NioServerSocketChannel添加額外的ChannelHandler,NioServerSocketChannel只需要專心做好自己最重要的本職工作接收客户端連接就好了。這裏額外添加一個LoggingHandler只是為了向大家展示ServerBootstrap的配置方法。

5. 為客户端NioSocketChannel中的Pipeline配置ChannelHandler

            final EchoServerHandler serverHandler = new EchoServerHandler();

            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//設置從Reactor中註冊channel的pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
            
                     p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });
    //socketChannel中pipeline中的處理handler
    private volatile ChannelHandler childHandler;

    public ServerBootstrap childHandler(ChannelHandler childHandler) {
        this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
        return this;
    }

向客户端NioSocketChannel中的Pipeline裏添加ChannelHandler完全是由用户自己控制顯式添加,添加的數量不受限制。

由於在Netty的IO線程模型中,是由單個Sub Reactor線程負責執行客户端NioSocketChannel中的Pipeline,一個Sub Reactor線程負責處理多個NioSocketChannel上的IO事件,如果Pipeline中的ChannelHandler添加的太多,就會影響Sub Reactor線程執行其他NioSocketChannel上的Pipeline,從而降低IO處理效率,降低吞吐量。

image.png

所以Pipeline中的ChannelHandler不易添加過多,並且不能再ChannelHandler中執行耗時的業務處理任務。

在我們通過ServerBootstrap配置netty服務端啓動信息的時候,無論是向服務端NioServerSocketChannel的pipeline中添加ChannelHandler,還是向客户端NioSocketChannel的pipeline中添加ChannelHandler,當涉及到多個ChannelHandler添加的時候,我們都會用到ChannelInitializer,那麼這個ChannelInitializer究竟是何方聖神,為什麼要這樣做呢?我們接着往下看~~

ChannelInitializer

image.png

首先ChannelInitializer它繼承於ChannelHandler,它自己本身就是一個ChannelHandler,所以它可以添加到childHandler中。

其他的父類大家這裏可以不用管,後面文章中筆者會一一為大家詳細介紹。

那為什麼不直接添加ChannelHandler而是選擇用ChannelInitializer呢?

這裏主要有兩點原因:

  • 前邊我們提到,客户端NioSocketChannel是在服務端accept連接後,在服務端NioServerSocketChannel中被創建出來的。但是此時我們正處於配置ServerBootStrap階段,服務端還沒有啓動,更沒有客户端連接上來,此時客户端NioSocketChannel還沒有被創建出來,所以也就沒辦法向客户端NioSocketChannel的pipeline中添加ChannelHandler
  • 客户端NioSocketChannelPipeline裏可以添加任意多個ChannelHandler,但是Netty框架無法預知用户到底需要添加多少個ChannelHandler,所以Netty框架提供了回調函數ChannelInitializer#initChannel,使用户可以自定義ChannelHandler的添加行為。

當客户端NioSocketChannel註冊到對應的Sub Reactor上後,緊接着就會初始化NioSocketChannel中的Pipeline,此時Netty框架會回調ChannelInitializer#initChannel執行用户自定義的添加邏輯。

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        //當channelRegister事件發生時,調用initChannel初始化pipeline
        if (initChannel(ctx)) {
                 .................省略...............
        } else {
                 .................省略...............
        }
    }

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                //此時客户單NioSocketChannel已經創建並初始化好了
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                 .................省略...............
            } finally {
                  .................省略...............
            }
            return true;
        }
        return false;
    }

    protected abstract void initChannel(C ch) throws Exception;
    
    .................省略...............
}

這裏由netty框架回調的ChannelInitializer#initChannel方法正是我們自定義的添加邏輯。

            final EchoServerHandler serverHandler = new EchoServerHandler();

            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//設置從Reactor中註冊channel的pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
            
                     p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

到此為止,Netty服務端啓動所需要的必要配置信息,已經全部存入ServerBootStrap啓動輔助類中。

接下來要做的事情就是服務端的啓動了。

// Start the server. 綁定端口啓動服務,開始監聽accept事件
ChannelFuture f = serverBootStrap.bind(PORT).sync();

Netty服務端的啓動

經過前面的鋪墊終於來到了本文的核心內容----Netty服務端的啓動過程。

如代碼模板中的示例所示,Netty服務端的啓動過程封裝在io.netty.bootstrap.AbstractBootstrap#bind(int)函數中。

接下來我們看一下Netty服務端在啓動過程中究竟幹了哪些事情?

image.png

大家看到這副啓動流程圖先不要慌,接下來的內容筆者會帶大家各個擊破它,在文章的最後保證讓大家看懂這副流程圖。

我們先來從netty服務端啓動的入口函數開始我們今天的源碼解析旅程:

    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }

    public ChannelFuture bind(SocketAddress localAddress) {
        //校驗Netty核心組件是否配置齊全
        validate();
        //服務端開始啓動,綁定端口地址,接收客户端連接
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }

   private ChannelFuture doBind(final SocketAddress localAddress) {
        //異步創建,初始化,註冊ServerSocketChannel到main reactor上
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {   

           ........serverSocketChannel向Main Reactor註冊成功後開始綁定端口....,               
             
        } else {
            //如果此時註冊操作沒有完成,則向regFuture添加operationComplete回調函數,註冊成功後回調。
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {

                   ........serverSocketChannel向Main Reactor註冊成功後開始綁定端口...., 
            });
            return promise;
        }
    }

Netty服務端的啓動流程總體如下:

  • 創建服務端NioServerSocketChannel並初始化。
  • 將服務端NioServerSocketChannel註冊到主Reactor線程組中。
  • 註冊成功後,開始初始化NioServerSocketChannel中的pipeline,然後在pipeline中觸發channelRegister事件。
  • 隨後由NioServerSocketChannel綁定端口地址。
  • 綁定端口地址成功後,向NioServerSocketChannel對應的Pipeline中觸發傳播ChannelActive事件,在ChannelActive事件回調中向Main Reactor註冊OP_ACCEPT事件,開始等待客户端連接。服務端啓動完成。

當netty服務端啓動成功之後,最終我們會得到如下結構的陣型,開始枕戈待旦,準備接收客户端的連接,Reactor開始運轉。

image.png

接下來,我們就來看下Netty源碼是如何實現以上步驟的~~

1. initAndRegister

image.png

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            //創建NioServerSocketChannel
            //ReflectiveChannelFactory通過泛型,反射,工廠的方式靈活創建不同類型的channel
            channel = channelFactory.newChannel();
            //初始化NioServerSocketChannel
            init(channel);
        } catch (Throwable t) {
            ..............省略.................
        }

        //向MainReactor註冊ServerSocketChannel
        ChannelFuture regFuture = config().group().register(channel);

           ..............省略.................

        return regFuture;
    }

從函數命名中我們可以看出,這個函數主要做的事情就是首先創建NioServerSocketChannel ,並對NioServerSocketChannel 進行初始化,最後將NioServerSocketChannel 註冊到Main Reactor中。

1.1 創建NioServerSocketChannel

還記得我們在介紹ServerBootstrap啓動輔助類配置服務端ServerSocketChannel類型的時候提到的工廠類ReflectiveChannelFactory 嗎?

因為當時我們在配置ServerBootstrap啓動輔助類的時候,還沒到啓動階段,而配置階段並不是創建具體ServerSocketChannel的時機。

所以Netty通過工廠模式將要創建的ServerSocketChannel的類型(通過泛型指定)以及 創建的過程(封裝在newChannel函數中)統統先封裝在工廠類ReflectiveChannelFactory中。

ReflectiveChannelFactory通過泛型反射工廠的方式靈活創建不同類型的channel

等待創建時機來臨,我們調用保存在ServerBootstrap中的channelFactory直接進行創建。

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Constructor<? extends T> constructor;

    @Override
    public T newChannel() {
        try {
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }
}

下面我們來看下NioServerSocketChannel的構建過程:

1.1.1 NioServerSocketChannel

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

    //SelectorProvider(用於創建Selector和Selectable Channels)
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

    //創建JDK NIO ServerSocketChannel
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }

     //ServerSocketChannel相關的配置
    private final ServerSocketChannelConfig config;

    public NioServerSocketChannel(ServerSocketChannel channel) {
        //父類AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要監聽的事件OP_ACCEPT
        super(null, channel, SelectionKey.OP_ACCEPT);
        //DefaultChannelConfig中設置用於Channel接收數據用的buffer->AdaptiveRecvByteBufAllocator
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

}
  • 首先調用newSocket 創建JDK NIO 原生ServerSocketChannel,這裏調用了SelectorProvider#openServerSocketChannel 來創建JDK NIO 原生ServerSocketChannel,我們在上篇文章《聊聊Netty那些事兒之Reactor在Netty中的實現(創建篇)》中詳細的介紹了SelectorProvider相關內容,當時是用SelectorProvider來創建Reactor中的Selector。大家還記得嗎??
  • 通過父類構造器設置NioServerSocketChannel感興趣的IO事件,這裏設置的是SelectionKey.OP_ACCEPT事件。並將JDK NIO 原生ServerSocketChannel封裝起來。
  • 創建Channel的配置類NioServerSocketChannelConfig,在配置類中封裝了對Channel底層的一些配置行為,以及JDK中的ServerSocket。以及創建NioServerSocketChannel接收數據用的Buffer分配器AdaptiveRecvByteBufAllocator
NioServerSocketChannelConfig沒什麼重要的東西,我們這裏也不必深究,它就是管理NioServerSocketChannel相關的配置,這裏唯一需要大家注意的是這個用於Channel接收數據用的Buffer分配器AdaptiveRecvByteBufAllocator,我們後面在介紹Netty如何接收連接的時候還會提到。

NioServerSocketChannel 的整體構建過程介紹完了,現在我們來按照繼承層次再回過頭來看下NioServerSocketChannel 的層次構建,來看下每一層都創建了什麼,封裝了什麼,這些信息都是Channel的核心信息,所以有必要了解一下。

image.png

NioServerSocketChannel 的創建過程中,我們主要關注繼承結構圖中紅框標註的三個類,其他的我們佔時先不用管。

其中AbstractNioMessageChannel類主要是對NioServerSocketChannel底層讀寫行為的封裝和定義,比如accept接收客户端連接。這個我們後續會介紹到,這裏我們並不展開。

1.1.2 AbstractNioChannel

public abstract class AbstractNioChannel extends AbstractChannel {
   //JDK NIO原生Selectable Channel
    private final SelectableChannel ch;
    // Channel監聽事件集合 這裏是SelectionKey.OP_ACCEPT事件
    protected final int readInterestOp;

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            //設置Channel為非阻塞 配合IO多路複用模型
            ch.configureBlocking(false);
        } catch (IOException e) {
            .............省略................
        }
    }
}
  • 封裝由SelectorProvider創建出來的JDK NIO原生ServerSocketChannel
  • 封裝Channel在創建時指定感興趣的IO事件,對於NioServerSocketChannel來説感興趣的IO事件OP_ACCEPT事件
  • 設置JDK NIO原生ServerSocketChannel為非阻塞模式, 配合IO多路複用模型。

1.1.3 AbstractChannel

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    //channel是由創建層次的,比如ServerSocketChannel 是 SocketChannel的 parent
    private final Channel parent;
    //channel全局唯一ID machineId+processId+sequence+timestamp+random
    private final ChannelId id;
    //unsafe用於封裝對底層socket的相關操作
    private final Unsafe unsafe;
    //為channel分配獨立的pipeline用於IO事件編排
    private final DefaultChannelPipeline pipeline;

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        //channel全局唯一ID machineId+processId+sequence+timestamp+random
        id = newId();
        //unsafe用於定義實現對Channel的底層操作
        unsafe = newUnsafe();
        //為channel分配獨立的pipeline用於IO事件編排
        pipeline = newChannelPipeline();
    }
}
  • Netty中的Channel創建是有層次的,這裏的parent屬性用來保存上一級的Channel,比如這裏的NioServerSocketChannel是頂級Channel,所以它的parent = null。客户端NioSocketChannel是由NioServerSocketChannel創建的,所以它的parent = NioServerSocketChannel
  • Channel分配全局唯一的ChannelIdChannelId由機器Id(machineId),進程Id(processId),序列號(sequence),時間戳(timestamp),隨機數(random)構成
   private DefaultChannelId() {
        data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];
        int i = 0;

        // machineId
        System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);
        i += MACHINE_ID.length;

        // processId
        i = writeInt(i, PROCESS_ID);

        // sequence
        i = writeInt(i, nextSequence.getAndIncrement());

        // timestamp (kind of)
        i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());

        // random
        int random = PlatformDependent.threadLocalRandom().nextInt();
        i = writeInt(i, random);
        assert i == data.length;

        hashCode = Arrays.hashCode(data);
    }
  • 創建NioServerSocketChannel的底層操作類Unsafe 。這裏創建的是io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe
UnsafeChannel接口的一個內部接口,用於定義實現對Channel底層的各種操作,Unsafe接口定義的操作行為只能由Netty框架的Reactor線程調用,用户線程禁止調用。
interface Unsafe {
        
        //分配接收數據用的Buffer
        RecvByteBufAllocator.Handle recvBufAllocHandle();

        //服務端綁定的端口地址
        SocketAddress localAddress();
        //遠端地址
        SocketAddress remoteAddress();
        //channel向Reactor註冊
        void register(EventLoop eventLoop, ChannelPromise promise);

        //服務端綁定端口地址
        void bind(SocketAddress localAddress, ChannelPromise promise);
        //客户端連接服務端
        void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
        //關閉channle
        void close(ChannelPromise promise);
        //讀數據
        void beginRead();
        //寫數據
        void write(Object msg, ChannelPromise promise);

    }
  • NioServerSocketChannel分配獨立的pipeline用於IO事件編排。pipeline其實是一個ChannelHandlerContext類型的雙向鏈表。頭結點HeadContext,尾結點TailContextChannelHandlerContext中包裝着ChannelHandler
ChannelHandlerContext 保存 ChannelHandler上下文信息,用於事件傳播。後面筆者會單獨開一篇文章介紹,這裏我們還是聚焦於啓動主線。

這裏只是為了讓大家簡單理解pipeline的一個大致的結構,後面會寫一篇文章專門詳細講解pipeline

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

image.png


到了這裏NioServerSocketChannel就創建完畢了,我們來回顧下它到底包含了哪些核心信息。

image.png

1.2 初始化NioServerSocketChannel

   void init(Channel channel) {
        //向NioServerSocketChannelConfig設置ServerSocketChannelOption
        setChannelOptions(channel, newOptionsArray(), logger);
        //向netty自定義的NioServerSocketChannel設置attributes
        setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

        ChannelPipeline p = channel.pipeline();
        
        //獲取從Reactor線程組
        final EventLoopGroup currentChildGroup = childGroup;
        //獲取用於初始化客户端NioSocketChannel的ChannelInitializer
        final ChannelHandler currentChildHandler = childHandler;
        //獲取用户配置的客户端SocketChannel的channelOption以及attributes
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
        }
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

        //向NioServerSocketChannel中的pipeline添加初始化ChannelHandler的邏輯
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                //ServerBootstrap中用户指定的channelHandler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    //LoggingHandler
                    pipeline.addLast(handler);
                }
                //添加用於接收客户端連接的acceptor
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
  • NioServerSocketChannelConfig設置ServerSocketChannelOption
  • 向netty自定義的NioServerSocketChannel設置ChannelAttributes

Netty自定義的SocketChannel類型均繼承AttributeMap接口以及DefaultAttributeMap類,正是它們定義了ChannelAttributes。用於向Channel添加用户自定義的一些信息。

image.png

這個ChannelAttributes的用處大有可為,Netty後邊的許多特性都是依靠這個ChannelAttributes來實現的。這裏先賣個關子,大家可以自己先想一下可以用這個ChannelAttributes做哪些事情?
  • 獲取從Reactor線程組childGroup,以及用於初始化客户端NioSocketChannelChannelInitializer,ChannelOption,ChannelAttributes,這些信息均是由用户在啓動的時候向ServerBootstrap添加的客户端NioServerChannel配置信息。這裏用這些信息來初始化ServerBootstrapAcceptor。因為後續會在ServerBootstrapAcceptor中接收客户端連接以及創建NioServerChannel
  • NioServerSocketChannel中的pipeline添加用於初始化pipelineChannelInitializer

問題來了,這裏為什麼不乾脆直接將ChannelHandler添加到pipeline中,而是又使用到了ChannelInitializer呢?

其實原因有兩點:

  • 為了保證線程安全地初始化pipeline,所以初始化的動作需要由Reactor線程進行,而當前線程是用户程序啓動Main線程不是Reactor線程。這裏不能立即初始化。
  • 初始化Channelpipeline的動作,需要等到Channel註冊到對應的Reactor中才可以進行初始化,當前只是創建好了NioServerSocketChannel,但並未註冊到Main Reactor上。

    初始化NioServerSocketChannelpipeline的時機是:當NioServerSocketChannel註冊到Main Reactor之後,綁定端口地址之前。
前邊在介紹ServerBootstrap配置childHandler時也用到了ChannelInitializer,還記得嗎??

問題又來了,大家注意下ChannelInitializer#initChannel方法,在該初始化回調方法中,添加LoggingHandler是直接向pipeline中添加,而添加Acceptor為什麼不是直接添加而是封裝成異步任務呢?

這裏先給大家賣個關子,筆者會在後續流程中為大家解答 ~

image.png

此時NioServerSocketChannel中的pipeline結構如下圖所示:

image.png

1.3 向Main Reactor註冊NioServerSocketChannel

ServerBootstrap獲取主Reactor線程組NioEventLoopGroup,將NioServerSocketChannel註冊到NioEventLoopGroup中。

ChannelFuture regFuture = config().group().register(channel);

下面我們來看下具體的註冊過程:

1.3.1 主Reactor線程組中選取一個Main Reactor進行註冊

image.png

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public EventExecutor next() {
        return chooser.next();
    }

    //獲取綁定策略
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
    
    //採用輪詢round-robin的方式選擇Reactor
    @Override
    public EventExecutor next() {
            return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    }

Netty通過next()方法根據上篇文章《聊聊Netty那些事兒之Reactor在Netty中的實現(創建篇)》提到的channel到reactor的綁定策略,從ReactorGroup中選取一個Reactor進行註冊綁定。之後Channel生命週期內的所有 IO 事件都由這個 Reactor 負責處理,如 accept、connect、read、write 等 IO 事件。

一個channel只能綁定到一個Reactor上,一個Reactor負責監聽多個channel

image.png

由於這裏是NioServerSocketChannleMain Reactor進行註冊綁定,所以Main Reactor主要負責處理的IO事件OP_ACCEPT事件。

1.3.2 向綁定後的Main Reactor進行註冊

image.png

Reactor進行註冊的行為定義在NioEventLoop的父類SingleThreadEventLoop中,印象模糊的同學可以在回看下上篇文章中的NioEventLoop繼承結構小節內容。

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    @Override
    public ChannelFuture register(Channel channel) {
        //註冊channel到綁定的Reactor上
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        //unsafe負責channel底層的各種操作
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
}

通過NioServerSocketChannel中的Unsafe類執行底層具體的註冊動作。

protected abstract class AbstractUnsafe implements Unsafe {

        /**
         * 註冊Channel到綁定的Reactor上
         * */
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            //EventLoop的類型要與Channel的類型一樣  Nio Oio Aio
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            //在channel上設置綁定的Reactor
            AbstractChannel.this.eventLoop = eventLoop;

            /**
             * 執行channel註冊的操作必須是Reactor線程來完成
             *
             * 1: 如果當前執行線程是Reactor線程,則直接執行register0進行註冊
             * 2:如果當前執行線程是外部線程,則需要將register0註冊操作 封裝程異步Task 由Reactor線程執行
             * */
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                   ...............省略...............
                }
            }
        }
}
  • 首先檢查NioServerSocketChannel是否已經完成註冊。如果以完成註冊,則直接設置代表註冊操作結果的ChannelPromisefail狀態
  • 通過isCompatible方法驗證Reactor模型EventLoop是否與Channel的類型匹配。NioEventLoop對應於NioServerSocketChannel
上篇文章我們介紹過 Netty對三種IO模型Oio,Nio,Aio的支持,用户可以通過改變Netty核心類的前綴輕鬆切換IO模型isCompatible方法目的就是需要保證ReactorChannel使用的是同一種IO模型
  • Channel中保存其綁定的Reactor實例
  • 執行ChannelReactor註冊的動作必須要確保是在Reactor線程中執行。

    • 如果當前線程是Reactor線程則直接執行註冊動作register0
    • 如果當前線程不是Reactor線程,則需要將註冊動作register0封裝成異步任務,存放在Reactor中的taskQueue中,等待Reactor線程執行。
當前執行線程並不是Reactor線程,而是用户程序的啓動線程Main線程

1.3.3 Reactor線程的啓動

上篇文章中我們在介紹NioEventLoopGroup的創建過程中提到了一個構造器參數executor,它用於啓動Reactor線程,類型為ThreadPerTaskExecutor

當時筆者向大家賣了一個關子~~“Reactor線程是何時啓動的?”

image.png

那麼現在就到了為大家揭曉謎底的時候了~~

Reactor線程的啓動是在向Reactor提交第一個異步任務的時候啓動的。

Netty中的主Reactor線程組NioEventLoopGroup中的Main ReactorNioEventLoop是在用户程序Main線程Main Reactor提交用於註冊NioServerSocketChannel的異步任務時開始啓動。

   eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });

接下來我們關注下NioEventLoopexecute方法

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    @Override
    public void execute(Runnable task) {
        ObjectUtil.checkNotNull(task, "task");
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }

    private void execute(Runnable task, boolean immediate) {
        //當前線程是否為Reactor線程
        boolean inEventLoop = inEventLoop();
        //addTaskWakesUp = true  addTask喚醒Reactor線程執行任務
        addTask(task);
        if (!inEventLoop) {
            //如果當前線程不是Reactor線程,則啓動Reactor線程
            //這裏可以看出Reactor線程的啓動是通過 向NioEventLoop添加異步任務時啓動的
            startThread();

            .....................省略.....................
        }
        .....................省略.....................
    }

}
  • 首先將異步任務task添加到Reactor中的taskQueue中。
  • 判斷當前線程是否為Reactor線程,此時當前執行線程為用户程序啓動線程,所以這裏調用startThread 啓動Reactor線程

1.3.4 startThread

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    //定義Reactor線程狀態
    private static final int ST_NOT_STARTED = 1;
    private static final int ST_STARTED = 2;
    private static final int ST_SHUTTING_DOWN = 3;
    private static final int ST_SHUTDOWN = 4;
    private static final int ST_TERMINATED = 5;

     //Reactor線程狀態  初始為 未啓動狀態
    private volatile int state = ST_NOT_STARTED;

    //Reactor線程狀態字段state 原子更新器
    private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
    AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");

    private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }

}
  • Reactor線程初始化狀態為ST_NOT_STARTED ,首先CAS更新狀態為ST_STARTED
  • doStartThread 啓動Reactor線程
  • 啓動失敗的話,需要將Reactor線程狀態改回ST_NOT_STARTED
    //ThreadPerTaskExecutor 用於啓動Reactor線程
    private final Executor executor;

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    //Reactor線程開始啓動
                    SingleThreadEventExecutor.this.run();
                    success = true;
                }
              
                ................省略..............
        }

這裏就來到了ThreadPerTaskExecutor 類型的executor的用武之地了。

  • Reactor線程的核心工作之前介紹過:輪詢所有註冊其上的Channel中的IO就緒事件處理對應Channel上的IO事件執行異步任務。Netty將這些核心工作封裝在io.netty.channel.nio.NioEventLoop#run方法中。

image.png

  • NioEventLoop#run封裝在異步任務中,提交給executor執行,Reactor線程至此開始工作了就。
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    @Override
    public void execute(Runnable command) {
        //啓動Reactor線程
        threadFactory.newThread(command).start();
    }
}

此時Reactor線程已經啓動,後面的工作全部都由這個Reactor線程來負責執行了。

而用户啓動線程在向Reactor提交完NioServerSocketChannel的註冊任務register0後,就逐步退出調用堆棧,回退到最開始的啓動入口處ChannelFuture f = b.bind(PORT).sync()

此時Reactor中的任務隊列中只有一個任務register0Reactor線程啓動後,會從任務隊列中取出任務執行。

image.png

至此NioServerSocketChannel的註冊工作正式拉開帷幕~~

image.png

1.3.5 register0

       //true if the channel has never been registered, false otherwise 
        private boolean neverRegistered = true;

        private void register0(ChannelPromise promise) {
            try {
                //查看註冊操作是否已經取消,或者對應channel已經關閉
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                //執行真正的註冊操作
                doRegister();
                //修改註冊狀態
                neverRegistered = false;
                registered = true;
                //回調pipeline中添加的ChannelInitializer的handlerAdded方法,在這裏初始化channelPipeline
                pipeline.invokeHandlerAddedIfNeeded();
                //設置regFuture為success,觸發operationComplete回調,將bind操作放入Reactor的任務隊列中,等待Reactor線程執行。
                safeSetSuccess(promise);
                //觸發channelRegister事件
                pipeline.fireChannelRegistered();
                //對於服務端ServerSocketChannel來説 只有綁定端口地址成功後 channel的狀態才是active的。
                //此時綁定操作作為異步任務在Reactor的任務隊列中,綁定操作還沒開始,所以這裏的isActive()是false
                if (isActive()) {
                    if (firstRegistration) {
                        //觸發channelActive事件
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                 ............省略.............
            }
        }

register0是驅動整個Channel註冊綁定流程的關鍵方法,下面我們來看下它的核心邏輯:

  • 首先需要檢查Channel的註冊動作是否在Reactor線程外被取消了已經!promise.setUncancellable()。檢查要註冊的Channel是否已經關閉!ensureOpen(promise)。如果Channel已經關閉或者註冊操作已經被取消,那麼就直接返回,停止註冊流程。
  • 調用doRegister()方法,執行真正的註冊操作。最終實現在AbstractChannel的子類AbstractNioChannel中,這個我們一會在介紹,先關注整體流程。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

   /**
     * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
     *
     * Sub-classes may override this method
     */
    protected void doRegister() throws Exception {
        // NOOP
    }

}
  • ChannelReactor註冊完畢後,調用pipeline.invokeHandlerAddedIfNeeded()方法,觸發回調pipeline中添加的ChannelInitializer的handlerAdded方法,在handlerAdded方法中利用前面提到的ChannelInitializer初始化ChannelPipeline

image.png

初始化ChannelPipeline的時機是當Channel向對應的Reactor註冊成功後,在handlerAdded事件回調中利用ChannelInitializer進行初始化。
  • 設置regFutureSuccess,並回調註冊在regFuture上的ChannelFutureListener#operationComplete方法,在operationComplete回調方法中將綁定操作封裝成異步任務,提交到ReactortaskQueue中。等待Reactor的執行。
還記得這個regFuture在哪裏出現的嗎?它是在哪裏被創建,又是在哪裏添加的ChannelFutureListener呢? 大家還有印象嗎?回憶不起來也沒關係,筆者後面還會提到
  • 通過pipeline.fireChannelRegistered()pipeline中觸發channelRegister事件
pipelinechannelHandlerchannelRegistered方法被回調。
  • 對於Netty服務端NioServerSocketChannel來説, 只有綁定端口地址成功後 channel的狀態才是active的。此時綁定操作regFuture上註冊的ChannelFutureListener#operationComplete回調方法中被作為異步任務提交到了Reactor的任務隊列中,Reactor線程沒開始執行綁定任務。所以這裏的isActive()false
Reactor線程執行完register0方法後,才會去執行綁定任務

下面我們來看下register0方法中這些核心步驟的具體實現:

1.3.6 doRegister()

public abstract class AbstractNioChannel extends AbstractChannel {

    //channel註冊到Selector後獲得的SelectKey
    volatile SelectionKey selectionKey;

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                ...............省略....................
            }
        }
    }

}

調用底層JDK NIO Channel方法java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int, java.lang.Object),將NettyNioServerSocketChannel中包裝的JDK NIO ServerSocketChannel註冊到Reactor中的JDK NIO Selector上。

簡單介紹下SelectableChannel#register方法參數的含義:

  • Selector:表示JDK NIO Channel將要向哪個Selector進行註冊。
  • int ops: 表示Channel上感興趣的IO事件,當對應的IO事件就緒時,Selector會返回Channel對應的SelectionKey
SelectionKey可以理解為ChannelSelector上的特殊表示形式, SelectionKey中封裝了Channel感興趣的IO事件集合~~~interestOps,以及IO就緒的事件集合~~readyOps, 同時也封裝了對應的JDK NIO Channel以及註冊的Selector。最後還有一個重要的屬性attachment,可以允許我們在SelectionKey上附加一些自定義的對象。
  • Object attachment:SelectionKey中添加用户自定義的附加對象。

這裏NioServerSocketChannelReactor中的Selector註冊的IO事件0,這個操作的主要目的是先獲取到ChannelSelector中對應的SelectionKey,完成註冊。當綁定操作完成後,在去向SelectionKey添加感興趣的IO事件~~~OP_ACCEPT事件

同時通過SelectableChannel#register方法將Netty自定義的NioServerSocketChannel(這裏的this指針)附着在SelectionKeyattechment屬性上,完成Netty自定義Channel與JDK NIO Channel的關係綁定。這樣在每次對Selector 進行IO就緒事件輪詢時,Netty 都可以從 JDK NIO Selector返回的SelectionKey中獲取到自定義的Channel對象(這裏指的就是NioServerSocketChannel)。

1.3.7 HandlerAdded事件回調中初始化ChannelPipeline

NioServerSocketChannel註冊到Main Reactor上的Selector後,Netty通過調用pipeline.invokeHandlerAddedIfNeeded()開始回調NioServerSocketChannelpipeline裏的ChannelHandler的handlerAdded方法

此時NioServerSocketChannelpipeline結構如下:

image.png

此時pipeline中只有在初始化NioServerSocketChannel時添加的ChannelInitializer

我們來看下ChannelInitializerhandlerAdded回調方法具體作了哪些事情~~

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            if (initChannel(ctx)) {
                //初始化工作完成後,需要將自身從pipeline中移除
                removeState(ctx);
            }
        }
    }

    //ChannelInitializer實例是被所有的Channel共享的,用於初始化ChannelPipeline
    //通過Set集合保存已經初始化的ChannelPipeline,避免重複初始化同一ChannelPipeline
    private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(
            new ConcurrentHashMap<ChannelHandlerContext, Boolean>());

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
                ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
                     //初始化完畢後,從pipeline中移除自身
                    pipeline.remove(this);
                }
            }
            return true;
        }
        return false;
    }

    //匿名類實現,這裏指定具體的初始化邏輯
    protected abstract void initChannel(C ch) throws Exception;

    private void removeState(final ChannelHandlerContext ctx) {
        //從initMap防重Set集合中刪除ChannelInitializer
        if (ctx.isRemoved()) {
            initMap.remove(ctx);
        } else {
            ctx.executor().execute(new Runnable() {
                @Override
                public void run() {
                    initMap.remove(ctx);
                }
            });
        }
    }
}

ChannelInitializer 中的初始化邏輯比較簡單明瞭:

  • 首先要判斷必須是當前Channel已經完成註冊後,才可以進行pipeline的初始化。ctx.channel().isRegistered()
  • 調用ChannelInitializer 的匿名類指定的initChannel 執行自定義的初始化邏輯。
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                //ServerBootstrap中用户指定的channelHandler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
還記得在初始化NioServerSocketChannel時。io.netty.bootstrap.ServerBootstrap#init方法中向pipeline中添加的ChannelInitializer嗎?
  • 當執行完initChannel 方法後,ChannelPipeline的初始化就結束了,此時ChannelInitializer 就沒必要再繼續呆在pipeline中了,所需要將ChannelInitializer pipeline中刪除。pipeline.remove(this)

當初始化完pipeline時,此時pipeline的結構再次發生了變化:

image.png

此時Main Reactor中的任務隊列taskQueue結構變化為:

image.png

添加ServerBootstrapAcceptor的任務是在初始化NioServerSocketChannel的時候向main reactor提交過去的。還記得嗎?

1.3.8 回調regFuture的ChannelFutureListener

在本小節《Netty服務端的啓動》的最開始,我們介紹了服務端啓動的入口函數io.netty.bootstrap.AbstractBootstrap#doBind,在函數的最開頭調用了initAndRegister()方法用來創建並初始化NioServerSocketChannel,之後便會將NioServerSocketChannel註冊到Main Reactor中。

註冊的操作是一個異步的過程,所以在initAndRegister()方法調用後返回一個代表註冊結果的ChannelFuture regFuture

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    private ChannelFuture doBind(final SocketAddress localAddress) {
        //異步創建,初始化,註冊ServerSocketChannel
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            //如果註冊完成,則進行綁定操作
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            //添加註冊完成 回調函數
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {

                         ...............省略...............
                          // 註冊完成後,Reactor線程回調這裏
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
}

之後會向ChannelFuture regFuture添加一個註冊完成後的回調函數~~~~ ChannelFutureListener 。在回調函數operationComplete 中開始發起綁端口地址流程

image.png

那麼這個回調函數在什麼時候?什麼地方發起的呢??

讓我們在回到本小節的主題register0 方法的流程中:

當調用doRegister()方法完成NioServerSocketChannelMain Reactor的註冊後,緊接着會調用pipeline.invokeHandlerAddedIfNeeded()方法中觸發ChannelInitializer#handlerAdded回調中對pipeline進行初始化。

最後在safeSetSuccess方法中,開始回調註冊在regFuture 上的ChannelFutureListener

   protected final void safeSetSuccess(ChannelPromise promise) {
        if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
           logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
        }
   }

   @Override
    public boolean trySuccess() {
        return trySuccess(null);
    }

    @Override
    public boolean trySuccess(V result) {
        return setSuccess0(result);
    }

   private boolean setSuccess0(V result) {
        return setValue0(result == null ? SUCCESS : result);
    }

    private boolean setValue0(Object objResult) {
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            if (checkNotifyWaiters()) {
                //回調註冊在promise上的listeners
                notifyListeners();
            }
            return true;
        }
        return false;
    }

safeSetSuccess 的邏輯比較簡單,首先設置regFuture結果為success,並且回調註冊在regFuture上的ChannelFutureListener

需要提醒的是,執行safeSetSuccess 方法,以及後邊回調regFuture上的ChannelFutureListener 這些動作都是由Reactor線程執行的。

關於Netty中的Promise模型後邊我會在寫一篇專門的文章進行分析,這裏大家只需清楚大體的流程即可。不必在意過多的細節。

下面我們把視角切換到regFuture上的ChannelFutureListener 回調中,看看在Channel註冊完成後,Netty又會做哪些事情?

2. doBind0

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

}

這裏Netty又將綁定端口地址的操作封裝成異步任務,提交給Reactor執行。

但是這裏有一個問題,其實此時執行doBind0方法的線程正是Reactor線程,那為什麼不直接在這裏去執行bind操作,而是再次封裝成異步任務提交給Reactor中的taskQueue呢?

反正最終都是由Reactor線程執行,這其中又有什麼分別呢?

經過上小節的介紹我們知道,bind0方法的調用是由io.netty.channel.AbstractChannel.AbstractUnsafe#register0方法在將NioServerSocketChannel註冊到Main Reactor之後,並且NioServerSocketChannelpipeline已經初始化完畢後,通過safeSetSuccess 方法回調過來的。

這個過程全程是由Reactor線程來負責執行的,但是此時register0方法並沒有執行完畢,還需要執行後面的邏輯。

而綁定邏輯需要在註冊邏輯執行完之後執行,所以在doBind0方法中Reactor線程會將綁定操作封裝成異步任務先提交給taskQueue中保存,這樣可以使Reactor線程立馬從safeSetSuccess 中返回,繼續執行剩下的register0方法邏輯。

        private void register0(ChannelPromise promise) {
            try {
                ................省略............

                doRegister();
                pipeline.invokeHandlerAddedIfNeeded();
                safeSetSuccess(promise);
                //觸發channelRegister事件
                pipeline.fireChannelRegistered();

                if (isActive()) {
                     ................省略............
                }
            } catch (Throwable t) {
                  ................省略............
            }
        }

Reactor線程執行完register0方法後,就會從taskQueue中取出異步任務執行。

此時Reactor線程中的taskQueue結構如下:

image.png

  • Reactor線程會先取出位於taskQueue隊首的任務執行,這裏是指向NioServerSocketChannelpipeline中添加ServerBootstrapAcceptor的異步任務。

此時NioServerSocketChannelpipeline的結構如下:

image.png

  • Reactor線程執行綁定任務。

3. 綁定端口地址

Channel的操作行為全部定義在ChannelOutboundInvoker接口中

image.png

public interface ChannelOutboundInvoker {

    /**
     * Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
     * completes, either because the operation was successful or because of an error.
     *
     */
    ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
}

bind方法由子類AbstractChannel實現。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

   @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }

}

調用pipeline.bind(localAddress, promise)pipeline中傳播bind事件,觸發回調pipeline中所有ChannelHandlerbind方法

事件在pipeline中的傳播具有方向性:

  • inbound事件HeadContext開始逐個向後傳播直到TailContext
  • outbound事件則是反向傳播,從TailContext開始反向向前傳播直到HeadContext
inbound事件只能被pipeline中的ChannelInboundHandler響應處理
outbound事件只能被pipeline中的ChannelOutboundHandler響應處理

image.png

然而這裏的bind事件在Netty中被定義為outbound事件,所以它在pipeline中是反向傳播。先從TailContext開始反向傳播直到HeadContext

然而bind的核心邏輯也正是實現在HeadContext中。

3.1 HeadContext

  final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

     @Override
        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
            //觸發AbstractChannel->bind方法 執行JDK NIO SelectableChannel 執行底層綁定操作
            unsafe.bind(localAddress, promise);
        }

}

HeadContext#bind回調方法中,調用Channel裏的unsafe操作類執行真正的綁定操作。

protected abstract class AbstractUnsafe implements Unsafe {

      @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            .................省略................

            //這時channel還未激活  wasActive = false
            boolean wasActive = isActive();
            try {
                //io.netty.channel.socket.nio.NioServerSocketChannel.doBind
                //調用具體channel實現類
                doBind(localAddress);
            } catch (Throwable t) {
                .................省略................
                return;
            }

            //綁定成功後 channel激活 觸發channelActive事件傳播
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        //pipeline中觸發channelActive事件
                        pipeline.fireChannelActive();
                    }
                });
            }
            //回調註冊在promise上的ChannelFutureListener
            safeSetSuccess(promise);
        }

        protected abstract void doBind(SocketAddress localAddress) throws Exception;
}
  • 首先執行子類NioServerSocketChannel具體實現的doBind方法,通過JDK NIO 原生 ServerSocketChannel執行底層的綁定操作。
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        //調用JDK NIO 底層SelectableChannel 執行綁定操作
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
  • 判斷是否為首次綁定,如果是的話將觸發pipeline中的ChannelActive事件封裝成異步任務放入Reactor中的taskQueue中。
  • 執行safeSetSuccess(promise),回調註冊在promise上的ChannelFutureListener

還是同樣的問題,當前執行線程已經是Reactor線程了,那麼為何不直接觸發pipeline中的ChannelActive事件而是又封裝成異步任務呢??

因為如果直接在這裏觸發ChannelActive事件,那麼Reactor線程就會去執行pipeline中的ChannelHandlerchannelActive事件回調

這樣的話就影響了safeSetSuccess(promise)的執行,延遲了註冊在promise上的ChannelFutureListener的回調。

到現在為止,Netty服務端就已經完成了綁定端口地址的操作,NioServerSocketChannel的狀態現在變為Active

最後還有一件重要的事情要做,我們接着來看pipeline中對channelActive事件處理。

3.2 channelActive事件處理

channelActive事件在Netty中定義為inbound事件,所以它在pipeline中的傳播為正向傳播,從HeadContext一直到TailContext為止。

channelActive事件回調中需要觸發向Selector指定需要監聽的IO事件~~OP_ACCEPT事件

這塊的邏輯主要在HeadContext中實現。

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            //pipeline中繼續向後傳播channelActive事件
            ctx.fireChannelActive();
            //如果是autoRead 則自動觸發read事件傳播
            //在read回調函數中 觸發OP_ACCEPT註冊
            readIfIsAutoRead();
        }

        private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                //如果是autoRead 則觸發read事件傳播
                channel.read();
            }
        }

        //AbstractChannel
        public Channel read() {
                //觸發read事件
                pipeline.read();
                return this;
        }

       @Override
        public void read(ChannelHandlerContext ctx) {
            //觸發註冊OP_ACCEPT或者OP_READ事件
            unsafe.beginRead();
        }
   }
  • HeadContext中的channelActive回調中觸發pipeline中的read事件
  • read事件再次傳播到HeadContext時,觸發HeadContext#read方法的回調。在read回調中調用channel底層操作類unsafebeginRead方法向selector註冊監聽OP_ACCEPT事件

3.3 beginRead

protected abstract class AbstractUnsafe implements Unsafe {

     @Override
        public final void beginRead() {
            assertEventLoop();
            //channel必須是Active
            if (!isActive()) {
                return;
            }

            try {
                // 觸發在selector上註冊channel感興趣的監聽事件
                doBeginRead();
            } catch (final Exception e) {
               .............省略..............
            }
        }
}

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    //子類負責繼承實現
    protected abstract void doBeginRead() throws Exception;

}
  • 斷言判斷執行該方法的線程必須是Reactor線程
  • 此時NioServerSocketChannel已經完成端口地址的綁定操作,isActive() = true
  • 調用doBeginRead實現向Selector註冊監聽事件OP_ACCEPT
public abstract class AbstractNioChannel extends AbstractChannel {

    //channel註冊到Selector後獲得的SelectKey
    volatile SelectionKey selectionKey;
    // Channel監聽事件集合
    protected final int readInterestOp;

    @Override
    protected void doBeginRead() throws Exception {
      
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        /**
         * 1:ServerSocketChannel 初始化時 readInterestOp設置的是OP_ACCEPT事件
         * */
        if ((interestOps & readInterestOp) == 0) {
            //添加OP_ACCEPT事件到interestOps集合中
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
}
  • 前邊提到在NioServerSocketChannel在向Main Reactor中的Selector註冊後,會獲得一個SelectionKey。這裏首先要獲取這個SelectionKey
  • SelectionKey中獲取NioServerSocketChannel感興趣的IO事件集合 interestOps ,當時在註冊的時候interestOps設置為0
  • 將在NioServerSocketChannel初始化時設置的readInterestOp = OP_ACCEPT,設置到SelectionKey中的interestOps 集合中。這樣Reactor中的Selector就開始監聽interestOps 集合中包含的IO事件了。
Main Reactor中主要監聽的是OP_ACCEPT事件

流程走到這裏,Netty服務端就真正的啓動起來了,下一步就開始等待接收客户端連接了。大家此刻在來回看這副啓動流程圖,是不是清晰了很多呢?

image.png

此時Netty的Reactor模型結構如下:

image.png


總結

本文我們通過圖解源碼的方式完整地介紹了整個Netty服務端啓動流程,並介紹了在啓動過程中涉及到的ServerBootstrap 相關的屬性以及配置方式。NioServerSocketChannel 的創建初始化過程以及類的繼承結構。

其中重點介紹了NioServerSocketChannel Reactor的註冊過程以及Reactor線程的啓動時機和pipeline的初始化時機。

最後介紹了NioServerSocketChannel綁定端口地址的整個流程。

上述介紹的這些流程全部是異步操作,各種回調繞來繞去的,需要反覆回想下,讀異步代碼就是這樣,需要理清各種回調之間的關係,並且時刻提醒自己當前的執行線程是什麼?

好了,現在Netty服務端已經啓動起來,接着就該接收客户端連接了,我們下篇文章見~~~~

image.png

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.