高并發高性能的定時器實現
前言
我們經常都會碰到延遲任務,定時任務這種需求。在網絡連接的場景中,常常會出現一些超時控制。隨著連接數量的增加,這些超時任務的數量往往也是很龐大的。實現對大量任務的超時管理并不是一個容易的事情。
幾種定時任務的實現
java.util.Timer
JDK 在 1.3 的時候引入了Timer數據結構用于實現定時任務。Timer的實現思路比較簡單,其內部有兩個主要屬性:
- TaskQueue:定時任務抽象類TimeTask的列表。
- TimerThread:用于執行定時任務的線程。
- private final TaskQueue queue = new TaskQueue();
- private final TimerThread thread = new TimerThread(queue);
Timer結構還定義了一個抽象類TimerTask并且繼承了Runnable接口。業務系統實現了這個抽象類的run方法用于提供具體的延時任務邏輯。
TaskQueue內部采用大頂堆的方式,依據任務的觸發時間進行排序。而TimerThread則以死循環的方式從TaskQueue獲取隊列頭,等待隊列頭的任務的超時時間到達后觸發該任務,并且將任務從隊列中移除。
Timer的數據結構和算法都很容易理解。所有的超時任務都首先進入延時隊列。后臺超時線程不斷的從延遲隊列中獲取任務并且等待超時時間到達后執行任務。延遲隊列采用大頂堆排序,在延遲任務的場景中有三種操作,分別是:添加任務,提取隊列頭任務,查看隊列頭任務。
查看隊列頭任務的事件復雜度是 O(1) 。而添加任務和提取隊列頭任務的時間復雜度都是 O(Logn) 。當任務數量較大時,添加和刪除的開銷也是比較大的。此外,由于Timer內部只有一個處理線程,如果有一個延遲任務的處理消耗了較多的時間,會對應的延遲后續任務的處理。
代碼如下:
- public static void main(String[] args) {
- Timer timer = new Timer();
- // 延遲 1秒 執行任務
- timer.schedule(
- new java.util.TimerTask() {
- @Override
- public void run() {
- System.out.println("延遲 1秒 執行任務"+System.currentTimeMillis());
- }
- }
- ,1000);
- timer.schedule(
- new java.util.TimerTask() {
- @Override
- public void run() {
- System.out.println("延遲 2秒 執行任務"+System.currentTimeMillis());
- }
- }
- ,2000);
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- timer.cancel();
- }
ScheduledThreadPoolExecutor
由于Timer只有一個線程用來處理延遲任務,在任務數量很多的時候顯然是不足夠的。在 JDK1.5 引入線程池接口ExecutorService后,也對應的提供了一個用于處理延時任務的ScheduledExecutorService子類接口。該接口內部也一樣使用了一個使用小頂堆進行排序的延遲隊列存放任務。線程池中的線程會在這個隊列上等待直到有任務可以提取。
整體來說,ScheduledExecutorService 區別于 Timer 的地方就在于前者依賴了線程池來執行任務,而任務本身會判斷是什么類型的任務,需要重復執行的在任務執行結束后會被重新添加到任務隊列。
而對于后者來說,它只依賴一個線程不停的去獲取隊列首部的任務并嘗試執行它,無論是效率上、還是安全性上都比不上前者。
ScheduledExecutorService的實現上有一些特殊,只有一個線程能夠提取到延遲隊列頭的任務,并且根據任務的超時時間進行等待。在這個等待期間,其他的線程是無法獲取任務的。這樣的實現是為了避免多個線程同時獲取任務,導致超時時間未到達就任務觸發或者在等待任務超時時間時有新的任務被加入而無法響應。
由于ScheduledExecutorService可以使用多個線程,這樣也緩解了因為個別任務執行時間長導致的后續任務被阻塞的情況。不過延遲隊列也是一樣采用小頂堆的排序方式,因此添加任務和刪除任務的時間復雜度都是 O(Logn) 。在任務數量很大的情況下,性能表現比較差。
代碼如下:
- public class ScheduledThreadPoolServiceTest {
- // 參數代表可以同時執行的定時任務個數
- private ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
- /**
- * schedule:延時2秒執行一次任務
- */
- public void task0() {
- service.schedule(() -> {
- System.out.println("task0-start");
- sleep(2);
- System.out.println("task0-end");
- }, 2, TimeUnit.SECONDS);
- }
- /**
- * scheduleAtFixedRate:2秒后,每間隔4秒執行一次任務
- * 注意,如果任務的執行時間(例如6秒)大于間隔時間,則會等待任務執行結束后直接開始下次任務
- */
- public void task1() {
- service.scheduleAtFixedRate(() -> {
- System.out.println("task1-start");
- sleep(2);
- System.out.println("task1-end");
- }, 2, 4, TimeUnit.SECONDS);
- }
- /**
- * scheduleWithFixedDelay:2秒后,每次延時4秒執行一次任務
- * 注意,這里是等待上次任務執行結束后,再延時固定時間后開始下次任務
- */
- public void task2() {
- service.scheduleWithFixedDelay(() -> {
- System.out.println("task2-start");
- sleep(2);
- System.out.println("task2-end");
- }, 2, 4, TimeUnit.SECONDS);
- }
- private void sleep(long time) {
- try {
- TimeUnit.SECONDS.sleep(time);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) {
- ScheduledThreadPoolServiceTest test = new ScheduledThreadPoolServiceTest();
- System.out.println("main start");
- test.task0();
- //test.task1();
- // test.task2();
- test.sleep(10);
- System.out.println("main end");
- }
- }
DelayQueue
Java 中還有個延遲隊列 DelayQueue,加入延遲隊列的元素都必須實現 Delayed 接口。延遲隊列內部是利用 PriorityQueue 實現的,所以還是利用優先隊列!Delayed 接口繼承了Comparable 因此優先隊列是通過 delay 來排序的。
Redis sorted set
Redis的數據結構Zset,同樣可以實現延遲隊列的效果,主要利用它的score屬性,redis通過score來為集合中的成員進行從小到大的排序。zset 內部是用跳表實現的。
跳表數據結構的示意圖:
總體上,跳躍表刪除操作的時間復雜度是O(logN)。
有沒有更高效的數據結構?
Timer 、ScheduledThreadPool 、 DelayQueue,總結的說下它們都是通過優先隊列來獲取最早需要執行的任務,因此插入和刪除任務的時間復雜度都為O(logn),并且 Timer 、ScheduledThreadPool 的周期性任務是通過重置任務的下一次執行時間來完成的。
但是由于新增任務和提取任務的時間復雜度都是 O(Logn) ,在任務數量很大,比如幾萬,十幾萬的時候,性能的開銷就變得很巨大。
問題就出在時間復雜度上,插入刪除時間復雜度是O(logn),那么假設頻繁插入刪除次數為 m,總的時間復雜度就是O(mlogn)
那么,是否存在新增任務和提取任務比 O(Log2n) 復雜度更低的數據結構呢?答案是存在的。在論文《Hashed and Hierarchical Timing Wheels》中設計了一種名為時間輪( Timing Wheels )的數據結構,這種結構在處理延遲任務時,其新增任務和刪除任務的時間復雜度降低到了 O(1) 。
時間輪算法
基本原理
見名知意,時間輪的數據結構很類似于我們鐘表上的數據指針。
時間輪用環形數組實現,數組的每個元素可以稱為槽,和 HashMap一樣稱呼。
槽的內部用雙向鏈表存著待執行的任務,添加和刪除的鏈表操作時間復雜度都是 O(1),槽位本身也指代時間精度,比如一秒掃一個槽,那么這個時間輪的最高精度就是 1 秒。
也就是說延遲 1.2 秒的任務和 1.5 秒的任務會被加入到同一個槽中,然后在 1 秒的時候遍歷這個槽中的鏈表執行任務。
任務插入
當有一個延遲任務要插入時間輪時,首先計算其延遲時間與單位時間的余值,從指針指向的當前槽位移動余值的個數槽位,就是該延遲任務需要被放入的槽位。
舉個例子,時間輪有8個槽位,編號為 0 ~ 7 。指針當前指向槽位 2 。新增一個延遲時間為 4 秒的延遲任務,4 % 8 = 4,因此該任務會被插入 4 + 2 = 6,也就是槽位6的延遲任務隊列。
時間槽位的實現
時間輪的槽位實現可以采用循環數組的方式達成,也就是讓指針在越過數組的邊界后重新回到起始下標。概括來說,可以將時間輪的算法描述為:
用隊列來存儲延遲任務,同一個隊列中的任務,其延遲時間相同。用循環數組的方式來存儲元素,數組中的每一個元素都指向一個延遲任務隊列。
有一個當前指針指向數組中的某一個槽位,每間隔一個單位時間,指針就移動到下一個槽位。被指針指向的槽位的延遲隊列,其中的延遲任務全部被觸發。
在時間輪中新增一個延遲任務,將其延遲時間除以單位時間得到的余值,從當前指針開始,移動余值對應個數的槽位,就是延遲任務被放入的槽位。
基于這樣的數據結構,插入一個延遲任務的時間復雜度就下降到 O(1) 。而當指針指向到一個槽位時,該槽位連接的延遲任務隊列中的延遲任務全部被觸發。
延遲任務的觸發和執行不應該影響指針向后移動的時間精確性。因此一般情況下,用于移動指針的線程只負責任務的觸發,任務的執行交由其他的線程來完成。比如,可以將槽位上的延遲任務隊列放入到額外的線程池中執行,然后在槽位上新建一個空白的新的延遲任務隊列用于后續任務的添加。
關于擴容
那假設現在要加入一個50秒后執行的任務怎么辦?這槽好像不夠啊?難道要加槽嘛?和HashMap一樣擴容?
假設要求精度為 1 秒,要能支持延遲時間為 1 天的延遲任務,時間輪的槽位數需要 60 × 60 × 24 = 86400 。這就需要消耗更多的內存。顯然,單純增加槽位數并不是一個好的解決方案。
常見有兩種方式:
通過增加輪次。50 % 8 + 1 = 3,即應該放在槽位是 3,下標是 2 的位置。然后 (50 - 1) / 8 = 6,即輪數記為 6。也就是說當循環 6 輪之后掃到下標的 2 的這個槽位會觸發這個任務。Netty 中的 HashedWheelTimer 使用的就是這種方式。
通過多層次。這個和我們的手表就更像了,像我們秒針走一圈,分針走一格,分針走一圈,時針走一格。
多層次時間輪就是這樣實現的。假設上圖就是第一層,那么第一層走了一圈,第二層就走一格。
可以得知第二層的一格就是8秒,假設第二層也是 8 個槽,那么第二層走一圈,第三層走一格,可以得知第三層一格就是 64 秒。
那么一格三層,每層8個槽,一共 24 個槽時間輪就可以處理最多延遲 512 秒的任務。
而多層次時間輪還會有降級的操作,假設一個任務延遲 500 秒執行,那么剛開始加進來肯定是放在第三層的,當時間過了 436 秒后,此時還需要 64 秒就會觸發任務的執行,而此時相對而言它就是個延遲 64 秒后的任務,因此它會被降低放在第二層中,第一層還放不下它。
再過個 56 秒,相對而言它就是個延遲 8 秒后執行的任務,因此它會再被降級放在第一層中,等待執行。
降級是為了保證時間精度一致性。Kafka內部用的就是多層次的時間輪算法。
降級過程:
本文轉載自微信公眾號「小汪哥寫代碼」,可以通過以下二維碼關注。轉載本文請聯系小汪哥寫代碼公眾號。