动态

详情 返回 返回

Netty與網絡編程 - 动态 详情

要了解Netty,必須先了解網絡編程

1 網絡編程

1.1 網絡IO模型

1.1.1 網絡三種I/O模型分類:

  • BIO:(同步 阻塞)jdk1.4以前 java.io包
  • NIO:(同步 非阻塞)jdk1.4 java.nio包
  • AIO:(異步 非阻塞)jdk1.7 java.nio包

1.1.2 BIO、NIO、AIO處理模式

  • 1)BIO:一個連接一個線程,客户端有連接請求時服務器端就需要啓動一個線程進行處理,線程開銷大。
  • 2)NIO:一個請求一個線程,客户端發送的連接請求會註冊到多路複用器上,多路複用器輪詢到該連接有I/O請求時才啓動一個線程進行處理;
  • 3)AIO:一個有效請求一個線程,客户端的I/O請求都是由OS先完成了再通知服務器應用去啓動線程進行處理。

通俗地概括一下就是:

  • 1)BIO是面向流的,NIO是面向緩衝區的;
  • 2)BIO的各種流是阻塞的,而NIO是非阻塞的;
  • 3)BIO的Stream是單向的,而NIO的channel是雙向的。

1.1.3 阻塞與非阻塞

Java IO的各種流是阻塞的。這意味着,當一個線程調用read() 或 write()時,該線程被阻塞,直到有一些數據被讀取,或數據完全寫入。該線程在此期間不能再幹任何事情了。

Java NIO的非阻塞模式,使一個線程從某通道發送請求讀取數據,但是它僅能得到目前可用的數據,如果目前沒有數據可用時,就什麼都不會獲取。而不是保持線程阻塞,所以直至數據變的可以讀取之前,該線程可以繼續做其他的事情。非阻塞寫也是如此。一個線程請求寫入一些數據到某通道,但不需要等待它完全寫入,這個線程同時可以去做別的事情。線程通常將非阻塞IO的空閒時間用於在其它通道上執行IO操作,所以一個單獨的線程現在可以管理多個輸入和輸出通道(channel)。

1.1.4 同步與異步

同步:數據就緒後需要自己去讀是同步

異步:數據就緒直接讀好再回調給程序是異步

1.2 NIO

1.2.1 NIO三大核心組件

NIO有三大核心組件:Selector選擇器、Channel管道、buffer緩衝區。

Selector

Selector的英文含義是“選擇器”,也可以稱為為“輪詢代理器”、“事件訂閲器”、“channel容器管理機”都行。

Java NIO的選擇器允許一個單獨的線程來監視多個輸入通道,你可以註冊多個通道使用一個選擇器(Selectors),然後使用一個單獨的線程來操作這個選擇器,進而“選擇”通道:這些通道里已經有可以處理的輸入,或者選擇已準備寫入的通道。這種選擇機制,使得一個單獨的線程很容易來管理多個通道。

應用程序將向Selector對象註冊需要它關注的Channel,以及具體的某一個Channel會對哪些IO事件感興趣。Selector中也會維護一個“已經註冊的Channel”的容器。

Channel

通道,被建立的一個應用程序和操作系統交互事件、傳遞內容的渠道(注意是連接到操作系統)。那麼既然是和操作系統進行內容的傳遞,那麼説明應用程序可以通過通道讀取數據,也可以通過通道向操作系統寫數據,而且可以同時進行讀寫。

  • 所有被Selector(選擇器)註冊的通道,只能是繼承了SelectableChannel類的子類。
  • ServerSocketChannel:應用服務器程序的監聽通道。只有通過這個通道,應用程序才能向操作系統註冊支持“多路複用IO”的端口監聽。同時支持UDP協議和TCP協議。
  • ScoketChannel:TCP Socket套接字的監聽通道,一個Socket套接字對應了一個客户端IP:端口到服務器IP:端口的通信連接。

通道中的數據總是要先讀到一個Buffer,或者總是要從一個Buffer中寫入。

buffer緩衝區

網絡通訊中負責數據讀寫的區域

1.3 NIO之Reactor模式

1.3.1 單線程Reator模式

  • ①服務器端的Reactor是一個線程對象,該線程會啓動事件循環,並使用Selector(選擇器)來實現IO的多路複用。註冊一個Acceptor事件處理器到Reactor中,Acceptor事件處理器所關注的事件是ACCEPT事件,這樣Reactor會監聽客户端向服務器端發起的連接請求事件(ACCEPT事件)。
  • ②客户端向服務器端發起一個連接請求,Reactor監聽到了該ACCEPT事件的發生並將該ACCEPT事件派發給相應的Acceptor處理器來進行處理。Acceptor處理器通過accept()方法得到與這個客户端對應的連接(SocketChannel),然後將該連接所關注的READ事件以及對應的READ事件處理器註冊到Reactor中,這樣一來Reactor就會監聽該連接的READ事件了。
  • ③當Reactor監聽到有讀或者寫事件發生時,將相關的事件派發給對應的處理器進行處理。比如,讀處理器會通過SocketChannel的read()方法讀取數據,此時read()操作可以直接讀取到數據,而不會堵塞與等待可讀的數據到來。
  • ④每當處理完所有就緒的感興趣的I/O事件後,Reactor線程會再次執行select()阻塞等待新的事件就緒並將其分派給對應處理器進行處理。

注意,Reactor的單線程模式的單線程主要是針對於I/O操作而言,也就是所有的I/O的accept()、read()、write()以及connect()操作都在一個線程上完成的。

但在目前的單線程Reactor模式中,不僅I/O操作在該Reactor線程上,連非I/O的業務操作也在該線程上進行處理了,這可能會大大延遲I/O請求的響應。所以我們應該將非I/O的業務邏輯操作從Reactor線程上卸載,以此來加速Reactor線程對I/O請求的響應。

1.3.2 多線程Reactor模型

與單線程Reactor模式不同的是,添加了一個工作者線程池,並將非I/O操作從Reactor線程中移出轉交給工作者線程池來執行。這樣能夠提高Reactor線程的I/O響應,不至於因為一些耗時的業務邏輯而延遲對後面I/O請求的處理。

使用線程池的優勢:

  • ①通過重用現有的線程而不是創建新線程,可以在處理多個請求時分攤在線程創建和銷燬過程產生的巨大開銷。
  • ②另一個額外的好處是,當請求到達時,工作線程通常已經存在,因此不會由於等待創建線程而延遲任務的執行,從而提高了響應性。
  • ③通過適當調整線程池的大小,可以創建足夠多的線程以便使處理器保持忙碌狀態。同時還可以防止過多線程相互競爭資源而使應用程序耗盡內存或失敗。

改進的版本中,所以的I/O操作依舊由一個Reactor來完成,包括I/O的accept()、read()、write()以及connect()操作。

對於一些小容量應用場景,可以使用單線程模型。但是對於高負載、大併發或大數據量的應用場景卻不合適,主要原因如下:

  • ①一個NIO線程同時處理成百上千的鏈路,性能上無法支撐,即便NIO線程的CPU負荷達到100%,也無法滿足海量消息的讀取和發送;
  • ②當NIO線程負載過重之後,處理速度將變慢,這會導致大量客户端連接超時,超時之後往往會進行重發,這更加重了NIO線程的負載,最終會導致大量消息積壓和處理超時,成為系統的性能瓶頸;

1.3.3 主從多線程Reactor模式

Reactor線程池中的每一Reactor線程都會有自己的Selector、線程和分發的事件循環邏輯。

mainReactor可以只有一個,但subReactor一般會有多個。mainReactor線程主要負責接收客户端的連接請求,然後將接收到的SocketChannel傳遞給subReactor,由subReactor來完成和客户端的通信。

流程:

①註冊一個Acceptor事件處理器到mainReactor中,Acceptor事件處理器所關注的事件是ACCEPT事件,這樣mainReactor會監聽客户端向服務器端發起的連接請求事件(ACCEPT事件)。啓動mainReactor的事件循環。

②客户端向服務器端發起一個連接請求,mainReactor監聽到了該ACCEPT事件並將該ACCEPT事件派發給Acceptor處理器來進行處理。Acceptor處理器通過accept()方法得到與這個客户端對應的連接(SocketChannel),然後將這個SocketChannel傳遞給subReactor線程池。

③ subReactor線程池分配一個subReactor線程給這個SocketChannel,即,將SocketChannel關注的READ事件以及對應的READ事件處理器註冊到subReactor線程中。當然你也註冊WRITE事件以及WRITE事件處理器到subReactor線程中以完成I/O寫操作。Reactor線程池中的每一Reactor線程都會有自己的Selector、線程和分發的循環邏輯。

④當有I/O事件就緒時,相關的subReactor就將事件派發給響應的處理器處理。注意,這裏subReactor線程只負責完成I/O的read()操作,在讀取到數據後將業務邏輯的處理放入到線程池中完成,若完成業務邏輯後需要返回數據給客户端,則相關的I/O的write操作還是會被提交回subReactor線程來完成。

注意,所以的I/O操作(包括,I/O的accept()、read()、write()以及connect()操作)依舊還是在Reactor線程(mainReactor線程或 subReactor線程)中完成的。Thread Pool(線程池)僅用來處理非I/O操作的邏輯。

多Reactor線程模式將“接受客户端的連接請求”和“與該客户端的通信”分在了兩個Reactor線程來完成。mainReactor完成接收客户端連接請求的操作,它不負責與客户端的通信,而是將建立好的連接轉交給subReactor線程來完成與客户端的通信,這樣一來就不會因為read()數據量太大而導致後面的客户端連接請求得不到即時處理的情況。並且多Reactor線程模式在海量的客户端併發請求的情況下,還可以通過實現subReactor線程池來將海量的連接分發給多個subReactor線程,在多核的操作系統中這能大大提升應用的負載和吞吐量。

2、Netty

Java NIO是一個高性能的IO模式,但是JDK的API很複雜,一直以來很不方便使用。Netty對Java的NIO進行了封裝。

Netty是一個高性能的,事件驅動的,異步非阻塞的IO,java開源框架。Netty對Java的NIO進行了封裝,大大降低了java NIO使用難度,同時解決了很多Java NIO的bug,可以用於快速開發可維護的高性能協議服務器和客户端。

2.1 Netty介紹

Netty的特點

  • 1)高併發:基於 NIO(Nonblocking IO,非阻塞IO)開發,對比於 BIO(Blocking I/O,阻塞IO),他的併發性能得到了很大提高;
  • 2)傳輸快:傳輸依賴於零拷貝特性,儘量減少不必要的內存拷貝,實現了更高效率的傳輸;
  • 3)封裝好:封裝了 NIO 操作的很多細節,提供了易於使用調用接口。

Netty的優勢

  • 1)使用簡單:封裝了 NIO 的很多細節,使用更簡單;
  • 2)功能強大:預置了多種編解碼功能,支持多種主流協議;
  • 3)擴展性強:可以通過 ChannelHandler 對通信框架進行靈活地擴展;
  • 4)性能優異:通過與其他業界主流的 NIO 框架對比,Netty 的綜合性能最優;
  • 5)運行穩定:Netty 修復了已經發現的所有 NIO 的 bug,讓開發人員可以專注於業務本身;
  • 6)社區活躍:Netty 是活躍的開源項目,版本迭代週期短,bug 修復速度快。

Netty高性能表現在哪些方面

  • 1)IO 線程模型:同步非阻塞,用最少的資源做更多的事;
  • 2)內存零拷貝:儘量減少不必要的內存拷貝,實現了更高效率的傳輸;
  • 3)內存池設計:申請的內存可以重用,主要指直接內存。內部實現是用一顆二叉查找樹管理內存分配情況;
  • 4)鎖優化:通過對鎖做了很多優化,解決了很多性能問題;
  • 5)高性能序列化協議:支持 protobuf 等高性能序列化協議。

2.3 使用Netty實現一個簡單的客户服務端通信案例

功能:實現一個客户端和一個服務端,客户端向服務端發生一個"Hello Netty"的消息,服務端收到消息打印出來,並將消息返回給客户端,客户端收到返回的消息也打印出來。

我們只需要Handler中寫消息處理的邏輯就可以,實現起來比Java NIO容易多了。

EchoClient.java

/**
 * 類説明:Netty實現的客户端
 */
public class EchoClient {

    private final int port;
    private final String host;

    public EchoClient(int port, String host) {
        this.port = port;
        this.host = host;
    }

    public void start() throws InterruptedException {
        /*線程組*/
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            /*客户端啓動必備*/
            Bootstrap b = new Bootstrap();
            b.group(group)/*把線程組傳入*/
                    /*指定使用NIO進行網絡傳輸*/
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host,port))
                    .handler(new EchoClientHandle());
            /*連接到遠程節點,阻塞直到連接完成*/
            ChannelFuture f = b.connect().sync();
            /*阻塞程序,直到Channel發生了關閉*/
            f.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully().sync();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        new EchoClient(9999,"127.0.0.1").start();
    }
}

EchoClientHandler.java

public class EchoClientHandle extends SimpleChannelInboundHandler<ByteBuf> {

    /*客户端讀到數據以後,就會執行*/
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
            throws Exception {
        System.out.println("client acccept:"+msg.toString(CharsetUtil.UTF_8));
    }

    /*連接建立以後*/
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer(
                "Hello Netty",CharsetUtil.UTF_8));
        //ctx.fireChannelActive();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();

        ctx.close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
    }
}

EchoServer.java

/**
 * 類説明:Netty實現的服務端
 */
public class EchoServer  {

    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws InterruptedException {
        int port = 9999;
        EchoServer echoServer = new EchoServer(port);
        System.out.println("服務器即將啓動");
        echoServer.start();
        System.out.println("服務器關閉");
    }

    public void start() throws InterruptedException {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        /*線程組*/
       // EventLoopGroup group = new NioEventLoopGroup();
        EventLoopGroup group = new NioEventLoopGroup(1);
        try {
            /*服務端啓動必須*/
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)/*將線程組傳入*/
                    .channel(NioServerSocketChannel.class)/*指定使用NIO進行網絡傳輸*/
                    .localAddress(new InetSocketAddress(port))/*指定服務器監聽端口*/
                    /*服務端每接收到一個連接請求,就會新啓一個socket通信,也就是channel,
                    所以下面這段代碼的作用就是為這個子channel增加handle*/
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            /*添加到該子channel的pipeline的尾部*/
                            ch.pipeline().addLast(serverHandler);
                        }
                    });
            ChannelFuture f = b.bind().sync();/*異步綁定到服務器,sync()會阻塞直到完成*/
            f.channel().closeFuture().sync();/*阻塞直到服務器的channel關閉*/

        } finally {
            group.shutdownGracefully().sync();/*優雅關閉線程組*/
        }

    }
}

EchoServerHandler.java

/**
 * 類説明:業務處理
 */
@ChannelHandler.Sharable
/*不加這個註解那麼在增加到childHandler時就必須new出來*/
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    /*客户端讀到數據以後,就會執行*/
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf)msg;
        System.out.println("Server accept:"+in.toString(CharsetUtil.UTF_8));
        ctx.write(in);

    }

    /*** 服務端讀取完成網絡數據後的處理*/
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    /*** 發生異常後的處理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Add a new 评论

Some HTML is okay.