Disruptor 高性能環形消息隊列應用,Log4j 2 也用到了這套技術
說到底,無論是晉升述職還是面試考察,編程技能的展現總是在那些技術的橫向對比和深度的了解運用。知其一,也知其二。一個場景的問題,往往也會對應著多種的解決方案,從沒有絕對的好和不好,都是是否適合而已。所以,往往技術越好的,也越低調,不那么咋咋呼呼的。
什么是柔性事務?
在分布式軟件系統架構設計中,所有的并發資源的競爭,都會往無鎖化、非獨占競爭,以及柔性事務設計。柔性事務用于替代傳統事務管理中(如ACID屬性:原子性、一致性、隔離性、持久性),在分布式架構系統中的使用場景。通過消息、補償,協調不同服務間的一致性。
圖片
那么在消息的使用中,除了有 MQ 消息,使用于微服務之間。還有本地消息,可以作用在各個領域間驅動流程。關于本地消息可以用,Spring 的監聽、Redis 發布訂閱、Guava EventBus 事件總線,這些內容在小傅哥博客 bugstack.cn 《路書》中有相關的案例。之后本節咱們介紹一個新的高性能組件 Disruptor 的使用。
一、關于 Disruptor
Disruptor 是一種高性能的并發框架,最初由 LMAX 開發,用于解決高吞吐量、低延遲的消息處理問題。它提供了一種無鎖的、有序的事件處理模型,非常適合處理需要高性能的場景。Disruptor 本身并不是用于實現事務的框架,而是一個事件處理器。因此,要在 Disruptor 上實現柔性事務,需要結合其事件處理能力與柔性事務的模式。
- 源碼:https://github.com/LMAX-Exchange/disruptor
- 文檔:https://lmax-exchange.github.io/disruptor/ - 谷歌瀏覽器右鍵點翻譯為中文。
二、實戰案例
1. 工程結構
小傅哥準備好了一份基于 Disruptor 事件消息的使用案例工程,你可以直接上手體現。
圖片
- app 是使用的啟動層、trigger 是提供接口、監聽消息、處理任務的觸發器層。
- 在這里我們通過 trigger 下的 event 包,監聽事件消息。之后把這個 XxxEventHandler 讓 app 層下的 Disruptor 進行實例化。
2. 引入POM
<!-- https://mvnrepository.com/artifact/com.lmax/disruptor -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
- 引入 disruptor pom 包。
3. 監聽消息
@Slf4j
public class XxxEventHandler implements EventHandler<XxxEventHandler.Message> {
@Override
public void onEvent(Message longEvent, long l, boolean b) throws Exception {
log.info("接收消息:{}", longEvent.getValue());
}
@Data
public static class Message {
private String value;
}
}
- 在 trigger 下 event 包內,加一個實現了 disruptor EventHandler 的監聽實現類,消息體類型我們定義到 XxxEventHandler 中,也就是 Message。具體生產使用的時候,按需調整。
- 這個接收消息的過程和使用 MQ 的方式是一樣的。
4. 實例化監聽
@Configuration
public class DisruptorConfig {
private final ExecutorService executor = Executors.newCachedThreadPool();
@Bean("xxxEventDisruptor")
public Disruptor<XxxEventHandler.Message> disruptor() {
// 環形隊列的大小,注意要是2的冪
int bufferSize = 1024;
// 創建Disruptor
Disruptor<XxxEventHandler.Message> disruptor = new Disruptor<>(XxxEventHandler.Message::new, bufferSize, executor);
// 連接事件處理器
disruptor.handleEventsWith(new XxxEventHandler());
// 開始Disruptor
disruptor.start();
return disruptor;
}
}
- 在 App 模塊下,有一個 config 專門的配置類,在這里配置下消息監聽。這個過程和我們之前使用的 Redis 發布訂閱是一樣的。
5. 推送消息(Test)
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class DisruptorTest {
@Resource
private Disruptor<XxxEventHandler.Message> xxxEventDisruptor;
@Test
public void test_publishEvent() throws InterruptedException {
for (int i = 0; i < 10; i++) {
xxxEventDisruptor.publishEvent((event, sequence) -> event.setValue("你好,我是 Disruptor Message"));
}
// 暫停 - 測試完手動關閉程序
new CountDownLatch(1).await();
}
}
24-10-26.11:55:55.827 [main ] INFO DisruptorTest - Starting DisruptorTest using Java 1.8.0_311 on MacBook-Pro.local with PID 92827 (started by fuzhengwei in /Users/fuzhengwei/1024/KnowledgePlanet/road-map/xfg-dev-tech-disruptor/xfg-dev-tech-app)
24-10-26.11:55:55.829 [main ] INFO DisruptorTest - The following 1 profile is active: "dev"
24-10-26.11:55:57.749 [main ] INFO DisruptorTest - Started DisruptorTest in 2.526 seconds (JVM running for 3.741)
24-10-26.11:55:58.125 [pool-2-thread-1 ] INFO XxxEventHandler - 接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128 [pool-2-thread-1 ] INFO XxxEventHandler - 接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128 [pool-2-thread-1 ] INFO XxxEventHandler - 接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128 [pool-2-thread-1 ] INFO XxxEventHandler - 接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128 [pool-2-thread-1 ] INFO XxxEventHandler - 接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128 [pool-2-thread-1 ] INFO XxxEventHandler - 接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128 [pool-2-thread-1 ] INFO XxxEventHandler - 接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128 [pool-2-thread-1 ] INFO XxxEventHandler - 接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128 [pool-2-thread-1 ] INFO XxxEventHandler - 接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128 [pool-2-thread-1 ] INFO XxxEventHandler - 接收消息:你好,我是 Disruptor Message
- 提供一個單測來測試消息推送,這樣你就可以監聽到消息了。
三、總結
在美團、京東、阿里,等各個大廠中都有很多這樣的組件使用,在美團發布過的文章中《高性能隊列——Disruptor》 還有一個對應的壓測數據。CPU:Intel Core i7-2720QM,JVM:Java 1.6.0_25 64-bit,OS:Ubuntu 11.04
- | ABQ | Disruptor |
Unicast: 1P – 1C | 4,057,453 | 22,381,378 |
Pipeline: 1P – 3C | 2,006,903 | 15,857,913 |
Sequencer: 3P – 1C | 2,056,118 | 14,540,519 |
Multicast: 1P – 3C | 260,733 | 10,860,121 |
Diamond: 1P – 3C | 2,082,725 | 15,295,197 |
- 依據并發競爭的激烈程度的不同,Disruptor比ArrayBlockingQueue吞吐量快4~7倍。
另外,Log4j 2 采用了 Disruptor(一種無鎖的線程間通信庫),提高吞吐量降低延遲。在生產使用中,大并發的系統注意 Log4j 版本。官網說明:https://logging.apache.org/log4j/2.12.x/manual/async.html
圖片
- 異步 Logger是 Log4j 2 中的新增功能。其目的是盡快從對 Logger.log 的調用返回到應用程序。您可以選擇使所有 Logger 異步,或使用同步和異步 Logger 的混合。使所有 Logger 異步將提供最佳性能,而混合使用則可為您提供更大的靈活性。
- LMAX Disruptor 技術。異步記錄器內部使用 Disruptor(一種無鎖的線程間通信庫)而不是隊列,從而實現更高的吞吐量和更低的延遲。
- 作為異步日志記錄器工作的一部分,異步附加器已得到增強,可以在批處理結束時(當隊列為空時)刷新到磁盤。這會產生與配置“immediateFlush=true”相同的結果,即所有收到的日志事件始終在磁盤上可用,但效率更高,因為它不需要在每個日志事件上都接觸磁盤。(異步附加器在內部使用 ArrayBlockingQueue,不需要類路徑上的 Disruptor jar。)