引言
本文主要介紹ChannelHandler當中的ChannelInboundHandler。
思維導圖
https://www.mubu.com/doc/1lK922R14Bl
LifeCycleTestHandler 案例
首先來看一下案例,LifeCycleTestHandlerTest 利用適配器 ChannelInboundHandlerAdapter 重寫,重寫相關方法。
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception
- public void channelRegistered(ChannelHandlerContext ctx) throws Exception
- public void channelActive(ChannelHandlerContext ctx) throws Exception
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
- public void channelInactive(ChannelHandlerContext ctx) throws Exception
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception
- public void handlerRemoved(ChannelHandlerContext ctx) throws Exception
/**
* 測試netty channelHandler 的生命週期
*
* 對應的是 ChannelInboundHandlerAdapter
*/
@Slf4j
public class LifeCycleTestHandlerTest extends ChannelInboundHandlerAdapter {
public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new LifeCycleTestHandlerTest());
}
})
.bind(8001);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("邏輯處理器被添加 handlerAdded() ");
super.handlerAdded(ctx);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info("channel 綁定到線程 (NioEventLoop):channelRegistered() ");
super.channelRegistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("channel 準備就緒 channelActive()");
super.channelActive(ctx);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("channel 某次數據讀寫完成 channelReadComplete() ");
super.channelReadComplete(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("channel 有數據可讀 channelRead() ");
super.channelRead(ctx, msg);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("channel 被關閉 channelInactive()");
super.channelInactive(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log.info("channel取消線程 (NioEventLoop) 的綁定:channelUnregistered() ");
super.channelUnregistered(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info("邏輯處理器被刪除 handlerRemoved() ");
super.handlerRemoved(ctx);
}
}
如果啓動Client客户端並且進行連接,ChannelHanlder 的回調方法執行順序如下:
handlerAdded()channelRegistered()channelActive()channelRead()channelReadComplete()
Channel連接方法回調
根據上面的執行順序,這裏補充介紹回調方法的含義。
-
邏輯處理器被添加:
handlerAdded()檢測到新連接,調用
ch.pipleLine(new LifeCycleTestHandlerTest())之後的回調,表示當前的Channel成功綁定一個邏輯處理器。 -
channel 綁定到線程 (NioEventLoop):
channelRegistered()表示當前Channel所有邏輯處理器和某個線程模型的線程綁定完成,比如綁定到NIO模型的NioEventLoop。此時會創建線程處理連接的讀寫,具體使用了JDK的線程池實現,通過線程池抓一個線程方式綁定。
-
channel 準備就緒:
channelActive()觸發條件是所有的業務邏輯準備完成,也就是pipeline添加了所有的Handler ,以及綁定好一個NIO的線程之後,相當於是worker全部就位,就等boss一聲令下開工。
-
channel 有數據可讀
channelRead()客户端每次發數據都會以此進行回調。
-
channel 某次數據讀寫完成
channelReadComplete()服務端每次處理以此數據都回調此方法進行回調 。
Channel 關閉方法回調
-
channel 被關閉:
channelInactive()TCP層已經轉為非ESTABLISH模式。
-
channel取消線程 (NioEventLoop) 的綁定:
channelUnregistered()與連接線程的綁定全部取消時候觸發。
-
邏輯處理器被刪除:
handlerRemoved()移除連接對應的所有業務邏輯處理器觸發。
Channel連接和關閉回調流程圖
ChannelHanlder用法舉例
ChannelInitializer
入門程序當中的 .childHandler(new ChannelInitializer<NioSocketChannel>()定義了抽象initChannel方法,抽象方法需要自行實現,通常是服務端啓動流程的邏輯處理器中使用,添加Handler鏈到Pipeline。
handlerAdded() 方法和 channelRegistered()方法都會嘗試調用 initChannel,initChannel被多次調用,利用putIfAbsent防止initChannel被多次調用。
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
思考:為什麼既要防止被多次調用,Netty又要調用多次initChannel?
1. handlerAdded方法已經通過`Channel`進行綁定。
2. 用户自行覆蓋和重寫`ChannelInitializer`的`handlerAdded`,導致`Channel`不觸發綁定
3. `channelRegisted()` 中再觸發一次綁定,並且本身不允許被重寫。
`public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {`
4. 引導實現者只關注重寫handlerAdded之後的邏輯處理
handlerAdded 與 channelInActive
主要用做資源釋放和申請。
channelActive 和 ChannelInActive
應用的場景如下:
- TCP建立和連接抽象
-
實現統計單機的連接數量
- 其中channelActive被調用,對應的每次調用連接+1
- 其中channelInActive被調用,對應的每次調用連接-1
-
實現客户端IP黑白名單的連接過濾
channelRead()
這個方法和粘包和拆包問題相關,服務端根據自定義協議拆包,在這個方法中每次讀取一定數據,都會累加到容器當中。
channelReadComplete
每次向客户端寫入數據都調用writeAndFlush不是很高效, 更好的實現是向客户端寫數據只write,但是在ChannelReadComplete裏面一次 flush ,所以這個方法相當於批量刷新機制,批量刷新追求更高性能。
總結
- 本部分主要是介紹ChannelHandler的各種回調,以及連接建立關閉,執行回調是一個逆向過程。
- 每一種回調都有各自用法,但是部分回調界限比較模糊,更多需要在實踐中區分和使用。
寫到最後
生命週期和回調在Netty中非常直觀,本文更多是對於重要的方法進行羅列。