博客 / 詳情

返回

《跟閃電俠學Netty》閲讀筆記 - ChannelHandler 生命週期

引言

本文主要介紹ChannelHandler當中的ChannelInboundHandler。

思維導圖

https://www.mubu.com/doc/1lK922R14Bl

《跟閃電俠學Netty》- ChannelHandler 生命週期.png

LifeCycleTestHandler 案例

首先來看一下案例,LifeCycleTestHandlerTest 利用適配器 ChannelInboundHandlerAdapter 重寫,重寫相關方法。

  • public void handlerAdded(ChannelHandlerContext ctx) throws Exception
  • public void channelRegistered(ChannelHandlerContext ctx) throws Exception
  • public void channelActive(ChannelHandlerContext ctx) throws Exception
  • public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
  • public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
  • public void channelInactive(ChannelHandlerContext ctx) throws Exception
  • public void channelUnregistered(ChannelHandlerContext ctx) throws Exception
  • public void handlerRemoved(ChannelHandlerContext ctx) throws Exception

/**
 * 測試netty channelHandler 的生命週期
 *
 * 對應的是 ChannelInboundHandlerAdapter
 */
@Slf4j
public class LifeCycleTestHandlerTest extends ChannelInboundHandlerAdapter {

    public static void main(String[] args) {
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();

        serverBootstrap
                .group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new LifeCycleTestHandlerTest());

                    }

                })
                .bind(8001);

    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("邏輯處理器被添加 handlerAdded() ");
        super.handlerAdded(ctx);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channel 綁定到線程 (NioEventLoop):channelRegistered() ");
        super.channelRegistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("channel 準備就緒 channelActive()");
        super.channelActive(ctx);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("channel 某次數據讀寫完成 channelReadComplete() ");
        super.channelReadComplete(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("channel 有數據可讀 channelRead() ");
        super.channelRead(ctx, msg);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channel 被關閉 channelInactive()");
        super.channelInactive(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channel取消線程 (NioEventLoop) 的綁定:channelUnregistered() ");
        super.channelUnregistered(ctx);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("邏輯處理器被刪除 handlerRemoved() ");
        super.handlerRemoved(ctx);
    }
}

如果啓動Client客户端並且進行連接,ChannelHanlder 的回調方法執行順序如下:

  1. handlerAdded()
  2. channelRegistered()
  3. channelActive()
  4. channelRead()
  5. channelReadComplete()

Channel連接方法回調

根據上面的執行順序,這裏補充介紹回調方法的含義。

  1. 邏輯處理器被添加:handlerAdded()

    檢測到新連接,調用ch.pipleLine(new LifeCycleTestHandlerTest()) 之後的回調,表示當前的Channel成功綁定一個邏輯處理器

  2. channel 綁定到線程 (NioEventLoop):channelRegistered()

    表示當前Channel所有邏輯處理器和某個線程模型的線程綁定完成,比如綁定到NIO模型的NioEventLoop。此時會創建線程處理連接的讀寫,具體使用了JDK的線程池實現,通過線程池抓一個線程方式綁定。

  3. channel 準備就緒: channelActive()

    觸發條件是所有的業務邏輯準備完成,也就是pipeline添加了所有的Handler ,以及綁定好一個NIO的線程之後,相當於是worker全部就位,就等boss一聲令下開工。

  4. channel 有數據可讀 channelRead()

    客户端每次發數據都會以此進行回調。

  5. channel 某次數據讀寫完成 channelReadComplete()

    服務端每次處理以此數據都回調此方法進行回調 。

Channel 關閉方法回調

  1. channel 被關閉:channelInactive()

    TCP層已經轉為非ESTABLISH模式。

  2. channel取消線程 (NioEventLoop) 的綁定:channelUnregistered()

    與連接線程的綁定全部取消時候觸發。

  1. 邏輯處理器被刪除:handlerRemoved()

    移除連接對應的所有業務邏輯處理器觸發。

Channel連接和關閉回調流程圖

Channel連接和關閉回調流程圖

ChannelHanlder用法舉例

ChannelInitializer

入門程序當中的 .childHandler(new ChannelInitializer<NioSocketChannel>()定義了抽象initChannel方法,抽象方法需要自行實現,通常是服務端啓動流程的邏輯處理器中使用,添加Handler鏈到Pipeline。

handlerAdded() 方法和 channelRegistered()方法都會嘗試調用 initChannel,initChannel被多次調用,利用putIfAbsent防止initChannel被多次調用。

if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.  
思考:為什麼既要防止被多次調用,Netty又要調用多次initChannel
1. handlerAdded方法已經通過`Channel`進行綁定。
2. 用户自行覆蓋和重寫`ChannelInitializer`的`handlerAdded`,導致`Channel`不觸發綁定  
3. `channelRegisted()` 中再觸發一次綁定,並且本身不允許被重寫。  
`public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {`  
4. 引導實現者只關注重寫handlerAdded之後的邏輯處理  
                

handlerAdded 與 channelInActive

主要用做資源釋放和申請。

channelActive 和 ChannelInActive

應用的場景如下:

  1. TCP建立和連接抽象
  2. 實現統計單機的連接數量

    • 其中channelActive被調用,對應的每次調用連接+1
    • 其中channelInActive被調用,對應的每次調用連接-1
  3. 實現客户端IP黑白名單的連接過濾

channelRead()

這個方法和粘包和拆包問題相關,服務端根據自定義協議拆包,在這個方法中每次讀取一定數據,都會累加到容器當中。

channelReadComplete

每次向客户端寫入數據都調用writeAndFlush不是很高效, 更好的實現是向客户端寫數據只write,但是在ChannelReadComplete裏面一次 flush ,所以這個方法相當於批量刷新機制,批量刷新追求更高性能。

總結

  1. 本部分主要是介紹ChannelHandler的各種回調,以及連接建立關閉,執行回調是一個逆向過程。
  2. 每一種回調都有各自用法,但是部分回調界限比較模糊,更多需要在實踐中區分和使用。

寫到最後

生命週期和回調在Netty中非常直觀,本文更多是對於重要的方法進行羅列。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.