關於同步/異步,阻塞/非阻塞,Unix IO模型,可以先看這篇文章網絡系統 - Unix IO模型
BIO概述
阻塞式IO。也就是説io沒有就緒的時候,操作IO當前線程會被阻塞。也就是用户線程需要等待IO線程完成
服務器實現模式為一個連接一個線程,也就是説,客户端每當有一個連接請求的時候,服務器就需要啓動一個對應線程進行處理。但是如果這個連接不做任何事情,就會造成不必要的線程開銷。這種模型一般適用於連接數目小且固定的架構。
BIO 其實就是 Reactor的 單reactor 單進程/線程模型
BIO的問題
-
同一時間,服務器只能接受來自於客户端A的請求信息;雖然客户端A和客户端B的請求是同時進行的,但客户端B發送的請求信息只能等到服務器接受完A的請求數據後,才能被接受。
-
由於服務器一次只能處理一個客户端請求,當處理完成並返回後(或者異常時),才能進行第二次請求的處理。很顯然,這樣的處理方式在高併發的情況下,是不能採用的。
多線程方式 - 偽異步方式
上面説的情況是服務器只有一個線程的情況,那麼我們就能想到使用多線程技術來解決這個問題:
-
當服務器收到客户端X的請求後,(讀取到所有請求數據後)將這個請求送入一個獨立線程進行處理,然後主線程繼續接受客户端Y的請求。
-
客户端一側,也可以使用一個子線程和服務器端進行通信。這樣客户端主線程的其他工作就不受影響了,當服務器端有響應信息的時候再由這個子線程通過 監聽模式/觀察模式(等其他設計模式)通知主線程。
如下圖所示:
這種方式其實就是Reactor 的 單reactor 多線程/多進程模型,同樣有是有侷限性的,因此也就有了後文的NIO方案:
-
雖然在服務器端,請求的處理交給了一個獨立線程進行,但是操作系統通知accept()的方式還是單個的。也就是,實際上是服務器接收到數據報文後的“業務處理過程”可以多線程,但是數據報文的接受還是需要一個一個的來
-
在linux系統中,可以創建的線程是有限的。可以通過
cat /proc/sys/kernel/threads-max命令查看可以創建的最大線程數。當然這個值是可以更改的,但是線程越多,CPU切換所需的時間也就越長,用來處理真正業務的需求也就越少。 -
創建一個線程是有較大的資源消耗的。JVM創建一個線程的時候,即使這個線程不做任何的工作,JVM都會分配一個堆棧空間。這個空間的大小默認為128K,可以通過-Xss參數進行調整。當然還可以使用ThreadPoolExecutor線程池來緩解線程的創建問題,但是又會造成BlockingQueue積壓任務的持續增加,同樣消耗了大量資源。
-
另外,如果應用程序大量使用長連接的話,線程是不會關閉的。這樣系統資源的消耗更容易失控。 那麼,如果真想單純使用線程解決阻塞的問題,那麼自己都可以算出來您一個服務器節點可以一次接受多大的併發了。看來,單純使用線程解決這個問題不是最好的辦法。
BIO通信方式深入分析
BIO的問題關鍵不在於是否使用了多線程(包括線程池)處理這次請求,而在於accept()、read()的操作點都是被阻塞。要測試這個問題,也很簡單。這裏模擬了20個客户端(用20根線程模擬),利用JAVA的同步計數器CountDownLatch,保證這20個客户都初始化完成後然後同時向服務器發送請求,然後觀察一下Server這邊接受信息的情況。
服務器端使用單線程
- 客户端代碼(SocketClientDaemon),模擬20個客户端連接
public class SocketClientDaemon {
public static void main(String[] args) throws Exception {
Integer clientNumber = 20;
CountDownLatch countDownLatch = new CountDownLatch(clientNumber);
//分別開始啓動這20個客户端
for (int index = 0; index < clientNumber; index++) {
SocketClientRequestThread client = new SocketClientRequestThread(countDownLatch, index);
new Thread(client).start();
countDownLatch.countDown();
}
//這個wait不涉及到具體的實驗邏輯,只是為了保證守護線程在啓動所有線程後,進入等待狀態
synchronized (SocketClientDaemon.class) {
SocketClientDaemon.class.wait();
}
}
}
- 客户端代碼(SocketClientRequestThread模擬20個請求)
@Slf4j
public class SocketClientRequestThread implements Runnable {
private CountDownLatch countDownLatch;
//線程編號
private Integer clientIndex;
/**
* countDownLatch是java提供的同步計數器。
* 當計數器數值減為0時,所有受其影響而等待的線程將會被激活。這樣保證模擬併發請求的真實性
*
* @param countDownLatch
*/
public SocketClientRequestThread(CountDownLatch countDownLatch, Integer clientIndex) {
this.countDownLatch = countDownLatch;
this.clientIndex = clientIndex;
}
@Override
public void run() {
Socket socket = null;
OutputStream clientRequest = null;
InputStream clientResponse = null;
try {
socket = new Socket("localhost", 83);
clientRequest = socket.getOutputStream();
clientResponse = socket.getInputStream();
//等待,直到SocketClientDaemon完成所有線程的啓動,然後所有線程一起發送請求
this.countDownLatch.await();
//發送請求信息
clientRequest.write(("這是第" + this.clientIndex + " 個客户端的請求。").getBytes());
clientRequest.flush();
//在這裏等待,直到服務器返回信息
log.info("第{}個客户端的請求發送完成,等待服務器返回信息", this.clientIndex);
int maxLen = 1024;
byte[] contextBytes = new byte[maxLen];
int realLen;
String message = "";
//程序執行到這裏,會一直等待服務器返回信息(注意,前提是in和out都不能close,如果close了就收不到服務器的反饋了)
while ((realLen = clientResponse.read(contextBytes, 0, maxLen)) != -1) {
message += new String(contextBytes, 0, realLen);
}
log.info("接收到來自服務器的信息:{}", message);
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
try {
if (clientRequest != null) {
clientRequest.close();
}
if (clientResponse != null) {
clientResponse.close();
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
}
}
- 服務器端(SocketServer) 單個線程
@Slf4j
public class SocketServer {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(83);
try {
while (true) {
//這裏會被阻塞,直到能獲取到連接
Socket socket = serverSocket.accept();
//下面開始收取信息
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();
//獲取端口
Integer sourcePort = socket.getPort();
int maxLen = 2048;
byte[] contextBytes = new byte[maxLen];
//這裏會被阻塞,直到有數據準備好
int realLen = in.read(contextBytes, 0, maxLen);
//讀取信息
String message = new String(contextBytes, 0, realLen);
//打印信息
log.info("服務器收到來自於端口: {}的信息: {}", sourcePort, message);
Thread.sleep(10000);//模擬執行業務邏輯
//開始發送信息
out.write("回發響應信息!".getBytes());
//關閉
out.close();
in.close();
socket.close();
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
if (serverSocket != null) {
serverSocket.close();
}
}
}
}
經過執行就會發現,服務器一次只能處理一個客户端請求,當處理完成並返回後(或者異常時),才能進行第二次請求的處理。這就是上面提到的BIO存在的問題
優化服務器端為多線程
客户端代碼和上文一樣,最主要是更改服務器端的代碼:
@Slf4j
public class SocketServer {
static {
BasicConfigurator.configure();
}
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(83);
try {
while (true) {
Socket socket = serverSocket.accept();
//業務處理過程可以交給一個線程(這裏可以使用線程池),並且線程的創建是很耗資源的。
//但最終還是改變不了.accept()只能一個一個接受socket的情況,並且被阻塞的情況
SocketServerThread socketServerThread = new SocketServerThread(socket);
new Thread(socketServerThread).start();
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
if (serverSocket != null) {
serverSocket.close();
}
}
}
}
@Slf4j
class SocketServerThread implements Runnable {
private Socket socket;
public SocketServerThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
InputStream in = null;
OutputStream out = null;
try {
//下面收取信息
in = socket.getInputStream();
out = socket.getOutputStream();
Integer sourcePort = socket.getPort();
int maxLen = 1024;
byte[] contextBytes = new byte[maxLen];
//使用線程,同樣無法解決read方法的阻塞問題,
//也就是説read方法處同樣會被阻塞,直到操作系統有數據準備好
int realLen = in.read(contextBytes, 0, maxLen);
//讀取信息
String message = new String(contextBytes, 0, realLen);
log.info("服務器收到來自於端口: " + sourcePort + "的信息: " + message);
Thread.sleep(10000);//模擬執行業務邏輯
//下面開始發送信息
out.write("回發響應信息!".getBytes());
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
//關閉資源
try {
if (in != null) {
in.close();
}
if (out != null) {
out.close();
}
if (this.socket != null) {
this.socket.close();
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
}
}
這裏與單線程相比,使用了多線程來處理具體的業務。但還是改變不了.accept()只能一個一個阻塞處理 socket的情況
問題根源
那麼重點的問題並不是“是否使用了多線程”,而是為什麼accept()、read()方法會被阻塞。
API文檔中對於 serverSocket.accept() 方法的使用描述:
Listens for a connection to be made to this socket and accepts it. The method blocks until a connection is made.
翻譯一下:監聽與此套接字的連接並接受它。該方法會一直阻塞,直到建立連接為止。
這主要就涉及到阻塞式同步IO的工作原理:
-
服務器線程發起一個accept動作,詢問操作系統 是否有新的socket套接字信息從端口X發送過來。accept源碼如下:
// java.net.ServerSocket#accept public Socket accept() throws IOException { if (isClosed()) throw new SocketException("Socket is closed"); if (!isBound()) throw new SocketException("Socket is not bound yet"); Socket s = new Socket((SocketImpl) null); implAccept(s);//顯然會走到這個邏輯 return s; } //java.net.ServerSocket#implAccept(java.net.Socket) protected final void implAccept(Socket s) throws IOException { SocketImpl si = s.impl; // Socket has no SocketImpl if (si == null) {//上面傳進來的null si = implAccept(); s.setImpl(si); s.postAccept(); return; } //...省略 s.postAccept(); } //java.net.ServerSocket#implAccept() private SocketImpl implAccept() throws IOException { if (impl instanceof PlatformSocketImpl) { return platformImplAccept(); } else { //...省略 } } //java.net.ServerSocket#platformImplAccept private SocketImpl platformImplAccept() throws IOException { assert impl instanceof PlatformSocketImpl; // create a new platform SocketImpl and accept the connection SocketImpl psi = SocketImpl.createPlatformSocketImpl(false); implAccept(psi); return psi; } //java.net.ServerSocket#platformImplAccept private void implAccept(SocketImpl si) throws IOException { assert !(si instanceof DelegatingSocketImpl); // accept a connection impl.accept(si); //...省略 } //java.net.AbstractPlainSocketImpl#accept protected void accept(SocketImpl si) throws IOException { si.fd = new FileDescriptor(); acquireFD(); try { socketAccept(si); } finally { releaseFD(); } SocketCleanable.register(si.fd, true); } -
注意,是詢問操作系統。也就是説socket套接字的IO模式支持是基於操作系統的,那麼自然同步IO/異步IO的支持就是需要操作系統級別的了。如下:
// java.net.PlainSocketImpl#socketAccept void socketAccept(SocketImpl s) throws IOException { int nativefd = checkAndReturnNativeFD(); if (s == null) throw new NullPointerException("socket is null"); int newfd = -1; InetSocketAddress[] isaa = new InetSocketAddress[1]; if (timeout <= 0) { //如果沒有設置timeout,那麼在調用JNI時會一直等待,直到有數據返回 newfd = accept0(nativefd, isaa);//這是個JNI方法 } else { configureBlocking(nativefd, false); try { waitForNewConnection(nativefd, timeout); newfd = accept0(nativefd, isaa); if (newfd != -1) { configureBlocking(newfd, true); } } finally { configureBlocking(nativefd, true); } } /* Update (SocketImpl)s' fd */ fdAccess.set(s.fd, newfd); /* Update socketImpls remote port, address and localport */ InetSocketAddress isa = isaa[0]; s.port = isa.getPort(); s.address = isa.getAddress(); s.localport = localport; if (preferIPv4Stack && !(s.address instanceof Inet4Address)) throw new SocketException("Protocol family not supported"); } // java.net.PlainSocketImpl#accept0 static native int accept0(int fd, InetSocketAddress[] isaa) throws IOException;
最後調用的accept0十個native方法,就是調用的操作系統級別的accept。因此如果操作系統沒有發現有套接字從指定的端口X來,那麼操作系統就會等待。這樣serverSocket.accept()方法就會一直等待。這就是為什麼accept()方法為什麼會阻塞: 它內部的實現是使用的操作系統級別的同步IO