簡單易用的消息隊列框架的設計與實現
1 背景介紹
消息隊列在互聯網領域里得到了廣泛的應用,它多應用在異步處理、模塊之間的解偶和高并發的消峰等場景,消息隊列中表現最好的當屬Apache開源項目Kafka,Kafka使用支持高并發的Scala語言開發,利用操作系統的緩存原理達到高性能,并且天生具有可分區,分布式的特點,而且有不同語言的客戶端,使用起來非常的方便。
Kclient是Kafka生產者客戶端和消費者客戶端的一個簡單易用的框架,它具有高效集成、高性能、高穩定的高級特點。
在繼續閱讀kclient的功能特性、架構設計和使用方法之前,讀者需要對Kafka進行基本的學習和了解。如果讀者是Kafka的初學者,而且英文還不錯,也可以直接參考Kafka官方在線文檔:Kafka 0.8.2 Documentation,如果對英文不感性趣,可以在網上搜索Kakfa的中文資料進行學習,內容需要涵蓋Kafka的使用向導以及利用操作系統緩存的“空中接力”、持久化、分片機制、高可用等原理。
本文包含了背景介紹、功能特性、架構設計、使用指南、API簡介、后臺監控和管理、消息處理機模板項目、以及性能壓測相關章節。如果你想使用kclient快速的構建Kafka處理機服務,請參考消息處理機模板項目章節; 如果你想了解kclient的其他使用方式、功能特性、監控和管理等,請參考功能特性、使用指南、API簡介、后臺監控和管理等章節; 如果你想更深入的理解kclient的架構設計和性能指標,請參考架構設計和性能壓測章節。
2 功能特性
2.1 簡單易用
簡化了Kafka客戶端API的使用方法, 特別是對消費端開發,消費端開發者只需要實現MessageHandler接口或者相關子類,在實現中處理消息完成業務邏輯,并且在主線程中啟動封裝的消費端服務器即可。它提供了各種常用的MessageHandler,框架自動轉換消息到領域對象模型或者JSON對象等數據結構,讓開發者更專注于業務處理。如果使用服務源碼注解的方式聲明消息處理機的后臺,可以將一個通用的服務方法直接轉變成具有完善功能的處理Kafka消息隊列的處理機,使用起來極其簡單,代碼看起來一目了然,在框架級別通過多種線程池技術保證了處理機的高性能。
在使用方面,它提供了多種使用方式:
- 直接使用Java API;
- 與Spring環境無縫集成;
- 服務源碼注解,通過注解聲明方式啟動Kafka消息隊列的處理機。
除此之外,它基于注解提供了消息處理機的模板項目,可以根據模板項目通過配置快速開發Kafka的消息處理機。
2.2 高性能
為了在不同的業務場景下實現高性能, 它提供不同的線程模型:
適合輕量級服務的同步線程模型;
適合IO密集型服務的異步線程模型(細分為所有消費者流共享線程池和每個流獨享線程池)。
另外,異步模型中的線程池也支持確定數量線程的線程池和線程數量可伸縮的線程池。
2.3 高穩定性
框架級別處理了常見的異常,計入錯誤日志,可用于錯誤手工恢復或者洗數據,并實現了優雅關機和重啟等功能。
3 架構設計
3.1 線程模型
1. 同步線程模型
在這種線程模型中,客戶端為每一個消費者流使用一個線程,每個線程負責從Kafka隊列里消費消息,并且在同一個線程里進行業務處理。我們把這些線程稱為消費線程,把這些線程所在的線程池叫做消息消費線程池。這種模型之所以在消息消費線程池處理業務,是因為它多用于處理輕量級別的業務,例如:緩存查詢、本地計算等。
2. 異步線程模型
在這種線程模型中,客戶端為每一個消費者流使用一個線程,每個線程負責從Kafka隊列里消費消息,并且傳遞消費得到的消息到后端的異步線程池,在異步線程池中處理業務。我們仍然把前面負責消費消息的線程池稱為消息消費線程池,把后面的異步線程池稱為異步業務線程池。這種線程模型適合重量級的業務,例如:業務中有大量的IO操作、網絡IO操作、復雜計算、對外部系統的調用等。
后端的異步業務線程池又細分為所有消費者流共享線程池和每個流獨享線程池。
1)所有消費者流共享線程池
所有消費者流共享線程池對比每個流獨享線程池,創建更少的線程池對象,能節省些許的內存,但是,由于多個流共享同一個線程池,在數據量較大的時候,流之間的處理可能互相影響。例如,一個業務使用2個區和兩個流,他們一一對應,通過生產者指定定制化的散列函數替換默認的key-hash, 實現一個流(區)用來處理普通用戶,另外一個流(區)用來處理VIP用戶,如果兩個流共享一個線程池,當普通用戶的消息大量產生的時候,VIP用戶只有少量,并且排在了隊列的后頭,就會產生餓死的情況。這個場景是可以使用多個topic來解決,一個普通用戶的topic,一個VIP用戶的topic,但是這樣又要多維護一個topic,客戶端發送的時候需要顯式的進行判斷topic目標,也沒有多少好處。
2)每個流獨享線程池
每個流獨享線程池使用不同的異步業務線程池來處理不同的流里面的消息,互相隔離,互相獨立,不互相影響,對于不同的流(區)的優先級不同的情況,或者消息在不同流(區)不均衡的情況下表現會更好,當然,創建多個線程池會多使用些許內存,但是這并不是一個大問題。
另外,異步業務線程池支持確定數量線程的線程池和線程數量可伸縮的線程池。
- 核心業務硬件資源有保證,核心服務有專享的資源池,或者線上流量可預測,請使用固定數量的線程池。
- 非核心業務一般混布,資源互相調配,線上流量不固定等情況請使用線程數量可伸縮的線程池。
3.2 異常處理
對于消息處理過程中產生的業務異常,當前在業務處理的上層捕捉了Throwable, 在專用的錯誤恢復日志中記錄出錯的消息,后續可根據錯誤恢復日志人工處理錯誤消息,也可重做或者洗數據。TODO:考慮實現異常Listener體系結構, 對異常處理實現監聽者模式,異常處理器可插拔等,默認打印錯誤日志。
由于默認的異常處理中,捕捉異常,在專用的錯誤回復日志中記錄錯誤,并且繼續處理下一個消息。考慮到可能上線失敗,或者上游消息格式出錯,導致所有消息處理都出錯,堆滿錯誤恢復日志的情況,我們需要訴諸于報警和監控系統來解決。
3.3 優雅關機
由于消費者本身是一個事件驅動的服務器,類似Tomcat,Tomcat接收HTTP請求返回HTTP響應,Consumer則接收Kafka消息,然后處理業務后返回,也可以將處理結果發送到下一個消息隊列。所以消費者本身是非常復雜的,除了線程模型,異常處理,性能,穩定性,可用性等都是需要思考點。既然消費者是一個后臺的服務器,我們需要考慮如何優雅的關機,也就是在消費者服務器在處理消息的時候,如果關機才能不導致處理的消息中斷而丟失。
優雅關機的重點在于解決如下3個問題:
- 如何知道JVM要退出?
- 如何阻止Daemon的線程在JVM退出被殺掉而導致消息丟失?
- 如果Worker線程在阻塞,如何喚起并退出?
第一個問題:如果一個后臺程序運行在控制臺的前臺,通過Ctrl + C可以發送退出信號給JVM,也可以通過kill -2 PS_IS 或者 kill -15 PS_IS發送退出信號,但是不能發送kill -9 PS_IS, 否則進程會無條件強制退出。JVM收到退出信號后,會調用注冊的鉤子,我們通過的注冊的JVM退出鉤子進行優雅關機。
第二個問題:線程分為Daemon線程和非Daemon線程,一個線程默認繼承父線程的Daemon屬性,如果當前線程池是由Daemon線程創建的,則Worker線程是Daemon線程。如果Worker線程是Daemon線程,我們需要在JVM退出鉤子中等待Worker線程完成當前手頭處理的消息,再退出JVM。如果不是Daemon線程,即使JVM收到退出信號,也得等待Worker線程退出后再退出,不會丟掉正在處理的消息。
第三個問題:在Worker線程從Kafka服務器消費消息的時候,Worker線程可能處于阻塞,這時需要中斷線程以退出,沒有消息被丟掉。在Worker線程處理業務時有可能有阻塞,例如:IO,網絡IO,在指定退出時間內沒有完成,我們也需要中斷線程退出,這時會產生一個InterruptedException, 在異常處理的默認處理器中被捕捉,并寫入錯誤日志,Worker線程隨后退出。
4 使用指南
kclient提供了三種使用方法,對于每一種方法,按照下面的步驟可快速構建Kafka生產者和消費者程序。
4.1 前置步驟
1.下載源代碼后在項目根目錄執行如下命令安裝打包文件到你的Maven本地庫。
- mvn install
2.在你的項目pom.xml文件中添加對kclient的依賴。
- <dependency>
- <groupId>com.robert.kafka</groupId>
- <artifactId>kclient-core</artifactId>
- <version>0.0.1</version>
- </dependency>
3.根據Kafka官方文檔搭建Kafka環境,并創建兩個Topic, test1和test2。
4.然后,從Kafka安裝目錄的config目錄下拷貝kafka-consumer.properties和kafka-producer.properties到你的項目類路徑下,通常是src/main/resources目錄。
4.2 Java API
Java API提供了最直接,最簡單的使用kclient的方法。
構建Producer示例:
- KafkaProducer kafkaProducer = new KafkaProducer("kafka-producer.properties", "test");
- for (int i = 0; i < 10; i++) {
- Dog dog = new Dog();
- dog.setName("Yours " + i);
- dog.setId(i);
- kafkaProducer.sendBean2Topic("test", dog);
- System.out.format("Sending dog: %d \n", i + 1);
- Thread.sleep(100);
- }
構建Consumer示例:
- DogHandler mbe = new DogHandler();
- KafkaConsumer kafkaConsumer = new KafkaConsumer("kafka-consumer.properties", "test", 1, mbe);
- try {
- kafkaConsumer.startup();
- try {
- System.in.read();
- } catch (IOException e) {
- e.printStackTrace();
- }
- } finally {
- kafkaConsumer.shutdownGracefully();
- }
- public class DogHandler extends BeanMessageHandler<Dog> {
- public DogHandler() {
- super(Dog.class);
- }
- protected void doExecuteBean(Dog dog) {
- System.out.format("Receiving dog: %s\n", dog);
- }
- }
4.3 Spring環境集成
kclient可以與Spring環境無縫集成,你可以像使用Spring Bean一樣來使用KafkaProducer和KafkaConsumer。
構建Producer示例:
- ApplicationContext ac = new ClassPathXmlApplicationContext("kafka-producer.xml");
- KafkaProducer kafkaProducer = (KafkaProducer) ac.getBean("producer");
- for (int i = 0; i < 10; i++) {
- Dog dog = new Dog();
- dog.setName("Yours " + i);
- dog.setId(i);
- kafkaProducer.send2Topic("test", JSON.toJSONString(dog));
- System.out.format("Sending dog: %d \n", i + 1);
- Thread.sleep(100);
- }
- <bean name="producer" class="com.robert.kafka.kclient.core.KafkaProducer" init-method="init">
- <property name="propertiesFile" value="kafka-producer.properties"/>
- <property name="defaultTopic" value="test"/>
- </bean>
構建Consumer示例:
- ApplicationContext ac = new ClassPathXmlApplicationContext(
- "kafka-consumer.xml");
- KafkaConsumer kafkaConsumer = (KafkaConsumer) ac.getBean("consumer");
- try {
- kafkaConsumer.startup();
- try {
- System.in.read();
- } catch (IOException e) {
- e.printStackTrace();
- }
- } finally {
- kafkaConsumer.shutdownGracefully();
- }
- public class DogHandler extends BeanMessageHandler<Dog> {
- public DogHandler() {
- super(Dog.class);
- }
- protected void doExecuteBean(Dog dog) {
- System.out.format("Receiving dog: %s\n", dog);
- }
- }
- <bean name="dogHandler" class="com.robert.kafka.kclient.sample.api.DogHandler" />
- <bean name="consumer" class="com.robert.kafka.kclient.core.KafkaConsumer" init-method="init">
- <property name="propertiesFile" value="kafka-consumer.properties" />
- <property name="topic" value="test" />
- <property name="streamNum" value="1" />
- <property name="handler" ref="dogHandler" />
- </bean>
4.4 服務源碼注解
kclient提供了類似Spring聲明式的編程方法,使用注解聲明Kafka處理器方法,所有的線程模型、異常處理、服務啟動和關閉等都由后臺服務自動完成,極大程度的簡化了API的使用方法,提高了開發者的工作效率。
注解聲明Kafka消息處理器:
- @KafkaHandlers
- public class AnnotatedDogHandler {
- @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test", streamNum = 1)
- @OutputProducer(propertiesFile = "kafka-producer.properties", defaultTopic = "test1")
- public Cat dogHandler(Dog dog) {
- System.out.println("Annotated dogHandler handles: " + dog);
- return new Cat(dog);
- }
- @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test1", streamNum = 1)
- public void catHandler(Cat cat) throws IOException {
- System.out.println("Annotated catHandler handles: " + cat);
- throw new IOException("Man made exception.");
- }
- @ErrorHandler(exception = IOException.class, topic = "test1")
- public void ioExceptionHandler(IOException e, String message) {
- System.out.println("Annotated excepHandler handles: " + e);
- }
- }
注解啟動程序:
- public static void main(String[] args) {
- ApplicationContext ac = new ClassPathXmlApplicationContext(
- "annotated-kafka-consumer.xml");
- try {
- System.in.read();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
注解Spring環境配置:
- <bean name="kclientBoot" class="com.robert.kafka.kclient.boot.kclientBoot" init-method="init"/>
- <context:component-scan base-package="com.robert.kafka.kclient.sample.annotation" />
5 API簡介
5.1 Producer API
KafkaProducer類提供了豐富的API來發送不同類型的消息,它支持發送字符串消息,發送一個普通的Bean,以及發送JSON對象等。在這些API中可以指定發送到某個Topic,也可以不指定而使用默認的Topic。對于發送的數據,支持帶Key值的消息和不帶Key值的消息。
發送字符串消息:
- public void send(String message);
- public void send2Topic(String topicName, String message);
- public void send(String key, String message);
- public void send2Topic(String topicName, String key, String message);
- public void send(Collection<String> messages);
- public void send2Topic(String topicName, Collection<String> messages);
- public void send(Map<String, String> messages);
- public void send2Topic(String topicName, Map<String, String> messages);
發送Bean消息:
- public <T> void sendBean(T bean);
- public <T> void sendBean2Topic(String topicName, T bean);
- public <T> void sendBean(String key, T bean);
- public <T> void sendBean2Topic(String topicName, String key, T bean);
- public <T> void sendBeans(Collection<T> beans);
- public <T> void sendBeans2Topic(String topicName, Collection<T> beans);
- public <T> void sendBeans(Map<String, T> beans);
- public <T> void sendBeans2Topic(String topicName, Map<String, T> beans);
發送JSON對象消息:
- public void sendObject(JSONObject jsonObject);
- public void sendObject2Topic(String topicName, JSONObject jsonObject);
- public void sendObject(String key, JSONObject jsonObject);
- public void sendObject2Topic(String topicName, String key, JSONObject jsonObject);
- public void sendObjects(JSONArray jsonArray);
- public void sendObjects2Topic(String topicName, JSONArray jsonArray);
- public void sendObjects(Map<String, JSONObject> jsonObjects);
- public void sendObjects2Topic(String topicName, Map<String, JSONObject> jsonObjects);
5.2 Consumer API
KafkaConsumer類提供了豐富的構造函數用來指定Kafka消費者服務器的各項參數,包括線程池策略,線程池類型,流數量等等。
使用PROPERTIES文件初始化:
- public KafkaConsumer(String propertiesFile, String topic, int streamNum, MessageHandler handler);
- public KafkaConsumer(String propertiesFile, String topic, int streamNum, int fixedThreadNum, MessageHandler handler);
- public KafkaConsumer(String propertiesFile, String topic, int streamNum, int fixedThreadNum, boolean isSharedThreadPool, MessageHandler handler);
- public KafkaConsumer(String propertiesFile, String topic, int streamNum, int minThreadNum, int maxThreadNum, MessageHandler handler);
- public KafkaConsumer(String propertiesFile, String topic, int streamNum, int minThreadNum, int maxThreadNum, boolean isSharedThreadPool,MessageHandler handler);
使用PROPERTIES對象初始化:
- public KafkaConsumer(Properties properties, String topic, int streamNum, MessageHandler handler);
- public KafkaConsumer(Properties properties, String topic, int streamNum, int fixedThreadNum, MessageHandler handler);
- public KafkaConsumer(Properties properties, String topic, int streamNum, int fixedThreadNum, boolean isSharedThreadPool, MessageHandler handler);
- public KafkaConsumer(Properties properties, String topic, int streamNum, int minThreadNum, int maxThreadNum, MessageHandler handler);
- public KafkaConsumer(Properties properties, String topic, int streamNum, int minThreadNum, int maxThreadNum, boolean isSharedThreadPool,MessageHandler handler);
5.3 消息處理器
消息處理器結構提供了一個基本接口,并且提供了不同的抽象類實現不同層次的功能,讓功能得到最大化的重用,并且互相解偶,開發者可以根據需求選擇某一個抽象類來繼承和使用。
接口定義:
- public interface MessageHandler {
- public void execute(String message);
- }
安全處理異常抽象類:
- public abstract class SafelyMessageHandler implements MessageHandler {
- public void execute(String message) {
- try {
- doExecute(message);
- } catch (Throwable t) {
- handleException(t, message);
- }
- }
- protected void handleException(Throwable t, String message) {
- for (ExceptionHandler excepHandler : excepHandlers) {
- if (t.getClass() == IllegalStateException.class
- && t.getCause() != null
- && t.getCause().getClass() == InvocationTargetException.class
- && t.getCause().getCause() != null)
- t = t.getCause().getCause();
- if (excepHandler.support(t)) {
- try {
- excepHandler.handle(t, message);
- } catch (Exception e) {
- log.error(
- "Exception hanppens when the handler {} is handling the exception {} and the message {}. Please check if the exception handler is configured properly.",
- excepHandler.getClass(), t.getClass(), message);
- log.error(
- "The stack of the new exception on exception is, ",
- e);
- }
- }
- }
- }
- }
- protected abstract void doExecute(String message);
面向類型的抽象類:
- public abstract class BeanMessageHandler<T> extends SafelyMessageHandler {...}
- public abstract class BeansMessageHandler<T> extends SafelyMessageHandler {...}
- public abstract class DocumentMessageHandler extends SafelyMessageHandler {...}
- public abstract class ObjectMessageHandler extends SafelyMessageHandler {...}
- public abstract class ObjectsMessageHandler extends SafelyMessageHandler {...}
5.4 消息處理器注解
正如上面使用指南第三部分服務源碼注解所講述的那樣,kclient可以通過注解來聲明Kafka消息處理器,kclient提供了@KafkaHandlers、@InputConsumer、@OutputProducer和@ErrorHandler等注解。
@KafkaHandlers:
- @Target({ ElementType.TYPE })
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- @Component
- public @interface KafkaHandlers {
- }
@InputConsumer:
- @Target({ ElementType.METHOD })
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- public @interface InputConsumer {
- String propertiesFile() default "";
- String topic() default "";
- int streamNum() default 1;
- int fixedThreadNum() default 0;
- int minThreadNum() default 0;
- int maxThreadNum() default 0;
- }
@OutputProducer:
- @Target({ ElementType.METHOD })
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- public @interface OutputProducer {
- String propertiesFile() default "";
- String defaultTopic() default "";
- }
@ErrorHandler:
- @Target({ ElementType.METHOD })
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- public @interface ErrorHandler {
- Class<? extends Throwable> exception() default Throwable.class;
- String topic() default "";
- }
6 消息處理機模板項目
6.1 快速開發向導
通過下面步驟可以快速開發Kafka處理機服務。
1.從本項目下載kclient-processor項目模板,并且根據業務需要修改pom.xml后導入Eclipse。
2.根據業務需要更改com.robert.kclient.app.handler.AnimalsHandler類名稱,并且根據業務需要修改處理器的注解。這里,可以導入業務服務對消息進行處理。
- @KafkaHandlers
- public class AnimalsHandler {
- @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test", streamNum = 1)
- @OutputProducer(propertiesFile = "kafka-producer.properties", defaultTopic = "test1")
- public Cat dogHandler(Dog dog) {
- System.out.println("Annotated dogHandler handles: " + dog);
- return new Cat(dog);
- }
- @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test1", streamNum = 1)
- public void catHandler(Cat cat) throws IOException {
- System.out.println("Annotated catHandler handles: " + cat);
- throw new IOException("Man made exception.");
- }
- @ErrorHandler(exception = IOException.class, topic = "test1")
- public void ioExceptionHandler(IOException e, String message) {
- System.out.println("Annotated excepHandler handles: " + e);
- }
- }
3.通過mvn package既可以打包包含Spring Boot功能的自啟動jar包。
4.通過java -jar kclient-processor.jar即可啟動服務。
6.2 后臺監控和管理
kclient模板項目提供了后臺管理接口來監控和管理消息處理服務。
1.歡迎詞 - 用來校驗服務是否啟動成功。
2.服務狀態 - 顯示處理器數量。
curl http://localhost:8080/status
3.重啟服務 - 重新啟動服務。
curl http://localhost:8080/restart
7 性能壓測
Benchmark應該覆蓋推送QPS、接收處理QPS以及單線程、多線程生產者的用例。
用例1: 輕量級服務同步線程模型和異步線程模型的性能對比。
用例2: 重量級服務同步線程模型和異步線程模型的性能對比。
用例3: 重量級服務異步線程模型中所有消費者流共享線程池和每個流獨享線程池的性能對比。
用例4: 重量級服務異步線程模型中每個流獨享線程池的對比的確定數量線程的線程池和線程數量可伸縮的線程池的性能對比。
由于筆者在發文的時候還沒有時間完成前面四種場景的壓測,暫時留給讀者親自動手,作為一個練習。
8 更多思考
盡管本文設計和實現的kclient項目提供了許多高級功能,并且使用起來方便,而且筆者在幾家公司里在線上進行了應用,已經發揮了不少的作用,但是,還有一些細節需要提高。
kclient處理器項目中管理Restful服務暫時只提供了獲得狀態的API,需要進行進一步的豐富,增加對線程池的監控,以及消息處理性能的監控。
當前注解@ErrorHandler里面的exception參數為必選,完全可以使用方法第一參數進行推導,簡化開發人員配置的工作。
模板項目還不完善,需要增加啟動和關閉腳本,這樣讀者可以直接拷貝使用。
盡管線上應用已經證明了kclient沒有性能問題,但是開發一款中間件系統是需要閉環的,需要盡快把性能壓測這塊昨晚并且形成壓測報表。
點擊《簡單易用的消息隊列框架的設計與實現》閱讀原文。
【本文為51CTO專欄作者“李艷鵬”的原創稿件,轉載可通過作者簡書號(李艷鵬)或51CTO專欄獲取聯系】