本文已收錄【修煉內功】躍遷之路
在上一篇文章JVM 細説線程中已經介紹了應用程序常見的一些線程模型,本篇就上篇提及的協程做簡單的介紹
談到併發/異步,首先想到的可能便是線程/進程,Java在近20年的發展中從JDK1.2之後便採用1:1線程模型,Java在核心類庫中提供了眾多異步API,可以使多線程應用發揮強大的併發能力並獲得不錯的性能
如今,在很多高併發的場景下(如I/O密集型)操作系統的線程調度成為了性能的瓶頸,往往cpu使用率及內存使用率還穩如泰山,但系統load已經堵到不行
那,協程能夠為I/O密集型的場景帶來什麼幫助?本篇就從Node.js的異步API聊起
總有人會説,協程其實就是線程,只不過是換了一種寫法的語法糖,就如同Java8中的Lambda表達式,也總有人會説Lambda表達式只不過是匿名類的語法糖而已(見Java8 Lambda究竟是不是匿名類的語法糖),然,非也
如上篇文章所述,N:M線程模型可以解決N:1模型中阻塞問題,同時也能充分利用CPU的多核優勢,這也是大部分協程實現的基礎
N可以理解為用户線程數,其數量根據業務邏輯需要而定,M可以理解為內核線程數,其數量固定(或相對固定),每一個用户線程都需要放到內核線程中才能執行,用户線程的調度由應用程序管理(甚至可以交由編程人員通過編寫程序管理)。而協程則可以理解為上述的用户線程,一種更為輕量級的線程
Node.js架構特點
為什麼偏偏選Node.js聊協程?這要從Node.js的架構特點説起~
Node.js是單線程麼?是,也不是~
Node.js使用事件驅動及非阻塞I/O實現異步模型
Node.js is a platform built on Chrome's JavaScript runtime for easily building fast, scalable network applications. Node.js uses an event-driven, non-blocking I/O model that makes it lightweight and efficient, perfect for data-intensive real-time applications that run across distributed devices.
APPLICATION為我們所編寫的應用層,其JS的解釋執行由V8引擎負責,這裏的執行線程只有一個,這也就是通常所説Node.js為單線程的原因,在編寫Node.js程序時沒有辦法創建子線程,同時如果有部分邏輯阻塞或者長時間運行,則會影響整個運行時
Node.js的高併發則依賴事件驅動(EVENT LOOP)及非阻塞I/O(LIBUV),在進行I/O操作時Node.js將任務交由UV線程池中的線程執行,並在事件隊列中註冊回調,在I/O操作完成時觸發回調繼續後續的動作,在整個I/O操作的過程中並不會阻塞JS的解釋執行
- Node.js的JavaScript解釋執行只有一個線程,阻塞或長時運行的邏輯會影響整個應用層的運行
- I/O操作交由UV線程池處理(通過addones也可以將CPU密集型的計算邏輯放到LIBUV線程池中執行),通過事件機制回調將結果返回給應用層處理
Node.js的工作線程數固定(可通過環境變量UV_THREADPOOL_SIZE指定),每個工作線程對應一個內核線程,工作線程數可以理解為N:M線程模型中的M
Node.js應用層的異步任務由開發人員編寫,每個異步任務可以理解為用户線程,任務數對應於N:M線程模型中的N
由於Node.js上述的特點(單執行線程,多工作線程),沒有過多的干擾,非常適合用來講述協程的概念及應用
異步編程
為了循序漸進地理解協程的概念,我們從異步的常規實現方式一一説起,來講述協程的演變過程(對常用異步編程無感的可直接跳到協程一節)
回調
將函數做為另一個函數的入參傳入,並由調用方在合適的時機(處理完成或失敗)進行調用
// 獲取我的信息
let request = $.get(
"user/info/me",
// 回調函數
function(data) {
/* do something */
}
);
// 2秒後取消請求(如果請求仍未返回)
setTimeout(
// 回調函數
function() {
if (!!request) {
request.abort()
}
},
2000
);
回調可以在一定程度上將主流程與異步邏輯分離,異步邏輯的處理不會阻塞主流程的執行,但回調也帶了一些問題
- 異步執行的結果無法有效地返回到主邏輯流程中(Java中可以使用
Future.get以阻塞方式等待異步結果) - 不良好的編程習慣,容易形成回調地獄
Callback Hell
如,將文件夾下的所有圖片拼接成一張圖片
fs.readdir(source, function(err, files) {
if (err) {
console.error('Error finding files: ', err);
} else {
files.forEach(function(filename, fileIndex) {
console.info(filename);
gm(source + filename).size(function(err, values) {
if (err) {
console.error('Error identifying file size: ', err);
} else {
console.info(filename + ' : ' + values);
aspect = values.width / values.height;
widths.forEach(function(width, widthIndex) {
height = Math.round(width / aspect);
console.info('resizing ' + filename + 'to ' + height + 'x' + height);
let destFile = dest + "w" + width + '_' + filename;
this.resize(width, height).write(destFile, function(err) {
if (err) {
console.error('Error writing file: ', err);
} else {
console.info('Writing file to: ', destFile);
}
});
});
}
});
});
}
});
大量的回調函數嵌套在一起,可閲讀性和可維護性都並不高
再如,多個http請求存在前後依賴關係,前一個請求的返回值作為後一個請求的參數
$.get("step/1", (data1) => {
$.get(`step/2/${data1}`, (data2) => {
$.get(`step/3/${data2}`, (data3) => {
/* do the final thing */
})
})
})
化解Callback Hell的方法有很多,其中最簡單的方式便是將代碼模塊化、扁平化
function step(url, then) {
$.get(url, (data) => {
then(data);
});
}
function doFinalThing(data) {
/* do the final thing */
}
// Thunk 化
function proxyStep(url, then) {
return (data) => {
step(`${url}/${data}`, then);
}
}
let step3Proxy = proxyStep('step/3', doFinalThing);
let step2Proxy = proxyStep('step/2', step3Proxy);
step('step/1', step2Proxy);
Thunk的概念在下文會有介紹,在將邏輯進行抽象化、模塊化之後,代碼則會變得清晰起來
事件
事件是回調的另一種形式,在代碼邏輯的分離上做的更為徹底
回調的事件化
// 定義事件
class EventGet {
constructor(url = '') {
this.url = url;
// 事件完成時的回調
this.onComplete = (data) => {}
}
// 觸發事件
emitGet() {
$.get(this.url, this.onComplete)
}
}
let step1 = new EventGet('step/1')
let step2 = new EventGet('step/2')
let step3 = new EventGet('step/3')
step1.onComplete = (data) => {
// step1完成時觸發step2
step2.url += `/${data}`;
step2.emitGet();
}
step2.onComplete = (data) => {
// step2完成時觸發step3
step3.url += `/${data}`;
step3.emitGet();
}
step3.onComplete = (data) => {
/* do the final thing */
}
// 觸發step1
step1.emitGet();
事件相比於單純的回調更為語義化,也更容易表達程序所要執行的邏輯
這裏是否讓你想起了Java中的CompletableFuture?但請再次注意,Node.js中應用層的解釋執行只有一個線程,每次GET請求並非創建了一個內核線程去執行,而是交給了UV線程池,由事件機制來回調onComplete函數處理請求的結果
其實,如果做過前端開發,隨處都可以看到回調或事件的使用
<nz-button-group>
<button nz-button nzType="default" (click)="cancle()">取消</button>
<button nz-button nzType="danger" (click)="delete()">刪除</button>
</nz-button-group>
function clickHandler() {
/* do something when clicking */
}
var btn = document.getElementById("btn");
btn.onclick = clickHandler;
btn.addEventListener("click", clickHandler, false);
Promise
Promise 是異步編程的一種解決方案,比傳統的解決方案(回調函數和事件)更合理、更強大
Promise有三個狀態,pending(進行中)、fulfilled(已成功)和rejected(已失敗),pending可以轉換為fulfilled或rejected其中之一,且狀態一旦轉換就不會再變
promise的使用詳見ex6-promise,這裏不贅述,如果將上述回調或事件的示例轉為promise的方式,可以編寫如下
function promiseGet(url) {
return new Promise((resolve, reject) => {
$.get(url, resolve, reject);
});
}
// 回調嵌套改為鏈式調用
promiseGet(
"step/1"
).then((data1) => {
return promiseGet(`step/2/${data1}`)
}).then((data2) => {
return promiseGet(`step/3/${data2}`)
}).then((data3) => {
/* do the final thing*/
}).catch((e) => {
/* handle exception */
}).finally(() => {
/* do finnaly */
});
promise可將前後有依賴關係的異步處理轉換為鏈式調用的形式,同樣是回調,卻可以大大避免Callback Hell,並使調用邏輯更加清晰
同時,promise還可以輕鬆編寫並行代碼
function promiseGet(url) {
return new Promise((resolve, reject) => {
$.get(url, resolve, reject);
});
}
Promise.all([
promiseGet("user/info/張三"),
promiseGet("user/info/李四"),
promiseGet("user/info/趙五")
]).then((users) => {
/* do something */
}).catch((e) => {
/* handle exception */
}).finally(() => {
/* do finnaly */
})
響應式
響應式編程作為近幾年很火的一種編程範式,以一種流(Stream)的方式處理數據,響應式的概念十分龐大,這裏不做詳述,以下以一個rxjs示例展示響應式編程如何解耦異步邏輯
function getOnObserver(url, observer) {
$.get(
'user/info/張三',
(data) => {
observer.next(data);
observer.complete();
},
(error) => {
observer.error(error)
}
);
}
// 創建流
let observable = Observable.create((observer) => {
getOnObserver('user/info/張三', observer);
getOnObserver('user/info/李四', observer);
getOnObserver('user/info/趙五', observer);
});
// 訂閲/消費流
observable.subscribe((data) => {
/* do something for each result */
});
響應式編程的威力遠不止此,流式處理有豐富的api(可簡單參考Java8中Stream API)、背壓保護策略等等,通過其事件回調機制可以在I/O密集型應用中一展身手
協程
在粗略瞭解了幾種常規的異步編程方式之後,從本節內容開始真正進入協程的範疇
generator
子程序(或者稱為函數),在所有語言中都是層級調用,嚴格遵循線程棧的入棧出棧,子程序調用總是一個入口一個返回,調用順序是明確的
而協程的調用和子程序不同,協程看上去也是子程序,但執行過程中協程內部可中斷,然後轉而執行別的子程序/協程,在適當的時候再返回來接着執行
generator 函數是 ES6 提供的一種異步編程解決方案,語法行為與傳統函數完全不同
function* helloGenerator() {
yield console.log('hello');
yield console.log('I\'m');
}
function* worldGenerator() {
yield console.log('world');
yield console.log('ManerFan');
}
let hello = helloGenerator();
let world = worldGenerator();
// 交替執行helloGenerator及worldGenerator
hello.next();
world.next();
hello.next();
world.next();
運行結果
hello
world
I'm
ManerFan
按照常理,在同一個線程中順序調用helloGenerator及worldGenerator,兩個函數均會按照調用順序完整的執行,按預期應該輸出
hello
I'm
world
ManerFan
在使用generator時,其next方法會在方法體內遇到yield關鍵字時暫停執行,交回該函數的執行權,類似於線程的掛起,因此generator也被稱之為暫停函數
generator函數可以在內部使用yield關鍵字標識暫停點,generator函數的暫停、恢復執行可由應用程序靈活控制(內核線程的調度由系統控制),這與傳統函數的執行規則完全不同,generator函數的調度權完全交給了應用層
yield關鍵字除了標識暫停點之外,還可以在恢復執行的時候傳值進來(generator更高階的用法詳見es6-generator)
function* foo(x) {
var y = 2 * (yield (x + 1));
var z = yield (y / 3);
return (x + y + z);
}
let f = foo(5);
let step1 = f.next();
console.log(step1); // { value:6, done:false }
let step2 = f.next(12);
console.log(step2); // { value:8, done:false }
let step3 = f.next(13);
console.log(step3); // { value:42, done:true }
不論事件還是Promise亦或響應式都離不開回調,事件將主流程與異步回調分離,Promise將異步回調轉為鏈式回調,響應式將異步回調轉為流式回調,當generator遇到異步回調會發生什麼?
以下,模擬定義$.get函數如下
let $ = {
get(url, callback) {
setTimeout(() => callback(url.substring(5)), 500);
}
}
以上文回調嵌套為例
// 回調方式
$.get("step/1", (data1) => {
$.get(`step/2/${data1}`, (data2) => {
$.get(`step/3/${data2}`, (data3) => {
/* do the final thing */
})
})
})
利用generator可暫停、可恢復的能力,可在異步回調邏輯中觸發恢復下一步的動作,並將當前的異步處理結果帶回,以此將回調嵌套拉平,將異步回調邏輯寫出同步的順滑感,我們稱之為異步邏輯的“同步化”(同步的寫法,異步的執行)
// 封裝異步調用
function get(url) {
$.get(url, (data) => {
// 觸發後續流程,並將數據代入後續流程
req.next(data)
})
}
// generator 異步邏輯同步化
function* asyncAsSync() {
// 同步的寫法,異步的執行
let result1 = yield get('step/1');
let result2 = yield get(`step/2/${result1}`);
let result3 = yield get(`step/3/${result2}`);
console.log(result3);
/* do the final thing */
}
// 生成generator
var req = asyncAsSync();
// 觸發一次
req.next();
// do something at the same time
console.log('do something at the same time when excute gets');
輸出
do something at the same time when excute gets
3/2/1
asyncAsSync函數中看似是同步的邏輯,實則每一個yield get()都是一次異步調用,異步的結果通過req.next()帶回,並且asyncAsSync函數的調用並不會阻塞最後一行console.log的執行
generator的自動執行
在使用generator的過程中其實並不是很方便,generator函數的暫停與恢復需要使用程序控制,這對於編寫程序來説門檻會提高,那有沒有一種方法可以自動的執行generator的next函數呢?
首先介紹generator自動執行使用的一種函數變形方式,柯里化(Thunk)
thunk函數的定義
// fn(arg1, arg2, arg3, ..., callback)
const thunk = function(fn) {
return function (...args) {
return function (callback) {
return fn.call(this, ...args, callback);
}
};
};
首先,被柯里化函數的入參滿足以下規則
- 由兩部分組成,參數及回調函數
- 參數可以有多個,回調函數只能有一個
- 回調函數在入參的最後一個位置
柯里化將函數拆解成了兩部分,一部分為只要參數入參的函數,一部分為只有回調函數入參的函數,其使用方法如下
// $.get為一函數,其入參為url及回調
// 對$.get進行柯里化
// getThunk為一函數,其入參為url
const getThunk = thunk($.get);
// getInfoMe為一函數,其入參為回調函數
const getInfoMe = getThunk('user/info/me');
getInfoMe(function(data) => {
/* do something */
})
對thunk實現比較完美的庫參見npm-thunkify
使用thunk實現generator的自動執行器
// 自動執行器
function co(fn) {
var gen = fn();
// nextCallBack(data)函數的執行,來自
// 1. 下方的首次顯示觸發
// 2. yield值的回調觸發
function nextCallBack(data) {
// result的值為一個接收回調函數的函數
var result = gen.next(data);
if (result.done) return;
// 執行result值中的函數,並將回調參數設置為nextCallBack(data),異步遞歸回調
// 當回調的時候繼續執行nextCallBack(data),並將執行結果代入
result.value(nextCallBack);
}
nextCallBack();
}
let getThunk = thunk($.get);
// generator 異步邏輯同步化
function* asyncAsSync() {
let result1 = yield getThunk('step/1');
let result2 = yield getThunk(`step/2/${result1}`);
let result3 = yield getThunk(`step/3/${result2}`);
console.log(result3);
/* do the final thing */
}
co(asyncAsSync);
// do something at the same time
console.log('do something at the same time when excute gets');
輸出
do something at the same time when excute gets
3/2/1
getThunk(url)的執行結果為一個函數,我們記為getStep(callback),該函數的入參為一個回調函數callback,而callbak即$.get的回調函數(這裏很重要,參見thunk定義)
關鍵在於co中的nextCallBack(data),我們記為co#nextCallBack(data)
co#nextCallBack(data)會觸發一次yield,yeild的結果即getThunk(url)的執行結果getStep(callback),而getStep(callback)的入參又被設為co#nextCallBack(data),執行異步遞歸回調
上述代碼的執行邏輯為
-
gen.next(data)執行
yield getThunk('step/1')得到result,其值為函數getStep(callback) -
result.value(nextCallBack)執行
getStep(callback),並將callback入參設置為co#nextCallBack -
getStep(co#nextCallBack)執行
$.get('step/1'),當數據返回時執行回調函數co#nextCallBack(data),其中data為$.get('step/1')的結果1 -
gen.next(1)執行
yield getThunk('step/2/1')得到函數getStep(callback) -
result.value(nextCallBack)執行
getStep(callback),並將callback入參設置為co#nextCallBack(data) -
getStep(co#nextCallBack)執行
$.get('step/2/1'),當數據返回時執行回調函數co#nextCallBack(data),其中data為$.get('step/2/1')的結果2/1 - 以此類推
generator的自動執行關鍵在於以下幾點
- 在
co#nextCallBack中觸發generator的next - yield的結果不再是具體的值,而是柯里化後的函數(接收回調函數入參的函數)
- 將上述柯里化後函數的入參設置為
co#nextCallBack,執行並遞歸回調 - 遞歸回調會執行
co#nextCallBack並將執行結果代入 - 繼續觸發generator的next,恢復generator的執行,並將上一步的直接結果代入
generator的實際應用(koa)
將generator的精髓用到極致的還要當屬koa(koa2已經使用async改寫,不再使用generator),它將http server端異步middleware的書寫體驗整個提升了一個層級
middleware類似於java servlet中的filter,其執行過程類似於剝洋葱
而當所有的middleware(包括核心core)都是異步的話,整個處理邏輯在各middleware之間的跳轉就變得複雜起來
koa使用generator的特性,巧妙實現了請求處理邏輯在各異步middleware間的靈活跳轉執行
以下,簡單模擬koa-middleware的實現邏輯
// 定義app
let app = {
middlewares: [],
core: function* (next) {
console.log("excute core!");
// yield 異步操作
yield* next;
},
// 將多個middleware組合成鏈式結構
compose(middlewares) {
function* noop() {}
return function* (next){
var i = middlewares.length;
var prev = next || noop();
var curr;
while (i--) {
curr = middlewares[i];
prev = curr.call(this, prev);
}
yield* prev;
}
},
// 添加middleware
use(middleware) {
this.middlewares.push(middleware);
},
run() {
let chain = this.compose([...this.middlewares, this.core]);
co(chain);
}
}
app.use(function* (next) {
console.log("before middleware1");
// yield 異步操作
yield* next;
console.log("after middleware1");
// yield 異步操作
});
app.use(function* (next) {
console.log("before middleware2");
// yield 異步操作
yield* next;
console.log("after middleware2");
// yield 異步操作
});
app.run();
輸出
before middleware1
before middleware2
excute core!
after middleware2
after middleware1
generator遇上Promise
以上,generator的自動執行依賴於thunk化,而thunk又非常生澀難懂,如果將generator與Promise結合,或許會更容易理解一些
function promiseGet(url) {
return new Promise((resolve, _) => {
$.get(url, resolve);
});
}
function * asyncAsSync() {
let result1 = yield promiseGet('step/1');
let result2 = yield promiseGet(`step/2/${result1}`);
let result3 = yield promiseGet(`step/3/${result2}`);
console.log(result3);
/* do the final thing */
}
function co(fn) {
var g = fn();
// nextCallBack(data)函數的執行,來自
// 1. 下方的首次顯示觸發
// 2. yield值的回調觸發
function nextCallBack(data){
// result的值為Promise對象
var result = g.next(data);
if (result.done) return result.value;
result.value.then(function(data){
// Promise的then回調中,遞歸執行nextCallBack(data),並將執行結果代入
nextCallBack(data);
});
}
nextCallBack();
}
co(asyncAsSync);
// do something at the same time
console.log('do something at the same time when excute gets');
對generator自動執行封裝較好的看tj大神的tj-co
async/await
generator(function*)? yield? thunk? co? 想使用generator實在不要再複雜,不過好在es2017開始提供了async/await
function promiseGet(url) {
return new Promise((resolve, _) => {
$.get(url, resolve);
});
}
// 異步代碼同步化
async function asyncAsSync() {
let result1 = await promiseGet('step/1');
let result2 = await promiseGet(`step/2/${result1}`);
let result3 = await promiseGet(`step/3/${result2}`);
console.log(result3);
/* do the final thing */
}
asyncAsSync();
// do something at the same time
console.log('do something at the same time when excute gets');
輸出
do something at the same time when excute gets
3/2/1
沒有了生澀的thunk,沒有了燒腦的自動執行器,代碼得以變得更加清爽
簡單來講,async其實就是generator的語法糖
- 使用
async替代generator的標星函數function* - 使用
await替代yield await後可跟普通值、普通函數及Promise對象async自帶自動執行器
async/await相比generator + thunk/Promise + co的方案,更加語義化,也更容易理解
藉助Promise的能力,還可以異步並行處理數據
function promiseGet(url) {
return new Promise((resolve, reject) => {
$.get(url, resolve, reject);
});
}
async function asyncRun() {
let names = await Promise.all([
promiseGet("user/info/張三"),
promiseGet("user/info/李四"),
promiseGet("user/info/趙五")
])
names.forEach((name) => console.log(name));
}
asyncRun().catch((e) => { /* handle exception */})
// do something at the same time
console.log('do something at the same time when excute gets');
輸出
do something at the same time when excute gets
張三
李四
趙五
更多async的用法詳見async/await
使用async/await的koa middleware處理邏輯可以簡單模擬如下
// 定義app
let app = {
middlewares: [],
core: async (next) => {
console.log("excute core!");
// await 異步操作
await next;
},
// 將多個middleware組合成鏈式結構
compose(middlewares) {
var i = middlewares.length;
var prev = Promise.resolve();
var curr;
while (i--) {
curr = middlewares[i];
prev = curr.bind(this, prev);
}
return prev;
},
// 添加middleware
use(middleware) {
this.middlewares.push(middleware);
},
run() {
let chain = this.compose([...this.middlewares, this.core]);
chain();
}
}
app.use(async (next) => {
console.log("before middleware1");
// await 異步操作
await next();
console.log("after middleware1");
// await 異步操作
});
app.use(async (next) => {
console.log("before middleware2");
// await 異步操作
await next();
console.log("after middleware2");
// await 異步操作
});
app.run();
輸出
before middleware1
before middleware2
excute core!
after middleware2
after middleware1
總結
- 傳統異步編程需要藉助同步阻塞等待、回調函數、事件等方式獲取異步執行結果
- generator(協程)可將子程序的暫停、恢復等調度權交給應用層,並且可以在同一個協程上下文中將子程序放到不同的內核線程中執行(端應用場景較多,將UI渲染與後台計算的執行線程隔離,避免後台計算阻塞UI渲染,造成假死)
- 使用自動執行器(generator + thunk + co / async + await + promise),可將回調編碼方式拉平,以同步的寫法編寫異步執行邏輯
- 使用N:M線程模型,固定(或相對固定)內核線程數,避免內核線程的創建、銷燬、調度、上下文切換等帶來的系統消耗,同時也打破了單進程可創建有限線程數的限制,以此提升系統吞吐率