博客 / 詳情

返回

BlockingQueue:阻塞操作與條件隊列的高效結合

BlockingQueue和BlockingDeque

BlockingQueue

BlockingQueue 通常用於一個線程生產對象,而另外一個線程消費這些對象的場景。下圖是對這個原理的闡述:

一個線程往裏邊放,另外一個線程從裏邊取的一個 BlockingQueue。

一個線程將會持續生產新對象並將其插入到隊列之中,直到隊列達到它所能容納的臨界點。也就是説,它是有限的。如果該阻塞隊列到達了其臨界點,負責生產的線程將會在往裏邊插入新對象時發生阻塞。它會一直處於阻塞之中,直到負責消費的線程從隊列中拿走一個對象。 負責消費的線程將會一直從該阻塞隊列中拿出對象。如果消費線程嘗試去從一個空的隊列中提取對象的話,這個消費線程將會處於阻塞之中,直到一個生產線程把一個對象丟進隊列。

BlockingQueue 的方法

BlockingQueue 具有 4 組不同的方法用於插入、移除以及對隊列中的元素進行檢查。如果請求的操作不能得到立即執行的話,每個方法的表現也不同。這些方法如下:

拋異常 特定值 阻塞 超時
插入 add(o) offer(o) put(o) offer(o, timeout, timeunit)
移除 remove() poll() take() poll(timeout, timeunit)
檢查 element() peek()

四組不同的行為方式解釋:

  • 拋異常:如果試圖的操作無法立即執行,拋一個異常。
  • 特定值:如果試圖的操作無法立即執行,返回一個特定的值(常常是 true / false)。
  • 阻塞:如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行。
  • 超時:如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。返回一個特定值以告知該操作是否成功(典型的是 true / false)。

無法向一個 BlockingQueue 中插入 null。如果你試圖插入 null,BlockingQueue 將會拋出一個 NullPointerException。 可以訪問到 BlockingQueue 中的所有元素,而不僅僅是開始和結束的元素。比如説,你將一個對象放入隊列之中以等待處理,但你的應用想要將其取消掉。那麼你可以調用諸如 remove(o) 方法來將隊列之中的特定對象進行移除。但是這麼幹效率並不高,因此你儘量不要用這一類的方法,除非你確實不得不那麼做。

BlockingDeque

java.util.concurrent 包裏的 BlockingDeque 接口表示一個線程安放入和提取實例的雙端隊列。

BlockingDeque 類是一個雙端隊列,在不能夠插入元素時,它將阻塞住試圖插入元素的線程;在不能夠抽取元素時,它將阻塞住試圖抽取的線程。 deque(雙端隊列) 是 "Double Ended Queue" 的縮寫。因此,雙端隊列是一個你可以從任意一端插入或者抽取元素的隊列。

在線程既是一個隊列的生產者又是這個隊列的消費者的時候可以使用到 BlockingDeque。如果生產者線程需要在隊列的兩端都可以插入數據,消費者線程需要在隊列的兩端都可以移除數據,這個時候也可以使用 BlockingDeque。BlockingDeque 圖解:

BlockingDeque 的方法

一個 BlockingDeque - 線程在雙端隊列的兩端都可以插入和提取元素。 一個線程生產元素,並把它們插入到隊列的任意一端。如果雙端隊列已滿,插入線程將被阻塞,直到一個移除線程從該隊列中移出了一個元素。如果雙端隊列為空,移除線程將被阻塞,直到一個插入線程向該隊列插入了一個新元素。

BlockingDeque 具有 4 組不同的方法用於插入、移除以及對雙端隊列中的元素進行檢查。如果請求的操作不能得到立即執行的話,每個方法的表現也不同。這些方法如下:

拋異常 特定值 阻塞 超時
插入 addFirst(o) offerFirst(o) putFirst(o) offerFirst(o, timeout, timeunit)
移除 removeFirst(o) pollFirst(o) takeFirst(o) pollFirst(timeout, timeunit)
檢查 getFirst(o) peekFirst(o)
拋異常 特定值 阻塞 超時
插入 addLast(o) offerLast(o) putLast(o) offerLast(o, timeout, timeunit)
移除 removeLast(o) pollLast(o) takeLast(o) pollLast(timeout, timeunit)
檢查 getLast(o) peekLast(o)

四組不同的行為方式解釋:

  • 拋異常: 如果試圖的操作無法立即執行,拋一個異常。
  • 特定值: 如果試圖的操作無法立即執行,返回一個特定的值(常常是 true / false)。
  • 阻塞: 如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行。
  • 超時: 如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。返回一個特定值以告知該操作是否成功(典型的是 true / false)。

BlockingDeque 與BlockingQueue關係

BlockingDeque 接口繼承自 BlockingQueue 接口。這就意味着你可以像使用一個 BlockingQueue 那樣使用 BlockingDeque。如果你這麼幹的話,各種插入方法將會把新元素添加到雙端隊列的尾端,而移除方法將會把雙端隊列的首端的元素移除。正如 BlockingQueue 接口的插入和移除方法一樣。

以下是 BlockingDeque 對 BlockingQueue 接口的方法的具體內部實現:

BlockingQueue BlockingDeque
add() addLast()
offer() x 2 offerLast() x 2
put() putLast()
remove() removeFirst()
poll() x 2 pollFirst()
take() takeFirst()
element() getFirst()
peek() peekFirst()

BlockingQueue 的例子

這裏是一個 Java 中使用 BlockingQueue 的示例。本示例使用的是 BlockingQueue 接口的 ArrayBlockingQueue 實現。 首先,BlockingQueueExample 類分別在兩個獨立的線程中啓動了一個 Producer 和 一個 Consumer。Producer 向一個共享的 BlockingQueue 中注入字符串,而 Consumer 則會從中把它們拿出來。

public class BlockingQueueExample {
    public static void main(String[] args) throws Exception {
        BlockingQueue queue = new ArrayBlockingQueue(1024);
        
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
 
        new Thread(producer).start();
        new Thread(consumer).start();
 
        Thread.sleep(4000);
    }
}

以下是 Producer 類。注意它在每次 put() 調用時是如何休眠一秒鐘的。這將導致 Consumer 在等待隊列中對象的時候發生阻塞。

public class Producer implements Runnable{
    protected BlockingQueue queue = null;
    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }
    public void run() {
        try {
            queue.put("1");
            Thread.sleep(1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

以下是 Consumer 類。它只是把對象從隊列中抽取出來,然後將它們打印到 System.out。

public class Consumer implements Runnable{
    protected BlockingQueue queue = null;
    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }
    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

數組阻塞隊列 ArrayBlockingQueue

ArrayBlockingQueue 類實現了 BlockingQueue 接口。

ArrayBlockingQueue 是一個有界的阻塞隊列,其內部實現是將對象放到一個數組裏。有界也就意味着,它不能夠存儲無限多數量的元素。它有一個同一時間能夠存儲元素數量的上限。你可以在對其初始化的時候設定這個上限,但之後就無法對這個上限進行修改了(譯者注: 因為它是基於數組實現的,也就具有數組的特性: 一旦初始化,大小就無法修改)。 ArrayBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。 以下是在使用 ArrayBlockingQueue 的時候對其初始化的一個示例:

BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put("1");
Object object = queue.take();

以下是使用了 Java 泛型的一個 BlockingQueue 示例。注意其中是如何對 String 元素放入和提取的:

BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);
queue.put("1");
String string = queue.take();

延遲隊列 DelayQueue

DelayQueue 實現了 BlockingQueue 接口。

DelayQueue 對元素進行持有直到一個特定的延遲到期。注入其中的元素必須實現 java.util.concurrent.Delayed 接口,該接口定義:

public interface Delayed extends Comparable<Delayed< {
    public long getDelay(TimeUnit timeUnit);
}

DelayQueue 將會在每個元素的 getDelay() 方法返回的值的時間段之後才釋放掉該元素。如果返回的是 0 或者負值,延遲將被認為過期,該元素將會在 DelayQueue 的下一次 take 被調用的時候被釋放掉。

傳遞給 getDelay 方法的 getDelay 實例是一個枚舉類型,它表明了將要延遲的時間段。TimeUnit 枚舉將會取以下值:

  • DAYS
  • HOURS
  • INUTES
  • SECONDS
  • MILLISECONDS
  • MICROSECONDS
  • NANOSECONDS

正如你所看到的,Delayed 接口也繼承了 java.lang.Comparable 接口,這也就意味着 Delayed 對象之間可以進行對比。這個可能在對 DelayQueue 隊列中的元素進行排序時有用,因此它們可以根據過期時間進行有序釋放。 以下是使用 DelayQueue 的例子:

public class DelayQueueExample {
    public static void main(String[] args) {
        DelayQueue queue = new DelayQueue();
        Delayed element1 = new DelayedElement();
        queue.put(element1);
        Delayed element2 = queue.take();
    }
}

DelayedElement 是我所創建的一個 DelayedElement 接口的實現類,它不在 java.util.concurrent 包裏。你需要自行創建你自己的 Delayed 接口的實現以使用 DelayQueue 類。

鏈阻塞隊列 LinkedBlockingQueue

LinkedBlockingQueue 類實現了 BlockingQueue 接口。

LinkedBlockingQueue 內部以一個鏈式結構(鏈接節點)對其元素進行存儲。如果需要的話,這一鏈式結構可以選擇一個上限。如果沒有定義上限,將使用 Integer.MAX_VALUE 作為上限。

LinkedBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。 以下是 LinkedBlockingQueue 的初始化和使用示例代碼:

BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
BlockingQueue<String> bounded   = new LinkedBlockingQueue<String>(1024);
bounded.put("Value");
String value = bounded.take();

具有優先級的阻塞隊列 PriorityBlockingQueue

PriorityBlockingQueue 類實現了 BlockingQueue 接口。

PriorityBlockingQueue 是一個無界的併發隊列。它使用了和類 java.util.PriorityQueue 一樣的排序規則。你無法向這個隊列中插入 null 值。 所有插入到 PriorityBlockingQueue 的元素必須實現 java.lang.Comparable 接口。因此該隊列中元素的排序就取決於你自己的 Comparable 實現。 注意 PriorityBlockingQueue 對於具有相等優先級(compare() == 0)的元素並不強制任何特定行為。

同時注意,如果你從一個 PriorityBlockingQueue 獲得一個 Iterator 的話,該 Iterator 並不能保證它對元素的遍歷是以優先級為序的。 以下是使用 PriorityBlockingQueue 的示例:

BlockingQueue queue   = new PriorityBlockingQueue();
//String implements java.lang.Comparable
queue.put("Value");
String value = queue.take();

同步隊列 SynchronousQueue

SynchronousQueue 類實現了 BlockingQueue 接口。

SynchronousQueue 是一個特殊的隊列,它的內部同時只能夠容納單個元素。如果該隊列已有一元素的話,試圖向隊列中插入一個新元素的線程將會阻塞,直到另一個線程將該元素從隊列中抽走。同樣,如果該隊列為空,試圖向隊列中抽取一個元素的線程將會阻塞,直到另一個線程向隊列中插入了一條新的元素。 據此,把這個類稱作一個隊列顯然是誇大其詞了。它更多像是一個匯合點。

BlockingDeque 的例子

既然 BlockingDeque 是一個接口,那麼你想要使用它的話就得使用它的眾多的實現類的其中一個。java.util.concurrent 包提供了以下 BlockingDeque 接口的實現類: LinkedBlockingDeque。

以下是如何使用 BlockingDeque 方法的一個簡短代碼示例:

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();
deque.addFirst("1");
deque.addLast("2");
 
String two = deque.takeLast();
String one = deque.takeFirst();

鏈阻塞雙端隊列 LinkedBlockingDeque

LinkedBlockingDeque 類實現了 BlockingDeque 接口。

deque(雙端隊列) 是 "Double Ended Queue" 的縮寫。因此,雙端隊列是一個你可以從任意一端插入或者抽取元素的隊列。

LinkedBlockingDeque 是一個雙端隊列,在它為空的時候,一個試圖從中抽取數據的線程將會阻塞,無論該線程是試圖從哪一端抽取數據。

以下是 LinkedBlockingDeque 實例化以及使用的示例:

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();
deque.addFirst("1");
deque.addLast("2");
 
String two = deque.takeLast();
String one = deque.takeFirst();
user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.