博客 / 詳情

返回

線程如何停止?線程之間如何協作?線程之間的異常如何處理?

線程停止

stop方法

stop 方法雖然可以停止線程,但它已經是不建議使用的廢棄方法了,這一點可以通過 Thread 類中的源碼發現,stop 源碼如下:

stop 方法是被 @Deprecated 修飾的不建議使用的過期方法,並且在註釋的第一句話就説明了 stop 方法為非安全的方法。

原因在於它在終止一個線程時會強制中斷線程的執行,不管run方法是否執行完了,並且還會釋放這個線程所持有的所有的鎖對象。這一現象會被其它因為請求鎖而阻塞的線程看到,使他們繼續向下執行。這就會造成數據的不一致。

比如銀行轉賬,從A賬户向B賬户轉賬500元,這一過程分為三步,第一步是從A賬户中減去500元,假如到這時線程就被stop了,那麼這個線程就會釋放它所取得鎖,然後其他的線程繼續執行,這樣A賬户就莫名其妙的少了500元而B賬户也沒有收到錢。這就是stop方法的不安全性。

設置標誌位

如果線程的run方法中執行的是一個重複執行的循環,可以提供一個標記來控制循環是否繼續

class FlagThread extends Thread {
    // 自定義中斷標識符
    public volatile boolean isInterrupt = false;
    @Override
    public void run() {
        // 如果為 true -> 中斷執行
        while (!isInterrupt) {
            // 業務邏輯處理
        }
    }
}

但自定義中斷標識符的問題在於:線程中斷的不夠及時。因為線程在執行過程中,無法調用 while(!isInterrupt) 來判斷線程是否為終止狀態,它只能在下一輪運行時判斷是否要終止當前線程,所以它中斷線程不夠及時,比如以下代碼:

class InterruptFlag {
    // 自定義的中斷標識符
    private static volatile boolean isInterrupt = false;

    public static void main(String[] args) throws InterruptedException {
        // 創建可中斷的線程實例
        Thread thread = new Thread(() -> {
            while (!isInterrupt) { // 如果 isInterrupt=true 則停止線程
                System.out.println("thread 執行步驟1:線程即將進入休眠狀態");
                try {
                    // 休眠 1s
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("thread 執行步驟2:線程執行了任務");
            }
        });
        thread.start(); // 啓動線程

        // 休眠 100ms,等待 thread 線程運行起來
        Thread.sleep(100);
        System.out.println("主線程:試圖終止線程 thread");
        // 修改中斷標識符,中斷線程
        isInterrupt = true;
    }
}

輸出:我們期望的是:線程執行了步驟 1 之後,收到中斷線程的指令,然後就不要再執行步驟 2 了,但從上述執行結果可以看出,使用自定義中斷標識符是沒辦法實現我們預期的結果的,這就是自定義中斷標識符,響應不夠及時的問題。

interrupted中斷

這種方式需要在while循環中判斷使用

使用 interrupt 方法可以給執行任務的線程,發送一箇中斷線程的指令,它並不直接中斷線程,而是發送一箇中斷線程的信號,把是否正在中斷線程的主動權交給代碼編寫者。相比於自定義中斷標識符而然,它能更及時的接收到中斷指令,如下代碼所示:

public static void main(String[] args) throws InterruptedException {
    // 創建可中斷的線程實例
    Thread thread = new Thread(() -> {
        while (!Thread.currentThread().isInterrupted()) {
            System.out.println("thread 執行步驟1:線程即將進入休眠狀態");
            try {
                // 休眠 1s
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println("thread 線程接收到中斷指令,執行中斷操作");
                // 中斷當前線程的任務執行
                break;
            }
            System.out.println("thread 執行步驟2:線程執行了任務");
        }
    });
    thread.start(); // 啓動線程

    // 休眠 100ms,等待 thread 線程運行起來
    Thread.sleep(100);
    System.out.println("主線程:試圖終止線程 thread");
    // 修改中斷標識符,中斷線程
    thread.interrupt();
}

輸出:

從上述結果可以看出,線程在接收到中斷指令之後,立即中斷了線程,相比於上一種自定義中斷標識符的方法來説,它能更及時的響應中斷線程指令。

利用interruptedException

這種方式 不 需要在while循環中判斷使用

如果線程因為執行join(),sleep或者wait()而進入阻塞狀態,此時想要停止它,可以調用interrupt(),程序會拋出interruptedException異常。可以利用這個異常終止線程

public void run() {
    System.out.println(this.getName() + "start");
    int i=0;
    //while (!Thread.interrupted()){
    while (!Thread.currentThread().isInterrupted()){
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            //e.printStackTrace();
            System.out.println("中斷線程");
            break;//通過識別到異常來中斷
        }
        System.out.println(this.getName() + " "+ i);
        i++;
    }
    System.out.println(this.getName() + "end");
}

Executor 的中斷操作

調用 Executor 的 shutdown() 方法會等待線程都執行完畢之後再關閉,但是如果調用的是 shutdownNow() 方法,則相當於調用每個線程的 interrupt() 方法。

以下使用 Lambda 創建線程,相當於創建了一個匿名內部線程。

public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> {
        try {
            Thread.sleep(2000);
            System.out.println("Thread run");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    executorService.shutdownNow();
    System.out.println("Main run");
}
Main run
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at ExecutorInterruptExample.lambda$main$0(ExecutorInterruptExample.java:9)
    at ExecutorInterruptExample$$Lambda$1/1160460865.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

如果只想中斷 Executor 中的一個線程,可以通過使用 submit() 方法來提交一個線程,它會返回一個 Future<?> 對象,通過調用該對象的 cancel(true) 方法就可以中斷線程。

Future<?> future = executorService.submit(() -> {
    // ..
});
future.cancel(true);

線程之間的協作

當多個線程可以一起工作去解決某個問題時,如果某些部分必須在其它部分之前完成,那麼就需要對線程進行協調。

join()

案例

在線程中調用另一個線程的 join() 方法,會將當前線程掛起,而不是忙等待,直到目標線程結束。

對於以下代碼,雖然 b 線程先啓動,但是因為在 b 線程中調用了 a 線程的 join() 方法,b 線程會等待 a 線程結束才繼續執行,因此最後能夠保證 a 線程的輸出先於 b 線程的輸出。

public class JoinExample {

    private class A extends Thread {
        @Override
        public void run() {
            System.out.println("A");
        }
    }

    private class B extends Thread {

        private A a;

        B(A a) {
            this.a = a;
        }

        @Override
        public void run() {
            try {
                a.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("B");
        }
    }

    public void test() {
        A a = new A();
        B b = new B(a);
        b.start();
        a.start();
    }
}
public static void main(String[] args) {
    JoinExample example = new JoinExample();
    example.test();
}
A
B

原理

public final synchronized void join(long millis)
throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }

    if (millis == 0) {
        while (isAlive()) {//檢查線程是否存活,只要線程還沒結束,主線程就會一直阻塞
            wait(0);//這裏的wait調用的本地方法。
        }
    } else {//等待一段指定的時間
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay);
            now = System.currentTimeMillis() - base;
        }
    }
}

從源碼來看,實際上join方法就是調用了wait方法來使得線程阻塞,一直到線程結束運行。注意到,join方法前的synchronized修飾符,它相當於:

public final void join(long millis){
 synchronized(this){
        //代碼塊
    }
}

也就是説加鎖的對象即調用這個鎖的線程對象,在main()方法中即為t1,持有這個鎖的是主線程即main()方法,也就是説代碼相當於如下:

//t1.join()前的代碼
synchronized (t1) {
 // 調用者線程進入 t1 的 waitSet 等待, 直到 t1 運行結束
 while (t1.isAlive()) {
  t1.wait(0);
 }
}
//t1.join()後的代碼

也因此主線程進入等待隊列,直到 t1 線程結束。

這裏可能會有很多人會有疑惑,為什麼t1.wait了,阻塞的不是t1,而是主線程?

實際上,如果要阻塞t1,那麼就應該在t1的run 方法裏進行阻塞,如在run方法裏寫wait();(當然還有suspend方法,這屬於非Java層面,另説)

而這裏的 wait 方法被調用以後,是讓持有鎖的線程進入等待隊列,即主線程調用,因此 t1 線程並不會被阻塞,阻塞的是主線程。

也就是説,join方法是一個同步方法,當主線程調用t1.join()方法時,主線程先獲得了t1對象的鎖,隨後進入方法,調用了t1對象的wait()方法,使主線程進入了t1對象的等待池。

那麼問題在於,這裏只看到了wait方法,卻並沒有看到notify或者是notifyAll方法,那麼主線程在那裏被喚醒呢?

這裏參考jvm的代碼:

static void ensure_join(JavaThread* thread) {

 Handle threadObj(thread, thread->threadObj());

 ObjectLocker lock(threadObj, thread);

 hread->clear_pending_exception();

 //這一句中的TERMINATED表示這是線程結束以後運行的
 java_lang_Thread::set_thread_status(threadObj(), java_lang_Thread::TERMINATED);

    //這裏會清楚native線程,isAlive()方法會返回false
    java_lang_Thread::set_thread(threadObj(), NULL);

 //thread就是當前線程,調用這個方法喚醒等待的線程。
 lock.notify_all(thread);

 hread->clear_pending_exception();

}

其實是jvm虛擬機中存在方法lock.notify_all(thread),在t1線程結束以後,會調用該方法,最後喚醒主線程。

所以簡化一下,流程即:

wait() notify() notifyAll()

調用 wait() 使得線程等待某個條件滿足,線程在等待時會被掛起,當其他線程的運行使得這個條件滿足時,其它線程會調用 notify() 或者 notifyAll() 來喚醒掛起的線程。

它們都屬於 Object 的一部分,而不屬於 Thread。

只能用在同步方法synchronized或者同步控制塊中使用,否則會在運行時拋出 IllegalMonitorStateExeception。

使用 wait() 掛起期間,線程會釋放鎖。這是因為,如果沒有釋放鎖,那麼其它線程就無法進入對象的同步方法或者同步控制塊中,那麼就無法執行 notify() 或者 notifyAll() 來喚醒掛起的線程,造成死鎖。

public class WaitNotifyExample {
    public synchronized void before() {
        System.out.println("before");
        notifyAll();
    }

    public synchronized void after() {
        try {
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after");
    }
}
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    WaitNotifyExample example = new WaitNotifyExample();
    executorService.execute(() -> example.after());
    executorService.execute(() -> example.before());
}
before
after

wait() 和 sleep() 的區別

  • wait() 是 Object 的方法,而 sleep() 是 Thread 的靜態方法;

  • wait() 會釋放鎖,sleep() 不會。

await() signal() signalAll()

java.util.concurrent 類庫中提供了 Condition 類來實現線程之間的協調,可以在 Condition 上調用 await() 方法使線程等待,其它線程調用 signal() 或 signalAll() 方法喚醒等待的線程。相比於 wait() 這種等待方式,await() 可以指定等待的條件,因此更加靈活。

使用 Lock 來獲取一個 Condition 對象。

public class AwaitSignalExample {
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void before() {
        lock.lock();
        try {
            System.out.println("before");
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void after() {
        lock.lock();
        try {
            condition.await();
            System.out.println("after");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    AwaitSignalExample example = new AwaitSignalExample();
    executorService.execute(() -> example.after());
    executorService.execute(() -> example.before());
}
before
after

線程中的異常處理

Runnable中異常如何被吞掉

Runnable 接口的 run() 方法不允許拋出任何被檢查的異常(checked exceptions),只能處理或拋出運行時異常(unchecked exceptions)。當在 run() 方法內發生異常時,如果沒有顯式地捕獲和處理這些異常,它們通常會在執行該 Runnable 的線程中被“吞掉”,即異常會導致線程終止,但不會影響其他線程的執行。

public void uncaughtException(Thread t, Throwable e) {
   if (parent != null) {
        parent.uncaughtException(t, e);
   } else {
        Thread.UncaughtExceptionHandler ueh =
            Thread.getDefaultUncaughtExceptionHandler();
        if (ueh != null) {
            ueh.uncaughtException(t, e);
        } else if (!(e instanceof ThreadDeath)) {
            System.err.print("Exception in thread \""
                             + t.getName() + "\" ");
            e.printStackTrace(System.err);
        }
    }
}

解決方案:

  1. 在run方法中顯示的捕獲異常

    public void run() {
        try {
            // 可能拋出異常的代碼
        } catch (Exception e) {
            // 記錄日誌或處理異常
            throw new RuntimeException(e);
        }
    }
    
  2. 為創建的線程設置一個UncaughtExceptionHandler

    Thread t = new Thread(() -> {
       int i = 1 / 0;
    }, "t1");
    t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
       @Override
       public void uncaughtException(Thread t, Throwable e) {
            logger.error('---', e);
       }
    });
    
  3. 使用Callable代替RunnableCallablecall方法允許拋出異常,然後可以通過提交給ExecutorService返回的Future來捕獲和處理這些異常

    ExecutorService executor = Executors.newFixedThreadPool(1);
    Future<?> future = executor.submit(() -> {
        // 可能拋出異常的代碼
    });
    
    try {
        future.get(); // 這裏會捕獲到Callable中的異常
    } catch (ExecutionException e) {
        Throwable cause = e.getCause(); // 獲取原始異常
    }
    

Callable中異常如何被吞掉

class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("===> 開始執行callable");
        int i = 1 / 0; //異常的地方
        return "callable的結果";
    }
}

public class CallableAndRunnableTest {

    public static void main(String[] args) {
        System.out.println(" =========> main start ");
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
        Future<String> submit = threadPoolExecutor.submit(new MyCallable());
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(" =========> main end ");
    }
}

運行結果

 =========> main start 
 ===> 開始執行callable
 =========> main end 

源碼如下:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

RunableFuture<T> 是個接口,但是它繼承了Runnable 接口 , 實現類是 FutureTask ,因此就需要看下 FutureTask裏的run方法 是不是和 構造時的Callable 有關係:

public void run() {
     // 狀態不屬於初始狀態的情況下,不進行後續邏輯處理
     // 那也就是run 方法只能執行一次
     if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                   null, Thread.currentThread()))
        return;
    try { 
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            // 
            boolean ran;
            try {
                // 執行 Callable 裏的 call 方法 ,將結果存入result變量中
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                 // call 方法異常 , 記錄下異常結果
                setException(ex);
            }
            // call 方法正常執行完畢 ,進行結果存儲
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

接下來就要看,如果存儲正常結果的set(result)方法 和存儲異常結果的 setException(ex) 方法

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

這兩個代碼都做了一個操作,就是將正常結果result 和 異常結果 exception 都賦值給了 outcome 這個變量 。

接着再看下future的get方法

//這裏有必須看下Task的結束時的狀態,如果正常結束,狀態為 NORMAL , 異常結果,狀態為EXCEPTIONAL 。 看下幾個狀態的定義,如下:  
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // NORMAL(2) 、EXCEPTIONAL(3) 都大於 COMPLETING(1),所以Task結束之後,不會走該if
    if (s <= COMPLETING)
         s = awaitDone(false, 0L);
    // 重點: 返回結果
    return report(s);
}

private V report(int s) throws ExecutionException {
    // 之前正常結果或者異常都存放在Object outcomme 中了
    Object x = outcome;
    // 正常返回
    if (s == NORMAL)
        return (V)x;
    // EXCEPTIONAL(3) 小於 CANCELLED(4) ,所以不會走該if分支,直接後續的throw 拋異常的邏輯
    if (s >= CANCELLED)
        throw new CancellationException();
    // 不等於NORMAL 且 大於等於 CANCELLED  ,  再結合 調用 report(int s ) 之前也做了state 的過濾
    //到這一步,那隻能是EXCEPTIONAL(3) 
    throw new ExecutionException((Throwable)x);
}

因此可以通過get方法獲取到異常結果

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.