服務通常需要考慮速度和容量限制,增強系統的魯棒性。
背景
筆者曾負責過某公司內公眾號服務開發。公眾號接口服務接收到用户的推送請求後會構造公眾號消息並寫入消息隊列,路由服務異步接收到消息後進行消息存儲後,再交由推送服務向用户推送消息。基本流程如下圖所示:
消息存儲過程:
- 路由服務發起消息存儲請求,並將消息緩存到本地;
- 存儲服務成功存儲消息後異步發送成功通知;
- 路由服務接收到成功通知後從本地緩存獲取消息內容後進行後續推送處理;
問題
若存儲服務異常,系統會出現什麼問題?
- 路由服務使用local cache臨時存儲消息。當存儲服務異常時,若不加限制,路由服務極有可能導致內存溢出,路由服務不可用;
- 路由服務發起消息存儲請求為異步過程,很有可能會一直消費MQ裏的消息,導致存儲服務承受更大的服務壓力。同時會存在消息可能丟失的風險;
方案
基於信號量實現限制容量的本地緩存。容量大小為信號量個數,當路由服務發起消息存儲請求時,信號量減1。當路由服務接收到存儲成功通知後,信號量加1。
- 存儲服務正常時,容量限制機制不會起作用,服務性能不會受到影響;
- 存儲服務異常時,本地緩存的容量會越來越小。最後再無可用的信號量時,服務會阻塞等待。此時不再對消息隊列進行消費。既避免了服務OOM的狀況,也降低了服務繼續惡化的可能;
實施
基於信號量實現的限容數據結構BlockingHashMap
public class BlockingHashMap<K, V> {
private static final int DEFAULT_MAX_AVAILABLE = 1000;
private final ConcurrentHashMap<K, V> inmap = new ConcurrentHashMap<>(DEFAULT_MAX_AVAILABLE);
private Semaphore sem;
public BlockingHashMap() {
this(DEFAULT_MAX_AVAILABLE);
}
public BlockingHashMap(int permits) {
sem = new Semaphore(permits);
}
public V put(K key, V value) {
boolean wasAdded = false;
try {
sem.acquire();
V v = inmap.putIfAbsent(key, value);
if (v != null) {
return v;
}
wasAdded = true;
} catch (Exception e) {
} finally {
if (!wasAdded) {
// 若添加失敗,需要釋放信號量
sem.release();
}
}
return value;
}
public V remove(K key) {
V value = inmap.remove(key);
if (value != null) {
// 只有當成功移除元素時才釋放信號量
sem.release();
}
return value;
}
}
本文章為轉載內容,我們尊重原作者對文章享有的著作權。如有內容錯誤或侵權問題,歡迎原作者聯繫我們進行內容更正或刪除文章。