在多線程編程中,線程間的數據交換是一個常見需求。Java IO包中的PipedInputStream和PipedOutputStream提供了一種高效的線程間通信機制,允許一批(多個)線程向PipedOutputStream寫入數據,另一批(多個)線程從PipedInputStream讀取數據。
但是,同一批(多個)線程相互之間會存在競爭,比如,同一批向PipedOutputStream寫入數據的線程會存在競爭,同一批從PipedInputStream讀取數據的線程也會存在競爭。因此PipedInputStream和PipedOutputStream中的線程安全需要通過synchronized關鍵字和wait()/notifyAll()機制實現。不建議在一個線程中同時使用PipedInputStream和PipedOutputStream,因為這樣可能會導致這個線程陷入死鎖狀態。
PipedInputStream和PipedOutputStream之間的通信本質上是一個生產者-消費者模型,其中PipedOutputStream作為生產者,PipedInputStream作為消費者。兩者通過一個循環緩衝區(byte[]數組)進行數據交換,PipedOutputStream將數據緩存在PipedInputStream的數組當中,等待PipedInputStream的讀取。
PipedInputStream和PipedOutputStream的UML關係圖,如下所示:

一、PipedOutputStream(生產者)源碼——向PipedInputStream(消費者)中的緩衝區(byte[]數組)寫入字節數據的輸出Stream(生產者)
package java.io;
import java.io.*;
public
class PipedOutputStream extends OutputStream {
//與這個PipedOutputStream(生產者)相關聯的 PipedInputStream (消費者)
private PipedInputStream sink;
//構造函數
public PipedOutputStream(PipedInputStream snk) throws IOException {
connect(snk);//調用connect()函數,來改變PipedInputStream (消費者)中一些變量的值
}
//構造函數
public PipedOutputStream() {
}
//線程同步函數:用來改變將要關聯的PipedInputStream (消費者)中一些變量的值
public synchronized void connect(PipedInputStream snk) throws IOException {
if (snk == null) {
throw new NullPointerException();//如果將要關聯的PipedInputStream (消費者)為null,拋出NullPointerException
} else if (sink != null || snk.connected) {
//如果與這個PipedOutputStream(生產者)相關聯的 PipedInputStream (消費者)!=null或者將要關聯的PipedInputStream (消費者)的boolean connected變量為true,則拋出IOException
throw new IOException("Already connected");
}
sink = snk;//將這個PipedOutputStream(生產者)與這個PipedInputStream (消費者)相關聯
snk.in = -1;//改變PipedInputStream (消費者)中的變量int in=-1
snk.out = 0;//改變PipedInputStream (消費者)中的變量int out=0
snk.connected = true;//改變PipedInputStream (消費者)中的變量boolean connected=true
}
//向與這個PipedOutputStream(生產者)相關聯的 PipedInputStream (消費者)的緩衝區(byte[]數組)寫入1個字節
public void write(int b) throws IOException {
if (sink == null) {
//如果與這個PipedOutputStream(生產者)相關聯的 PipedInputStream (消費者)== null,拋出IOException
throw new IOException("Pipe not connected");
}
sink.receive(b);//最終調用的是這個相關聯的 PipedInputStream (消費者)的receive(int b)函數
}
//向與這個PipedOutputStream(生產者)相關聯的 PipedInputStream (消費者)的緩衝區(byte[]數組)寫入byte[]數組b的[off,off+len)(左閉右開,不包括off+len)索引位置的字節
public void write(byte b[], int off, int len) throws IOException {
if (sink == null) {
//如果與這個PipedOutputStream(生產者)相關聯的 PipedInputStream (消費者)== null,拋出IOException
throw new IOException("Pipe not connected");
} else if (b == null) {
throw new NullPointerException();//如果byte[]數組b==null,拋出一個NullPointerException
} else if ((off < 0) || (off > b.length) || (len < 0) ||
((off + len) > b.length) || ((off + len) < 0)) {//byte[]數組b的[off,off+len)(左閉右開)索引位置是否有越界的檢查
throw new IndexOutOfBoundsException();//越界的話,拋出一個IndexOutOfBoundsException
} else if (len == 0) {
return;//如果len==0,結束本次函數調用
}
sink.receive(b, off, len);//最終調用的是這個相關聯的 PipedInputStream (消費者)的receive(byte b[], int off, int len)函數
}
//線程同步函數:使用notifyAll()函數喚醒所有與這個PipedOutputStream(生產者)相關聯的 PipedInputStream (消費者)線程(這個消費者可以綁定1~多個線程)
public synchronized void flush() throws IOException {
if (sink != null) {
synchronized (sink) {
sink.notifyAll();
}
}
}
//關閉這個PipedOutputStream(生產者),這個PipedOutputStream(生產者)不能再向與它相關聯的PipedInputStream(消費者)中的緩衝區(byte[]數組)寫入字節數據
public void close() throws IOException {
if (sink != null) {
sink.receivedLast();
}
}
}
二、PipedInputStream(消費者)源碼——從自己的緩衝區(byte[]數組)讀取字節數據的輸入Stream(消費者)
package java.io;
public class PipedInputStream extends InputStream {
//標記符:true表示與這個 PipedInputStream (消費者)相關聯的PipedOutputStream(生產者)已經關閉,反之,反之
boolean closedByWriter = false;
//標記符:true表示當前這個 PipedInputStream (消費者)已經關閉了,反之,反之
volatile boolean closedByReader = false;
//標記符:true表示與這個 PipedInputStream (消費者)相關聯的PipedOutputStream(生產者)已經持有了這個PipedInputStream (消費者)對象(或者叫已經連接上了),反之,反之
boolean connected = false;
Thread readSide;//當前消費的線程
Thread writeSide;//當前生產者的線程
//默認的PipedInputStream (消費者)的緩衝區(byte[]數組)的長度
private static final int DEFAULT_PIPE_SIZE = 1024;
//PipedInputStream (消費者)的緩衝區(byte[]數組)
protected byte buffer[];
//緩衝區(byte[]數組)的寫指針
protected int in = -1;
//緩衝區(byte[]數組)的讀指針
protected int out = 0;
//構造函數
public PipedInputStream(PipedOutputStream src) throws IOException {
this(src, DEFAULT_PIPE_SIZE);//緩衝區(byte[]數組)的長度使用默認值1024
}
//構造函數
public PipedInputStream(PipedOutputStream src, int pipeSize)
throws IOException {
initPipe(pipeSize);//緩衝區(byte[]數組)的長度使用指定的長度
//最終還是調用PipedOutputStream(生產者)的connect()函數,並把自身對象this傳遞進去,然後在PipedOutputStream(生產者)的connect()函數中,改變自己的3個變量int in=-1、int out=0、boolean connected=true
connect(src);
}
//構造函數,緩衝區(byte[]數組)的長度使用默認值1024
public PipedInputStream() {
initPipe(DEFAULT_PIPE_SIZE);
}
//構造函數,緩衝區(byte[]數組)的長度使用指定的長度
public PipedInputStream(int pipeSize) {
initPipe(pipeSize);
}
//初始化緩衝區(byte[]數組)
private void initPipe(int pipeSize) {
if (pipeSize <= 0) {
throw new IllegalArgumentException("Pipe Size <= 0");
}
buffer = new byte[pipeSize];
}
public void connect(PipedOutputStream src) throws IOException {
src.connect(this); //最終還是調用PipedOutputStream(生產者)的connect()函數,並把自身對象this傳遞進去,然後在PipedOutputStream(生產者)的connect()函數中,改變自己的3個變量int in=-1、int out=0、boolean connected=true
}
//線程同步函數:該函數只被PipedOutputStream(生產者)的write(int b)函數調用
protected synchronized void receive(int b) throws IOException {
checkStateForReceive();//檢查PipedInputStream (消費者)的狀態
writeSide = Thread.currentThread();//當前執行該函數的線程,就是生產者線程
if (in == out)
//如果緩衝區(byte[]數組)的讀指針==緩衝區(byte[]數組)的寫指針,喚醒所有消費者線程,自己這個生產者線程調用wait(1000)函數
awaitSpace();
if (in < 0) {//緩衝區(byte[]數組)的寫指針<0時,設置緩衝區(byte[]數組)的寫指針=0,緩衝區(byte[]數組)的讀指針=0
in = 0;
out = 0;
}
buffer[in++] = (byte)(b & 0xFF);//向緩衝區的寫指針位置寫入1個字節
if (in >= buffer.length) {
in = 0;//如果緩衝區滿了,設置緩衝區的寫指針=0
}
}
//線程同步函數:該函數只被PipedOutputStream(生產者)的write(byte b[], int off, int len)函數調用
synchronized void receive(byte b[], int off, int len) throws IOException {
checkStateForReceive();//檢查PipedInputStream (消費者)的狀態
writeSide = Thread.currentThread();//當前執行該函數的線程,就是生產者線程
int bytesToTransfer = len;//生產者線程要寫入到緩衝區(byte[]數組)中的字節總量
while (bytesToTransfer > 0) {
if (in == out)
//如果緩衝區(byte[]數組)的讀指針==緩衝區(byte[]數組)的寫指針,喚醒所有消費者線程,自己這個生產者線程調用wait(1000)函數
awaitSpace();
int nextTransferAmount = 0;//本次生產者線程要寫入到緩衝區(byte[]數組)中的字節數量
if (out < in) {
//如果緩衝區的讀指針<緩衝區的寫指針,本次要寫入到緩衝區(byte[]數組)中的字節數量=緩衝區的長度-緩衝區的寫指針
nextTransferAmount = buffer.length - in;
} else if (in < out) {
if (in == -1) {
in = out = 0;
//如果緩衝區的讀指針(out)> 緩衝區的寫指針(in)並且緩衝區的寫指針(in)=-1,先設置緩衝區的讀(out)、寫(in)指針=0,本次要寫入到緩衝區(byte[]數組)中的字節數量=緩衝區的長度
nextTransferAmount = buffer.length - in;
} else {
//如果緩衝區的讀指針(out)> 緩衝區的寫指針(in)並且緩衝區的寫指針(in)=-1,本次要寫入到緩衝區(byte[]數組)中的字節數量=讀指針(out)-寫指針(in)
nextTransferAmount = out - in;
}
}
//本次生產者線程要寫入到緩衝區(byte[]數組)中的字節數量最多為len,下次為len-本次寫入到緩衝區(byte[]數組)中的字節數量,也就是每次寫入的基於len個字節循環遞減上一次寫入的
if (nextTransferAmount > bytesToTransfer)
nextTransferAmount = bytesToTransfer;
assert(nextTransferAmount > 0);
System.arraycopy(b, off, buffer, in, nextTransferAmount);//向緩衝區(byte[]數組)的[in,in+nextTransferAmount)索引位置寫入byte[]數組b中[off,off+nextTransferAmount)索引位置的字節,都是左閉右開。
bytesToTransfer -= nextTransferAmount;//每一次都基於len個字節循環遞減本次寫入到緩衝區(byte[]數組)中的字節數量nextTransferAmount
off += nextTransferAmount;//將下次要從byte[]數組b中取字節的起始索引的位置(偏移量)+本次寫入到緩衝區(byte[]數組)中的字節數量nextTransferAmount
in += nextTransferAmount;//將緩衝區的寫指針(in)+本次寫入到緩衝區(byte[]數組)中的字節數量nextTransferAmount
if (in >= buffer.length) {
in = 0;//如果緩衝區的寫指針(in)> 緩衝區(byte[]數組)的長度,設置緩衝區的寫指針(in)=0
}
}
}
//檢查PipedInputStream (消費者)的狀態
private void checkStateForReceive() throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByWriter || closedByReader) {
throw new IOException("Pipe closed");
} else if (readSide != null && !readSide.isAlive()) {
throw new IOException("Read end dead");
}
}
//如果緩衝區(byte[]數組)的讀指針==緩衝區(byte[]數組)的寫指針,喚醒所有消費者線程,自己這個生產者線程調用wait(1000)函數
private void awaitSpace() throws IOException {
while (in == out) {
checkStateForReceive();
/* full: kick any waiting readers */
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
}
//關閉與這個 PipedInputStream (消費者)相關聯的PipedOutputStream(生產者)
synchronized void receivedLast() {
closedByWriter = true;
notifyAll();//喚醒所有消費者線程
}
//線程同步函數:消費者線程每次從緩衝區(byte[]數組)中讀取1個字節
public synchronized int read() throws IOException {
if (!connected) {//檢查標記符connected,如果為false,拋出IOException
throw new IOException("Pipe not connected");
} else if (closedByReader) {//檢查標記符closedByReader,如果為true,拋出IOException
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive()
&& !closedByWriter && (in < 0)) {
//檢查當前這個PipedInputStream (消費者)對象中引用的生產者線程和生產者線程的狀態,如果和標記符closedByWriter還有緩衝區(byte[]數組)的寫指針(in)不能對應的話,拋出一個IOException
throw new IOException("Write end dead");
}
readSide = Thread.currentThread();//當前執行該函數的線程,就是消費者線程
int trials = 2;//這是一個多次檢測的策略變量,防止生產者線程沒有關閉了與這個 PipedInputStream (消費者)相關聯的PipedOutputStream(生產者)時便拋出IOException
//in=-1的情況有種:
//①、生產者線程還沒有向緩衝區(byte[]數組)中寫任何字節
//②、消費者線程從緩衝區(byte[]數組)中讀完字節(byte)數據以後讀指針(out)=寫指針(in),那麼,當前消費者線程會設置寫指針(in)=-1
//③、消費者線程執行PipedInputStream 的close()函數後,關閉了這個 PipedInputStream (消費者)
while (in < 0) {
if (closedByWriter) {
/* closed by writer, return EOF */
return -1;
}
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
//多個消費者線程從緩衝區(byte[]數組)中讀的時候,並且前一個消費者線程已經把緩衝區(byte[]數組)中寫入的字節讀完了,並且前一個線程設置了寫指針(in)=-1,生產者線程也關閉了與這個 PipedInputStream (消費者)相關聯的PipedOutputStream(生產者)時,拋出一個IOException
throw new IOException("Pipe broken");
}
/* might be a writer waiting */
notifyAll();//此處的目的是為了喚醒所有生產者線程
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
int ret = buffer[out++] & 0xFF;//獲取緩衝區(byte[]數組)中讀指針(out)索引位置的字節,並且將讀指針(out)+1
if (out >= buffer.length) {
out = 0;//如果讀指針(out)>=緩衝區(byte[]數組)的長度,設置讀指針(out)=0
}
if (in == out) {
/* now empty */
in = -1;//如果消費者線程從緩衝區(byte[]數組)中讀完字節(byte)數據以後讀指針(out)=寫指針(in),那麼,當前消費者線程會設置寫指針(in)=-1
}
return ret;
}
//線程同步函數:如果緩衝區(byte[]數組)中有足夠多的字節的話(數量>len),消費者線程每次從緩衝區(byte[]數組)中讀取len個字節放到byte[]數組b的[off, off+len)索引位置(左閉右開,不包括off+len)
//如果緩衝區(byte[]數組)中字節的數量<len個(比如有in(寫指針)-out(讀指針)個),消費者線程每次從緩衝區(byte[]數組)中讀取(in-out)個字節放到byte[]數組b的[off, off+in-out)索引位置(左閉右開,不包括off+in-out)
public synchronized int read(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {//byte[]數組b的[off,off+len)(左閉右開)索引位置是否有越界的檢查
throw new IndexOutOfBoundsException();//越界的話,拋出一個IndexOutOfBoundsException
} else if (len == 0) {
return 0;//如果len==0,返回0
}
/* possibly wait on the first character */
int c = read();//先調用read()函數試探性從緩衝區(byte[]數組)中讀1個字節
if (c < 0) {
return -1;//如果試探性的從緩衝區(byte[]數組)中都讀不到1個字節,返回-1
}
b[off] = (byte) c;//把試探性從緩衝區(byte[]數組)中讀到的第1個字節放到byte[]數組b的off索引位置
int rlen = 1;//累計從緩衝區(byte[]數組)中讀到的所有字節數量
while ((in >= 0) && (len > 1)) {
int available;//本次執行System.arraycopy()函數可以從緩衝區(byte[]數組)中讀到byte[]數組b中的字節數量
if (in > out) {
available = Math.min((buffer.length - out), (in - out));
} else {
available = buffer.length - out;
}
// A byte is read beforehand outside the loop
if (available > (len - 1)) {//減掉試探性從緩衝區(byte[]數組)中讀到的第1個字節
available = len - 1;
}
System.arraycopy(buffer, out, b, off + rlen, available);
out += available;//讀指針(out)+System.arraycopy()函數從緩衝區(byte[]數組)中讀到byte[]數組b中的字節數量
rlen += available;//累計從緩衝區(byte[]數組)中讀到的所有字節數量 + System.arraycopy()函數從緩衝區(byte[]數組)中讀到byte[]數組b中的字節數量
len -= available;//len - System.arraycopy()函數從緩衝區(byte[]數組)中讀到byte[]數組b中的字節數量
if (out >= buffer.length) {
out = 0;//如果讀指針(out)>=緩衝區(byte[]數組)的長度,設置讀指針(out)=0
}
if (in == out) {
/* now empty */
in = -1;//如果消費者線程從緩衝區(byte[]數組)中讀完字節(byte)數據以後讀指針(out)=寫指針(in),那麼,當前消費者線程會設置寫指針(in)=-1
}
}
return rlen;//返回累計從緩衝區(byte[]數組)中讀到的所有字節數量
}
//線程同步函數:返回緩衝區(byte[]數組)中可以被消費者線程讀取的字節數量
public synchronized int available() throws IOException {
if(in < 0)
return 0;
else if(in == out)
return buffer.length;
else if (in > out)
return in - out;
else
return in + buffer.length - out;
}
//關閉這個 PipedInputStream (消費者),其實就是設置標記符closedByReader=true, 設置寫指針(in)=-1
public void close() throws IOException {
closedByReader = true;
synchronized (this) {
in = -1;
}
}
}
三、1個線程向PipedOutputStream(生產者)寫字節數據,1個線程從PipedInputStream(消費者)讀取字節數據的過程
3.1、非循環直接寫和非循環直接讀
package com.chelong.StreamAndReader;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class PipedTest {
public static void main(String[] args) throws IOException {
final PipedOutputStream output = new PipedOutputStream();
final PipedInputStream input = new PipedInputStream(output);
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
output.write("Hello world, pipe!".getBytes());//write()函數是阻塞的
} catch (IOException e) {
}
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
int data = -1;
while ((data = input.read()) != -1) {//read()函數是阻塞的
System.out.print((char) data);
}
} catch (IOException e) {
}
}
});
thread1.start();
thread2.start();
}
}
程序運行結果,如下所示:

main線程構造PipedOutputStream(生產者)和PipedInputStream(消費者)的過程如下:

向PipedOutputStream(生產者)寫字節數據的生產者線程的執行過程如下:

從PipedInputStream(消費者)讀取字節數據的消費者線程的執行過程如下:

3.1.1、非循環直接寫和非循環直接讀時1個生產者線程和1個消費者線程處理數據的過程
Java 語言定義了 6 種線程狀態, 在任意一個時間點, 一個線程只能有且只有其中的一種狀態, 這 6 種狀態分別如下:

這 6 種線程狀態的簡單介紹,如下所示

JVM運行時內存結構主要包含了五個部分:程序計數器 (PC寄存器)、 JVM棧、Native方法棧、堆、 方法區。如下圖所示:

圖中紅色部分是線程私有區域,進入這個區域的數據不會出現線程競爭的關係。而綠色區域中的數據則被所有線程共享,其中Java堆中存放的是大量對象,方法區中存放class信息、常量、靜態變量等數據。
每個線程的線程棧中會存放函數(方法)的描述符,成員(本地)變量等,函數(方法)在線程棧中會通過壓棧和彈棧來執行,除了8種(byte、short、int、long、float、double、boolean、char)基本的數據類型存儲在線程棧中以外,其餘的引用數據類型(對象)都存儲在堆中,然後通過引用將堆中的對象和線程棧中的變量關聯起來(也可以叫線程棧中的引用指向堆中的對象)。

那麼,當使用者執行3.1中的代碼時,1個生產者線程和1個消費者線程處理數據的過程如下:
①、main線程初始化一個緩衝區(byte[]數組),長度為1024(默認值),然後生產者線程通過不斷的壓棧來完成函數之間的調用,最終執行PipedInputStream.class::receive(byte b[], int off, int len)函數來對緩衝區(byte[]數組)進行填充,如下所示:

②、當生產者線程填充完緩衝區之後,寫指針變量int in=17,讀指針變量int out=0,Thread writeSide = 當前這個生產者線程(Thread)對象,生產者線程會把自己線程棧中修改的變量最終刷新到堆中PipedInputStream對象中,以確保其它消費者線程的線程棧從堆中讀取這3個變量時,這3個變量已經為修改後的值,如下所示:

③、消費者線程讀緩衝區(byte[]數組)的過程中會不斷地執行out++(讀指針)以讀取緩衝區(byte[]數組)中的可用字節並返回,直到out(讀指針)==in(寫指針),修改in(寫指針)=-1,並且每次同步執行PipedInputStream.class::read()函數時,都會更新Thread readSide = 當前這個消費者線程(Thread)對象,消費者線程也會把自己線程棧中修改的變量最終刷新到堆中PipedInputStream對象中,以確保其它消費者線程的線程棧從堆中讀取這3個變量時,這3個變量已經為修改後的值,如下所示:

④、更新in(寫指針)=-1後,消費者線程再次同步執行PipedInputStream.class::read()函數時,如果PipedInputStream::boolean closedByWriter變量為true,則會返回-1
3.2、加鎖循環寫和非加鎖循環讀到byte[]數組b中再處理
package com.chelong.pipe;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class PipeForTransferInThread {
public static void main(String[] args) throws IOException, InterruptedException {
final PipedOutputStream output = new PipedOutputStream();
final PipedInputStream input = new PipedInputStream(output);
//生產者線程
Thread producer = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 3; i++) {
synchronized (input) {
try {
// input.wait();
output.write("Hello world, pipe!".getBytes());
input.wait();//釋放鎖並無限等待,直到消費者線程consumer 執行notifyAll()函數來喚醒當前阻塞
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
},"生產者線程");
//消費者線程
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
try {
byte[] b = new byte[1024];//1KB
int readBytes = -1;
long lastTime = System.currentTimeMillis();
while ((readBytes = input.read(b, 0, b.length)) != -1) {
long curTime = System.currentTimeMillis();
System.out.print(Thread.currentThread().getName()+"本次讀取花費時間:" + (curTime - lastTime) + "ms,讀到的數據是:");
lastTime = curTime;
for (int i = 0; i < readBytes; i++) {
System.out.print((char) b[i]);//模擬處理字節數據
}
System.out.println();
}
} catch (IOException e) {
e.printStackTrace();
}
}
},"消費者線程");
producer.start();//生產者線程啓動
consumer.start();//消費者線程啓動
}
}
程序運行結果,如下所示:

main線程構造PipedOutputStream(生產者)和PipedInputStream(消費者)的過程可以參考3.1;
向PipedOutputStream(生產者)寫字節數據的生產者線程的執行過程可以參考3.1;
從PipedInputStream(消費者)讀取字節數據的消費者線程的執行過程如下:

3.2.1、加鎖循環寫和非加鎖循環讀到byte[]數組b中再處理時1個生產者線程和1個消費者線程處理數據的過程
標題3.2中的代碼的整個執行過程如下:
①、main線程初始化一個緩衝區(byte[]數組),長度為1024(默認值),如下所示:

②、然後生產者線程通過不斷的壓棧來完成函數之間的調用,最終執行PipedInputStream.class::receive(byte b[], int off, int len)函數來對緩衝區(byte[]數組)進行填充,並且先在自己的線程棧中更新in(寫指針)=17,out(讀指針)=0,writeSide=當前這個生產者線程(Thread)對象 如下所示:

當生產者線程對緩衝區(byte[]數組)填充完成之後,再執行標題3.2中的代碼
input.wait();
這行代碼會釋放鎖並讓生產者線程進入無限等待,直到消費者線程consumer執行notifyAll()函數來喚醒當前這個生產者線程。在這之前,生產者線程會將自己線程棧中的in(寫指針)=17,out(讀指針)=0,writeSide=當前這個生產者線程,這3個變量更新到主內存(也就是堆)中的PipedInputStream對象中。

③、消費者線程讀緩衝區(byte[]數組)的過程也是通過不斷的壓棧來完成函數之間的調用,最終執行PipedInputStream::read()函數(試探性的讀取1個字節)和PipedInputStream::read(byte b[], int off, int len)函數(讀取剩餘其它的字節)將步驟②中生產者線程寫入到緩衝區(byte[]數組)中的17個字節讀取出來

附言:最終消費者線程也會將自己線程棧中的in(寫指針)= -1,out(讀指針)= 17,writeSide=當前這個消費者線程,這3個變量更新到主內存(也就是堆)中的PipedInputStream對象中。
因此,本次消費者線程從緩衝區(byte[]數組)中讀數據的過程中沒有執行read()函數中的wait(1000)這一行代碼,如下:

所以,本次消費者線程從緩衝區(byte[]數組)中讀取數據到消費者線程中自己創建的byte[]數組中時,只花費了0ms:

接下來,當消費者線程將步驟②中生產者線程寫入到緩衝區(byte[]數組)中的17個字節讀取出來以後(通過System.arraycopy()函數複製到了消費者線程中自己創建的byte[]數組中),消費者線程會遍歷從緩衝區讀到的這個byte[]數組,來處理這些數據,如下所示(標題3.2中的代碼片段):
//標題3.2中的代碼片段
for (int i = 0; i < readBytes; i++) {
System.out.print((char) b[i]);//模擬處理字節數據
}
然後,當消費者線程再次執行
//標題3.2中的代碼片段
input.read(b, 0, b.length)
從緩衝區(byte[]數組)中讀數據到自己創建的byte[]數組中時,由於此時in(寫指針)=-1,並且當下圖中的其它5個條件都不成立時,喚醒執行了
input.wait()
的生產者線程,然後當前這個正在從緩衝區(byte數組)中讀數據的消費者線程執行wait 1000ms ,如下:

④、當生產者線程被消費者線程執行的
notifyAll();
喚醒之後,會再次通過不斷的壓棧來完成函數之間的調用,再次執行PipedInputStream.class::receive(byte b[], int off, int len)函數來對緩衝區(byte[]數組)進行填充,並且先在自己的線程棧中先更新in(寫指針)=17,out(讀指針)=0,writeSide=當前這個生產者線程(Thread)對象 如下所示:

當生產者線程對緩衝區(byte[]數組)填充完成之後,再執行標題3.2中的代碼
input.wait();
這行代碼會釋放鎖並讓生產者線程進入無限等待,直到消費者線程consumer執行notifyAll()函數來喚醒當前這個生產者線程。在這之前,生產者線程會將自己線程棧中的in(寫指針)=17,out(讀指針)=0,writeSide=當前這個生產者線程,這3個變量更新到主內存(也就是堆)中的PipedInputStream對象中。如下所示:

⑤、消費者線程在第③步執行了
wait(1000);
在等待了1000ms之後,消費者線程會自動喚醒繼續執行,此時自己線程棧中的in(寫指針)= -1,out(讀指針)= 17已經被第④步中的生產者線程修改為in(寫指針)=17,out(讀指針)=0(生產者線程不會直接修改消費者線程棧中的變量,生產者線程會先將自己線程棧中in(寫指針),out(讀指針)變量的值修改到主內存中,然後消費者線程會自己將主內存中的這2個變量值刷新到消費者自己的線程棧中),如下所示:

然後執行PipedInputStream::read()函數(試探性的讀取1個字節)和PipedInputStream::read(byte b[], int off, int len)函數(讀取剩餘其它的字節)將步驟④中生產者線程寫入到緩衝區(byte[]數組)中的17個字節讀取出來

附言:最終消費者線程也會將自己線程棧中的in(寫指針)= -1,out(讀指針)= 17,writeSide=當前這個消費者線程,這3個變量更新到主內存(也就是堆)中的PipedInputStream對象中。
由於,本次消費者線程從緩衝區(byte[]數組)中讀數據的過程是從步驟③中自動喚醒繼續執行的,所以,本次消費者線程從緩衝區(byte[]數組)中讀取數據到消費者線程中自己創建的byte[]數組中時,花費了1015ms:

接下來,當消費者線程將步驟④中生產者線程寫入到緩衝區(byte[]數組)中的17個字節讀取出來以後(通過System.arraycopy()函數複製到了消費者線程中自己創建的byte[]數組中),消費者線程會遍歷從緩衝區讀到的這個byte[]數組,來處理這些數據,如下所示(標題3.2中的代碼片段):
//標題3.2中的代碼片段
for (int i = 0; i < readBytes; i++) {
System.out.print((char) b[i]);//模擬處理字節數據
}
然後,當消費者線程再次執行
//標題3.2中的代碼片段
input.read(b, 0, b.length)
從緩衝區(byte[]數組)中讀數據到自己創建的byte[]數組中時,由於此時in(寫指針)=-1,並且當下圖中的其它5個條件都不成立時,喚醒執行了
input.wait()
的生產者線程,然後當前這個正在從緩衝區(byte[]數組)中讀數據的消費者線程執行wait 1000ms ,如下:

⑥、當生產者線程被消費者線程執行的
notifyAll();
喚醒之後,會再次通過不斷的壓棧來完成函數之間的調用,再次執行PipedInputStream.class::receive(byte b[], int off, int len)函數來對緩衝區(byte[]數組)進行填充,並且先在自己的線程棧中先更新in(寫指針)=17,out(讀指針)=0,writeSide=當前這個生產者線程(Thread)對象 如下所示:

當生產者線程對緩衝區(byte[]數組)填充完成之後,再執行標題3.2中的代碼
input.wait();
這行代碼會釋放鎖並讓生產者線程進入無限等待,直到消費者線程consumer執行notifyAll()函數來喚醒當前這個生產者線程。在這之前,生產者線程會將自己線程棧中的in(寫指針)=17,out(讀指針)=0,writeSide=當前這個生產者線程,這3個變量更新到主內存(也就是堆)中的PipedInputStream對象中。如下所示:

⑦、消費者線程在第⑤步執行了
wait(1000);
在等待了1000ms之後,消費者線程會自動喚醒繼續執行,此時自己線程棧中的in(寫指針)= -1,out(讀指針)= 17已經被第⑥步中的生產者線程修改為in(寫指針)=17,out(讀指針)=0(生產者線程不會直接修改消費者線程棧中的變量,生產者線程會先將自己線程棧中in(寫指針),out(讀指針)變量的值修改到主內存中,然後消費者線程會自己將主內存中的這2個變量值刷新到消費者自己的線程棧中),然後執行PipedInputStream::read()函數(試探性的讀取1個字節)和PipedInputStream::read(byte b[], int off, int len)函數(讀取剩餘其它的字節)將步驟⑥中生產者線程寫入到緩衝區(byte[]數組)中的17個字節讀取出來

附言:最終消費者線程也會將自己線程棧中的in(寫指針)= -1,out(讀指針)= 17,writeSide=當前這個消費者線程,這3個變量更新到主內存(也就是堆)中的PipedInputStream對象中。
由於,本次消費者線程從緩衝區(byte[]數組)中讀數據的過程是從步驟⑤中自動喚醒繼續執行的,所以,本次消費者線程從緩衝區(byte[]數組)中讀取數據到消費者線程中自己創建的byte[]數組中時,花費了1017ms:

接下來,當消費者線程將步驟⑥中生產者線程寫入到緩衝區(byte[]數組)中的17個字節讀取出來以後(通過System.arraycopy()函數複製到了消費者線程中自己創建的byte[]數組中),消費者線程會遍歷從緩衝區讀到的這個byte[]數組,來處理這些數據,如下所示(標題3.2中的代碼片段):
//標題3.2中的代碼片段
for (int i = 0; i < readBytes; i++) {
System.out.print((char) b[i]);//模擬處理字節數據
}
然後,當消費者線程再次執行
//標題3.2中的代碼片段
input.read(b, 0, b.length)
從緩衝區(byte[]數組)中讀數據到自己創建的byte[]數組中時,由於此時in(寫指針)=-1,並且當下圖中的其它5個條件都不成立時,喚醒執行了
input.wait()
的生產者線程,然後當前這個正在從緩衝區(byte[]數組)中讀數據的消費者線程執行wait 1000ms ,如下:

⑧、當生產者線程被消費者線程執行的
notifyAll();
喚醒之後,會跳出for循環,結束生產者線程的生命週期,之後,該線程對象會被操作系統回收。
⑨、消費者線程在第⑦步執行了
wait(1000);
在等待了1000ms之後,消費者線程會自動喚醒繼續執行,此時自己線程棧中的in(寫指針)= -1,out(讀指針)= 17,並且從
wait(1000);
的代碼之後,繼續執行,執行過程如下(從下圖的紫色流程繼續執行):

在執行了2個循環後,直到int trials = 0時,執行到判斷(writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)這個條件時就會為true(下圖的紅色流程)

然後,拋出了一個IOException("Pipe broken"),因此,可以得出int trials變量的含義:這個變量是一個多次檢測的策略變量,當生產者線程沒有關閉了與這個 PipedInputStream (消費者)相關聯的PipedOutputStream(生產者)時,並且writeSide變量指向的當前生產者線程已經被操作系統回收時(此時當前生產者線程對象的isAlive()函數會返回false),消費者線程會拋出1個IOException("Pipe broken"),並結束while循環,進而結束消費者線程的生命週期。之後,該線程對象也會被操作系統回收。如下圖所示:

3.2.2、怎樣防止3.2.1中第⑨步的生產者線程拋出IOException("Pipe broken")
回顧3.2.1中第⑨步中的消費者線程拋出IOException("Pipe broken")的產生過程:當執行到判斷(writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)這個條件時就會為true(下圖的紅色流程)

那麼,使用者就可以將上圖中紅色流程的前一步變成true即可,如下代碼所示(只修改了生產者線程中的代碼,消費者線程中的代碼沒有變化):
package com.chelong.pipe;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public static void main(String[] args) throws IOException, InterruptedException {
final PipedOutputStream output = new PipedOutputStream();
final PipedInputStream input = new PipedInputStream(output);
//生產者線程
Thread producer = new Thread(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < 3; i++) {
synchronized (input) {
// input.wait();
output.write("Hello world, pipe!".getBytes());
input.wait();//釋放鎖並無限等待,直到消費者線程thread2執行notifyAll()函數來喚醒當前阻塞
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (output != null) output.close();//調用close()函數關閉生產者對象
} catch (IOException e) {
e.printStackTrace();
}
}
}
}, "生產者線程");
//消費者線程
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
try {
byte[] b = new byte[1024];//1KB
int readBytes = -1;
long lastTime = System.currentTimeMillis();
while ((readBytes = input.read(b, 0, b.length)) != -1) {
long curTime = System.currentTimeMillis();
System.out.print(Thread.currentThread().getName() + "本次讀取花費時間:" + (curTime - lastTime) + "ms,讀到的數據是:");
lastTime = curTime;
for (int i = 0; i < readBytes; i++) {
System.out.print((char) b[i]);//模擬處理字節數據
}
System.out.println();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}, "消費者線程");
producer.start();//生產者線程啓動
consumer.start();//消費者線程啓動
}
}
程序運行結果,如下所示:

通過PipedOutputStream.class::close()的源碼可以看到這樣修改後消費者線程不再拋出IOException("Pipe broken")原因:
PipedOutputStream.class(生產者類)的源碼
package java.io;
import java.io.*;
public
class PipedOutputStream extends OutputStream {
...省略部分代碼...
//關閉這個PipedOutputStream(生產者),這個PipedOutputStream(生產者)不能再向與它相關聯的PipedInputStream(消費者)中的緩衝區(byte[]數組)寫入字節數據
public void close() throws IOException {
if (sink != null) {
sink.receivedLast();//調用PipedInputStream.class::receivedLast()函數
}
}
}
PipedInputStream .class(消費者類)的源碼
package java.io;
public class PipedInputStream extends InputStream {
//標記符:true表示與這個 PipedInputStream (消費者)相關聯的PipedOutputStream(生產者)已經關閉,反之,反之
boolean closedByWriter = false;
...省略部分代碼...
//關閉與這個 PipedInputStream (消費者)相關聯的PipedOutputStream(生產者)
synchronized void receivedLast() {
closedByWriter = true;//關閉後消費者再從緩衝區(byte[])數組中讀取字節數據時,會返回-1,不會拋出IOException了
notifyAll();//喚醒所有消費者線程
}
...省略部分代碼...
四、多個線程向PipedOutputStream(生產者)寫字節數據,多個線程從PipedInputStream(消費者)讀取字節數據的過程
略(待補充)