應用開發過程中,我們常常需要用到延時任務的地方,最近在工作遇到了一個需求,用UDP發送報文,發送後30s後要是還沒有收到回報報文,就對對於報文進行重發。
類似於訂單超時未支付取消訂單一樣,可以有很多解決方法,我這裏採用其中一種,java的延時隊列來實現。
用這篇筆記簡易記錄一下實現過程。
什麼是DelayQueue
DelayQueue 是按照元素的延時時間排序的隊列。元素必須實現 Delayed 接口,該接口定義了一個 getDelay 方法,用於返回元素的剩餘延時時間。
Delayed接口繼承了Comparable接口,所以延時隊列中的元素對象也必須要實現compareTo方法。
延時隊列在內部使用了一個優先級隊列(PriorityQueue)來實現,確保隊頭元素始終是剩餘延時時間最小的元素。
實現過程
1. 創建報文內容類
/**
* 報文內容類
* 報文類別號和流水號能確定唯一一條報文
*/
@Data
public class MessageInnerProtocol {
/**
* 報文類別號
*/
private Integer infoClass;
/**
* 流水號
*/
private Integer serialNo;
// 省略其他字段
// ......
}
2. 創建延時隊列元素對象
@Getter
public class MessageDelayed implements Delayed {
/**
* 報文內容
*/
private final MessageInnerProtocol message;
/**
* 計時開始時間
*/
private final long startTime;
/**
* 超時時間
*/
private static final long EXPIRE_TIME = 30 * 1000;
/**
* 構造函數
* @param message 報文內容
*/
public MessageDelayed(MessageInnerProtocol message) {
this.message = message;
this.startTime = System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit unit) {
long elapsedTime = System.currentTimeMillis() - startTime;
return unit.convert(System.currentTimeMillis() - elapsedTime, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
if(this == o){
return 0;
}
// 根據剩餘時間來進行排序
long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
if(diff == 0){
return 0;
}else if(diff < 0){
return -1;
}else {
return 1;
}
}
}
3 報文回報狀態枚舉類
/**
* 回報報文狀態
*/
public enum ResultMessageStatusEnums {
/**
* 等待報文回報中
*/
WAITING,
/**
* 成功收到報文回報,且校驗成功
*/
SUCCESS,
/**
* 收到回報報文,但是校驗發生錯誤
*/
FAIL;
}
4 創建線程不停的處理超時報文
public class MessageTimeoutDeal {
private static final int KNOWN_CAPACITY = 16;
/**
* 延時隊列存儲報文
*/
public static final DelayQueue<MessageDelayed> RESULT_MESSAGE_DELAY_QUEUE = new DelayQueue<>();
/**
* <infoClass, <serialNo, ResultMessageStatusEnums>>
* 用保報文類別和流水號,報文狀態來對報文進行處理
*/
public static final Map<Integer, Map<Integer, ResultMessageStatusEnums>> RESULT_MESSAGE_MAP = new ConcurrentHashMap<>(KNOWN_CAPACITY);
public static void main(String[] args) {
// 生成報文
MessageInnerProtocol messageInnerProtocol = new MessageInnerProtocol();
// 設置報文內容
messageInnerProtocol.setSerialNo(0x243);
// 流水號建議用 AtomicInteger, 測試簡易寫一下
messageInnerProtocol.setSerialNo(25);
Map<Integer, ResultMessageStatusEnums> resultMessageMap =
RESULT_MESSAGE_MAP.computeIfAbsent(messageInnerProtocol.getInfoClass(), k -> new ConcurrentHashMap<>(KNOWN_CAPACITY));
// 報文狀態放入map中
resultMessageMap.put(messageInnerProtocol.getSerialNo(), ResultMessageStatusEnums.WAITING);
// 將報文加入延時隊列
RESULT_MESSAGE_DELAY_QUEUE.add(new MessageDelayed(messageInnerProtocol));
// 創建線程去處理延時隊列中的任務
new Thread(() -> {
try {
while (true){
// 從延時隊列中獲取任務
MessageDelayed messageDelayed = RESULT_MESSAGE_DELAY_QUEUE.take();
MessageInnerProtocol message = messageDelayed.getMessage();
Map<Integer, ResultMessageStatusEnums> resultMessageStatusEnumsMap =
RESULT_MESSAGE_MAP.get(message.getInfoClass());
// 獲取到對應報文的回報狀態
ResultMessageStatusEnums resultMessageStatusEnums = resultMessageStatusEnumsMap.get(message.getSerialNo());
switch (resultMessageStatusEnums){
case WAITING:
case FAIL:
// 沒有收到回報報文或校驗和失敗
// 需要重發或其他流程
case SUCCESS:
resultMessageStatusEnumsMap.remove(message.getSerialNo());
break;
default:
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
}
}
測試就跳過了,需要報文處理類,對不同報文進行處理,包括回報報文,修改RESULT_MESSAGE_MAP中報文的狀態。
如果需要處理大量的延遲任務, 可以使用netty的時間輪。