線程停止
stop方法
stop 方法雖然可以停止線程,但它已經是不建議使用的廢棄方法了,這一點可以通過 Thread 類中的源碼發現,stop 源碼如下:
stop 方法是被 @Deprecated 修飾的不建議使用的過期方法,並且在註釋的第一句話就説明了 stop 方法為非安全的方法。
原因在於它在終止一個線程時會強制中斷線程的執行,不管run方法是否執行完了,並且還會釋放這個線程所持有的所有的鎖對象。這一現象會被其它因為請求鎖而阻塞的線程看到,使他們繼續向下執行。這就會造成數據的不一致。
比如銀行轉賬,從A賬户向B賬户轉賬500元,這一過程分為三步,第一步是從A賬户中減去500元,假如到這時線程就被stop了,那麼這個線程就會釋放它所取得鎖,然後其他的線程繼續執行,這樣A賬户就莫名其妙的少了500元而B賬户也沒有收到錢。這就是stop方法的不安全性。
設置標誌位
如果線程的run方法中執行的是一個重複執行的循環,可以提供一個標記來控制循環是否繼續
class FlagThread extends Thread {
// 自定義中斷標識符
public volatile boolean isInterrupt = false;
@Override
public void run() {
// 如果為 true -> 中斷執行
while (!isInterrupt) {
// 業務邏輯處理
}
}
}
但自定義中斷標識符的問題在於:線程中斷的不夠及時。因為線程在執行過程中,無法調用 while(!isInterrupt) 來判斷線程是否為終止狀態,它只能在下一輪運行時判斷是否要終止當前線程,所以它中斷線程不夠及時,比如以下代碼:
class InterruptFlag {
// 自定義的中斷標識符
private static volatile boolean isInterrupt = false;
public static void main(String[] args) throws InterruptedException {
// 創建可中斷的線程實例
Thread thread = new Thread(() -> {
while (!isInterrupt) { // 如果 isInterrupt=true 則停止線程
System.out.println("thread 執行步驟1:線程即將進入休眠狀態");
try {
// 休眠 1s
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread 執行步驟2:線程執行了任務");
}
});
thread.start(); // 啓動線程
// 休眠 100ms,等待 thread 線程運行起來
Thread.sleep(100);
System.out.println("主線程:試圖終止線程 thread");
// 修改中斷標識符,中斷線程
isInterrupt = true;
}
}
輸出:我們期望的是:線程執行了步驟 1 之後,收到中斷線程的指令,然後就不要再執行步驟 2 了,但從上述執行結果可以看出,使用自定義中斷標識符是沒辦法實現我們預期的結果的,這就是自定義中斷標識符,響應不夠及時的問題。
interrupted中斷
這種方式需要在while循環中判斷使用
使用 interrupt 方法可以給執行任務的線程,發送一箇中斷線程的指令,它並不直接中斷線程,而是發送一箇中斷線程的信號,把是否正在中斷線程的主動權交給代碼編寫者。相比於自定義中斷標識符而然,它能更及時的接收到中斷指令,如下代碼所示:
public static void main(String[] args) throws InterruptedException {
// 創建可中斷的線程實例
Thread thread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
System.out.println("thread 執行步驟1:線程即將進入休眠狀態");
try {
// 休眠 1s
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("thread 線程接收到中斷指令,執行中斷操作");
// 中斷當前線程的任務執行
break;
}
System.out.println("thread 執行步驟2:線程執行了任務");
}
});
thread.start(); // 啓動線程
// 休眠 100ms,等待 thread 線程運行起來
Thread.sleep(100);
System.out.println("主線程:試圖終止線程 thread");
// 修改中斷標識符,中斷線程
thread.interrupt();
}
輸出:
從上述結果可以看出,線程在接收到中斷指令之後,立即中斷了線程,相比於上一種自定義中斷標識符的方法來説,它能更及時的響應中斷線程指令。
利用interruptedException
這種方式 不 需要在while循環中判斷使用
如果線程因為執行join(),sleep或者wait()而進入阻塞狀態,此時想要停止它,可以調用interrupt(),程序會拋出interruptedException異常。可以利用這個異常終止線程
public void run() {
System.out.println(this.getName() + "start");
int i=0;
//while (!Thread.interrupted()){
while (!Thread.currentThread().isInterrupted()){
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
//e.printStackTrace();
System.out.println("中斷線程");
break;//通過識別到異常來中斷
}
System.out.println(this.getName() + " "+ i);
i++;
}
System.out.println(this.getName() + "end");
}
Executor 的中斷操作
調用 Executor 的 shutdown() 方法會等待線程都執行完畢之後再關閉,但是如果調用的是 shutdownNow() 方法,則相當於調用每個線程的 interrupt() 方法。
以下使用 Lambda 創建線程,相當於創建了一個匿名內部線程。
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
try {
Thread.sleep(2000);
System.out.println("Thread run");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdownNow();
System.out.println("Main run");
}
Main run
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at ExecutorInterruptExample.lambda$main$0(ExecutorInterruptExample.java:9)
at ExecutorInterruptExample$$Lambda$1/1160460865.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
如果只想中斷 Executor 中的一個線程,可以通過使用 submit() 方法來提交一個線程,它會返回一個 Future<?> 對象,通過調用該對象的 cancel(true) 方法就可以中斷線程。
Future<?> future = executorService.submit(() -> {
// ..
});
future.cancel(true);
線程之間的協作
當多個線程可以一起工作去解決某個問題時,如果某些部分必須在其它部分之前完成,那麼就需要對線程進行協調。
join()
案例
在線程中調用另一個線程的 join() 方法,會將當前線程掛起,而不是忙等待,直到目標線程結束。
對於以下代碼,雖然 b 線程先啓動,但是因為在 b 線程中調用了 a 線程的 join() 方法,b 線程會等待 a 線程結束才繼續執行,因此最後能夠保證 a 線程的輸出先於 b 線程的輸出。
public class JoinExample {
private class A extends Thread {
@Override
public void run() {
System.out.println("A");
}
}
private class B extends Thread {
private A a;
B(A a) {
this.a = a;
}
@Override
public void run() {
try {
a.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("B");
}
}
public void test() {
A a = new A();
B b = new B(a);
b.start();
a.start();
}
}
public static void main(String[] args) {
JoinExample example = new JoinExample();
example.test();
}
A
B
原理
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {//檢查線程是否存活,只要線程還沒結束,主線程就會一直阻塞
wait(0);//這裏的wait調用的本地方法。
}
} else {//等待一段指定的時間
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
從源碼來看,實際上join方法就是調用了wait方法來使得線程阻塞,一直到線程結束運行。注意到,join方法前的synchronized修飾符,它相當於:
public final void join(long millis){
synchronized(this){
//代碼塊
}
}
也就是説加鎖的對象即調用這個鎖的線程對象,在main()方法中即為t1,持有這個鎖的是主線程即main()方法,也就是説代碼相當於如下:
//t1.join()前的代碼
synchronized (t1) {
// 調用者線程進入 t1 的 waitSet 等待, 直到 t1 運行結束
while (t1.isAlive()) {
t1.wait(0);
}
}
//t1.join()後的代碼
也因此主線程進入等待隊列,直到 t1 線程結束。
這裏可能會有很多人會有疑惑,為什麼t1.wait了,阻塞的不是t1,而是主線程?
實際上,如果要阻塞t1,那麼就應該在t1的run 方法裏進行阻塞,如在run方法裏寫wait();(當然還有suspend方法,這屬於非Java層面,另説)
而這裏的 wait 方法被調用以後,是讓持有鎖的線程進入等待隊列,即主線程調用,因此 t1 線程並不會被阻塞,阻塞的是主線程。
也就是説,join方法是一個同步方法,當主線程調用t1.join()方法時,主線程先獲得了t1對象的鎖,隨後進入方法,調用了t1對象的wait()方法,使主線程進入了t1對象的等待池。
那麼問題在於,這裏只看到了wait方法,卻並沒有看到notify或者是notifyAll方法,那麼主線程在那裏被喚醒呢?
這裏參考jvm的代碼:
static void ensure_join(JavaThread* thread) {
Handle threadObj(thread, thread->threadObj());
ObjectLocker lock(threadObj, thread);
hread->clear_pending_exception();
//這一句中的TERMINATED表示這是線程結束以後運行的
java_lang_Thread::set_thread_status(threadObj(), java_lang_Thread::TERMINATED);
//這裏會清楚native線程,isAlive()方法會返回false
java_lang_Thread::set_thread(threadObj(), NULL);
//thread就是當前線程,調用這個方法喚醒等待的線程。
lock.notify_all(thread);
hread->clear_pending_exception();
}
其實是jvm虛擬機中存在方法lock.notify_all(thread),在t1線程結束以後,會調用該方法,最後喚醒主線程。
所以簡化一下,流程即:
wait() notify() notifyAll()
調用 wait() 使得線程等待某個條件滿足,線程在等待時會被掛起,當其他線程的運行使得這個條件滿足時,其它線程會調用 notify() 或者 notifyAll() 來喚醒掛起的線程。
它們都屬於 Object 的一部分,而不屬於 Thread。
只能用在同步方法synchronized或者同步控制塊中使用,否則會在運行時拋出 IllegalMonitorStateExeception。
使用 wait() 掛起期間,線程會釋放鎖。這是因為,如果沒有釋放鎖,那麼其它線程就無法進入對象的同步方法或者同步控制塊中,那麼就無法執行 notify() 或者 notifyAll() 來喚醒掛起的線程,造成死鎖。
public class WaitNotifyExample {
public synchronized void before() {
System.out.println("before");
notifyAll();
}
public synchronized void after() {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after");
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
WaitNotifyExample example = new WaitNotifyExample();
executorService.execute(() -> example.after());
executorService.execute(() -> example.before());
}
before
after
wait() 和 sleep() 的區別
-
wait() 是 Object 的方法,而 sleep() 是 Thread 的靜態方法;
-
wait() 會釋放鎖,sleep() 不會。
await() signal() signalAll()
java.util.concurrent 類庫中提供了 Condition 類來實現線程之間的協調,可以在 Condition 上調用 await() 方法使線程等待,其它線程調用 signal() 或 signalAll() 方法喚醒等待的線程。相比於 wait() 這種等待方式,await() 可以指定等待的條件,因此更加靈活。
使用 Lock 來獲取一個 Condition 對象。
public class AwaitSignalExample {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void before() {
lock.lock();
try {
System.out.println("before");
condition.signalAll();
} finally {
lock.unlock();
}
}
public void after() {
lock.lock();
try {
condition.await();
System.out.println("after");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
AwaitSignalExample example = new AwaitSignalExample();
executorService.execute(() -> example.after());
executorService.execute(() -> example.before());
}
before
after
線程中的異常處理
Runnable中異常如何被吞掉
Runnable 接口的 run() 方法不允許拋出任何被檢查的異常(checked exceptions),只能處理或拋出運行時異常(unchecked exceptions)。當在 run() 方法內發生異常時,如果沒有顯式地捕獲和處理這些異常,它們通常會在執行該 Runnable 的線程中被“吞掉”,即異常會導致線程終止,但不會影響其他線程的執行。
public void uncaughtException(Thread t, Throwable e) {
if (parent != null) {
parent.uncaughtException(t, e);
} else {
Thread.UncaughtExceptionHandler ueh =
Thread.getDefaultUncaughtExceptionHandler();
if (ueh != null) {
ueh.uncaughtException(t, e);
} else if (!(e instanceof ThreadDeath)) {
System.err.print("Exception in thread \""
+ t.getName() + "\" ");
e.printStackTrace(System.err);
}
}
}
解決方案:
-
在run方法中顯示的捕獲異常
public void run() { try { // 可能拋出異常的代碼 } catch (Exception e) { // 記錄日誌或處理異常 throw new RuntimeException(e); } } -
為創建的線程設置一個
UncaughtExceptionHandlerThread t = new Thread(() -> { int i = 1 / 0; }, "t1"); t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { logger.error('---', e); } }); -
使用
Callable代替Runnable,Callable的call方法允許拋出異常,然後可以通過提交給ExecutorService返回的Future來捕獲和處理這些異常ExecutorService executor = Executors.newFixedThreadPool(1); Future<?> future = executor.submit(() -> { // 可能拋出異常的代碼 }); try { future.get(); // 這裏會捕獲到Callable中的異常 } catch (ExecutionException e) { Throwable cause = e.getCause(); // 獲取原始異常 }
Callable中異常如何被吞掉
class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("===> 開始執行callable");
int i = 1 / 0; //異常的地方
return "callable的結果";
}
}
public class CallableAndRunnableTest {
public static void main(String[] args) {
System.out.println(" =========> main start ");
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
Future<String> submit = threadPoolExecutor.submit(new MyCallable());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" =========> main end ");
}
}
運行結果
=========> main start
===> 開始執行callable
=========> main end
源碼如下:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
RunableFuture<T> 是個接口,但是它繼承了Runnable 接口 , 實現類是 FutureTask ,因此就需要看下 FutureTask裏的run方法 是不是和 構造時的Callable 有關係:
public void run() {
// 狀態不屬於初始狀態的情況下,不進行後續邏輯處理
// 那也就是run 方法只能執行一次
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
//
boolean ran;
try {
// 執行 Callable 裏的 call 方法 ,將結果存入result變量中
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// call 方法異常 , 記錄下異常結果
setException(ex);
}
// call 方法正常執行完畢 ,進行結果存儲
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
接下來就要看,如果存儲正常結果的set(result)方法 和存儲異常結果的 setException(ex) 方法
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
這兩個代碼都做了一個操作,就是將正常結果result 和 異常結果 exception 都賦值給了 outcome 這個變量 。
接着再看下future的get方法
//這裏有必須看下Task的結束時的狀態,如果正常結束,狀態為 NORMAL , 異常結果,狀態為EXCEPTIONAL 。 看下幾個狀態的定義,如下:
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
// NORMAL(2) 、EXCEPTIONAL(3) 都大於 COMPLETING(1),所以Task結束之後,不會走該if
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 重點: 返回結果
return report(s);
}
private V report(int s) throws ExecutionException {
// 之前正常結果或者異常都存放在Object outcomme 中了
Object x = outcome;
// 正常返回
if (s == NORMAL)
return (V)x;
// EXCEPTIONAL(3) 小於 CANCELLED(4) ,所以不會走該if分支,直接後續的throw 拋異常的邏輯
if (s >= CANCELLED)
throw new CancellationException();
// 不等於NORMAL 且 大於等於 CANCELLED , 再結合 調用 report(int s ) 之前也做了state 的過濾
//到這一步,那隻能是EXCEPTIONAL(3)
throw new ExecutionException((Throwable)x);
}
因此可以通過get方法獲取到異常結果