Stories

Detail Return Return

【Java】BIO源碼分析和改造(GraalVM JDK 11.0.19) - Stories Detail

引言

本文介紹網絡IO編程的入門部分,Java 的傳統BIO Socket編程源碼分析,瞭解如何將BIO阻塞行為accept()read() 改造為非阻塞行為,並且將結合Linux文檔介紹其中的機制,文檔中描述瞭如何處理Socketaccept,對比Java的Socket實現代碼,基本可以發現和Linux行為基本一致。

廢話不多説,我們直接開始。

draw.io 文件

本文涉及的個人源碼分析繪圖均由 draw.io 繪製,源文件如下:

鏈接:https://pan.baidu.com/s/1FHAYt4AxWh0Dd4qi2JKZLQ?pwd=qsmg
提取碼:qsmg

image.png

什麼是Socket?

Socket起源於Unix的一種通信機制,中文通常叫他“套接字”,代表了網絡IP和端口,可以看作是通信過程的一個“句柄”。

Socket 也可以理解為網絡編程當中的API,編程語言提供了對應的API實現方式,電腦上的網絡應用程序也是通過“套接字”完成網絡請求接受與應答。

總而言之:Socket是應用層與TCP/IP協議族通信的中間軟件抽象層

Socket是應用層與TCP/IP協議族通信的中間軟件抽象層

圖片來源:socket圖解 · Go語言中文文檔 (topgoer.com)

阻塞式IO模型

《UNIX Network Programming》 一書當中,用UDP傳輸的案例模擬了阻塞式的IO模型,這個模型的概念和Java BIO的阻塞模型類似。

下面函數中應用進程在調用 recvfrom 之後就開始系統調用並且進行阻塞,等待內核把數據準備並且複製完成之後才得到結果,或者等待過程中發生錯誤返回。

阻塞式IO模型

從圖片可以看到,在內核工作的整個過程中應用進程無法做其他任何操作。

BIO 通信模型

我們把上面的阻塞IO模型轉為IO通信模型,結果如下:

BIO 通信模型

BIO對於每一個客户端進行阻塞等待接收連接,同一個時間只能處理一個Socket請求,並且在構建完成之後通常會分配一個Thread線程為其進行服務。

BIO 阻塞案例代碼

BioClientSocket

/**  
 *  BioClientSocket 客户端 Socket實現  
 * @author Xander  
 * @version v1.0.0  
 * @Package : com.zxd.interview.niosource.bio  
 * @Description : BioClientSocket 客户端 Socket實現  
 * @Create on : 2023/7/5 09:52  
 **/@Slf4j  
public class BioClientSocket {  
  
  
    public void initBIOClient(String host, int port) {  
        BufferedReader reader = null;  
        BufferedWriter writer = null;  
        Socket socket = null;  
        String inputContent;  
        int count = 0;  
        try {  
            reader = new BufferedReader(new InputStreamReader(System.in));  
            socket = new Socket(host, port);  
            writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));  
            log.info("clientSocket started: " + stringNowTime());  
            while (((inputContent = reader.readLine()) != null) && count < 2) {  
                inputContent = stringNowTime() + ": 第" + count + "條消息: " + inputContent + "\n";  
                writer.write(inputContent);//將消息發送給服務端  
                writer.flush();  
                count++;  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                socket.close();  
                reader.close();  
                writer.close();  
            } catch (IOException e) {  
                e.printStackTrace();  
            }  
        }  
  
    }  
  
    public String stringNowTime() {  
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
        return format.format(new Date());  
    }  
  
    public static void main(String[] args) {  
        BioClientSocket client = new BioClientSocket();  
        client.initBIOClient("127.0.0.1", 8888);  
    }/**  
     運行結果:  
     clientSocket started: 2023-07-05 10:26:05  
     7978987     797898     tyuytu     */}

BioServerSocket

  
/**  
 * ServerSocket 實現  
 * @author Xander  
 * @version v1.0.0  
 * @Package : com.zxd.interview.niosource.bio  
 * @Description : ServerSocket 實現  
 * @Create on : 2023/7/5 09:48  
 **/@Slf4j  
public class BioServerSocket {  
  
    public void initBIOServer(int port) {  
        ServerSocket serverSocket = null;//服務端Socket  
        Socket socket = null;//客户端socket  
        BufferedReader reader = null;  
        String inputContent;  
        int count = 0;  
        try {  
            serverSocket = new ServerSocket(port);  
           log.info(stringNowTime() + ": serverSocket started");  
            while (true) {  
                socket = serverSocket.accept();  
               log.info(stringNowTime() + ": id為" + socket.hashCode() + "的Clientsocket connected");  
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));  
                while ((inputContent = reader.readLine()) != null) {  
                   log.info("收到id為" + socket.hashCode() + "  " + inputContent);  
                    count++;  
                }  
               log.info("id為" + socket.hashCode() + "的Clientsocket " + stringNowTime() + "讀取結束");  
            }  
        } catch (IOException e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                if(Objects.nonNull(reader)){  
  
                    reader.close();  
                }  
                if(Objects.nonNull(socket)){  
  
                    socket.close();  
                }  
            } catch (IOException e) {  
                e.printStackTrace();  
            }  
        }  
  
    }/**  
     運行結果:  
     10:25:57.731 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - 2023-07-05 10:25:57: serverSocket started  
     10:26:08.442 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - 2023-07-05 10:26:08: id為161960012的Clientsocket connected  
     10:26:29.356 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - 收到id為161960012  2023-07-05 10:26:26: 第0條消息: 7978987  
     10:26:34.409 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - 收到id為161960012  2023-07-05 10:26:34: 第1條消息: 797898  
     10:26:38.298 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - id為161960012的Clientsocket 2023-07-05 10:26:38讀取結束  
     */  
  
    public String stringNowTime() {  
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
        return format.format(new Date());  
    }  
  
    public static void main(String[] args) {  
        BioServerSocket server = new BioServerSocket();  
        server.initBIOServer(8888);  
  
    }  
}

BIO 阻塞模型中,需要關注的代碼主要是這幾個:

  • serverSocket = new ServerSocket(port);
  • socket = serverSocket.accept();
  • socket = new Socket(host, port);

從代碼中可以看出,客户端在獲取Socket建立連接後,通過系統輸入輸出流完成讀寫IO操作,服務端則通過系統緩衝流Buffer來提高讀寫效率。

ServerSocket 中 bind 解讀

在具體的解讀之前,先看下整個調用的大致流程圖。

ServerSocket 中 bind 解讀

由於是ServerSocket服務端先啓動,這裏先對bind操作進行解讀,bind操作是在本機的某個端口和IP地址上進行listen監聽。

在bind成功之後,服務端進入accept阻塞等待,此時客户端Socket請求此地址將會進行Socket連接綁定。

我們從ServerSocket的初始化代碼作為入口進行介紹。

//java.net.ServerSocket#ServerSocket(int)
public ServerSocket(int port) throws IOException {
    this(port, 50, null);
}


public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException {
    setImpl();
    // 檢查端口是否越界
    // 0xFFFF = 15 * 16^3 + 15 * 16^2 + 15 * 16^1 + 15 * 16^0 = **65535**
    if (port < 0 || port > 0xFFFF)
        throw new IllegalArgumentException(
                    "Port value out of range: " + port);
    if (backlog < 1)
        backlog = 50;
    try {
        // 核心部分
        bind(new InetSocketAddress(bindAddr, port), backlog);
    } catch(SecurityException e) {
        close();
        throw e;
    } catch(IOException e) {
        close();
        throw e;
    }
}

setImpl();這個方法我們先暫時放到一邊,我們簡單掃一下其他代碼。

在上面的案例代碼當中,我們傳入的ipport都處在合法的範圍內,Socket規定的端口範圍是0 - 65525,超過這個範圍不允許進行bind

上面代碼的核心邏輯是bind(xxx)這一段操作。

bind(new InetSocketAddress(bindAddr, port), backlog);

new InetSocketAddress(bindAddr, port)

bind方法調用之前,ServerSocket會先構建 InetSocketAddress 對象。InetSocketAddress對象構建實際為InetSocketAddressHolder包裝類。包裝類的作用是可以防止IPPort等敏感字段的外部篡改。

此外從代碼可以看到,構建對象會對於IPPort進行二次檢查,如果IP地址不存在,會給一個默認值(通常是 0.0.0.0 )。

Creates a socket address from an IP address and a port number.
A valid port value is between 0 and 65535. A port number of zero will let the system pick up an ephemeral port in a bind operation.|
根據IP地址和端口號創建Socket地址。有效的端口值介於0和65535之間。端口號為0時,系統將在綁定操作中使用短暫端口。

InetSocketAddressHolder 對象構建完成之後,接着就進入到核心的bind(SocketAddress endpoint, int backlog)內部代碼。

bind(SocketAddress endpoint, int backlog)

java.net.ServerSocket#bind(java.net.SocketAddress, int)

/**
Binds the ServerSocket to a specific address (IP address and port number).
將ServerSocket綁定到一個特定的地址(IP地址和端口號)。

If the address is null, then the system will pick up an ephemeral port and a valid local address to bind the socket.
如果地址為空,那麼系統會選取一個短暫的端口和一個有效的本地地址來綁定套接字。
*/
public void bind(SocketAddress endpoint, int backlog) throws IOException {
        // Socket是否已經被關閉
        if (isClosed())
            throw new SocketException("Socket is closed");
        // 判斷是否已經綁定
        if (!oldImpl && isBound())
            throw new SocketException("Already bound");
        if (endpoint == null)
            // 如果地址為空,給一個默認地址
            endpoint = new InetSocketAddress(0);
        if (!(endpoint instanceof InetSocketAddress))
            throw new IllegalArgumentException("Unsupported address type");
        // 類型強轉為 InetSocketAddress
        InetSocketAddress epoint = (InetSocketAddress) endpoint;
        // 如果地址已經被佔用了
        if (epoint.isUnresolved())
            throw new SocketException("Unresolved address");
        if (backlog < 1)
          backlog = 50;
        try {
            SecurityManager security = System.getSecurityManager();
            if (security != null)
                // 端口進行安全檢查
                security.checkListen(epoint.getPort());
            getImpl().bind(epoint.getAddress(), epoint.getPort());
            getImpl().listen(backlog);
            bound = true;
        } catch(SecurityException e) {
            bound = false;
            throw e;
        } catch(IOException e) {
            bound = false;
            throw e;
        }
    }

bind 方法是將 ServerSocket 綁定到一個特定的地址(IP地址和端口號), 如果地址為空,那麼系統會選取一個臨時端口和有效的本地地址來綁定 ServerSocket。

跳過不需要關注的校驗代碼,在·try 中有三行比較重要的代碼。

getImpl().bind(epoint.getAddress(), epoint.getPort());  
getImpl().listen(backlog);  
bound = true;

這裏的代碼初步理解是獲取一個impl對象,綁定地址和端口,調用listen方法傳遞backlog

backlog這個值的作用可以看下面的地址,這裏整理文章內容大致理解:

  • Linux Network Programming, Part 1 (linuxjournal.com)
  • 詳解socket中的backlog 參數 - 知乎 (zhihu.com)

backlog主要是和Socket有關。在Socket編程中listen函數的第二個參數為backlog,用於服務器編程。

listen(sock, backlog);

TCP 握手

在TCP 三次握手當中,LISTEN 狀態的服務端 Socket 收到 SYN,會建立一個 SYN_REVD 的連接,SYN_REVD 是一個半連接狀態,只有在收到客户端的ACK之後才會進入ESTABLISHED,也就是説三次握手的過程必然會經歷SYN_REVDESTABLISHED兩個狀態。

針對這兩個狀態,不同的操作系統有不同實現,在 FressBSD 中 backlog 就是描述狀態為 SYN_REVD 和 ESTABLISHED 的所有連接最大數量

在 Linux 系統當中,使用兩個隊列 syn queueaccept queue,這兩個隊列分別存儲狀態為SYN_REVD和狀態為ESTABLISHED的連接,Llinux2.2及以後,backlog表示accept queue的大小,而syn queue大小由 /proc/sys/net/ipv4/tcp_max_syn_backlog配置。

可以看到backlog的值直接影響了建立連接的效率。上面代碼中backlog=50,可以認為 accept queue 的容量為 50。

listen方法執行完成之後,此時將設置bound = true,代碼執行到此處説明Socket綁定成功了。

現在我們回過頭看getImpl().bind(epoint.getAddress(), epoint.getPort()); 這塊代碼工作。

setImpl()

介紹getImpl()的前提是我們要知道如何set的,具體代碼位於構造方法中一行不起眼的setImpl()操作。

java.net.ServerSocket#setImpl

private void setImpl() {
        if (factory != null) {
            impl = factory.createSocketImpl();
            checkOldImpl();
        } else {
            // No need to do a checkOldImpl() here, we know it's an up to date
            // SocketImpl!
            impl = new SocksSocketImpl();
        }
        if (impl != null)
            impl.setServerSocket(this);
    }

注意,在第一次初始化的時候,SocketImplFactory是沒有被初始化過的,所以走的是else分支,具體工作是為內部的成員變量 SocketImpl進行初始化。

  
/**  
 * The implementation of this Socket. */
private SocketImpl impl;

SocksSocketImpl 初始化之後,將會設置它的成員變量ServerSocketthis引用

if (impl != null)  
    impl.setServerSocket(this);

這裏的處理工作很簡單,分別是初始化 SocksSocketImpl ,把當前對象實例的this引用傳遞給這個初始化的 SocksSocketImpl 的成員變量(這時候自身的引用逸出了)。

瞭解setImpl之後,下面這裏我們再看看 getImpl() 幹了啥。

getImpl()

java.net.ServerSocket#getImpl

代碼內容也比較簡單,首先檢查SocketImpl是否創建,第一次連接這裏為false,此時會進入createImpl()方法。

/**
Get the SocketImpl attached to this socket, creating it if necessary.
獲取連接到此套接字的SocketImpl,如果有必要,可以創建它。
*/
SocketImpl getImpl() throws SocketException {  
    if (!created)  
        createImpl();  
    return impl;  
}

createImpl()當中,通常 SocketImpl 已經在構造器初始化完成,這裏直接更新 created 狀態即可。

void createImpl() throws SocketException {  
    if (impl == null)  
        setImpl();  
    try {  
        impl.create(true);  
        created = true;  
    } catch (IOException e) {  
        throw new SocketException(e.getMessage());  
    }  
}

setImpl()getImpl()方法配合,可以確定 SocketImpl 在使用的時候一定是被初始化完成的。

SocketImpl.bind(epoint.getAddress(), epoint.getPort())

下面再來看看它是如何進行下面兩項關鍵操作的:

getImpl().bind(epoint.getAddress(), epoint.getPort());  
getImpl().listen(backlog);

在之前的初始化代碼中,InetAddress對象初始化設置了IPPort等參數,現在委託 SocketImpl執行具體bind操作。

java.net.AbstractPlainSocketImpl#bind

bind方法是同步的,一開始需要先獲取到fdLock鎖,然後判斷是否滿足Socket綁定條件,如果滿足則利用鈎子(NetHooks) 對象進行前置TCP綁定。

注意,個人發現 NetHooks.beforeTcpBind(fd, address, lport); 方法發現在JDK11之中是一個空方法,而JDK8當中會有一段provider.implBeforeTcpBind(fdObj, address, port);的調用。
protected synchronized void bind(InetAddress address, int lport)  
    throws IOException  
{  
    // 獲取 fdLock 鎖
   synchronized (fdLock) {  
        if (!closePending && (socket == null || !socket.isBound())) {  
            NetHooks.beforeTcpBind(fd, address, lport);  
        }  
    }  
    // 是否鏈接本地地址
    if (address.isLinkLocalAddress()) {  
        address = IPAddressUtil.toScopedAddress(address);  
    }  
    // 關鍵
    socketBind(address, lport);  
    // 服務端和客户端的Socket走不同的 if 判斷
    if (socket != null)  
        socket.setBound();  
    if (serverSocket != null)  
        serverSocket.setBound();  
}

加鎖部分和核心邏輯無太多幹系,我們跳過細枝末節,看socketBind(address, lport); 這部分代碼。

fdLock 鎖作用:註釋説明它用於在增加/減少fdUseCount時鎖定。
/* lock when increment/decrementing fdUseCount */  
// 在增加/減少fdUseCount時鎖定
protected final Object fdLock = new Object();

PlainSocketImpl#socketBind(InetAddress address, int port)

@Override  
void socketBind(InetAddress address, int port) throws IOException {  
    int nativefd = checkAndReturnNativeFD();  
  
    if (address == null)  
        throw new NullPointerException("inet address argument is null.");  

    // 目前IPv4地址已經分配完畢,所以優先用 IPV6的,並且不支持 IPV4 
    if (preferIPv4Stack && !(address instanceof Inet4Address))  
        throw new SocketException("Protocol family not supported");  

    // 核心操作
    bind0(nativefd, address, port, useExclusiveBind);  
    // 如果是之前 InetAddress 為空默認初始化的端口為0,則重新隨機分配一個端口
    if (port == 0) {  
        localport = localPort0(nativefd);  
    } else {  
        localport = port;  
    }  
  
    this.address = address;  
}

socketBind(address, lport); 方法調用,最後綁定操作為JVM的底層C++操作bind0

bind0屬於比較底層的代碼,這裏我們就不繼續探究了,如果讀者好奇,可以閲讀 HotSpot 的開源實現代碼。

static native void bind0(int fd, InetAddress localAddress, int localport,  
                         boolean exclBind)  
    throws IOException;

從整體上看,上面這一整個bind操作都是同步完成的,主要邏輯是先做一系列檢查,之後調用底層的JVM方法完成Socket綁定。

畫圖小結

筆者通過個人理解畫了一幅圖,主要描述了 bind 操作大致的邏輯,可以看到很多地方都和JVM的底層C++代碼打交道。

有必要説明一下,BIO畢竟是 Java1.0 出來的玩意,看源碼我們要抓大放小,後續的JDK提案中,有人提出要收拾這個老古董=-=。

ServerSocket的bind

從圖中也可以看出,要完成Socket連接構建,必須要獲得文件描述符。

ServerSocket中accept解讀

ServerSocketaccpet是如何阻塞獲取連接的?

accept方法的作用是詢問操作系統是否有收到新的Socket套接字信息,操作過程在操作系統底層調用實現上都是 同步的。

操作系統從Socket中沒有Socket連接進來怎麼辦?根據Linux的accept文檔描述,以及Java註釋的JavaDoc文檔描述,都明確説明此時會在底層操作系統阻塞

java.net.ServerSocket#accept

我們從代碼層面看看 accept 方法幹了啥。

/**
Listens for a connection to be made to this socket and accepts it. The method blocks until a connection is made.
監聽並接受與此套接字的連接。該方法會阻塞,直到有一個連接被建立。

A new Socket s is created and, if there is a security manager, the security manager's checkAccept method is called with s.getInetAddress().getHostAddress() and s.getPort() as its arguments to ensure the operation is allowed. This could result in a SecurityException.
一個新的Socket s被創建,如果有一個安全管理器,安全管理器的checkAccept方法被調用,參數是s.getInetAddress().getHostAddress()和s.getPort(),以確保該操作被允許。這可能會導致一個SecurityException。
*/
public Socket accept() throws IOException {  
    if (isClosed())  
        throw new SocketException("Socket is closed");  
    if (!isBound())  
        throw new SocketException("Socket is not bound yet");  
    Socket s = new Socket((SocketImpl) null);  
    implAccept(s);  
    return s;  
}

Java Doc 説明了accept()會進行阻塞,這裏疑問比較大的點可能是Socket s = new Socket((SocketImpl) null); ,這行代碼為什麼又要新建一個Socket?帶着疑問我們繼續看implAccept(s); 方法。

java.net.ServerSocket#implAccept

/**
Subclasses of ServerSocket use this method to override accept() to return their own subclass of socket. So a FooServerSocket will typically hand this method an empty FooSocket. On return from implAccept the FooSocket will be connected to a client.

ServerSocket的子類使用這個方法來覆蓋accept()(的行為),以返回他們自己的socket子類。比如一個FooServerSocket通常會將一個空的FooSocket交給這個方法。從 implAccept 返回時,FooSocket 將被連接到一個客户端。
*/
protected final void implAccept(Socket s) throws IOException {  
    SocketImpl si = null;  
    try {  
        // 判斷新對象 Socketimpl 是否設置
        if (s.impl == null)  
          s.setImpl();  
        else {  
            s.impl.reset();  
        }  

        // si 指向 Socket 對象的 impl 
        si = s.impl; 
        // Socket 對象的 impl 引用 暫時置空
        s.impl = null;  
        // impl 地址和文件描述符初始化
        si.address = new InetAddress();  
        si.fd = new FileDescriptor();  
        // getImpl() 獲取的是 ServerSocket 的 impl,注意不是 Socket的
// 4. 調用 ServerSocket 持有的 SocksSocketImpl 對象完成底層操作系統的 accept 操作
        getImpl().accept(si);  

        // raw fd has been set 
        // 原始fd已被設置  
        SocketCleanable.register(si.fd);    
        // 安全檢查
        SecurityManager security = System.getSecurityManager();  
        if (security != null) {  
            security.checkAccept(si.getInetAddress().getHostAddress(),  
                                 si.getPort());  
        }  
    } catch (IOException e) {  
        // 如果出現底層IO異常,則s.impl = si;把之前臨時置空的引用給重置回來
        if (si != null)  
            si.reset();  
        s.impl = si;  
        throw e;  
    } catch (SecurityException e) {  
        if (si != null)  
            si.reset();  
        s.impl = si;  
        throw e;  
    }  
    // 把之前臨時置空的引用給重置回來
    s.impl = si;  
    s.postAccept();  
}

代碼首先進入一個if判斷,檢查 new Socket 新對象的Socketimpl是否設置,如果為空則就設置,如果不為空,則reset() 重置。

毫無疑問,這裏是剛剛初始化的Socket,此時Socket.Socketimpl 肯定是沒有設置的。

if (s.impl == null)  
  s.setImpl();  
else {  
    s.impl.reset();  
}  

首次進入代碼通常就是走if分支。Socket.setImpl 這個方法和ServerSocketsetImpl非常像,new Socket 新對象會為自己的 SocketImpl 成員對象進行初始化。

SocketImpl

至此,我們畫圖理解代碼操作邏輯:

accept 操作分析

接下來是一些有點”繞“的操作,建議讀者邊調試邊跟着圖示理解:

// 1. si 指向 Socket 對象的 impl 
si = s.impl; 
// 2. Socket 對象的 impl引用 暫時置空
s.impl = null;  
// 3. impl 地址和文件描述符初始化
si.address = new InetAddress();  
si.fd = new FileDescriptor();
// getImpl() 獲取的是 ServerSocket 的 impl,注意不是 Socket的
// 4. 調用 ServerSocket 持有的 SocksSocketImpl 對象完成底層操作系統的 accept 操作
getImpl().accept(si);  


// ....
// 假設此時 accept 獲取到連接
s.impl = si; 
這裏吐槽下老外這種變量命名給規則,啥sis,a,b,c,d 的,不畫圖很容易繞進去。

ServerSocket 中 accepet 解讀

格外強調下, getImpl()impl對象si.impl 對象並不是同一個,這些代碼內容非常像但是屬於不同的類,切記不要混淆。

實例對象對比

實例對象對比

代碼最後有必定會執行的 s.impl = si; 操作(因為之前暫時把引用“脱鈎”了),如果是異常的si還會進行額外的reset重置。

s.impl = si; 

這裏回答一下之前遺留的問題,Socket s = new Socket((SocketImpl) null); 這行代碼為什麼又要新建一個Socket?

我們觀察上面繪製的操作圖,s.impl = null; 的執行,此時Socket對象和這個SocketImpl暫時”失去關聯“,這個時候確保哪怕new Socket對象綁定失敗,此時對於SocketImpl來説根本是無感知的。

換句話説,如果失敗了Socket會完全重置,好像什麼都沒有發送過,而如果成功了,此時把引用“接回去”,必然得到的可用的Socket

這裏給一個不恰當的比喻,當年諸葛亮草船借箭,如果有碰到沒有借箭的船,極端一點是不是就可以直接”燒了“不要了,而如果“接”到箭自然需要回港“卸貨‘”,對於吳國來説,它們只看到“成功”借到箭的船隻。

執行getImpl().accept(si);方法之後,我們在AbstractPlainSocketImpl找到accept方法。我

java.net.AbstractPlainSocketImpl#accept

/**
Accepts connections.
接受連接
*/
protected void accept(SocketImpl s) throws IOException {  
    acquireFD();  
    try {  
        socketAccept(s);  
    } finally {  
        releaseFD();  
    }  
}

accept調用acquireFD();獲取並且植入文件描述符號,加鎖獲取之後會把fdUseCount 的計數器值+1,表示有一個新增的Socket連接。

加鎖保證 fdUseCount 計數是線程安全的
// "Acquires" and returns the FileDescriptor for this impl
// - "獲取 "並返回該植入物的文件描述符。
FileDescriptor acquireFD() {  
    synchronized (fdLock) {  
        fdUseCount++;  
        return fd;  
    }  
}

java.net.PlainSocketImpl#socketAccept

不同的操作系統實現不同,這裏僅以個人看到的JDK11版本源碼為例。

@Override
void socketAccept(SocketImpl s) throws IOException {
    int nativefd = checkAndReturnNativeFD();

    if (s == null)
        throw new NullPointerException("socket is null");

    int newfd = -1;
    InetSocketAddress[] isaa = new InetSocketAddress[1];
    if (timeout <= 0) {
        newfd = accept0(nativefd, isaa);
    } else {
        configureBlocking(nativefd, false);
        try {
            waitForNewConnection(nativefd, timeout);
            newfd = accept0(nativefd, isaa);
            if (newfd != -1) {
                configureBlocking(newfd, true);
            }
        } finally {
            configureBlocking(nativefd, true);
        }
    }
    /* Update (SocketImpl)s' fd */
    fdAccess.set(s.fd, newfd);
    /* Update socketImpls remote port, address and localport */
    InetSocketAddress isa = isaa[0];
    s.port = isa.getPort();
    s.address = isa.getAddress();
    s.localport = localport;
    if (preferIPv4Stack && !(s.address instanceof Inet4Address))
        throw new SocketException("Protocol family not supported");
}

我們只關心下面這部分代碼,方法中首先判斷 timeout 是否小於等於0(如果沒有設置,那麼默認就是 0),如果是則走accept0(nativefd, isaa)方法。

前面反覆提到的,accept操作核心實現這是下面的 native accept0 方法,具體操作是:

在操作系統層面檢查bind的端口上是否有客户端數據接入,如果沒有則一直阻塞等待

if (timeout <= 0) {
    newfd = accept0(nativefd, isaa);
} else {
    configureBlocking(nativefd, false);
    try {
        waitForNewConnection(nativefd, timeout);
        newfd = accept0(nativefd, isaa);  
        if (newfd != -1) {
            configureBlocking(newfd, true);
        }
    } finally {
        configureBlocking(nativefd, true);
    }
} // <4>
static native int accept0(int fd, InetSocketAddress[] isaa) throws IOException;

因為操作系統層面的阻塞需要影響到應用程序級別阻塞?顯然accept0(nativefd, isaa)的操作系統層面阻塞是無 法避免的。

仔細觀察代碼,上面的代碼分支提供了另外一種選擇, timeout 的值設置大於0的值,此時程序會在等到我們設置的時間後返回,並且只會阻塞設置的這個時間量的值(單位毫秒)。

注意,這裏的 newfd 如果是 -1,表示底層沒有任何數據返回,在Linux的文檔中也有對應的介紹。

java.net.ServerSocket#setSoTimeout

既然不阻塞的關鍵參數是timeout , 接下來我們看下 timeout 值要如何設置。

/**
Enable/disable SO_TIMEOUT with the specified timeout, in milliseconds. With this option set to a non-zero timeout, a call to accept() for this ServerSocket will block for only this amount of time. If the timeout expires, a java.net.SocketTimeoutException is raised, though the ServerSocket is still valid. The option must be enabled prior to entering the blocking operation to have effect. The timeout must be > 0. A timeout of zero is interpreted as an infinite timeout.

啓用/禁用SO_TIMEOUT,指定超時時間,單位為毫秒。在這個選項被設置為非零超時的情況下,對這個ServerSocket的accept()的調用將只阻塞這個時間量。如果超時過後,會引發java.net.SocketTimeoutException,儘管ServerSocket仍然有效。該選項必須在進入阻塞操作之前啓用才能生效。超時必須大於0。超時為0會被解釋為無限期超時。

*/
public synchronized void setSoTimeout(int timeout) throws SocketException {  
    if (isClosed())  
        throw new SocketException("Socket is closed");  
    getImpl().setOption(SocketOptions.SO_TIMEOUT, timeout);  
}

簡單明瞭,java.net.SocketOptions#setOption 方法最終調用的是java.net.AbstractPlainSocketImpl#setOption()

java.net.AbstractPlainSocketImpl#setOption

public void setOption(int opt, Object val) throws SocketException {  
    if (isClosedOrPending()) {  
        throw new SocketException("Socket Closed");  
    }  
    boolean on = true;  
    switch (opt) {  
    case SO_LINGER:  
        //..
    case SO_TIMEOUT:  
        if (val == null || (!(val instanceof Integer)))
                throw new SocketException("Bad parameter for SO_TIMEOUT");
            int tmp = ((Integer) val).intValue();
            if (tmp < 0)
                throw new IllegalArgumentException("timeout < 0");
            timeout = tmp;
            break;
    case TCP_NODELAY:  
        //....
    case SO_RCVBUF:  
        //....
    case SO_KEEPALIVE:  
        //....
    }  
    socketSetOption(opt, on, val);  
}

為了方便閲讀,這裏把其他的代碼都刪除了,只保留傳參部分。

可以看到,這裏僅僅是將setOption裏面傳入的timeout值,設置到了AbstractPlainSocketImpl的全局變量timeout

畫圖小結

個人認為整個accept() 操作比較”噁心“(個人觀點)的是幾個引用的賦值變化上面,暫時”解綁“的目的是在進行底層Socket連接的時候,如果Socket出現異常也沒有影響,此時Socket持有的引用也是null,可以無阻礙的重新進行下一次Socket連接。

換句話説,整個Socket要麼對接成功,要麼就是重置回沒對接之前的狀態可以進行下一次嘗試,保證ServerSocket會收到一個沒有任何異常的Socket連接

最後再看一眼圖:

accept 操作總結

改造並實現accept的非阻塞實現

在進行案例程序的改造之前,必須要先理解同步、異步、阻塞、非阻塞這幾個概念。

這個概念在之前的筆記中 [[《跟閃電俠學Netty》閲讀筆記 - 開篇入門Netty]] 【洗衣機案例理解阻塞非阻塞,同步異步概念】這一部分提到過,[[【Java】《2小時搞定多線程》個人筆記]] 中又一次對於這幾個概念做了個區分。

區分同步異步的關鍵點是被調用方的行為,沒有得到結果之前,服務端不返回任何結果,那麼操作就是同步的。

如果沒有得到結果之前,服務器可以返回結果,比如給一個句柄,通過這個句柄可以在未來某個時間點之後獲得結果,那麼操作就是異步的。

這個句柄可以對應Java 併發編程的 Future 的概念

再舉個例子,比如説前面的accept0應用程序調用操作系統,在Linux中就是訪問系統內核,此時這一整塊邏輯處理是選擇”永久等待一個客户端連接“,符合 沒有得到結果之前,服務端不返回任何結果 這種情況,所以它是同步的。

區分阻塞和非阻塞的關鍵點則是 對於調用者而言的服務端狀態*,比如我們站在線程狀態的角度,阻塞對應 Blocking,非阻塞此時應該對應Running正常執行。再比如站在線程發出請求之後請求方的角度,阻塞和非阻塞分別對應waitingNo waiting

理解同步異步阻塞和非阻塞之後,下面來嘗試改造相關代碼accept的阻塞問題,實現方式很簡單,那就是設置 timeout , 然後在異常處理上continue重試。

/**  
 * accept 超時時間設置  
 */  
private static final int SO_TIMEOUT = 2000;

/***
 * NIO 改寫
 * @description NIO 改寫
 * @param port
 * @return void
 * @author xander
 * @date 2023/7/12 10:35
 */
public void initNioServer(int port) {
    ServerSocket serverSocket = null;//服務端Socket
    Socket socket = null;//客户端socket
    BufferedReader reader = null;
    String inputContent;
    int count = 0;
    try {
        serverSocket = new ServerSocket(port);
        // 1. 需要設置超時時間,會等待設置的時間之後再進行返回
        serverSocket.setSoTimeout(SO_TIMEOUT);
        log.info(stringNowTime() + ": serverSocket started");
        while (true) {
            // 2. 如果超時沒有獲取,這裏會拋出異常,這裏的處理策略是不處理異常
            try {
                socket = serverSocket.accept();
            } catch (SocketTimeoutException e) {
                //運行到這裏表示本次accept是沒有收到任何數據的,服務端的主線程在這裏可以做一些其他事情
                log.info("now time is: " + stringNowTime());
                continue;
            }
            log.info(stringNowTime() + ": id為" + socket.hashCode() + "的Clientsocket connected");
            reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            while ((inputContent = reader.readLine()) != null) {
                log.info("收到id為" + socket.hashCode() + "  " + inputContent);
                count++;
            }
            log.info("id為" + socket.hashCode() + "的Clientsocket " + stringNowTime() + "讀取結束");
        }
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        try {
            if(Objects.nonNull(reader)){

                reader.close();
            }
            if(Objects.nonNull(socket)){

                socket.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}/**運行結果:
     10:40:49.272 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - 2023-07-12 10:40:49: serverSocket started
     10:40:52.826 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:40:52
     10:40:54.830 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:40:54
     10:40:56.837 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:40:56
     10:40:58.840 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:40:58
     10:41:00.849 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:41:00
     10:41:02.852 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:41:02
     */

設置了timeout之後,accept 方法每次都會在間隔指定時間之後被喚醒一次,如果沒有收到連接就會拋出異常,我們的處理方式是吞掉異常並且重新accept,這樣就實現了類似非阻塞的效果。

小結

Socket 當中 getInputStream() 的方法解析以及後續的read操作結構圖如下。

Socket.getInputStream()

Socket 中的 getInputStream() 方法解析

實現了非阻塞的accept之後,再來看下另一個會產生阻塞的方法,那就是Socket.getInputStream,這個方法在Socket連接,服務端在read() 讀取數據的時候會進行調用。

java.net.Socket#getInputStream

/**
返回該socket的輸入流。
如果該套接字有一個關聯的通道,那麼生成的輸入流會將其所有操作委託給該通道。如果通道處於非阻塞模式,那麼輸入流的讀操作將拋出java.nio.channel.IllegalBlockingModeException。
在異常情況下,底層連接可能會被遠程主機或網絡軟件中斷(例如在TCP連接中的連接重置)。當網絡軟件檢測到連接斷開時,返回的輸入流會出現以下情況:

*/
  public InputStream getInputStream() throws IOException {
        if (isClosed())
            throw new SocketException("Socket is closed");
        if (!isConnected())
            throw new SocketException("Socket is not connected");
        if (isInputShutdown())
            throw new SocketException("Socket input is shutdown");
        InputStream is = null;
        try {
            is = AccessController.doPrivileged(
                new PrivilegedExceptionAction<>() {
                    public InputStream run() throws IOException {
                        return impl.getInputStream();
                    }
                });
        } catch (java.security.PrivilegedActionException e) {
            throw (IOException) e.getException();
        }
        return is;
    }

上面通過AccessController進行授權,run方法中調用java.net.AbstractPlainSocketImpl#getInputStream方法。

protected synchronized InputStream getInputStream() throws IOException {  
    synchronized (fdLock) {  
        if (isClosedOrPending())  
            throw new IOException("Socket Closed");  
        if (shut_rd)  
            throw new IOException("Socket input is shutdown");  
        if (socketInputStream == null)  
            socketInputStream = new SocketInputStream(this);  
    }  
    return socketInputStream;  
}

可以看到,代碼中創建了 SocketInputStream 對象,並且會將當前AbstractPlainSocketImpl對象傳進去(這個對象實際就是 SocksSocketImpl )。

read讀數據的時候,則會調用如下方法:

public int read(byte b[], int off, int length) throws IOException {  
    return read(b, off, length, impl.getTimeout());  
}
int read(byte b[], int off, int length, int timeout) throws IOException {
    int n;

    // EOF already encountered
    if (eof) {
        return -1;
    }

    // connection reset
    if (impl.isConnectionReset()) {
        throw new SocketException("Connection reset");
    }

    // bounds check
    if (length <= 0 || off < 0 || length > b.length - off) {
        if (length == 0) {
            return 0;
        }
        throw new ArrayIndexOutOfBoundsException("length == " + length
                + " off == " + off + " buffer length == " + b.length);
    }

    // acquire file descriptor and do the read
    // 獲取文件描述符並進行讀取
    FileDescriptor fd = impl.acquireFD();
    try {
        n = socketRead(fd, b, off, length, timeout);
        if (n > 0) {
            return n;
        }
    } catch (ConnectionResetException rstExc) {
        impl.setConnectionReset();
    } finally {
        impl.releaseFD();
    }

    /*
     * If we get here we are at EOF, the socket has been closed,
     * or the connection has been reset.
     */
    if (impl.isClosedOrPending()) {
        throw new SocketException("Socket closed");
    }
    if (impl.isConnectionReset()) {
        throw new SocketException("Connection reset");
    }
    eof = true;
    return -1;
}

重點關注下面這一行代碼,這裏在讀取的時候同樣傳遞了 timeout 參數:

n = socketRead(fd, b, off, length, timeout);

socketRead 方法會調用 nativesocketRead0 方法,timeout 代表了讀取的超時時間。

private native int socketRead0(FileDescriptor fd,  
                               byte b[], int off, int len,  
                               int timeout)  
    throws IOException;

timeout 參數源於前面的new SocketInputStream(this)(也就是 AbstractPlainSocketImpl 對象)中的this引用impl.getTimeout(),這個參數的作用是指定read的超時時間,超時之後沒有結果拋出異常。

serverSocket.setSoTimeout(SO_TIMEOUT);

瞭解read方法中timeout的作用之後,我們便可以着手改造代碼了,具體的改造部分個人放到後文單獨的 titile 進行説明,方便後續回顧。

此外,這裏經過仔細考慮,判斷這部分代碼讀者很有可能會存在理解誤區,誤以為此處的 AbstractPlainSocketImpl 屬於 ServerSocket,實際上它屬於 Socket,也就是説我們設置的 timeout 是設置到 SocketAbstractPlainSocketImpl

最為簡單的證明方法是先在 java.net.Socket#setImpl 中打上斷點,在啓動BIO的服務端之後,立即啓動客户端,具體的Debug斷點如下:

Socket 的 setImpl

通過單步調試,我們在BioServerSocket 中看到兩個對象是不一樣的。

BioServerSocket

對象對比

為什麼不一樣呢?這裏需要回顧前面的【ServerSocket中accept 解讀】這一部分的操作。這裏把重要操作標記了一下:

ServerSocket中accept解讀可能的理解誤區1

這裏複習之前提到的內容,在accept(); 中為了確保Socket連接是正確並且可用的,每次都會new Socket(),而這裏的SocksSocketImpl 是屬於 Socket 的成員變量。

在進行Socket套接字連接之前會先判斷是否初始化,如果初始化沒有就先進行初始化(具體可以看紅框框的位置)。

如果還是理解不了,那麼只能再次寄出另一張殺手鐗圖了:

ServerSocket中accept解讀可能的理解誤2

實現 Socket 中的 read 方法非阻塞

AbstractPlainSocketImpl實現socketRead方法非阻塞,具體做法其實就是使用 AbstractPlainSocketImpl 傳入了 timeout 參數,實現 SocketInputStream 非阻塞read

表面上看上去 read 方法是非阻塞的,實際上這裏存在一個明顯的 誤區,那就是在socket = serverSocket.accept();這一段代碼中,服務端構建出 Socket 連接之後,客户端和服務端交互是通過獨立的Socket對象完成IO讀寫的。

然而在第一次改造過後,實際上還有兩點不易察覺的問題:

(1)服務端read的非阻塞輪詢效率非常低,基本上是“一核繁忙、多核圍觀”的情況。

(2)第一次改造設置的是設定的是ServerSocket級別SocksSocketImpl的timeout。每個新的客户端進來都是新的Socket連接,每個Socket又有各自的 SocksSocketImpl,這裏客户端連接所產生新的Sockettimeout是沒有做設置的,換句話説,服務端針對每個Socket的read依然是完全阻塞。

前文提到,在BIO非阻塞同步模型中,我們雖然沒法解決 系統底層"同步" 問題,但是我們可以讓“非阻塞”這一塊更為優化合理和更為高效。

第一個問題的解決策略是啓動多線程以非阻塞read()方式輪詢,這樣做的另一點好處是,某個Socket讀寫壓力大並不會影響CPU 切到其他線程的正常工作。

解決第二點問題,我們需要為每個新的Socket設置 timeout

解決上面兩個問題,真正BIO非阻塞實現才算是真正成立,下面我們來看下第二版代碼優化:

 /**
     * 1. NIO 改寫,accept 非阻塞
     * 2. 實現 read() 同樣非阻塞
     *
     * @param port
     * @return void
     * @description
     * @author xander
     * @date 2023/7/12 16:38
     */
    public void initNioAndNioReadServer(int port) {
        ServerSocket serverSocket = null;//服務端Socket
        Socket socket = null;//客户端socket
        BufferedReader reader = null;
        ExecutorService threadPool = Executors.newCachedThreadPool();
        String inputContent;
        int count = 0;
        try {
            serverSocket = new ServerSocket(port);
            // 1. 需要設置超時時間,會等待設置的時間之後再進行返回
            serverSocket.setSoTimeout(SO_TIMEOUT);
            log.info(stringNowTime() + ": serverSocket started");
            while (true) {
                // 2. 如果超時沒有獲取,這裏會拋出異常,這裏的處理策略是不處理異常
                try {
                    socket = serverSocket.accept();
                } catch (SocketTimeoutException e) {
                    //運行到這裏表示本次accept是沒有收到任何數據的,服務端的主線程在這裏可以做一些其他事情
                    log.info("now time is: " + stringNowTime());
                    continue;
                }
                // 3. 拿到Socket 之後,應該使用線程池新開線程方式處理客户端連接,提高CPU利用率。
                Thread thread = new Thread(new ClientSocketThread(socket));
                threadPool.execute(thread);
//                log.info(stringNowTime() + ": id為" + socket.hashCode() + "的Clientsocket connected");
//                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
//
//                while ((inputContent = reader.readLine()) != null) {
//                    log.info("收到 id為" + socket.hashCode() + "  " + inputContent);
//                    count++;
//                }
//                log.info("id為" + socket.hashCode() + "的Clientsocket " + stringNowTime() + "讀取結束");
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (Objects.nonNull(reader)) {
                    reader.close();
                }
                if (Objects.nonNull(socket)) {

                    socket.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    /**
     * 改寫 客户端 Socket 連接為單獨線程處理
     */
    class ClientSocketThread implements Runnable {

        private static final int SO_TIMEOUT = 2000;
        
        private static final int SLEEP_TIME = 1000;

        public final Socket socket;

        public ClientSocketThread(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            BufferedReader reader = null;
            String inputContent;
            int count = 0;
            try {
                socket.setSoTimeout(SO_TIMEOUT);
            } catch (SocketException e1) {
                e1.printStackTrace();
            }
            try {
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                while (true) {
                    try {
                        while ((inputContent = reader.readLine()) != null) {
                            log.info("收到id為" + socket.hashCode() + "  " + inputContent);
                            count++;
                        }
                    } catch (Exception e) {
                        //執行到這裏表示read方法沒有獲取到任何數據,線程可以執行一些其他的操作
                        log.info("Not read data: " + stringNowTime());
                        continue;
                    }
                    //執行到這裏表示讀取到了數據,我們可以在這裏進行回覆客户端的工作
                    log.info("id為" + socket.hashCode() + "的Clientsocket " + stringNowTime() + "讀取結束");
                    Thread.sleep(SLEEP_TIME);
                }
            } catch (IOException | InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                try {
                    if (Objects.nonNull(reader)) {
                        reader.close();
                    }
                    if (Objects.nonNull(socket)) {
                        socket.close();
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }

    }

經過上面的改造,我們基本把 BIO 同步阻塞的工作方式更新為 同步非阻塞的工作方式,核心是對於 read()以及服務端接收新連接的accept()設置timeout參數。

在外部處理上,通過while(true) 加上“吞異常”方式,結合Thread.sleep()的套路實現“非阻塞”定期accept

當然,我們也可以看到,通過線程池每次都構建新線程的方式,在連接比較少的時候是比較高效的,但是一旦連接暴增,理論上JVM雖然可以構建非常多線程,實際上CPU肯定是吃不消,多線程“空輪詢”判斷的方式也十分浪費CPU資源,多線程切換起來更是雪上加霜。

基於BIO的種種弊端,Sun 在JDK1.4 提供了 NIO 來解決上面的幾點問題。

native accept方法在Linux運作解讀

accept(2): accept connection on socket - Linux man page (die.net)

原始文檔相關解讀:[[【Linux】accept(2) - Linux man page]],下面的內容基本為文檔的翻譯和理解介紹。

accept()本地方法,我們可以來試着看一看Linux這塊的相關解讀:

#include <sys/types.h>

#include <sys/socket.h>

int accept(int sockfd,struct sockaddr *addr,socklen_t *addrlen);

accept()系統調用主要用在基於連接的套接字類型,比如SOCK_STREAMSOCK_SEQPACKET。它提取出所監聽套接字的等待連接隊列中第一個連接請求,創建一個新的套接字,並返回指向該套接字的文件描述符。新建立的套接字不在監聽狀態,原來所監聽的套接字也不受該系統調用的影響。

備註:新建立的套接字準備發送send()和接收數據recv()

sockfd,作用是 利用系統調用socket()建立的套接字描述符,通過bind()綁定到一個本地地址(一般為服務器的套接字),並且通過listen()一直在監聽連接;

addr, 指向struct sockaddr的指針,該結構用通訊層服務器對等套接字的地址(一般為客户端地址)填寫,返回地址addr的確切格式由套接字的地址類別(比如TCP或UDP)決定;

addr為NULL,沒有有效地址填寫,這種情況下,addrlen也不使用,應該置為NULL;

備註:addr是個指向局部數據結構sockaddr_in的指針,這就是要求接入的信息本地的套接字(地址和指針)。

addrlen, 代表一個值結果參數,調用函數必須初始化為包含addr所指向結構大小的數值,函數返回時包含對等地址(一般為服務器地址)的實際數值;

備註:addrlen是個局部整形變量,設置為sizeof(struct sockaddr_in)

如果隊列中沒有等待的連接,套接字也沒有被標記為Non-blocking,accept()會阻塞調用函數直到連接出現;如果套接字被標記為Non-blocking,隊列中也沒有等待的連接,accept()返回錯誤EAGAINEWOULDBLOCK

備註:一般來説accept()為阻塞函數,當監聽socket調用accept()時,它先到自己的receive_buf中查看是否有連接數據包;若有,把數據拷貝出來,刪掉接收到的數據包,創建新的socket與客户發來的地址建立連接;若沒有,就阻塞等待;

為了在套接字中有到來的連接時得到通知,可以使用select()poll()。當嘗試建立新連接時,系統發送一個可讀事件,然後調用accept()為該連接獲取套接字。另一種方法是,當套接字中有連接到來時設定套接字發送SIGIO信號。

返回值成功時,返回非負整數,該整數是接收到套接字的描述符;出錯時會返回-1,相應地設定全局變量error。

所以,在Java部分的源碼裏(java.net.ServerSocket#accept)會new 一個Socket出來,方便連接後拿到的新Socket的文件描述符的信息給設定到我們new出來的這個Socket 上來,這點在java.net.PlainSocketImpl#socketAccept中看到的尤為明顯,讀者可以回顧相關源碼。

總結

本文一開始介紹了Bio Socket的基本代碼,接着從ServerSocketbind方法解讀,通過圖文結合的方式介紹了源碼如何處理,整個bind操作過程中有許多native層調用,所以Socket的代碼調試是非常麻煩的。

介紹完bind之後,我們接着介紹了ServerSocketaccept方法,並且介紹了accept 方法的阻塞問題實際上和底層的操作系統行為有關,並且通過畫圖的方式理解accept中Socket連接比較“繞”的操作。

最後,文章的後半部分介紹瞭如何改造accept以及客户端的Socket連接解決非阻塞問題IO,最後我們介紹了 native accept方法在Linux運作,主要內容為Linux的相關文檔理解。

寫在最後

理解Socket的非阻塞操作有助於理解 NIO的Channel和Buffer的概念,實際上從我們的Demo代碼可以看到Channel和非阻塞的BIO思路比較類似,而BufferReader緩衝流則貼合了 Buffer 的概念。

參考資料

Linux Network Programming, Part 1 (linuxjournal.com)

詳解socket中的backlog 參數 - 知乎 (zhihu.com)

BIO到NIO源碼的一些事兒之BIO - 掘金 (juejin.cn)

CachedThreadPool的工作原理

源碼:

public static ExecutorService newCachedThreadPool() {  
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                  60L, TimeUnit.SECONDS, //60s 
                                  new SynchronousQueue<Runnable>());  
}

(1)corePoolSize = 0,maximumPoolSize = 最大值(無限大),keepAliveTime = 60s,workQueue = SynchronousQueue

(2)SynchronousQueue(實際上沒有存儲數據的空閒,是用來做多線程通信之間的協調作用的)。一開始提交一個任務過來,要求線程池裏必須有一個線程對應可以處理這個任務,但是此時一個線程都沒有,poolSize >= corePoolSize , workQueue已經滿了,poolSize < maximumPoolSize(最大值),直接就會創建一個新的線程來處理這個任務

這樣的效果也就是來一個任務就開一個線程,無界,無限開新線程,線程過多容易導致JVM的壓力過大甚至直接崩潰。這也是為什麼阿里巴巴規範禁掉這個方法的直接原因,容易誤用。

(3)如果短期內有大量的任務都涌進來,實際上是走一個直接提交的思路,對每個任務,如果沒法找到一個空閒的線程來處理它,那麼就會立即創建一個新的線程出來,來處理這個新提交的任務

(4)短時間內,如果大量的任務涌入,可能會導致瞬間創建出來幾百個線程,幾千個線程,是不固定的。

(5)但是當這些線程工作完一段時間之後,就會處於空閒狀態,就會看超過60s的空閒,就會直接將空閒的線程給釋放掉。

user avatar seazhan Avatar lenve Avatar javaedge Avatar daimajiangxin Avatar swifter Avatar wangjingyu_5f58472234cff Avatar dadehouzi Avatar fu_623f04ad34d53 Avatar hawawahahahawa Avatar knifeblade Avatar fulng Avatar yifu Avatar
Favorites 12 users favorite the story!
Favorites

Add a new Comments

Some HTML is okay.