歡迎關注公眾號:bin的技術小屋,閲讀公眾號原文
本系列Netty源碼解析文章基於 4.1.56.Final版本
大家第一眼看到這幅流程圖,是不是腦瓜子嗡嗡的呢?
大家先不要驚慌,問題不大,本文筆者的目的就是要讓大家清晰的理解這幅流程圖,從而深刻的理解Netty Reactor的啓動全流程,包括其中涉及到的各種代碼設計實現細節。
在上篇文章《聊聊Netty那些事兒之Reactor在Netty中的實現(創建篇)》中我們詳細介紹了Netty服務端核心引擎組件主從Reactor組模型 NioEventLoopGroup以及Reactor模型 NioEventLoop的創建過程。最終我們得到了netty Reactor模型的運行骨架如下:
現在Netty服務端程序的骨架是搭建好了,本文我們就基於這個骨架來深入剖析下Netty服務端的啓動過程。
我們繼續回到上篇文章提到的Netty服務端代碼模板中,在創建完主從Reactor線程組:bossGroup,workerGroup後,接下來就開始配置Netty服務端的啓動輔助類ServerBootstrap 了。
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//創建主從Reactor線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主從Reactor
.channel(NioServerSocketChannel.class)//配置主Reactor中的channel類型
.option(ChannelOption.SO_BACKLOG, 100)//設置主Reactor中channel的option選項
.handler(new LoggingHandler(LogLevel.INFO))//設置主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer<SocketChannel>() {//設置從Reactor中註冊channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server. 綁定端口啓動服務,開始監聽accept事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
在上篇文章中我們對代碼模板中涉及到ServerBootstrap 的一些配置方法做了簡單的介紹,大家如果忘記的話,可以在返回去回顧一下。
ServerBootstrap類其實沒有什麼特別的邏輯,主要是對Netty啓動過程中需要用到的一些核心信息進行配置管理,比如:
- Netty的核心引擎組件
主從Reactor線程組: bossGroup,workerGroup。通過ServerBootstrap#group方法配置。 - Netty服務端使用到的Channel類型:
NioServerSocketChannel,通過ServerBootstrap#channel方法配置。
以及配置NioServerSocketChannel時用到的SocketOption。SocketOption用於設置底層JDK NIO Socket的一些選項。通過ServerBootstrap#option方法進行配置。
主ReactorGroup中的MainReactor管理的Channel類型為
NioServerSocketChannel,如圖所示主要用來監聽端口,接收客户端連接,為客户端創建初始化NioSocketChannel,然後採用round-robin輪詢的方式從圖中從ReactorGroup中選擇一個SubReactor與該客户端NioSocketChannel進行綁定。從ReactorGroup中的SubReactor管理的Channel類型為
NioSocketChannel,它是netty中定義客户端連接的一個模型,每個連接對應一個。如圖所示SubReactor負責監聽處理綁定在其上的所有NioSocketChannel上的IO事件。
- 保存服務端
NioServerSocketChannel和客户端NioSocketChannel對應pipeline中指定的ChannelHandler。用於後續Channel向Reactor註冊成功之後,初始化Channel裏的pipeline。
不管是服務端用到的NioServerSocketChannel還是客户端用到的NioSocketChannel,每個Channel實例都會有一個Pipeline,Pipeline中有多個ChannelHandler用於編排處理對應Channel上感興趣的IO事件。
ServerBootstrap結構中包含了netty服務端程序啓動的所有配置信息,在我們介紹啓動流程之前,先來看下ServerBootstrap的源碼結構:
ServerBootstrap
ServerBootstrap的繼承結構比較簡單,繼承層次的職責分工也比較明確。
ServerBootstrap主要負責對主從Reactor線程組相關的配置進行管理,其中帶child前綴的配置方法是對從Reactor線程組的相關配置管理。從Reactor線程組中的Sub Reactor負責管理的客户端NioSocketChannel相關配置存儲在ServerBootstrap結構中。
父類AbstractBootstrap則是主要負責對主Reactor線程組相關的配置進行管理,以及主Reactor線程組中的Main Reactor負責處理的服務端ServerSocketChannel相關的配置管理。
1. 配置主從Reactor線程組
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主從Reactor
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
//Main Reactor線程組
volatile EventLoopGroup group;
//Sub Reactor線程組
private volatile EventLoopGroup childGroup;
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
//父類管理主Reactor線程組
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
}
2. 配置服務端ServerSocketChannel
ServerBootstrap b = new ServerBootstrap();
b.channel(NioServerSocketChannel.class);
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
//用於創建ServerSocketChannel ReflectiveChannelFactory
private volatile ChannelFactory<? extends C> channelFactory;
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
@Deprecated
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
}
在向ServerBootstrap配置服務端ServerSocketChannel的channel 方法中,其實是創建了一個ChannelFactory工廠實例ReflectiveChannelFactory,在Netty服務端啓動的過程中,會通過這個ChannelFactory去創建相應的Channel實例。
我們可以通過這個方法來配置netty的IO模型,下面為ServerSocketChannel在不同IO模型下的實現:
| BIO | NIO | AIO |
|---|---|---|
| OioServerSocketChannel | NioServerSocketChannel | AioServerSocketChannel |
EventLoopGroup Reactor線程組在不同IO模型下的實現:
| BIO | NIO | AIO |
|---|---|---|
| ThreadPerChannelEventLoopGroup | NioEventLoopGroup | AioEventLoopGroup |
我們只需要將IO模型的這些核心接口對應的實現類前綴改為對應IO模型的前綴,就可以輕鬆在Netty中完成對IO模型的切換。
2.1 ReflectiveChannelFactory
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
//NioServerSocketChannelde 構造器
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
//反射獲取NioServerSocketChannel的構造器
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
@Override
public T newChannel() {
try {
//創建NioServerSocketChannel實例
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
}
從類的簽名我們可以看出,這個工廠類是通過泛型加反射的方式來創建對應的Channel實例。
- 泛型參數
T extends Channel表示的是要通過工廠類創建的Channel類型,這裏我們初始化的是NioServerSocketChannel。 - 在
ReflectiveChannelFactory的構造器中通過反射的方式獲取NioServerSocketChannel的構造器。 - 在
newChannel方法中通過構造器反射創建NioServerSocketChannel實例。
注意這時只是配置階段,NioServerSocketChannel此時並未被創建。它是在啓動的時候才會被創建出來。
3. 為NioServerSocketChannel配置ChannelOption
ServerBootstrap b = new ServerBootstrap();
//設置被MainReactor管理的NioServerSocketChannel的Socket選項
b.option(ChannelOption.SO_BACKLOG, 100)
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
//serverSocketChannel中的ChannelOption配置
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
public <T> B option(ChannelOption<T> option, T value) {
ObjectUtil.checkNotNull(option, "option");
synchronized (options) {
if (value == null) {
options.remove(option);
} else {
options.put(option, value);
}
}
return self();
}
}
無論是服務端的NioServerSocketChannel還是客户端的NioSocketChannel它們的相關底層Socket選項ChannelOption配置全部存放於一個Map類型的數據結構中。
由於客户端NioSocketChannel是由從Reactor線程組中的Sub Reactor來負責處理,所以涉及到客户端NioSocketChannel所有的方法和配置全部是以child前綴開頭。
ServerBootstrap b = new ServerBootstrap();
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
//客户端SocketChannel對應的ChannelOption配置
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
ObjectUtil.checkNotNull(childOption, "childOption");
synchronized (childOptions) {
if (value == null) {
childOptions.remove(childOption);
} else {
childOptions.put(childOption, value);
}
}
return this;
}
}
相關的底層Socket選項,netty全部枚舉在ChannelOption類中,筆者這裏就不一一列舉了,在本系列後續相關的文章中,筆者還會為大家詳細的介紹這些參數的作用。
public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> {
..................省略..............
public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");
public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");
public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");
public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");
public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");
public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");
..................省略..............
}
4. 為服務端NioServerSocketChannel中的Pipeline配置ChannelHandler
//serverSocketChannel中pipeline裏的handler(主要是acceptor)
private volatile ChannelHandler handler;
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
向NioServerSocketChannel中的Pipeline添加ChannelHandler分為兩種方式:
顯式添加:顯式添加的方式是由用户在main線程中通過ServerBootstrap#handler的方式添加。如果需要添加多個ChannelHandler,則可以通過ChannelInitializer向pipeline中進行添加。
關於ChannelInitializer後面筆者會有詳細介紹,這裏大家只需要知道ChannelInitializer是一種特殊的ChannelHandler,用於初始化pipeline。適用於向pipeline中添加多個ChannelHandler的場景。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主從Reactor
.channel(NioServerSocketChannel.class)//配置主Reactor中的channel類型
.handler(new ChannelInitializer<NioServerSocketChannel>() {
@Override
protected void initChannel(NioServerSocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(channelhandler1)
.addLast(channelHandler2)
......
.addLast(channelHandler3);
}
})
隱式添加:隱式添加主要添加的就是主ReactorGroup的核心組件也就是下圖中的acceptor,Netty中的實現為ServerBootstrapAcceptor,本質上也是一種ChannelHandler,主要負責在客户端連接建立好後,初始化客户端NioSocketChannel,在從Reactor線程組中選取一個Sub Reactor,將客户端NioSocketChannel註冊到Sub Reactor中的selector上。
隱式添加ServerBootstrapAcceptor是由Netty框架在啓動的時候負責添加,用户無需關心。
在本例中,NioServerSocketChannel的PipeLine中只有兩個ChannelHandler,一個由用户在外部顯式添加的LoggingHandler,另一個是由Netty框架隱式添加的ServerBootstrapAcceptor。
其實我們在實際項目使用的過程中,不會向netty服務端NioServerSocketChannel添加額外的ChannelHandler,NioServerSocketChannel只需要專心做好自己最重要的本職工作接收客户端連接就好了。這裏額外添加一個LoggingHandler只是為了向大家展示ServerBootstrap的配置方法。
5. 為客户端NioSocketChannel中的Pipeline配置ChannelHandler
final EchoServerHandler serverHandler = new EchoServerHandler();
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//設置從Reactor中註冊channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
//socketChannel中pipeline中的處理handler
private volatile ChannelHandler childHandler;
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}
向客户端NioSocketChannel中的Pipeline裏添加ChannelHandler完全是由用户自己控制顯式添加,添加的數量不受限制。
由於在Netty的IO線程模型中,是由單個Sub Reactor線程負責執行客户端NioSocketChannel中的Pipeline,一個Sub Reactor線程負責處理多個NioSocketChannel上的IO事件,如果Pipeline中的ChannelHandler添加的太多,就會影響Sub Reactor線程執行其他NioSocketChannel上的Pipeline,從而降低IO處理效率,降低吞吐量。
所以Pipeline中的ChannelHandler不易添加過多,並且不能再ChannelHandler中執行耗時的業務處理任務。
在我們通過ServerBootstrap配置netty服務端啓動信息的時候,無論是向服務端NioServerSocketChannel的pipeline中添加ChannelHandler,還是向客户端NioSocketChannel的pipeline中添加ChannelHandler,當涉及到多個ChannelHandler添加的時候,我們都會用到ChannelInitializer,那麼這個ChannelInitializer究竟是何方聖神,為什麼要這樣做呢?我們接着往下看~~
ChannelInitializer
首先ChannelInitializer它繼承於ChannelHandler,它自己本身就是一個ChannelHandler,所以它可以添加到childHandler中。
其他的父類大家這裏可以不用管,後面文章中筆者會一一為大家詳細介紹。
那為什麼不直接添加ChannelHandler而是選擇用ChannelInitializer呢?
這裏主要有兩點原因:
- 前邊我們提到,客户端
NioSocketChannel是在服務端accept連接後,在服務端NioServerSocketChannel中被創建出來的。但是此時我們正處於配置ServerBootStrap階段,服務端還沒有啓動,更沒有客户端連接上來,此時客户端NioSocketChannel還沒有被創建出來,所以也就沒辦法向客户端NioSocketChannel的pipeline中添加ChannelHandler。 - 客户端
NioSocketChannel中Pipeline裏可以添加任意多個ChannelHandler,但是Netty框架無法預知用户到底需要添加多少個ChannelHandler,所以Netty框架提供了回調函數ChannelInitializer#initChannel,使用户可以自定義ChannelHandler的添加行為。
當客户端NioSocketChannel註冊到對應的Sub Reactor上後,緊接着就會初始化NioSocketChannel中的Pipeline,此時Netty框架會回調ChannelInitializer#initChannel執行用户自定義的添加邏輯。
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
//當channelRegister事件發生時,調用initChannel初始化pipeline
if (initChannel(ctx)) {
.................省略...............
} else {
.................省略...............
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
//此時客户單NioSocketChannel已經創建並初始化好了
initChannel((C) ctx.channel());
} catch (Throwable cause) {
.................省略...............
} finally {
.................省略...............
}
return true;
}
return false;
}
protected abstract void initChannel(C ch) throws Exception;
.................省略...............
}
這裏由netty框架回調的ChannelInitializer#initChannel方法正是我們自定義的添加邏輯。
final EchoServerHandler serverHandler = new EchoServerHandler();
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//設置從Reactor中註冊channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
到此為止,Netty服務端啓動所需要的必要配置信息,已經全部存入ServerBootStrap啓動輔助類中。
接下來要做的事情就是服務端的啓動了。
// Start the server. 綁定端口啓動服務,開始監聽accept事件
ChannelFuture f = serverBootStrap.bind(PORT).sync();
Netty服務端的啓動
經過前面的鋪墊終於來到了本文的核心內容----Netty服務端的啓動過程。
如代碼模板中的示例所示,Netty服務端的啓動過程封裝在io.netty.bootstrap.AbstractBootstrap#bind(int)函數中。
接下來我們看一下Netty服務端在啓動過程中究竟幹了哪些事情?
大家看到這副啓動流程圖先不要慌,接下來的內容筆者會帶大家各個擊破它,在文章的最後保證讓大家看懂這副流程圖。
我們先來從netty服務端啓動的入口函數開始我們今天的源碼解析旅程:
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
//校驗Netty核心組件是否配置齊全
validate();
//服務端開始啓動,綁定端口地址,接收客户端連接
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
//異步創建,初始化,註冊ServerSocketChannel到main reactor上
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
........serverSocketChannel向Main Reactor註冊成功後開始綁定端口....,
} else {
//如果此時註冊操作沒有完成,則向regFuture添加operationComplete回調函數,註冊成功後回調。
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
........serverSocketChannel向Main Reactor註冊成功後開始綁定端口....,
});
return promise;
}
}
Netty服務端的啓動流程總體如下:
- 創建服務端
NioServerSocketChannel並初始化。 - 將服務端
NioServerSocketChannel註冊到主Reactor線程組中。 - 註冊成功後,開始初始化
NioServerSocketChannel中的pipeline,然後在pipeline中觸發channelRegister事件。 - 隨後由
NioServerSocketChannel綁定端口地址。 - 綁定端口地址成功後,向
NioServerSocketChannel對應的Pipeline中觸發傳播ChannelActive事件,在ChannelActive事件回調中向Main Reactor註冊OP_ACCEPT事件,開始等待客户端連接。服務端啓動完成。
當netty服務端啓動成功之後,最終我們會得到如下結構的陣型,開始枕戈待旦,準備接收客户端的連接,Reactor開始運轉。
接下來,我們就來看下Netty源碼是如何實現以上步驟的~~
1. initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//創建NioServerSocketChannel
//ReflectiveChannelFactory通過泛型,反射,工廠的方式靈活創建不同類型的channel
channel = channelFactory.newChannel();
//初始化NioServerSocketChannel
init(channel);
} catch (Throwable t) {
..............省略.................
}
//向MainReactor註冊ServerSocketChannel
ChannelFuture regFuture = config().group().register(channel);
..............省略.................
return regFuture;
}
從函數命名中我們可以看出,這個函數主要做的事情就是首先創建NioServerSocketChannel ,並對NioServerSocketChannel 進行初始化,最後將NioServerSocketChannel 註冊到Main Reactor中。
1.1 創建NioServerSocketChannel
還記得我們在介紹ServerBootstrap啓動輔助類配置服務端ServerSocketChannel類型的時候提到的工廠類ReflectiveChannelFactory 嗎?
因為當時我們在配置ServerBootstrap啓動輔助類的時候,還沒到啓動階段,而配置階段並不是創建具體ServerSocketChannel的時機。
所以Netty通過工廠模式將要創建的ServerSocketChannel的類型(通過泛型指定)以及 創建的過程(封裝在newChannel函數中)統統先封裝在工廠類ReflectiveChannelFactory中。
ReflectiveChannelFactory通過泛型,反射,工廠的方式靈活創建不同類型的channel
等待創建時機來臨,我們調用保存在ServerBootstrap中的channelFactory直接進行創建。
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Constructor<? extends T> constructor;
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
}
下面我們來看下NioServerSocketChannel的構建過程:
1.1.1 NioServerSocketChannel
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
//SelectorProvider(用於創建Selector和Selectable Channels)
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
//創建JDK NIO ServerSocketChannel
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
//ServerSocketChannel相關的配置
private final ServerSocketChannelConfig config;
public NioServerSocketChannel(ServerSocketChannel channel) {
//父類AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要監聽的事件OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
//DefaultChannelConfig中設置用於Channel接收數據用的buffer->AdaptiveRecvByteBufAllocator
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}
- 首先調用
newSocket創建JDK NIO 原生ServerSocketChannel,這裏調用了SelectorProvider#openServerSocketChannel來創建JDK NIO 原生ServerSocketChannel,我們在上篇文章《聊聊Netty那些事兒之Reactor在Netty中的實現(創建篇)》中詳細的介紹了SelectorProvider相關內容,當時是用SelectorProvider來創建Reactor中的Selector。大家還記得嗎?? - 通過父類構造器設置
NioServerSocketChannel感興趣的IO事件,這裏設置的是SelectionKey.OP_ACCEPT事件。並將JDK NIO 原生ServerSocketChannel封裝起來。 - 創建
Channel的配置類NioServerSocketChannelConfig,在配置類中封裝了對Channel底層的一些配置行為,以及JDK中的ServerSocket。以及創建NioServerSocketChannel接收數據用的Buffer分配器AdaptiveRecvByteBufAllocator。
NioServerSocketChannelConfig沒什麼重要的東西,我們這裏也不必深究,它就是管理NioServerSocketChannel相關的配置,這裏唯一需要大家注意的是這個用於Channel接收數據用的Buffer分配器AdaptiveRecvByteBufAllocator,我們後面在介紹Netty如何接收連接的時候還會提到。
NioServerSocketChannel 的整體構建過程介紹完了,現在我們來按照繼承層次再回過頭來看下NioServerSocketChannel 的層次構建,來看下每一層都創建了什麼,封裝了什麼,這些信息都是Channel的核心信息,所以有必要了解一下。
在NioServerSocketChannel 的創建過程中,我們主要關注繼承結構圖中紅框標註的三個類,其他的我們佔時先不用管。
其中AbstractNioMessageChannel類主要是對NioServerSocketChannel底層讀寫行為的封裝和定義,比如accept接收客户端連接。這個我們後續會介紹到,這裏我們並不展開。
1.1.2 AbstractNioChannel
public abstract class AbstractNioChannel extends AbstractChannel {
//JDK NIO原生Selectable Channel
private final SelectableChannel ch;
// Channel監聽事件集合 這裏是SelectionKey.OP_ACCEPT事件
protected final int readInterestOp;
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//設置Channel為非阻塞 配合IO多路複用模型
ch.configureBlocking(false);
} catch (IOException e) {
.............省略................
}
}
}
- 封裝由
SelectorProvider創建出來的JDK NIO原生ServerSocketChannel。 - 封裝
Channel在創建時指定感興趣的IO事件,對於NioServerSocketChannel來説感興趣的IO事件為OP_ACCEPT事件。 - 設置JDK NIO原生
ServerSocketChannel為非阻塞模式, 配合IO多路複用模型。
1.1.3 AbstractChannel
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
//channel是由創建層次的,比如ServerSocketChannel 是 SocketChannel的 parent
private final Channel parent;
//channel全局唯一ID machineId+processId+sequence+timestamp+random
private final ChannelId id;
//unsafe用於封裝對底層socket的相關操作
private final Unsafe unsafe;
//為channel分配獨立的pipeline用於IO事件編排
private final DefaultChannelPipeline pipeline;
protected AbstractChannel(Channel parent) {
this.parent = parent;
//channel全局唯一ID machineId+processId+sequence+timestamp+random
id = newId();
//unsafe用於定義實現對Channel的底層操作
unsafe = newUnsafe();
//為channel分配獨立的pipeline用於IO事件編排
pipeline = newChannelPipeline();
}
}
- Netty中的
Channel創建是有層次的,這裏的parent屬性用來保存上一級的Channel,比如這裏的NioServerSocketChannel是頂級Channel,所以它的parent = null。客户端NioSocketChannel是由NioServerSocketChannel創建的,所以它的parent = NioServerSocketChannel。 - 為
Channel分配全局唯一的ChannelId。ChannelId由機器Id(machineId),進程Id(processId),序列號(sequence),時間戳(timestamp),隨機數(random)構成
private DefaultChannelId() {
data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];
int i = 0;
// machineId
System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);
i += MACHINE_ID.length;
// processId
i = writeInt(i, PROCESS_ID);
// sequence
i = writeInt(i, nextSequence.getAndIncrement());
// timestamp (kind of)
i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());
// random
int random = PlatformDependent.threadLocalRandom().nextInt();
i = writeInt(i, random);
assert i == data.length;
hashCode = Arrays.hashCode(data);
}
- 創建
NioServerSocketChannel的底層操作類Unsafe。這裏創建的是io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe。
Unsafe為Channel接口的一個內部接口,用於定義實現對Channel底層的各種操作,Unsafe接口定義的操作行為只能由Netty框架的Reactor線程調用,用户線程禁止調用。
interface Unsafe {
//分配接收數據用的Buffer
RecvByteBufAllocator.Handle recvBufAllocHandle();
//服務端綁定的端口地址
SocketAddress localAddress();
//遠端地址
SocketAddress remoteAddress();
//channel向Reactor註冊
void register(EventLoop eventLoop, ChannelPromise promise);
//服務端綁定端口地址
void bind(SocketAddress localAddress, ChannelPromise promise);
//客户端連接服務端
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
//關閉channle
void close(ChannelPromise promise);
//讀數據
void beginRead();
//寫數據
void write(Object msg, ChannelPromise promise);
}
- 為
NioServerSocketChannel分配獨立的pipeline用於IO事件編排。pipeline其實是一個ChannelHandlerContext類型的雙向鏈表。頭結點HeadContext,尾結點TailContext。ChannelHandlerContext中包裝着ChannelHandler。
ChannelHandlerContext 保存 ChannelHandler上下文信息,用於事件傳播。後面筆者會單獨開一篇文章介紹,這裏我們還是聚焦於啓動主線。
這裏只是為了讓大家簡單理解pipeline的一個大致的結構,後面會寫一篇文章專門詳細講解pipeline。
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
到了這裏NioServerSocketChannel就創建完畢了,我們來回顧下它到底包含了哪些核心信息。
1.2 初始化NioServerSocketChannel
void init(Channel channel) {
//向NioServerSocketChannelConfig設置ServerSocketChannelOption
setChannelOptions(channel, newOptionsArray(), logger);
//向netty自定義的NioServerSocketChannel設置attributes
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
//獲取從Reactor線程組
final EventLoopGroup currentChildGroup = childGroup;
//獲取用於初始化客户端NioSocketChannel的ChannelInitializer
final ChannelHandler currentChildHandler = childHandler;
//獲取用户配置的客户端SocketChannel的channelOption以及attributes
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
//向NioServerSocketChannel中的pipeline添加初始化ChannelHandler的邏輯
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
//ServerBootstrap中用户指定的channelHandler
ChannelHandler handler = config.handler();
if (handler != null) {
//LoggingHandler
pipeline.addLast(handler);
}
//添加用於接收客户端連接的acceptor
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
- 向
NioServerSocketChannelConfig設置ServerSocketChannelOption。 - 向netty自定義的
NioServerSocketChannel設置ChannelAttributes
Netty自定義的SocketChannel類型均繼承AttributeMap接口以及DefaultAttributeMap類,正是它們定義了ChannelAttributes。用於向Channel添加用户自定義的一些信息。
這個ChannelAttributes的用處大有可為,Netty後邊的許多特性都是依靠這個ChannelAttributes來實現的。這裏先賣個關子,大家可以自己先想一下可以用這個ChannelAttributes做哪些事情?
- 獲取從Reactor線程組
childGroup,以及用於初始化客户端NioSocketChannel的ChannelInitializer,ChannelOption,ChannelAttributes,這些信息均是由用户在啓動的時候向ServerBootstrap添加的客户端NioServerChannel配置信息。這裏用這些信息來初始化ServerBootstrapAcceptor。因為後續會在ServerBootstrapAcceptor中接收客户端連接以及創建NioServerChannel。 - 向
NioServerSocketChannel中的pipeline添加用於初始化pipeline的ChannelInitializer。
問題來了,這裏為什麼不乾脆直接將ChannelHandler添加到pipeline中,而是又使用到了ChannelInitializer呢?
其實原因有兩點:
- 為了保證
線程安全地初始化pipeline,所以初始化的動作需要由Reactor線程進行,而當前線程是用户程序的啓動Main線程並不是Reactor線程。這裏不能立即初始化。 -
初始化
Channel中pipeline的動作,需要等到Channel註冊到對應的Reactor中才可以進行初始化,當前只是創建好了NioServerSocketChannel,但並未註冊到Main Reactor上。初始化
NioServerSocketChannel中pipeline的時機是:當NioServerSocketChannel註冊到Main Reactor之後,綁定端口地址之前。
前邊在介紹ServerBootstrap配置childHandler時也用到了ChannelInitializer,還記得嗎??
問題又來了,大家注意下ChannelInitializer#initChannel方法,在該初始化回調方法中,添加LoggingHandler是直接向pipeline中添加,而添加Acceptor為什麼不是直接添加而是封裝成異步任務呢?
這裏先給大家賣個關子,筆者會在後續流程中為大家解答
~
此時NioServerSocketChannel中的pipeline結構如下圖所示:
1.3 向Main Reactor註冊NioServerSocketChannel
從ServerBootstrap獲取主Reactor線程組NioEventLoopGroup,將NioServerSocketChannel註冊到NioEventLoopGroup中。
ChannelFuture regFuture = config().group().register(channel);
下面我們來看下具體的註冊過程:
1.3.1 主Reactor線程組中選取一個Main Reactor進行註冊
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventExecutor next() {
return chooser.next();
}
//獲取綁定策略
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
//採用輪詢round-robin的方式選擇Reactor
@Override
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
Netty通過next()方法根據上篇文章《聊聊Netty那些事兒之Reactor在Netty中的實現(創建篇)》提到的channel到reactor的綁定策略,從ReactorGroup中選取一個Reactor進行註冊綁定。之後Channel生命週期內的所有 IO 事件都由這個 Reactor 負責處理,如 accept、connect、read、write 等 IO 事件。
一個channel只能綁定到一個Reactor上,一個Reactor負責監聽多個channel。
由於這裏是NioServerSocketChannle向Main Reactor進行註冊綁定,所以Main Reactor主要負責處理的IO事件是OP_ACCEPT事件。
1.3.2 向綁定後的Main Reactor進行註冊
向Reactor進行註冊的行為定義在NioEventLoop的父類SingleThreadEventLoop中,印象模糊的同學可以在回看下上篇文章中的NioEventLoop繼承結構小節內容。
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
@Override
public ChannelFuture register(Channel channel) {
//註冊channel到綁定的Reactor上
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//unsafe負責channel底層的各種操作
promise.channel().unsafe().register(this, promise);
return promise;
}
}
通過NioServerSocketChannel中的Unsafe類執行底層具體的註冊動作。
protected abstract class AbstractUnsafe implements Unsafe {
/**
* 註冊Channel到綁定的Reactor上
* */
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
//EventLoop的類型要與Channel的類型一樣 Nio Oio Aio
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
//在channel上設置綁定的Reactor
AbstractChannel.this.eventLoop = eventLoop;
/**
* 執行channel註冊的操作必須是Reactor線程來完成
*
* 1: 如果當前執行線程是Reactor線程,則直接執行register0進行註冊
* 2:如果當前執行線程是外部線程,則需要將register0註冊操作 封裝程異步Task 由Reactor線程執行
* */
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...............省略...............
}
}
}
}
- 首先檢查
NioServerSocketChannel是否已經完成註冊。如果以完成註冊,則直接設置代表註冊操作結果的ChannelPromise為fail狀態。 - 通過
isCompatible方法驗證Reactor模型EventLoop是否與Channel的類型匹配。NioEventLoop對應於NioServerSocketChannel。
上篇文章我們介紹過 Netty對三種IO模型:Oio,Nio,Aio的支持,用户可以通過改變Netty核心類的前綴輕鬆切換IO模型。isCompatible方法目的就是需要保證Reactor和Channel使用的是同一種IO模型。
- 在
Channel中保存其綁定的Reactor實例。 -
執行
Channel向Reactor註冊的動作必須要確保是在Reactor線程中執行。- 如果當前線程是
Reactor線程則直接執行註冊動作register0 - 如果當前線程不是
Reactor線程,則需要將註冊動作register0封裝成異步任務,存放在Reactor中的taskQueue中,等待Reactor線程執行。
- 如果當前線程是
當前執行線程並不是Reactor線程,而是用户程序的啓動線程Main線程。
1.3.3 Reactor線程的啓動
上篇文章中我們在介紹NioEventLoopGroup的創建過程中提到了一個構造器參數executor,它用於啓動Reactor線程,類型為ThreadPerTaskExecutor 。
當時筆者向大家賣了一個關子~~“Reactor線程是何時啓動的?”
那麼現在就到了為大家揭曉謎底的時候了~~
Reactor線程的啓動是在向Reactor提交第一個異步任務的時候啓動的。
Netty中的主Reactor線程組NioEventLoopGroup中的Main ReactorNioEventLoop是在用户程序Main線程向Main Reactor提交用於註冊NioServerSocketChannel的異步任務時開始啓動。
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
接下來我們關注下NioEventLoop的execute方法
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
@Override
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
//當前線程是否為Reactor線程
boolean inEventLoop = inEventLoop();
//addTaskWakesUp = true addTask喚醒Reactor線程執行任務
addTask(task);
if (!inEventLoop) {
//如果當前線程不是Reactor線程,則啓動Reactor線程
//這裏可以看出Reactor線程的啓動是通過 向NioEventLoop添加異步任務時啓動的
startThread();
.....................省略.....................
}
.....................省略.....................
}
}
- 首先將異步任務
task添加到Reactor中的taskQueue中。 - 判斷當前線程是否為
Reactor線程,此時當前執行線程為用户程序啓動線程,所以這裏調用startThread啓動Reactor線程。
1.3.4 startThread
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
//定義Reactor線程狀態
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
//Reactor線程狀態 初始為 未啓動狀態
private volatile int state = ST_NOT_STARTED;
//Reactor線程狀態字段state 原子更新器
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
}
Reactor線程初始化狀態為ST_NOT_STARTED,首先CAS更新狀態為ST_STARTEDdoStartThread啓動Reactor線程- 啓動失敗的話,需要將
Reactor線程狀態改回ST_NOT_STARTED
//ThreadPerTaskExecutor 用於啓動Reactor線程
private final Executor executor;
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//Reactor線程開始啓動
SingleThreadEventExecutor.this.run();
success = true;
}
................省略..............
}
這裏就來到了ThreadPerTaskExecutor 類型的executor的用武之地了。
Reactor線程的核心工作之前介紹過:輪詢所有註冊其上的Channel中的IO就緒事件,處理對應Channel上的IO事件,執行異步任務。Netty將這些核心工作封裝在io.netty.channel.nio.NioEventLoop#run方法中。
- 將
NioEventLoop#run封裝在異步任務中,提交給executor執行,Reactor線程至此開始工作了就。
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
@Override
public void execute(Runnable command) {
//啓動Reactor線程
threadFactory.newThread(command).start();
}
}
此時Reactor線程已經啓動,後面的工作全部都由這個Reactor線程來負責執行了。
而用户啓動線程在向Reactor提交完NioServerSocketChannel的註冊任務register0後,就逐步退出調用堆棧,回退到最開始的啓動入口處ChannelFuture f = b.bind(PORT).sync()。
此時Reactor中的任務隊列中只有一個任務register0,Reactor線程啓動後,會從任務隊列中取出任務執行。
至此NioServerSocketChannel的註冊工作正式拉開帷幕~~
1.3.5 register0
//true if the channel has never been registered, false otherwise
private boolean neverRegistered = true;
private void register0(ChannelPromise promise) {
try {
//查看註冊操作是否已經取消,或者對應channel已經關閉
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//執行真正的註冊操作
doRegister();
//修改註冊狀態
neverRegistered = false;
registered = true;
//回調pipeline中添加的ChannelInitializer的handlerAdded方法,在這裏初始化channelPipeline
pipeline.invokeHandlerAddedIfNeeded();
//設置regFuture為success,觸發operationComplete回調,將bind操作放入Reactor的任務隊列中,等待Reactor線程執行。
safeSetSuccess(promise);
//觸發channelRegister事件
pipeline.fireChannelRegistered();
//對於服務端ServerSocketChannel來説 只有綁定端口地址成功後 channel的狀態才是active的。
//此時綁定操作作為異步任務在Reactor的任務隊列中,綁定操作還沒開始,所以這裏的isActive()是false
if (isActive()) {
if (firstRegistration) {
//觸發channelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
............省略.............
}
}
register0是驅動整個Channel註冊綁定流程的關鍵方法,下面我們來看下它的核心邏輯:
- 首先需要檢查
Channel的註冊動作是否在Reactor線程外被取消了已經!promise.setUncancellable()。檢查要註冊的Channel是否已經關閉!ensureOpen(promise)。如果Channel已經關閉或者註冊操作已經被取消,那麼就直接返回,停止註冊流程。 - 調用
doRegister()方法,執行真正的註冊操作。最終實現在AbstractChannel的子類AbstractNioChannel中,這個我們一會在介紹,先關注整體流程。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
/**
* Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
*
* Sub-classes may override this method
*/
protected void doRegister() throws Exception {
// NOOP
}
}
- 當
Channel向Reactor註冊完畢後,調用pipeline.invokeHandlerAddedIfNeeded()方法,觸發回調pipeline中添加的ChannelInitializer的handlerAdded方法,在handlerAdded方法中利用前面提到的ChannelInitializer初始化ChannelPipeline。
初始化ChannelPipeline的時機是當Channel向對應的Reactor註冊成功後,在handlerAdded事件回調中利用ChannelInitializer進行初始化。
- 設置
regFuture為Success,並回調註冊在regFuture上的ChannelFutureListener#operationComplete方法,在operationComplete回調方法中將綁定操作封裝成異步任務,提交到Reactor的taskQueue中。等待Reactor的執行。
還記得這個regFuture在哪裏出現的嗎?它是在哪裏被創建,又是在哪裏添加的ChannelFutureListener呢? 大家還有印象嗎?回憶不起來也沒關係,筆者後面還會提到
- 通過
pipeline.fireChannelRegistered()在pipeline中觸發channelRegister事件。
pipeline中channelHandler的channelRegistered方法被回調。
- 對於Netty服務端
NioServerSocketChannel來説, 只有綁定端口地址成功後 channel的狀態才是active的。此時綁定操作在regFuture上註冊的ChannelFutureListener#operationComplete回調方法中被作為異步任務提交到了Reactor的任務隊列中,Reactor線程還沒開始執行綁定任務。所以這裏的isActive()是false。
當Reactor線程執行完register0方法後,才會去執行綁定任務。
下面我們來看下register0方法中這些核心步驟的具體實現:
1.3.6 doRegister()
public abstract class AbstractNioChannel extends AbstractChannel {
//channel註冊到Selector後獲得的SelectKey
volatile SelectionKey selectionKey;
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...............省略....................
}
}
}
}
調用底層JDK NIO Channel方法java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int, java.lang.Object),將NettyNioServerSocketChannel中包裝的JDK NIO ServerSocketChannel註冊到Reactor中的JDK NIO Selector上。
簡單介紹下SelectableChannel#register方法參數的含義:
Selector:表示JDK NIO Channel將要向哪個Selector進行註冊。int ops:表示Channel上感興趣的IO事件,當對應的IO事件就緒時,Selector會返回Channel對應的SelectionKey。
SelectionKey可以理解為Channel在Selector上的特殊表示形式,SelectionKey中封裝了Channel感興趣的IO事件集合~~~interestOps,以及IO就緒的事件集合~~readyOps, 同時也封裝了對應的JDK NIO Channel以及註冊的Selector。最後還有一個重要的屬性attachment,可以允許我們在SelectionKey上附加一些自定義的對象。
Object attachment:向SelectionKey中添加用户自定義的附加對象。
這裏
NioServerSocketChannel向Reactor中的Selector註冊的IO事件為0,這個操作的主要目的是先獲取到Channel在Selector中對應的SelectionKey,完成註冊。當綁定操作完成後,在去向SelectionKey添加感興趣的IO事件~~~OP_ACCEPT事件。同時通過
SelectableChannel#register方法將Netty自定義的NioServerSocketChannel(這裏的this指針)附着在SelectionKey的attechment屬性上,完成Netty自定義Channel與JDK NIOChannel的關係綁定。這樣在每次對Selector進行IO就緒事件輪詢時,Netty 都可以從JDK NIO Selector返回的SelectionKey中獲取到自定義的Channel對象(這裏指的就是NioServerSocketChannel)。
1.3.7 HandlerAdded事件回調中初始化ChannelPipeline
當NioServerSocketChannel註冊到Main Reactor上的Selector後,Netty通過調用pipeline.invokeHandlerAddedIfNeeded()開始回調NioServerSocketChannel中pipeline裏的ChannelHandler的handlerAdded方法。
此時NioServerSocketChannel的pipeline結構如下:
此時pipeline中只有在初始化NioServerSocketChannel時添加的ChannelInitializer。
我們來看下ChannelInitializer中handlerAdded回調方法具體作了哪些事情~~
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
if (initChannel(ctx)) {
//初始化工作完成後,需要將自身從pipeline中移除
removeState(ctx);
}
}
}
//ChannelInitializer實例是被所有的Channel共享的,用於初始化ChannelPipeline
//通過Set集合保存已經初始化的ChannelPipeline,避免重複初始化同一ChannelPipeline
private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(
new ConcurrentHashMap<ChannelHandlerContext, Boolean>());
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
//初始化完畢後,從pipeline中移除自身
pipeline.remove(this);
}
}
return true;
}
return false;
}
//匿名類實現,這裏指定具體的初始化邏輯
protected abstract void initChannel(C ch) throws Exception;
private void removeState(final ChannelHandlerContext ctx) {
//從initMap防重Set集合中刪除ChannelInitializer
if (ctx.isRemoved()) {
initMap.remove(ctx);
} else {
ctx.executor().execute(new Runnable() {
@Override
public void run() {
initMap.remove(ctx);
}
});
}
}
}
ChannelInitializer 中的初始化邏輯比較簡單明瞭:
- 首先要判斷必須是當前
Channel已經完成註冊後,才可以進行pipeline的初始化。ctx.channel().isRegistered() - 調用
ChannelInitializer的匿名類指定的initChannel執行自定義的初始化邏輯。
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
//ServerBootstrap中用户指定的channelHandler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
還記得在初始化NioServerSocketChannel時。io.netty.bootstrap.ServerBootstrap#init方法中向pipeline中添加的ChannelInitializer嗎?
- 當執行完
initChannel 方法後,ChannelPipeline的初始化就結束了,此時ChannelInitializer就沒必要再繼續呆在pipeline中了,所需要將ChannelInitializer從pipeline中刪除。pipeline.remove(this)
當初始化完pipeline時,此時pipeline的結構再次發生了變化:
此時Main Reactor中的任務隊列taskQueue結構變化為:
添加ServerBootstrapAcceptor的任務是在初始化NioServerSocketChannel的時候向main reactor提交過去的。還記得嗎?
1.3.8 回調regFuture的ChannelFutureListener
在本小節《Netty服務端的啓動》的最開始,我們介紹了服務端啓動的入口函數io.netty.bootstrap.AbstractBootstrap#doBind,在函數的最開頭調用了initAndRegister()方法用來創建並初始化NioServerSocketChannel,之後便會將NioServerSocketChannel註冊到Main Reactor中。
註冊的操作是一個異步的過程,所以在initAndRegister()方法調用後返回一個代表註冊結果的ChannelFuture regFuture。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
private ChannelFuture doBind(final SocketAddress localAddress) {
//異步創建,初始化,註冊ServerSocketChannel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
//如果註冊完成,則進行綁定操作
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
//添加註冊完成 回調函數
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
...............省略...............
// 註冊完成後,Reactor線程回調這裏
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
}
之後會向ChannelFuture regFuture添加一個註冊完成後的回調函數~~~~ ChannelFutureListener 。在回調函數operationComplete 中開始發起綁端口地址流程。
那麼這個回調函數在什麼時候?什麼地方發起的呢??
讓我們在回到本小節的主題register0 方法的流程中:
當調用doRegister()方法完成NioServerSocketChannel向Main Reactor的註冊後,緊接着會調用pipeline.invokeHandlerAddedIfNeeded()方法中觸發ChannelInitializer#handlerAdded回調中對pipeline進行初始化。
最後在safeSetSuccess方法中,開始回調註冊在regFuture 上的ChannelFutureListener 。
protected final void safeSetSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}
@Override
public boolean trySuccess() {
return trySuccess(null);
}
@Override
public boolean trySuccess(V result) {
return setSuccess0(result);
}
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) {
//回調註冊在promise上的listeners
notifyListeners();
}
return true;
}
return false;
}
safeSetSuccess 的邏輯比較簡單,首先設置regFuture結果為success,並且回調註冊在regFuture上的ChannelFutureListener 。
需要提醒的是,執行
safeSetSuccess方法,以及後邊回調regFuture上的ChannelFutureListener這些動作都是由Reactor線程執行的。關於Netty中的
Promise模型後邊我會在寫一篇專門的文章進行分析,這裏大家只需清楚大體的流程即可。不必在意過多的細節。
下面我們把視角切換到regFuture上的ChannelFutureListener 回調中,看看在Channel註冊完成後,Netty又會做哪些事情?
2. doBind0
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
}
這裏Netty又將綁定端口地址的操作封裝成異步任務,提交給Reactor執行。
但是這裏有一個問題,其實此時執行doBind0方法的線程正是Reactor線程,那為什麼不直接在這裏去執行bind操作,而是再次封裝成異步任務提交給Reactor中的taskQueue呢?
反正最終都是由Reactor線程執行,這其中又有什麼分別呢?
經過上小節的介紹我們知道,bind0方法的調用是由io.netty.channel.AbstractChannel.AbstractUnsafe#register0方法在將NioServerSocketChannel註冊到Main Reactor之後,並且NioServerSocketChannel的pipeline已經初始化完畢後,通過safeSetSuccess 方法回調過來的。
這個過程全程是由Reactor線程來負責執行的,但是此時register0方法並沒有執行完畢,還需要執行後面的邏輯。
而綁定邏輯需要在註冊邏輯執行完之後執行,所以在doBind0方法中Reactor線程會將綁定操作封裝成異步任務先提交給taskQueue中保存,這樣可以使Reactor線程立馬從safeSetSuccess 中返回,繼續執行剩下的register0方法邏輯。
private void register0(ChannelPromise promise) {
try {
................省略............
doRegister();
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//觸發channelRegister事件
pipeline.fireChannelRegistered();
if (isActive()) {
................省略............
}
} catch (Throwable t) {
................省略............
}
}
當Reactor線程執行完register0方法後,就會從taskQueue中取出異步任務執行。
此時Reactor線程中的taskQueue結構如下:
Reactor線程會先取出位於taskQueue隊首的任務執行,這裏是指向NioServerSocketChannel的pipeline中添加ServerBootstrapAcceptor的異步任務。
此時NioServerSocketChannel中pipeline的結構如下:
Reactor線程執行綁定任務。
3. 綁定端口地址
對Channel的操作行為全部定義在ChannelOutboundInvoker接口中。
public interface ChannelOutboundInvoker {
/**
* Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
* completes, either because the operation was successful or because of an error.
*
*/
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
}
bind方法由子類AbstractChannel實現。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
}
調用pipeline.bind(localAddress, promise)在pipeline中傳播bind事件,觸發回調pipeline中所有ChannelHandler的bind方法。
事件在pipeline中的傳播具有方向性:
inbound事件從HeadContext開始逐個向後傳播直到TailContext。outbound事件則是反向傳播,從TailContext開始反向向前傳播直到HeadContext。
inbound事件只能被pipeline中的ChannelInboundHandler響應處理
outbound事件只能被pipeline中的ChannelOutboundHandler響應處理
然而這裏的bind事件在Netty中被定義為outbound事件,所以它在pipeline中是反向傳播。先從TailContext開始反向傳播直到HeadContext。
然而bind的核心邏輯也正是實現在HeadContext中。
3.1 HeadContext
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
//觸發AbstractChannel->bind方法 執行JDK NIO SelectableChannel 執行底層綁定操作
unsafe.bind(localAddress, promise);
}
}
在HeadContext#bind回調方法中,調用Channel裏的unsafe操作類執行真正的綁定操作。
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
.................省略................
//這時channel還未激活 wasActive = false
boolean wasActive = isActive();
try {
//io.netty.channel.socket.nio.NioServerSocketChannel.doBind
//調用具體channel實現類
doBind(localAddress);
} catch (Throwable t) {
.................省略................
return;
}
//綁定成功後 channel激活 觸發channelActive事件傳播
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
//pipeline中觸發channelActive事件
pipeline.fireChannelActive();
}
});
}
//回調註冊在promise上的ChannelFutureListener
safeSetSuccess(promise);
}
protected abstract void doBind(SocketAddress localAddress) throws Exception;
}
- 首先執行子類
NioServerSocketChannel具體實現的doBind方法,通過JDK NIO 原生 ServerSocketChannel執行底層的綁定操作。
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
//調用JDK NIO 底層SelectableChannel 執行綁定操作
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
- 判斷是否為首次綁定,如果是的話將
觸發pipeline中的ChannelActive事件封裝成異步任務放入Reactor中的taskQueue中。 - 執行
safeSetSuccess(promise),回調註冊在promise上的ChannelFutureListener。
還是同樣的問題,當前執行線程已經是Reactor線程了,那麼為何不直接觸發pipeline中的ChannelActive事件而是又封裝成異步任務呢??
因為如果直接在這裏觸發ChannelActive事件,那麼Reactor線程就會去執行pipeline中的ChannelHandler的channelActive事件回調。
這樣的話就影響了safeSetSuccess(promise)的執行,延遲了註冊在promise上的ChannelFutureListener的回調。
到現在為止,Netty服務端就已經完成了綁定端口地址的操作,NioServerSocketChannel的狀態現在變為Active。
最後還有一件重要的事情要做,我們接着來看pipeline中對channelActive事件處理。
3.2 channelActive事件處理
channelActive事件在Netty中定義為inbound事件,所以它在pipeline中的傳播為正向傳播,從HeadContext一直到TailContext為止。
在channelActive事件回調中需要觸發向Selector指定需要監聽的IO事件~~OP_ACCEPT事件。
這塊的邏輯主要在HeadContext中實現。
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) {
//pipeline中繼續向後傳播channelActive事件
ctx.fireChannelActive();
//如果是autoRead 則自動觸發read事件傳播
//在read回調函數中 觸發OP_ACCEPT註冊
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
//如果是autoRead 則觸發read事件傳播
channel.read();
}
}
//AbstractChannel
public Channel read() {
//觸發read事件
pipeline.read();
return this;
}
@Override
public void read(ChannelHandlerContext ctx) {
//觸發註冊OP_ACCEPT或者OP_READ事件
unsafe.beginRead();
}
}
- 在
HeadContext中的channelActive回調中觸發pipeline中的read事件。 - 當
read事件再次傳播到HeadContext時,觸發HeadContext#read方法的回調。在read回調中調用channel底層操作類unsafe的beginRead方法向selector註冊監聽OP_ACCEPT事件。
3.3 beginRead
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void beginRead() {
assertEventLoop();
//channel必須是Active
if (!isActive()) {
return;
}
try {
// 觸發在selector上註冊channel感興趣的監聽事件
doBeginRead();
} catch (final Exception e) {
.............省略..............
}
}
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
//子類負責繼承實現
protected abstract void doBeginRead() throws Exception;
}
- 斷言判斷執行該方法的線程必須是
Reactor線程。 - 此時
NioServerSocketChannel已經完成端口地址的綁定操作,isActive() = true - 調用
doBeginRead實現向Selector註冊監聽事件OP_ACCEPT
public abstract class AbstractNioChannel extends AbstractChannel {
//channel註冊到Selector後獲得的SelectKey
volatile SelectionKey selectionKey;
// Channel監聽事件集合
protected final int readInterestOp;
@Override
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
/**
* 1:ServerSocketChannel 初始化時 readInterestOp設置的是OP_ACCEPT事件
* */
if ((interestOps & readInterestOp) == 0) {
//添加OP_ACCEPT事件到interestOps集合中
selectionKey.interestOps(interestOps | readInterestOp);
}
}
}
- 前邊提到在
NioServerSocketChannel在向Main Reactor中的Selector註冊後,會獲得一個SelectionKey。這裏首先要獲取這個SelectionKey。 - 從
SelectionKey中獲取NioServerSocketChannel感興趣的IO事件集合 interestOps,當時在註冊的時候interestOps設置為0。 - 將在
NioServerSocketChannel初始化時設置的readInterestOp = OP_ACCEPT,設置到SelectionKey中的interestOps集合中。這樣Reactor中的Selector就開始監聽interestOps集合中包含的IO事件了。
Main Reactor中主要監聽的是OP_ACCEPT事件。
流程走到這裏,Netty服務端就真正的啓動起來了,下一步就開始等待接收客户端連接了。大家此刻在來回看這副啓動流程圖,是不是清晰了很多呢?
此時Netty的Reactor模型結構如下:
總結
本文我們通過圖解源碼的方式完整地介紹了整個Netty服務端啓動流程,並介紹了在啓動過程中涉及到的ServerBootstrap 相關的屬性以及配置方式。NioServerSocketChannel 的創建初始化過程以及類的繼承結構。
其中重點介紹了NioServerSocketChannel 向Reactor的註冊過程以及Reactor線程的啓動時機和pipeline的初始化時機。
最後介紹了NioServerSocketChannel綁定端口地址的整個流程。
上述介紹的這些流程全部是異步操作,各種回調繞來繞去的,需要反覆回想下,讀異步代碼就是這樣,需要理清各種回調之間的關係,並且時刻提醒自己當前的執行線程是什麼?
好了,現在Netty服務端已經啓動起來,接着就該接收客户端連接了,我們下篇文章見~~~~