數十萬定時任務,如何高效觸發定時和超時
項目產品中,大家都會有"定時任務"和"定時超時"的需求,初始階段,我們基本都是用少數的一些timer,即使是任務量越來越大的時候,我們就難免維護著大量的timer,或者進行了大量低效的掃描。
定時任務使用場景:當訂單一直處于未支付狀態時,如何及時的關閉訂單(已經使用)
如何定期檢查處于退款狀態的訂單是否已經退款成功(后期重構使用)
設計方案:
- 整個Redis當做消息池,以KV形式存儲消息
- 使用ZSET做優先隊列,按照Score維持優先級
- 使用LIST結構,以先進先出的方式消費
- ZSET和LIST存儲消息地址(對應消息池的每個KEY)
- 使用定時器維護路由
- 根據TTL規則實現消息延遲
咱們公司現階段就是使用的這套方法:
1.新增一個job,會job_pool中插入一條數據,記錄了業務方消費方。也會在bucket插入一條記錄,記錄執行的時間戳
2.搬運線程會去bucket中查找哪些執行時間戳的RunTimeMillis比現在的時間小,將這些記錄全部刪除;同時會解析出每個任務的Topic是什么,然后將這些任務PUSH到TOPIC對應的列表queue中
3每個topic的list都會有一個監聽線程去批量獲取list中的待消費數據,獲取到的數據全部扔給這個topic的消費線程池
4.消費線程池執行會去job_pool查找數據結構,返回給回調結構,執行回調方法。
圖片
待優化的內容:
- 目前只有一個Queue隊列存放消息,當需要消費的消息大量堆積后,會影響消息通知的時效。改進的辦法是,開啟多個Queue,進行消息路由,再開啟多個消費線程進行消費,提供吞吐量
- 消息沒有進行持久化,存在風險,后續會將消息持久化到MongoDB中
一般來說還有什么其他方法實現這類需求呢?
“輪詢掃描法”
1.用一個Map<uid, last_packet_time>來記錄每一個uid最近一次請求時間last_packet_time
2.當某個用戶uid有請求包來到,實時更新這個Map
3.啟動一個timer,當Map中不為空時,輪詢掃描這個Map,看每個uid的last_packet_time是否超過30s,如果超過則進行超時處理
“多timer觸發法”
1.用一個Map<uid, last_packet_time>來記錄每一個uid最近一次請求時間last_packet_time
2.當某個用戶uid有請求包來到,實時更新這個Map,并同時對這個uid請求包啟動一個timer,30s之后觸發
3.每個uid請求包對應的timer觸發后,看Map中,查看這個uid的last_packet_time是否超過30s,如果超過則進行超時處理
方案一:只啟動一個timer,但需要輪詢,效率較低
方案二:不需要輪詢,但每個請求包要啟動一個timer,比較耗資源
ZSet(有序集合)數據結構來實現
- 創建ZSet:首先,你需要創建一個ZSet數據結構,其中每個訂單將作為一個成員,其分數將表示訂單的創建時間戳。你可以使用Redis等支持ZSet的數據庫來實現。
- 添加訂單:每當用戶創建新訂單時,將訂單添加到ZSet中,其中成員是訂單ID,分數是訂單的創建時間戳。
- 定時檢查訂單:定期(例如,每分鐘)執行一個程序或定時任務來檢查ZSet中的訂單。你可以使用程序來查詢ZSet,找到創建時間超過一定時間閾值的訂單,表示它們長時間未支付。
- 取消訂單:對于那些長時間未支付的訂單,可以將其從ZSet中刪除,并執行取消訂單的操作(例如,將訂單狀態設置為"已取消")。
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;
import java.util.Set;
public class OrderCancellationSystem {
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost"); // 連接到本地Redis服務器
// 模擬添加訂單
addOrder(jedis, "Order1");
addOrder(jedis, "Order2");
// 定時任務,每分鐘檢查訂單并自動取消
while (true) {
cancelLongUnpaidOrders(jedis);
try {
Thread.sleep(60000); // 等待一分鐘再次檢查
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void addOrder(Jedis jedis, String orderId) {
long currentTime = System.currentTimeMillis();
jedis.zadd("orders", currentTime, orderId);
}
public static void cancelOrder(String orderId) {
// 執行取消訂單操作,例如更新訂單狀態
System.out.println("Cancelling order: " + orderId);
}
public static void cancelLongUnpaidOrders(Jedis jedis) {
long expirationTime = System.currentTimeMillis() - 3600 * 1000; // 60分鐘前的時間戳
Set<Tuple> longUnpaidOrders = jedis.zrangeByScoreWithScores("orders", "-inf", String.valueOf(expirationTime));
for (Tuple order : longUnpaidOrders) {
String orderId = order.getElement();
cancelOrder(orderId);
jedis.zrem("orders", orderId); // 從ZSet中刪除已取消的訂單
}
}
}