博客 / 詳情

返回

Netty服務端開發及性能優化 | 京東雲技術團隊

作者:京東物流 王奕龍

Netty是一個異步基於事件驅動高性能網絡通信框架,可以看做是對NIO和BIO的封裝,並提供了簡單易用的API、Handler和工具類等,用以快速開發高性能、高可靠性的網絡服務端和客户端程序。

1. 創建服務端

服務端啓動需要創建 ServerBootstrap 對象,並完成初始化線程模型配置IO模型添加業務處理邏輯(Handler) 。在添加業務處理邏輯時,調用的是 childHandler() 方法添加了一個ChannelInitializer,代碼示例如下

// 負責服務端的啓動
ServerBootstrap serverBootstrap = new ServerBootstrap();

// 以下兩個對象可以看做是兩個線程組
// boss線程組負責監聽端口,接受新的連接
NioEventLoopGroup boss = new NioEventLoopGroup();
// worker線程組負責讀取數據
NioEventLoopGroup worker = new NioEventLoopGroup();

// 配置線程組並指定NIO模型
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
        // 定義後續每個 新連接 的讀寫業務邏輯
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline()
                        // 添加業務處理邏輯
                        .addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
                                System.out.println(msg);
                            }
                        });
            }
        });

// 綁定端口號
serverBootstrap.bind(2002);

通過調用 .channel(NioServerSocketChannel.class) 方法指定 Channel 類型為NIO類型,如果要指定為BIO類型,參數改成 OioServerSocketChannel.class 即可。

其中 nioSocketChannel.pipeline() 用來獲取 PipeLine 對象,調用方法 addLast() 添加必要的業務處理邏輯,這裏採用的是責任鏈模式,會將每個Handler作為一個節點進行處理。

1.1 創建客户端

客户端與服務端啓動類似,不同的是,客户端需要創建 Bootstrap 對象來啓動,並指定一個客户端線程組,相同的是都需要完成初始化線程模型配置IO模型添加業務處理邏輯(Handler) , 代碼示例如下

// 負責客户端的啓動
Bootstrap bootstrap = new Bootstrap();
// 客户端的線程模型
NioEventLoopGroup group = new NioEventLoopGroup();

// 指定線程組和NIO模型
bootstrap.group(group).channel(NioSocketChannel.class)
        // handler() 方法封裝業務處理邏輯
        .handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel channel) throws Exception {
                channel.pipeline()
                    // 添加業務處理邏輯
                    .addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
                                System.out.println(msg);
                            }
                    });
            }
        });

// 連接服務端IP和端口
bootstrap.connect("127.0.0.1", 2002);

(注意:下文中內容均以服務端代碼示例為準)

2. 編碼和解碼

客户端與服務端進行通信,通信的消息是以二進制字節流的形式通過 Channel 進行傳遞的,所以當我們在客户端封裝好Java業務對象後,需要將其按照協議轉換成字節數組,並且當服務端接受到該二進制字節流時,需要將其根據協議再次解碼成Java業務對象進行邏輯處理,這就是編碼和解碼的過程。Netty 為我們提供了MessageToByteEncoder 用於編碼,ByteToMessageDecoder 用於解碼。

2.1 MessageToByteEncoder

用於將Java對象編碼成字節數組並寫入 ByteBuf,代碼示例如下

public class TcpEncoder extends MessageToByteEncoder<Message> {

    /**
     * 序列化器
     */
    private final Serializer serializer;

    public TcpEncoder(Serializer serializer) {
        this.serializer = serializer;
    }

    /**
     * 編碼的執行邏輯
     *
     * @param message 需要被編碼的消息對象
     * @param byteBuf 將字節數組寫入ByteBuf
     */
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
        // 通過自定義的序列化器將對象轉換成字節數組
        byte[] bytes = serializer.serialize(message);
        // 將字節數組寫入 ByteBuf 便完成了對象的編碼流程
        byteBuf.writeBytes(bytes);
    }
}

2.2 ByteToMessageDecoder

它用於將接收到的二進制數據流解碼成Java對象,與上述代碼類似,只不過是將該過程反過來了而已,代碼示例如下

public class TcpDecoder extends ByteToMessageDecoder {
    /**
     * 序列化器
     */
    private final Serializer serializer;

    public TcpDecoder(Serializer serializer) {
        this.serializer = serializer;
    }

    /**
     * 解碼的執行邏輯
     *
     * @param byteBuf 接收到的ByteBuf對象
     * @param list    任何完成解碼的Java對象添加到該List中即可
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception {
        // 根據協議自定義的解碼邏輯將其解碼成Java對象
        Message message = serializer.deSerialize(byteBuf);
        // 解碼完成後添加到List中即可
        list.add(message);
    }
}

2.3 注意要點

ByteBuf默認情況下使用的是堆外內存,不進行內存釋放會發生內存溢出。不過 ByteToMessageDecoder 和 MessageToByteEncoder 這兩個解碼和編碼Handler 會自動幫我們完成內存釋放的操作,無需再次手動釋放。因為我們實現的 encode() 和 decode() 方法只是這兩個 Handler 源碼中執行的一個環節,最終會在 finally 代碼塊中完成對內存的釋放,具體內容可閲讀 MessageToByteEncoder 中第99行 write() 方法源碼。

2.4 在服務端中添加編碼解碼Handler

serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline()
                        // 接收到請求時進行解碼
                        .addLast(new TcpDecoder(serializer))
                        // 發送請求時進行編碼
                        .addLast(new TcpEncoder(serializer));
            }
        });

3. 添加業務處理Handler

在Netty框架中,客户端與服務端的每個連接都對應着一個 Channel,而這個 Channel 的所有處理邏輯都封裝在一個叫作ChannelPipeline 的對象裏。ChannelPipeline 是一個雙向鏈表,它使用的是責任鏈模式,每個鏈表節點都是一個 Handler,能通它能獲取 Channel 相關的上下文信息(ChannelHandlerContext)。
Netty為我們提供了多種讀取 Channel 中數據的 Handler,其中比較常用的是 ChannelInboundHandlerAdapter 和SimpleChannelInboundHandler,下文中我們以讀取心跳消息為例。

3.1 ChannelInboundHandlerAdapter

如下為處理心跳業務邏輯的 Handler,具體執行邏輯參考代碼和註釋即可

public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

    /**
     * channel中有數據可讀時,會回調該方法
     *
     * @param msg 如果在該Handler前沒有解碼Handler節點處理,該對象類型為ByteBuf;否則為解碼後的Java對象
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Message message = (Message) msg;
        // 處理心跳消息
        processHeartBeatMessage(message);

        // 初始化Ack消息
        Message ackMessage = initialAckMessage();
        // 回寫給客户端
        ctx.channel().writeAndFlush(ackMessage);
    }
}

3.2 SimpleChannelInboundHandler

SimpleChannelInboundHandler 是ChannelInboundHandlerAdapter 的實現類,SimpleChannelInboundHandler 能夠指定泛型,這樣在處理業務邏輯時,便無需再添加上文代碼中對象強轉的邏輯,這部分代碼實現是在 SimpleChannelInboundHandler 的 channelRead() 方法中完成的,它是一個模版方法,我們僅僅需要實現 channelRead0() 方法即可,代碼示例如下

public class HeartBeatHandler extends SimpleChannelInboundHandler<Message> {

    /**
     * @param msg 注意這裏的對象類型即為 Message
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        // 處理心跳消息
        processHeartBeatMessage(message);

        // 初始化Ack消息
        Message ackMessage = initialAckMessage();
        // 回寫給客户端
        ctx.channel().writeAndFlush(ackMessage);
    }
}

3.3 在服務端中添加心跳處理Handler

serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline()
                        // 接收到進行解碼
                        .addLast(new TcpDecoder(serializer))
                        // 心跳業務處理Handler
                        .addLast(new HeartBeatHandler())
                        // 發送請求時進行編碼
                        .addLast(new TcpEncoder(serializer));
            }
        });

4. ChannelHandler的生命週期

在 ChannelInboundHandlerAdapter 可以通過實現不同的方法來完成指定時機的方法回調,具體可參考如下代碼

public class LifeCycleHandler extends ChannelInboundHandlerAdapter {
    /**
     * 當檢測到新連接之後,調用 ch.pipeline().addLast(...); 之後的回調
     * 表示當前channel中成功添加了 Handler
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("邏輯處理器被添加時回調:handlerAdded()");
        super.handlerAdded(ctx);
    }

    /**
     * 表示當前channel的所有邏輯處理已經和某個NIO線程建立了綁定關係
     * 這裏的NIO線程通常指的是 NioEventLoop
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 綁定到線程(NioEventLoop)時回調:channelRegistered()");
        super.channelRegistered(ctx);
    }

    /**
     * 當Channel的所有業務邏輯鏈準備完畢,連接被激活時
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 準備就緒時回調:channelActive()");
        super.channelActive(ctx);
    }

    /**
     * 客户端向服務端發送數據,表示有數據可讀時,就會回調該方法
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channel 有數據可讀時回調:channelRead()");
        super.channelRead(ctx, msg);
    }

    /**
     * 服務端每完整的讀完一次數據,都會回調該方法
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 某次數據讀完時回調:channelReadComplete()");
        super.channelReadComplete(ctx);
    }

    // ---斷開連接時---

    /**
     * 該客户端與服務端的連接被關閉時回調
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 被關閉時回調:channelInactive()");
        super.channelInactive(ctx);
    }

    /**
     * 對應的NIO線程移除了對這個連接的處理
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 取消線程(NioEventLoop) 的綁定時回調: channelUnregistered()");
        super.channelUnregistered(ctx);
    }

    /**
     * 為該連接添加的所有業務邏輯Handler被移除時
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("邏輯處理器被移除時回調:handlerRemoved()");
        super.handlerRemoved(ctx);
    }
}

5. 解決粘包和半包問題

即使我們發送消息的時候是以 ByteBuf 的形式發送的,但是到了底層操作系統,仍然是以字節流的形式對數據進行發送的,而且服務端也以字節流的形式讀取,因此在服務端對字節流進行拼接時,可能就會造成發送時 ByteBuf 與讀取時的 ByteBuf 不對等的情況,這就是所謂的粘包或半包現象。

以如下情況為例,當客户端頻繁的向服務端發送心跳消息時,讀取到的ByteBuf信息如下,其中一個心跳請求是用紅框圈出的部分

粘包日誌.png

可以發現多個心跳請求"粘"在了一起,那麼我們需要對它進行拆包處理,否則只會讀取第一條心跳請求,之後的請求會全部失效

Netty 為我們提供了基於長度的拆包器LengthFieldBasedFrameDecoder 來進行拆包工作,它能對超過所需數據量的包進行拆分,也能在數據不足的時候等待讀取,直到數據足夠時,構成一個完整的數據包並進行業務處理。

5.1 LengthFieldBasedFrameDecoder

以標準接口文檔中的協議(圖示)為準,代碼示例如下,其中的四個參數比較重要,詳細信息可見註釋描述

標準協議.png

public class SplitHandler extends LengthFieldBasedFrameDecoder {

    /**
     * 在協議中表示數據長度的字段在字節流首尾中的偏移量
     */
    private static final Integer LENGTH_FIELD_OFFSET = 10;

    /**
     * 表示數據長度的字節長度
     */
    private static final Integer LENGTH_FIELD_LENGTH = 4;

    /**
     * 數據長度後邊的頭信息中的字節偏移量
     */
    private static final Integer LENGTH_ADJUSTMENT = 10;

    /**
     * 表示從第一個字節開始需要捨去的字節數,在我們的協議中,不需要進行捨去
     */
    private static final Integer INITIAL_BYTES_TO_STRIP = 0;

    public SplitHandler() {
        super(Integer.MAX_VALUE, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP);
    }
}

之後將其添加到Handler中即可,如果遇到其他協議,更改其中參數或查看 LengthFieldBasedFrameDecoder 的JavaDoc中詳細描述。

5.2 在服務端中添加拆包Handler

serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline()
                    // 拆包Handler
                    .addLast(new SplitHandler())
                    // 接收到進行解碼
                    .addLast(new TcpDecoder(serializer))
                    // 心跳業務處理Handler
                    .addLast(new HeartBeatHandler())
                    // 發送請求時進行編碼
                    .addLast(new TcpEncoder(serializer));
            }
        });

6. Netty性能優化

6.1 Handler對單例模式的應用

Netty 在每次有新連接到來的時候,都會調用 ChannelInitializer 的 initChannel() 方法,會將其中相關的 Handler 都創建一次,
如果其中的 Handler 是無狀態且能夠通用的,可以將其改成單例,這樣就能夠在每次連接建立時,避免多次創建相同的對象。

以如下服務端代碼為例,包含如下Handler,可以將編碼解碼、以及業務處理Handler都定義成Spring單例bean的形式注入進來,這樣就能夠完成對象的複用而無需每次建立連接都創建相同的對象了

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                        // 拆包Handler
                        .addLast(new SplitHandler())
                        // 日誌Handler
                        .addLast(new LoggingHandler(LogLevel.INFO))
                        // 解碼Handler
                        .addLast(new TcpDecoder(serializer))
                        // 心跳、格口狀態、設備狀態、RFID上報、掃碼上報和分揀結果上報Handler
                        .addLast(new HeartBeatHandler(), new ChuteStatusHandler())
                        .addLast(new DeviceStatusReceiveHandler(), new RfidBindReceiveHandler())
                        .addLast(new ScanReceiveHandler(), new SortResultHandler())
                        // 編碼Handler
                        .addLast(new TcpEncoder(serializer));
            }
        });

改造完成後如下

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                        // 拆包Handler
                        .addLast(new SplitHandler())
                        // 日誌Handler
                        .addLast(new LoggingHandler(LogLevel.INFO))
                        // 解碼Handler
                        .addLast(tcpDecoder)
                        // 心跳、格口狀態、設備狀態、RFID上報、掃碼上報和分揀結果上報Handler
                        .addLast(heartBeatHandler, chuteStatusHandler)
                        .addLast(deviceStatusReceiveHandler, rfidBindReceiveHandler)
                        .addLast(scanReceiveHandler, sortResultHandler)
                        // 編碼Handler
                        .addLast(tcpEncoder);
            }
        });

不過需要注意在每個單例Handler的類上標註 @ChannelHandler.Sharable 註解,否則會拋出如下異常

io.netty.channel.ChannelPipelineException: netty.book.practice.handler.server.LoginHandler is not a @Sharable handler, so can't be added or removed multiple times

另外,SplitHanlder 不能進行單例處理,因為它的內部實現與每個 Channel 都有關,每個 SplitHandler 都需要維持每個Channel 讀到的數據,即它是有狀態的

6.2 縮短責任鏈調用

對服務端來説,每次解碼出來的Java對象在多個業務處理 Handler 中只會經過一個其中 Handler 完成業務處理,那麼我們將所有業務相關的 Handler封裝起來到一個Map中,每次只讓它經過必要的Handler而不是經過整個責任鏈,那麼便可以提高Netty處理請求的性能。

定義如下 ServerHandlers 單例bean,並使用 策略模式 將對應的 Handler 管理起來,每次處理時根據消息類型獲取對應的 Handler 來完成業務邏輯

@ChannelHandler.Sharable
public class ServerHandlers extends SimpleChannelInboundHandler<Message> {

    @Resourse
    private HeartBeatHandler heartBeatHandler;

    /**
     * 策略模式封裝Handler,這樣就能在回調 ServerHandler 的 channelRead0 方法時
     * 找到具體的Handler,而不需要經過責任鏈的每個 Handler 節點,以此來提高性能
     */
    private final Map<Command, SimpleChannelInboundHandler<Message>> map;

    public ServerHandler() {
        map = new HashMap();

        // key: 消息類型枚舉 value: 對應的Handler
        map.put(MessageType.HEART_BEAT, heartBeatHandler);
        // ...
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        // 調用 channelRead() 方法完成業務邏輯處理
        map.get(msg.getMessageType()).channelRead(ctx, msg);
    }
}

改造完成後,服務端代碼如下,因為我們封裝了平行的業務處理Handler,所以代碼很清爽

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                        // 拆包Handler
                        .addLast(new SplitHandler())
                        // 日誌Handler
                        .addLast(new LoggingHandler(LogLevel.INFO))
                        // 解碼Handler
                        .addLast(tcpDecoder)
                        // serverHandlers 封裝了 心跳、格口狀態、設備狀態、RFID上報、掃碼上報和分揀結果上報Handler
                        .addLast(serverHandlers)
                        // 編碼Handler
                        .addLast(tcpEncoder);
            }
        });

6.3 合併編碼、解碼Handler

Netty 對編碼解碼提供了統一處理Handler是MessageToMessageCodec,這樣我們就能將編碼和解碼的Handler合併成一個添加接口,代碼示例如下

@ChannelHandler.Sharable
public class MessageCodecHandler extends MessageToMessageCodec<ByteBuf, Message> {

    /**
     * 序列化器
     */
    @Resourse
    private Serializer serializer;

    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
        // 將字節數組寫入 ByteBuf 
        ByteBuf byteBuf = ctx.alloc().ioBuffer();
        serializer.serialize(byteBuf, msg);

        // 這個編碼也需要添加到List中
        out.add(byteBuf);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
        // 根據協議自定義的解碼邏輯將其解碼成Java對象,並添加到List中
        out.add(serializer.deSerialize(msg));
    }
}

改造完成後,服務端代碼如下,將其放在業務處理Handler前即可,調用完業務Handler邏輯,會執行編碼邏輯

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                        // 拆包Handler
                        .addLast(new SplitHandler())
                        // 日誌Handler
                        .addLast(new LoggingHandler(LogLevel.INFO))
                        // 解碼、編碼Handler
                        .addLast(messageCodecHandler)
                        // serverHandlers 封裝了 心跳、格口狀態、設備狀態、RFID上報、掃碼上報和分揀結果上報Handler
                        .addLast(serverHandlers);
            }
        });

6.4 減少NIO線程阻塞

對於耗時的業務操作,需要將它們都丟到業務線程池中去處理,因為單個NIO線程會管理很多 Channel ,只要有一個 Channel 中的 Handler 的 channelRead() 方法被業務邏輯阻塞,那麼它就會拖慢綁定在該NIO線程上的其他所有 Channel

為了避免上述情況,可以在包含長時間業務處理邏輯的Handler中創建一個線程池,並將其丟入線程池中進行執行,偽代碼如下

protected void channelRead(ChannelHandlerContext ctx, Object message) {
    threadPool.submit(new Runnable() {
        // 耗時的業務處理邏輯
        doSomethingSependTooMuchTime();
        
        writeAndFlush();
    });
}

6.5 空閒"假死"檢測Handler

如果底層的TCP連接已經斷開,但是另一端服務並沒有捕獲到,在某一端(客户端或服務端)看來會認為這條連接仍然存在,這就是連接"假死"現象。這造成的問題就是,對於服務端來説,每個連接連接都會耗費CPU和內存資源,過多的假死連接會造成性能下降和服務崩潰;對客户端來説,
連接假死會使得發往服務端的請求都會超時,所以需要儘可能避免假死現象的發生。

造成假死的原因可能是公網丟包、客户端或服務端網絡故障等,Netty為我們提供了 IdleStateHandler 來解決超時假死問題,示例代碼如下

public class MyIdleStateHandler extends IdleStateHandler {

    private static final int READER_IDLE_TIME = 15;

    public MyIdleStateHandler() {
        // 讀超時時間、寫超時時間、讀寫超時時間 指定0值不判斷超時
        super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
        System.out.println(READER_IDLE_TIME + "秒內沒有讀到數據,關閉連接");
        ctx.channel().close();
    }
}

其構造方法中有三個參數來分別指定讀、寫和讀寫超時時間,當指定0時不判斷超時,除此之外Netty也有專門用來處理讀和寫超時的Handler,分別為 ReadTimeoutHandlerWriteTimeoutHandler

將其添加到服務端 Handler 的首位即可

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                        // 超時判斷Handler
                        .addLast(new MyIdleStateHandler())
                        // 拆包Handler
                        .addLast(new SplitHandler())
                        // 日誌Handler
                        .addLast(new LoggingHandler(LogLevel.INFO))
                        // 解碼、編碼Handler
                        .addLast(messageCodecHandler)
                        // serverHandlers 封裝了 心跳、格口狀態、設備狀態、RFID上報、掃碼上報和分揀結果上報Handler
                        .addLast(serverHandlers);
            }
        });

7. ChannelPipeline

ChannelPipeline 與 Channel 密切相關,它可以看做是一條流水線,數據以字節流的形式進來,經過不同 Handler 的"加工處理",
最終以字節流的形式輸出。ChannelPipeline 在每條新連接建立的時候被創建,是一條雙向鏈表,其中每一個節點都是ChannelHadnlerContext 對象,能夠通過它拿到相關的上下文信息,默認它有頭節點 HeadContext 和尾結點 TailContext

7.1 InboundHandler 和 OutboundHandler

定義在 ChannelPipeline 中的 Handler 是可插拔的,能夠完成動態編織,調用 ctx.pipeline().remove() 方法可移除,調用 ctx.pipeline().addXxx() 方法可進行添加。

InboundHandler 與 OutboundHandler 處理的事件不同,前者處理 Inbound事件,典型的就是讀取數據流並加工處理;後者會對調用 writeAndFlush() 方法的 Outbound事件 進行處理。

此外,兩者的傳播機制也是不同的:

InboundHandler 會從鏈表頭逐個向下調用,頭節點只是簡單的將該事件傳播下去(ctx.fireChannelRead(mug)),執行過程中調用findContextInbound() 方法來尋找 InboundHandler 節點,直到 TailContext 節點執行方法完畢,結束調用。

一般自定義的 ChannelInboundHandler 都繼承自ChannelInboundHandlerAdapter, 如果沒有覆蓋channelXxx() 相關方法,那麼該事件正常會遍歷雙向鏈表一直傳播到尾結點,否則就會在當前節點執行完結束;當然也可以調用 fireXxx() 方法讓事件從當前節點繼續向下傳播。

OutboundHandler 是從鏈表尾向鏈表頭調用,相當於反向遍歷 ChannelPipeline 雙向鏈表,Outbound事件 會先經過TailContext 尾節點,並在執行過程中不斷尋找OutboundHandler 節點加工處理,直到頭節點 HeadContext 調用 Unsafe.write() 方法結束。

7.2 異常傳播

異常的傳播機制和 Inbound事件 的傳播機制類似,在任何節點發生的異常都會向下一個節點傳遞。如果自定義的 Handler 沒有處理異常也沒有實現 exceptionCaught() 方法,最終則會落到 TailContext 節點,控制枱打印異常未處理的警告信息。

通常異常處理,我們會定義一個異常處理器,繼承自ChannelDuplexHandler ,放在自定義鏈表節點的末尾,這樣就能夠一定捕獲和處理異常。

8. Reactor線程模型

8.1 NioEventLoopGroup

創建 new NioEventLoopGroup() 它的默認線程數是當前CPU線程數的2倍,最終會調用到如下源碼

// 這裏計算的線程數量
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
        "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

跟進到構造方法的最終實現,會執行如下業務邏輯

NioEventLoopGroup構造方法.jpg

其中在第2步創建 NioEventLoop 時,值得關注的是創建了一個 Selector,以此來實現IO多路複用;另外它還創建了高性能 MPSC(多生產者單消費者)隊列,藉助它來協調任務的異步執行,如此單條線程(NioEventLoop)、Selector和MPSC它們三者是一對一的關係。而每條連接都對應一個 Channel,每個 Channel 都綁定唯一一個 NioEventLoop,因此單個連接的所有操作都是在一個線程中執行,是線程安全的。

第3步驟創建線程選擇器,它的作用是為連接在NioEventLoopGroup 中選擇一個 NioEventLoop,並將該連接與 NioEventLoop 中的 Selector 完成綁定。

在底層有兩種選擇器的實現,分別是PowerOfTowEventExecutorChooser 和GenericEventExecutorChooser,它們的原理都是從線程池裏循環選擇線程,不同的是前者計算循環的索引採用的是位運算而後者採用的是取餘運算

8.2 Reactor線程 select 操作

源碼位置 NioEventLoop 的 run() 方法, select 操作會不斷輪詢是否有IO事件發生,並且在輪詢過程中不斷檢查是否有任務需要執行,保證Netty任務隊列中的任務能夠及時執行,輪詢過程使用一個計數器避開了 JDK 的空輪詢Bug

8.3 處理產生IO事件的Channel

在 Netty 的 Channel 中,有兩大類型的 Channel,一個是 NioServerSocketChannel,由 boss NioEventLoop 處理;另一個是 NioSocketChannel,由worker NioEventLoop 處理,所以

  1. 對於 boss NioEventLoop 來説,輪詢到的是連接事件,後續通過 NioServerSocketChannel 的 Pipeline 將連接交給一個 work NioEventLoop 處理
  2. 對於 work NioEventLoop 來説,輪詢到的是讀寫事件,後續通過 NioSocketChannel 的 Pipeline 將讀取到的數據傳遞給每個ChannelHandler 處理

注意任務的執行都是異步的。

8.4 任務的收集和執行

上文中提到了我們創建了高性能的MPSC隊列,它是用來聚集非Reactor線程創建的任務的,NioEventLoop 會在執行的過程中不斷檢測是否有事件發生,如果有事件發生就處理,處理完事件之後再處理非Reactor線程創建的任務。在檢測是否有事件發生的時候,為了保證異步任務的及時處理,只要有任務要處理,就會停止任務檢測,去處理任務,處理任務時是Reactor單線程執行。

8.5 註冊連接的流程

當 boss Reactor線程檢測到 ACCEPT 事件之後,創建一個 NioSocketChannel,並把用户設置的 ChannelOption(Option參數配置)、ChannelAttr(Channel 參數)、ChannelHandler(ChannelInitializer)封裝到 NioSocketChannel 中。接着,使用線程選擇器在NioEventLoopGroup 中選擇一條 NioEventLoop (線程),把 NioSocketChannel 中包裝的JDK Channel 當做Key,自身(NioSocketChannel)作為 attachment,註冊 NioEventLoop 對應的 Selector上。這樣,後續有讀寫事件發生,就可以直接獲取 attachment 來處理讀寫數據的邏輯。

8.6 如何理解IO多路複用

簡單地説:IO多路複用是指可以在一個線程內處理多個連接的IO事件請求。以Java中的IO多路複用為例,服務端創建 Selector 對象不斷的調用 select() 方法來處理各個連接上的IO事件,之後將這些IO事件交給任務線程異步去執行,這就達到了在一個線程內同時處理多個連接的IO請求事件的目的。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.