动态

详情 返回 返回

JAVA線程池有哪些隊列? 以及它們的適用場景案例 - 动态 详情

大家好,我是 V 哥。在高併發應用場景下,線程池的使用是必然的,那在線程中的隊列都有哪些呢?下面 V 哥整理的幾種常見的線程池隊列以及適用場景案例,分享給大家。

線程池中的隊列主要用於存放等待執行的任務,以下是幾種常見的線程池隊列:

1. 無界隊列(Unbounded Queue)

  • LinkedBlockingQueue(基於鏈表的阻塞隊列)

    • 特點:它是一個基於鏈表實現的阻塞隊列,默認情況下容量為 Integer.MAX_VALUE,也就是幾乎可以看作是無界的(實際受限於系統內存等因素)。當線程池中的線程處理任務速度小於任務提交速度時,任務會不斷被添加到這個隊列中,理論上不會出現隊列滿的情況,因此可以避免任務拒絕的情況發生,但如果任務持續快速堆積,可能會導致內存溢出等問題。
    • 適用場景:適用於任務量波動較大,但對任務拒絕比較敏感,希望儘可能容納所有提交任務的場景,比如一些後台異步任務處理場景,像日誌記錄異步處理等,只要內存資源允許,儘量接收並處理所有待記錄的日誌信息。

下面來看一個案例:

以下是一個簡單的使用Java實現的LinkedBlockingQueue類似功能的代碼示例,這裏為了突出核心邏輯,簡化了一些邊界情況處理等,但涵蓋了其主要的阻塞隊列特性,比如當隊列滿時阻塞插入線程,隊列空時阻塞獲取線程等,示例代碼如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

// 自定義的簡單LinkedBlockingQueue實現
public class MyLinkedBlockingQueue<E> {

    // 鏈表節點類,用於存儲隊列中的元素
    private static class Node<E> {
        E item;
        Node<E> next;

        Node(E x) {
            item = x;
        }
    }

    // 隊列頭節點
    private Node<E> head;
    // 隊列尾節點
    private Node<E> last;
    // 隊列當前元素個數
    private int count;
    // 隊列容量,這裏設置為Integer.MAX_VALUE模擬無界(實際受內存限制)
    private final int capacity = Integer.MAX_VALUE;

    // 用於併發控制的鎖
    private final Lock lock = new ReentrantLock();
    // 隊列非空條件,用於獲取元素時等待隊列有元素可用
    private final Condition notEmpty = lock.newCondition();
    // 隊列未滿條件,用於插入元素時等待隊列有空間
    private final Condition notFull = lock.newCondition();

    // 構造方法,初始化頭節點和尾節點
    public MyLinkedBlockingQueue() {
        head = new Node<>(null);
        last = head;
    }

    // 往隊列中插入元素的方法
    public void put(E e) throws InterruptedException {
        lock.lock();
        try {
            // 如果隊列已滿(這裏實際很難滿,除非內存耗盡等極端情況),阻塞等待有空間
            while (count == capacity) {
                notFull.await();
            }
            // 將新元素添加到隊列尾部
            Node<E> newNode = new Node<>(e);
            last.next = newNode;
            last = newNode;
            count++;
            // 插入元素後通知等待獲取元素的線程,隊列有元素了
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    // 從隊列中獲取元素的方法
    public E take() throws InterruptedException {
        lock.lock();
        try {
            // 如果隊列空,阻塞等待有元素可獲取
            while (count == 0) {
                notEmpty.await();
            }
            // 獲取頭節點的下一個節點(實際要獲取的元素所在節點)
            Node<E> first = head.next;
            E element = first.item;
            // 將頭節點指向下一個節點,移除當前獲取的元素
            head.next = first.next;
            if (last == first) {
                last = head;
            }
            count--;
            // 通知等待插入元素的線程,隊列有空間了
            notFull.signal();
            return element;
        } finally {
            lock.unlock();
        }
    }

    // 獲取當前隊列中元素的數量
    public int size() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
}

以下是一個簡單的測試類,用於演示如何使用這個自定義的MyLinkedBlockingQueue來模擬處理日誌記錄這樣的異步任務場景:

public class TestMyLinkedBlockingQueue {
    public static void main(String[] args) {
        MyLinkedBlockingQueue<String> queue = new MyLinkedBlockingQueue<>();

        // 模擬生產者線程,不斷產生日誌信息並放入隊列
        Thread producerThread = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                try {
                    String logMessage = "Log message " + i;
                    queue.put(logMessage);
                    System.out.println("Produced: " + logMessage);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        // 模擬消費者線程,從隊列中獲取日誌信息並處理(這裏簡單打印模擬處理)
        Thread consumerThread = new Thread(() -> {
            while (true) {
                try {
                    String log = queue.take();
                    System.out.println("Consumed: " + log);
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        producerThread.start();
        consumerThread.start();

        try {
            producerThread.join();
            consumerThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

在上述代碼中:

  1. MyLinkedBlockingQueue類是核心的自定義阻塞隊列實現:

    • 內部通過定義鏈表節點類Node來構建鏈表結構存儲元素。
    • 使用ReentrantLock來實現併發控制,配合Condition對象(notEmptynotFull)來實現當隊列空時阻塞獲取元素的線程、隊列滿時阻塞插入元素的線程的功能。
    • put方法用於向隊列中插入元素,當隊列元素個數達到設定容量(這裏模擬無界情況)時,線程會等待直到有空間可以插入元素,插入後會通知等待獲取元素的線程。
    • take方法用於從隊列中獲取元素,當隊列空時,線程會等待直到有元素可獲取,獲取元素後會通知等待插入元素的線程。
  2. TestMyLinkedBlockingQueue類是用於測試的主類:

    • 創建了自定義的阻塞隊列實例,並啓動了生產者線程和消費者線程。
    • 生產者線程不斷生成日誌信息(模擬)並放入隊列,消費者線程從隊列中取出日誌信息並模擬處理(簡單打印),展示了在異步任務處理場景下該阻塞隊列的基本使用方式。

需要注意的是,真正的LinkedBlockingQueue在Java的java.util.concurrent包中有着更完善的功能、異常處理以及性能優化等方面的設計,比如支持可中斷的插入和獲取操作、更精細的內存管理等,但這個示例可以幫助理解其基本的阻塞隊列原理和實現思路。

2. 有界隊列(Bounded Queue)

  • ArrayBlockingQueue(基於數組的阻塞隊列)

    • 特點:基於數組實現的阻塞隊列,在創建時需要指定隊列的容量大小。當隊列已滿時,若再有新的任務提交,提交任務的線程會被阻塞,直到隊列有空閒空間為止。它是一個有界的、遵循先進先出(FIFO)原則的隊列,保證了任務按照提交的先後順序依次執行。
    • 適用場景:適用於對資源使用有明確限制,需要控制隊列中任務數量的場景,例如在一個資源有限的服務器環境下,對同時處理的網絡請求任務數量進行限制,避免過多任務堆積耗盡系統資源,通過設置合適的隊列容量,確保系統的穩定性和響應性能。

來看一個案例實現:

以下是一個使用Java實現的簡單ArrayBlockingQueue類似功能的代碼示例,重點體現了其基於數組的阻塞隊列特性,包括有界容量、隊列滿時阻塞插入線程、隊列空時阻塞獲取線程等關鍵功能,示例代碼如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

// 自定義的簡單ArrayBlockingQueue實現
public class MyArrayBlockingQueue<E> {

    private final E[] items; // 用於存儲元素的數組
    private int takeIndex; // 下一個獲取元素的索引
    private int putIndex; // 下一個插入元素的索引
    private int count; // 當前隊列中元素的數量

    private final Lock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();

    // 構造方法,傳入隊列容量大小,初始化數組等相關屬性
    @SuppressWarnings("unchecked")
    public MyArrayBlockingQueue(int capacity) {
        if (capacity <= 0) {
            throw new IllegalArgumentException("Capacity must be greater than 0");
        }
        items = (E[]) new Object[capacity];
        takeIndex = 0;
        putIndex = 0;
        count = 0;
    }

    // 向隊列中插入元素的方法
    public void put(E e) throws InterruptedException {
        lock.lock();
        try {
            // 如果隊列已滿,阻塞等待有空間
            while (count == items.length) {
                notFull.await();
            }
            // 將元素放入數組指定位置(根據putIndex)
            items[putIndex] = e;
            // 更新putIndex,循環利用數組空間,達到數組末尾後回到開頭
            putIndex = (putIndex + 1) % items.length;
            count++;
            // 插入元素後通知等待獲取元素的線程,隊列有元素了
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    // 從隊列中獲取元素的方法
    public E take() throws InterruptedException {
        lock.lock();
        try {
            // 如果隊列空,阻塞等待有元素可獲取
            while (count == 0) {
                notEmpty.await();
            }
            // 獲取當前takeIndex位置的元素
            E element = items[takeIndex];
            // 將該位置元素置空,方便垃圾回收
            items[takeIndex] = null;
            // 更新takeIndex,循環利用數組空間
            takeIndex = (takeIndex + 1) % items.length;
            count--;
            // 通知等待插入元素的線程,隊列有空間了
            notFull.signal();
            return element;
        } finally {
            lock.unlock();
        }
    }

    // 獲取當前隊列中元素的數量
    public int size() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
}

來寫一個簡單的測試類,用於演示如何使用這個自定義的MyArrayBlockingQueue來模擬在資源有限環境下對網絡請求任務數量進行限制的場景:

public class TestMyArrayBlockingQueue {
    public static void main(String[] args) {
        // 設置隊列容量為5,模擬限制同時處理的任務數量
        MyArrayBlockingQueue<String> queue = new MyArrayBlockingQueue<>(5);

        // 模擬生產者線程,不斷產生網絡請求任務並放入隊列
        Thread producerThread = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    String request = "Network Request " + i;
                    queue.put(request);
                    System.out.println("Produced: " + request);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        // 模擬消費者線程,從隊列中獲取網絡請求任務並處理(這裏簡單打印模擬處理)
        Thread consumerThread = new Thread(() -> {
            while (true) {
                try {
                    String request = queue.take();
                    System.out.println("Consumed: " + request);
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        producerThread.start();
        consumerThread.start();

        try {
            producerThread.join();
            consumerThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

在上述代碼中:

  1. MyArrayBlockingQueue類是核心的自定義阻塞隊列實現:

    • 內部使用一個泛型數組items來存儲隊列中的元素,通過takeIndexputIndex來分別標記獲取元素和插入元素的索引位置,利用count記錄當前隊列中元素的數量。
    • 使用ReentrantLock進行併發控制,並配合Condition對象(notEmptynotFull)實現了隊列空時阻塞獲取元素的線程、隊列滿時阻塞插入元素的線程的功能。
    • put方法用於向隊列中插入元素,當隊列已滿(元素數量達到數組容量)時,線程會等待直到有空間可以插入元素,插入元素後會通知等待獲取元素的線程。
    • take方法用於從隊列中獲取元素,當隊列空時,線程會等待直到有元素可獲取,獲取元素後會通知等待插入元素的線程。
  2. TestMyArrayBlockingQueue類是用於測試的主類:

    • 創建了自定義的阻塞隊列實例,並設置了容量為5,模擬對網絡請求任務數量的限制場景。
    • 啓動了生產者線程和消費者線程,生產者線程不斷生成網絡請求任務(模擬)並放入隊列,消費者線程從隊列中取出任務並模擬處理(簡單打印),展示了在資源有限場景下該阻塞隊列的基本使用方式。

以上案例代碼,可以收藏起來,慢慢消化哈。

  • LinkedBlockingDeque(基於鏈表的雙向阻塞隊列)

    • 特點:它也是基於鏈表結構,但與LinkedBlockingQueue不同的是,它是一個雙向隊列,支持在隊列的兩端進行插入和移除操作,同樣可以設置容量限制成為有界隊列。在多線程環境下,這種雙向操作特性可以提供更靈活的任務調度方式,比如可以實現將高優先級任務從隊頭插入優先執行等情況。
    • 適用場景:適合需要靈活調整任務執行順序,同時又要對隊列規模進行控制的場景,比如在一個任務處理系統中,有緊急任務需要插隊優先處理時,可以通過在隊頭插入的方式讓其儘快被執行,並且通過設置容量防止過多任務無序堆積。

下面來看一個案例:

以下是一個使用Java實現的簡單LinkedBlockingDeque類似功能的代碼示例,體現了其基於鏈表的雙向阻塞隊列特性,包括可以在兩端插入和移除元素、設置容量限制、隊列滿或空時阻塞相應操作線程等關鍵功能,示例代碼如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

// 自定義的簡單LinkedBlockingDeque實現
public class MyLinkedBlockingDeque<E> {

    // 鏈表節點類,用於存儲隊列中的元素
    private static class Node<E> {
        E item;
        Node<E> prev;
        Node<E> next;

        Node(E x) {
            item = x;
        }
    }

    private Node<E> head; // 隊列頭節點
    private Node<E> tail; // 隊列尾節點
    private int count; // 當前隊列中元素的數量
    private final int capacity; // 隊列容量,用於控制隊列規模

    private final Lock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();

    // 構造方法,傳入隊列容量,初始化頭節點和尾節點
    public MyLinkedBlockingDeque(int capacity) {
        if (capacity <= 0) {
            throw new IllegalArgumentException("Capacity must be greater than 0");
        }
        this.capacity = capacity;
        head = new Node<>(null);
        tail = new Node<>(null);
        head.next = tail;
        tail.prev = head;
    }

    // 在隊列頭部插入元素的方法
    public void putFirst(E e) throws InterruptedException {
        lock.lock();
        try {
            // 如果隊列已滿,阻塞等待有空間
            while (count == capacity) {
                notFull.await();
            }
            Node<E> newNode = new Node<>(e);
            // 將新節點插入到頭部
            Node<E> next = head.next;
            head.next = newNode;
            newNode.prev = head;
            newNode.next = next;
            next.prev = newNode;
            count++;
            // 插入元素後通知等待獲取元素的線程,隊列有元素了
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    // 在隊列尾部插入元素的方法
    public void putLast(E e) throws InterruptedException {
        lock.lock();
        try {
            // 如果隊列已滿,阻塞等待有空間
            while (count == capacity) {
                notFull.await();
            }
            Node<E> newNode = new Node<>(e);
            // 將新節點插入到尾部
            Node<E> prev = tail.prev;
            prev.next = newNode;
            newNode.prev = prev;
            newNode.next = tail;
            tail.prev = newNode;
            count++;
            // 插入元素後通知等待獲取元素的線程,隊列有元素了
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    // 從隊列頭部獲取元素的方法
    public E takeFirst() throws InterruptedException {
        lock.lock();
        try {
            // 如果隊列空,阻塞等待有元素可獲取
            while (count == 0) {
                notEmpty.await();
            }
            Node<E> first = head.next;
            // 移除頭節點
            Node<E> next = first.next;
            head.next = next;
            next.prev = head;
            E element = first.item;
            first.item = null;
            count--;
            // 通知等待插入元素的線程,隊列有空間了
            notFull.signal();
            return element;
        } finally {
            lock.unlock();
        }
    }

    // 從隊列尾部獲取元素的方法
    public E takeLast() throws InterruptedException {
        lock.lock();
        try {
            // 如果隊列空,阻塞等待有元素可獲取
            while (count == 0) {
                notEmpty.await();
            }
            Node<E> last = tail.prev;
            // 移除尾節點
            Node<E> prev = last.prev;
            prev.next = tail;
            tail.prev = prev;
            E element = last.item;
            last.item = null;
            count--;
            // 通知等待插入元素的線程,隊列有元素了
            notFull.signal();
            return element;
        } finally {
            lock.unlock();
        }
    }

    // 獲取當前隊列中元素的數量
    public int size() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
}

咱們來寫一個測試類,用於演示如何使用這個自定義的MyLinkedBlockingDeque來模擬在任務處理系統中對任務執行順序靈活調整以及控制隊列規模的場景:

public class TestMyLinkedBlockingDeque {
    public static void main(String[] args) {
        // 設置隊列容量為5,模擬控制隊列規模
        MyLinkedBlockingDeque<String> queue = new MyLinkedBlockingDeque<>(5);

        // 模擬生產者線程,產生任務並插入隊列(先插入普通任務到尾部)
        Thread producerThread = new Thread(() -> {
            for (int i = 0; i < 8; i++) {
                try {
                    String task = "Task " + i;
                    if (i < 5) {
                        queue.putLast(task);
                    } else {
                        // 模擬有緊急任務,插入到頭部
                        queue.putFirst("Urgent Task " + (i - 5));
                    }
                    System.out.println("Produced: " + task);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        // 模擬消費者線程,從隊列中獲取任務並處理(這裏簡單打印模擬處理)
        Thread consumerThread = new Thread(() -> {
            while (true) {
                try {
                    String task = queue.takeFirst();
                    System.out.println("Consumed: " + task);
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        producerThread.start();
        consumerThread.start();

        try {
            producerThread.join();
            consumerThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

在上述代碼中:

  1. MyLinkedBlockingDeque類是核心的自定義雙向阻塞隊列實現:

    • 內部通過定義Node類構建雙向鏈表結構來存儲元素,有頭節點head和尾節點tail,通過指針維護節點之間的雙向關係。
    • 使用ReentrantLock進行併發控制,並配合Condition對象(notEmptynotFull)實現隊列空時阻塞獲取元素的線程、隊列滿時阻塞插入元素的線程的功能。
    • putFirstputLast方法分別用於向隊列頭部和尾部插入元素,當隊列已滿時,相應線程會等待直到有空間可插入,插入後通知等待獲取元素的線程。
    • takeFirsttakeLast方法分別用於從隊列頭部和尾部獲取元素,當隊列空時,相應線程會等待直到有元素可獲取,獲取後通知等待插入元素的線程。
  2. TestMyLinkedBlockingDeque類是用於測試的主類:

    • 創建了自定義的雙向阻塞隊列實例,並設置容量為5,模擬控制隊列規模的場景。
    • 啓動了生產者線程和消費者線程,生產者線程先正常往隊列尾部插入任務,然後模擬有緊急任務往隊列頭部插入,消費者線程從隊列頭部獲取任務並模擬處理(簡單打印),展示了在任務處理系統中該雙向阻塞隊列靈活調整任務執行順序以及控制隊列規模的基本使用方式。

學肥了麼,還不懂歡迎關注威哥愛編程私信給我,慢慢給你細説。

  • PriorityBlockingQueue(基於優先級的阻塞隊列)

    • 特點:這是一個支持優先級排序的無界阻塞隊列(雖然説是無界,但實際受系統資源限制),隊列中的元素(即任務)需要實現 Comparable 接口或者在創建隊列時傳入自定義的比較器 Comparator,以此來確定任務的優先級順序。每次從隊列中取出任務時,會優先取出優先級最高的任務進行執行。
    • 適用場景:適用於任務有明顯優先級區分的情況,例如在一個監控系統中,告警任務有不同的嚴重級別,嚴重級別高的告警任務(如服務器宕機告警)優先級更高,需要優先處理,就可以將這些告警任務放入PriorityBlockingQueue中,按照優先級高低依次執行。

來看一個案例:

以下是一個使用Java實現的簡單PriorityBlockingQueue類似功能的代碼示例,重點體現了基於優先級的阻塞隊列特性,即隊列中的元素需要實現Comparable接口來定義優先級順序,隊列能根據優先級高低來決定元素的取出順序,同時具備阻塞等待的功能,示例代碼如下:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

// 自定義的簡單PriorityBlockingQueue實現
public class MyPriorityBlockingQueue<E extends Comparable<? super E>> {

    private final List<E> queue; // 用於存儲元素的列表
    private final Lock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();

    // 構造方法,初始化存儲列表
    public MyPriorityBlockingQueue() {
        queue = new ArrayList<>();
    }

    // 向隊列中插入元素的方法
    public void put(E e) throws InterruptedException {
        lock.lock();
        try {
            queue.add(e);
            // 插入元素後進行上浮操作,確保滿足優先級順序
            siftUp(queue.size() - 1);
            // 插入元素後通知等待獲取元素的線程,隊列有元素了
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    // 從隊列中獲取並移除優先級最高的元素(即隊頭元素)的方法
    public E take() throws InterruptedException {
        lock.lock();
        try {
            // 如果隊列空,阻塞等待有元素可獲取
            while (queue.size() == 0) {
                notEmpty.await();
            }
            E result = queue.get(0);
            int lastIndex = queue.size() - 1;
            // 將隊尾元素移到隊頭
            E last = queue.get(lastIndex);
            queue.set(0, last);
            queue.remove(lastIndex);
            // 進行下沉操作,重新調整優先級順序
            siftDown(0);
            return result;
        } finally {
            lock.unlock();
        }
    }

    // 獲取當前隊列中元素的數量
    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }

    // 上浮操作,確保元素在合適的優先級位置(類似堆排序中的上浮操作)
    private void siftUp(int k) {
        while (k > 0) {
            int parent = (k - 1) / 2;
            E element = queue.get(k);
            E parentElement = queue.get(parent);
            if (element.compareTo(parentElement) >= 0) {
                break;
            }
            // 交換元素位置
            swap(parent, k);
            k = parent;
        }
    }

    // 下沉操作,確保元素在合適的優先級位置(類似堆排序中的下沉操作)
    private void siftDown(int k) {
        int half = queue.size() / 2;
        while (k < half) {
            int leftChild = 2 * k + 1;
            int rightChild = leftChild + 1;
            int childToSwap = leftChild;
            E element = queue.get(k);
            E leftChildElement = queue.get(leftChild);
            if (rightChild < queue.size() && leftChildElement.compareTo(queue.get(rightChild)) > 0) {
                childToSwap = rightChild;
            }
            E childToSwapElement = queue.get(childToSwap);
            if (element.compareTo(childToSwapElement) <= 0) {
                break;
            }
            // 交換元素位置
            swap(k, childToSwap);
            k = childToSwap;
        }
    }

    // 交換列表中兩個位置的元素
    private void swap(int i, int j) {
        E temp = queue.get(i);
        queue.set(i, queue.get(j));
        queue.set(j, temp);
    }
}

下面咱們來寫個測試類,用於模擬在監控系統中使用這個自定義的MyPriorityBlockingQueue來處理不同優先級告警任務的場景:

class AlertTask implements Comparable<AlertTask> {
    private final String message;
    private final int priority;

    public AlertTask(String message, int priority) {
        this.message = message;
        this.priority = priority;
    }

    @Override
    public int compareTo(AlertTask other) {
        // 按照優先級從小到大排序,優先級數值越小越優先,這裏返回差值來比較
        return Integer.compare(this.priority, other.priority);
    }

    @Override
    public String toString() {
        return "AlertTask{" +
                "message='" + message + '\'' +
                ", priority=" + priority +
                '}';
    }
}

public class TestMyPriorityBlockingQueue {
    public static void main(String[] args) {
        MyPriorityBlockingQueue<AlertTask> queue = new MyPriorityBlockingQueue<>();

        // 模擬生產者線程,產生不同優先級的告警任務並放入隊列
        Thread producerThread = new Thread(() -> {
            AlertTask highPriorityTask = new AlertTask("Server Down Alert", 1);
            AlertTask mediumPriorityTask = new AlertTask("High CPU Usage Alert", 3);
            AlertTask lowPriorityTask = new AlertTask("Disk Space Low Alert", 5);
            try {
                queue.put(highPriorityTask);
                queue.put(mediumPriorityTask);
                queue.put(lowPriorityTask);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // 模擬消費者線程,從隊列中獲取告警任務並處理(這裏簡單打印模擬處理)
        Thread consumerThread = new Thread(() -> {
            while (true) {
                try {
                    AlertTask task = queue.take();
                    System.out.println("Processing: " + task);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        producerThread.start();
        consumerThread.start();

        try {
            producerThread.join();
            consumerThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

在上述代碼中:

  1. MyPriorityBlockingQueue類是核心的自定義基於優先級的阻塞隊列實現:

    • 內部使用ArrayList來存儲隊列中的元素,通過ReentrantLock進行併發控制,並配合Condition對象notEmpty實現隊列空時阻塞獲取元素的線程的功能。
    • put方法用於向隊列中插入元素,插入後會調用siftUp方法進行上浮操作,以保證隊列中的元素始終按照優先級順序排列(基於元素實現的Comparable接口來比較),插入完成後還會通知等待獲取元素的線程。
    • take方法用於從隊列中獲取並移除優先級最高的元素(即隊頭元素),當隊列空時,線程會等待直到有元素可獲取,獲取元素前會先將隊尾元素移到隊頭並調用siftDown方法進行下沉操作,重新調整優先級順序,然後返回獲取的元素。
    • siftUpsiftDown方法分別實現了類似堆排序中的上浮和下沉操作,通過不斷比較元素的優先級並交換位置,來確保隊列中的元素符合優先級順序要求,swap方法用於交換列表中兩個位置的元素。
  2. TestMyPriorityBlockingQueue類是用於測試的主類:

    • 首先定義了AlertTask類實現Comparable接口,用於表示告警任務並定義其優先級比較規則,根據給定的優先級數值來確定任務的優先級高低。
    • 創建了自定義的基於優先級的阻塞隊列實例,啓動了生產者線程和消費者線程,生產者線程生成不同優先級的告警任務並放入隊列,消費者線程從隊列中獲取告警任務並模擬處理(簡單打印),展示了在監控系統場景下該優先級阻塞隊列的基本使用方式。

這個案例可以幫助你理解基於優先級的阻塞隊列原理和實現思路。Get 到了麼,有任何疑問可以關注威哥愛編程私信給我。

3. 同步隊列(Synchronous Queue)

  • SynchronousQueue

    • 特點:它是一種特殊的隊列,內部沒有實際的存儲容量,每插入一個任務必須等待有線程來獲取並執行這個任務,反之,線程來獲取任務時,如果沒有任務可用,線程會被阻塞等待任務提交。這種隊列更像是一種任務傳遞的媒介,直接將任務從提交者傳遞到執行線程手上,保證了任務的即時處理,不存在任務排隊等待的情況。
    • 適用場景:適用於要求任務提交後能立即被執行,不允許有任務等待堆積的場景,比如在一些對實時性要求極高的交互場景中,像在線實時交易系統中處理下單請求,希望下單任務能馬上被線程處理,而不是先放入隊列等待,以保障交易的及時性和流暢性。

來看一個案例代碼:

以下是一個使用Java實現的簡單同步隊列(SynchronousQueue)類似功能的代碼示例,重點體現了其核心特性,即每插入一個任務必須等待有線程來獲取並執行這個任務,反之,線程來獲取任務時,如果沒有任務可用,線程會被阻塞等待任務提交,示例代碼如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

// 自定義的簡單同步隊列實現
public class MySynchronousQueue<E> {

    private E element; // 用於存放當前要傳遞的元素
    private boolean hasElement = false; // 標記是否有元素存在

    private final Lock lock = new ReentrantLock();
    private final Condition isEmpty = lock.newCondition();
    private final Condition isFull = lock.newCondition();

    // 向隊列中插入元素的方法
    public void put(E e) throws InterruptedException {
        lock.lock();
        try {
            // 如果已經有元素了,阻塞等待元素被取走
            while (hasElement) {
                isFull.await();
            }
            element = e;
            hasElement = true;
            // 通知等待獲取元素的線程,有元素可獲取了
            isEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    // 從隊列中獲取元素的方法
    public E take() throws InterruptedException {
        lock.lock();
        try {
            // 如果沒有元素,阻塞等待元素被放入
            while (!hasElement) {
                isEmpty.await();
            }
            E result = element;
            hasElement = false;
            // 通知等待插入元素的線程,可以插入新元素了
            isFull.signal();
            return result;
        } finally {
            lock.unlock();
        }
    }
}

來來來,寫一個簡單的測試類,模擬在線實時交易系統中處理下單請求這樣的高實時性場景下使用這個自定義的同步隊列:

public class TestMySynchronousQueue {
    public static void main(String[] args) {
        MySynchronousQueue<String> queue = new MySynchronousQueue<>();

        // 模擬生產者線程,不斷產生下單請求並放入隊列
        Thread producerThread = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    String orderRequest = "Order Request " + i;
                    queue.put(orderRequest);
                    System.out.println("Produced: " + orderRequest);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        // 模擬消費者線程,從隊列中獲取下單請求並處理(這裏簡單打印模擬處理)
        Thread consumerThread = new Thread(() -> {
            while (true) {
                try {
                    String order = queue.take();
                    System.out.println("Consumed: " + order);
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        producerThread.start();
        consumerThread.start();

        try {
            producerThread.join();
            consumerThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

在上述代碼中:

  1. MySynchronousQueue類是核心的自定義同步隊列實現:

    • 使用一個變量element來臨時存放要傳遞的元素,通過hasElement變量來標記當前是否有元素存在於隊列中(其實它內部沒有真正的隊列存儲結構,只是起到一個元素傳遞的作用)。
    • 利用ReentrantLock進行併發控制,並配合Condition對象isEmptyisFull來實現當沒有元素時阻塞獲取元素的線程、有元素時阻塞插入元素的線程的功能。
    • put方法用於向隊列中插入元素,當已經有元素存在(即hasElementtrue)時,插入線程會等待直到元素被取走,插入元素後會通知等待獲取元素的線程。
    • take方法用於從隊列中獲取元素,當沒有元素(即hasElementfalse)時,獲取線程會等待直到有元素被放入,獲取元素後會通知等待插入元素的線程。
  2. TestMySynchronousQueue類是用於測試的主類:

    • 創建了自定義的同步隊列實例,啓動了生產者線程和消費者線程,生產者線程不斷生成下單請求(模擬)並放入隊列,消費者線程從隊列中取出請求並模擬處理(簡單打印),展示了在高實時性要求場景下該同步隊列的基本使用方式。

通過以上案例的學習,幫助咱們理解其基本的同步隊列原理和實現思路。

最後

不同的線程池隊列有着各自的特點和適用場景,在實際使用線程池時,需要根據具體的業務需求、系統資源狀況以及對任務執行順序、響應時間等方面的要求,合理選擇相應的隊列來構建線程池,以實現高效的任務處理。 關注威哥愛編程,學習編程不迷茫,關注威哥愛編程,代碼世界任縱橫。

user avatar yuhuashi_584a46acea21f 头像 xuxueli 头像 alienzhou 头像 thinkerdjx 头像 hulaxingxingxing 头像 niumingxin 头像
点赞 6 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.