博客 / 詳情

返回

netty系列之:可以自動通知執行結果的Future,有見過嗎?

簡介

在我的心中,JDK有兩個經典版本,第一個就是現在大部分公司都在使用的JDK8,這個版本引入了Stream、lambda表達式和泛型,讓JAVA程序的編寫變得更加流暢,減少了大量的冗餘代碼。

另外一個版本要早點,還是JAVA 1.X的時代,我們稱之為JDK1.5,這個版本引入了java.util.concurrent併發包,從此在JAVA中可以愉快的使用異步編程。

雖然先JDK已經發展到了17版本,但是併發這一塊的變動並不是很大。受限於JDK要保持穩定的需求,所以concurrent併發包提供的功能並不能完全滿足某些業務場景。所以依賴於JDK的包自行研發了屬於自己的併發包。

當然,netty也不例外,一起來看看netty併發包都有那些優勢吧。

JDK異步緣起

怎麼在java中創建一個異步任務,或者開啓一個異步的線程,每個人可能都有屬於自己的回答。

大家第一時間可能想到的是創建一個實現Runnable接口的類,然後將其封裝到Thread中運行,如下所示:

new Thread(new(RunnableTask())).start()

每次都需要new一個Thread是JDK大神們不可接受的,於是他們產生了一個將thread調用進行封裝的想法,而這個封裝類就叫做Executor.

Executor是一個interface,首先看一下這個interface的定義:

public interface Executor {

    void execute(Runnable command);
}

接口很簡單,就是定義了一個execute方法來執行傳入的Runnable命令。

於是我們可以這樣來異步開啓任務:

   Executor executor = anExecutor;
   executor.execute(new RunnableTask1());
   executor.execute(new RunnableTask2());

看到這裏,聰明的小夥伴可能就要問了,好像不對呀,Executor自定義了execute接口,好像跟異步和多線程並沒有太大的關係呀?

別急,因為Executor是一個接口,所以我們可以有很多實現。比如下面的直接執行Runnable,讓Runnable在當前線程中執行:

 class DirectExecutor implements Executor {
   public void execute(Runnable r) {
     r.run();
   }
 }

又比如下面的在一個新的線程中執行Runnable:

 class ThreadPerTaskExecutor implements Executor {
   public void execute(Runnable r) {
     new Thread(r).start();
   }
 }

又比如下面的將多個任務存放在一個Queue中,執行完一個任務再執行下一個任務的序列執行:

 class SerialExecutor implements Executor {
   final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
   final Executor executor;
   Runnable active;

   SerialExecutor(Executor executor) {
     this.executor = executor;
   }

   public synchronized void execute(final Runnable r) {
     tasks.offer(new Runnable() {
       public void run() {
         try {
           r.run();
         } finally {
           scheduleNext();
         }
       }
     });
     if (active == null) {
       scheduleNext();
     }
   }

   protected synchronized void scheduleNext() {
     if ((active = tasks.poll()) != null) {
       executor.execute(active);
     }
   }
 }

這些Executor都非常完美。但是他們都只能提交任務,提交任務之後就什麼都不知道了。這對於好奇的寶寶們是不可忍受的,因為我們需要知道執行的結果,或者對執行任務進行管控。

於是就有了ExecutorService。ExecutorService也是一個接口,不過他提供了shutdown方法來停止接受新的任務,和isShutdown來判斷關閉的狀態。

除此之外,它還提供了單獨調用任務的submit方法和批量調用任務的invokeAll和invokeAny方法。

既然有了execute方法,submit雖然和execute方法基本上執行了相同的操作,但是在方法參數和返回值上有稍許區別。

首先是返回值,submit返回的是Future,Future表示異步計算的結果。 它提供了檢查計算是否完成、等待其完成以及檢索計算結果的方法。 Future提供了get方法,用來獲取計算結果。但是如果調用get方法的同時,計算結果並沒有準備好,則會發生阻塞。

其次是submit的參數,一般來説只有Callable才會有返回值,所以我們常用的調用方式是這樣的:

<T> Future<T> submit(Callable<T> task);

如果我們傳入Runnable,那麼雖然也返回一個Future,但是返回的值是null:

Future<?> submit(Runnable task);

如果我又想傳入Runnable,又想Future有返回值怎麼辦呢?

古人告訴我們,魚和熊掌不可兼得!但是現在是2021年了,有些事情是可以發生改變了:

<T> Future<T> submit(Runnable task, T result);

上面我們可以傳入一個result,當Future中的任務執行完畢之後直接將result返回。

既然ExecutorService這麼強大,如何創建ExecutorService呢?

最簡單的辦法就是用new去創建對應的實例。但是這樣不夠優雅,於是JDK提供了一個Executors工具類,他提供了多種創建不同ExecutorService的靜態方法,非常好用。

netty中的Executor

為了兼容JDK的併發框架,雖然netty中也有Executor,但是netty中的Executor都是從JDK的併發包中衍生出來的。

具體而言,netty中的Executor叫做EventExecutor,他繼承自EventExecutorGroup:

public interface EventExecutor extends EventExecutorGroup 

而EventExecutorGroup又繼承自JDK的ScheduledExecutorService:

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor>

為什麼叫做Group呢?這個Group的意思是它裏面包含了一個EventExecutor的集合。這些結合中的EventExecutor通過Iterable的next方法來進行遍歷的。

這也就是為什麼EventExecutorGroup同時繼承了Iterable類。

然後netty中的其他具體Executor的實現再在EventExecutor的基礎之上進行擴展。從而得到了netty自己的EventExecutor實現。

Future的困境和netty的實現

那麼JDK中的Future會有什麼問題呢?前面我們也提到了JDK中的Future雖然保存了計算結果,但是我們要獲取的時候還是需要通過調用get方法來獲取。

但是如果當前計算結果還沒出來的話,get方法會造成當前線程的阻塞。

別怕,這個問題在netty中被解決了。

先看下netty中Future的定義:

public interface Future<V> extends java.util.concurrent.Future<V> 

可以看到netty中的Future是繼承自JDK的Future。同時添加了addListener和removeListener,以及sync和await方法。

先講一下sync和await方法,兩者都是等待Future執行結束。不同之處在於,如果在執行過程中,如果future失敗了,則會拋出異常。而await方法不會。

那麼如果不想同步調用Future的get方法來獲得計算結果。則可以給Future添加listener。

這樣當Future執行結束之後,會自動通知listener中的方法,從而實現異步通知的效果,其使用代碼如下:

EventExecutorGroup group = new DefaultEventExecutorGroup(4); // 4 threads
Future<?> f = group.submit(new Runnable() { ... });
f.addListener(new FutureListener<?> {
  public void operationComplete(Future<?> f) {
    ..
  }
});

還有一個問題,每次我們提交任務的時候,都需要創建一個EventExecutorGroup,有沒有不需要創建就可以提交任務的方法呢?

有的!

netty為那些沒有時間創建新的EventExecutorGroup的同志們,特意創建一個全局的GlobalEventExecutor,這是可以直接使用的:

GlobalEventExecutor.INSTANCE.execute(new Runnable() { ... });

GlobalEventExecutor是一個單線程的任務執行器,每隔一秒鐘回去檢測有沒有新的任務,有的話就提交到executor執行。

總結

netty為JDK的併發包提供了非常有用的擴展。大家可以直接使用。

本文已收錄於 http://www.flydean.com/46-netty-future-executor/

最通俗的解讀,最深刻的乾貨,最簡潔的教程,眾多你不知道的小技巧等你來發現!

歡迎關注我的公眾號:「程序那些事」,懂技術,更懂你!

user avatar vincehua 頭像 zhegemingzihaochang 頭像 zhongganqingdejinzhengu_cbvxch 頭像
3 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.