SpringCloud整合MQ實現消息總線服務,實戰講解!
一、背景介紹
在之前的文章中,我們介紹了服務配置中心高可用的玩法。了解到,每當修改配置文件內容,如果需要客戶端也同步更新,就需要手動調用/refresh接口,以便客戶端能獲取到最新的配置內容。
當客戶端越來越多的時候,通過人工進行處理顯然非常雞肋。有沒有一種更加高效的辦法,當手動調用其中一個客戶端的/refresh接口,其它的客戶端也自動更新?
答案是肯定的!
在 Spring Cloud 體系里,有一個叫做 Spring Cloud Bus 模塊,也被業界稱為消息總線。它可以將分布式系統內的節點以消息代理方式連接起來,開發者可以通過消息代理服務向其它節點傳輸數據的變更,例如配置文件的更改,也可以用于收集節點監控數據。其中常用的消息代理服務有 RabbitMQ 和 Kafka。
換言之,我們可以借助 Spring Cloud Bus 模塊來實現上文介紹的訴求,引入 Spring Cloud Bus 模塊后,客戶端獲取遠程配置文件的方式,可以用如下流程圖來描述。
圖片
熟悉 MQ 服務的同學可能一眼就看出來了,其原理就是借助 MQ 服務的發布與訂閱功能向其它節點進行廣播數據,從而實現客戶端自動刷新配置功能。
其交互流程可以用如下內容來概括。
- 1.當外部請求調用客戶端 A 的/refresh接口后,除了主動刷新配置意外,還會通過 Spring Cloud Bus 模塊,將刷新配置接口的指令數據發送到 MQ 服務器
- 2.MQ 服務器會將指令數據通過 Spring Cloud Bus 模塊推送給其它客戶端
- 3.客戶端 B、C 接收到最新的消息指令后,主動調用刷新配置服務,獲取最新的配置內容
下面我們通過具體的例子,結合之前介紹的知識,看看如何利用 Spring Cloud Bus 實現客戶端配置文件自動刷新的效果。
三、方案實踐
在此,我們采用 RabbitMQ 服務器來搭建消息總線,因此需要事先準備一臺可用的 RabbitMQ 服務器,具體的安裝教程之前有所介紹,大家也可以百度搜索一下,具體的安裝過程就不再重復撰述。
3.1、添加依賴
根據eureka-config-client復制一個服務消費者工程,命名為eureka-config-client-bus,并在pom.xml中引入spring-cloud-starter-bus-amqp依賴包,示例如下:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
3.2、添加配置文件
接著,在bootstrap.properties配置文件中添加消息代理相關的屬性信息,示例如下:
# 配置rabbitmq服務器地址
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
3.3、服務測試
然后,依次將eureka-server、eureka-config-server、eureka-config-client-bus (分別在不同的端口上,比如9022、9023) 服務啟動起來。訪問 eureka 可視化頁面,如果服務都正常,可以看到類似于如下的界面。
圖片
將遠程倉庫配置文件中的blog.name=hahaha修改成blog.name=hahaha123456,以便測試客戶端配置文件是否能自動更新。
接著,向其中一個客戶端發送一個/bus/refresh的 POST 請求。
**需要注意的是,這里的路徑是/bus/refresh,而不是/refresh**!
圖片
最后,在瀏覽器中重新訪問另一個客戶端讀取配置文件的接口,不意外的話,客戶端獲取的是最新的配置信息。
圖片
說明客戶端已經成功讀取到最新的配置內容。
查看客戶端的日志,也會看到類似于如下的信息。
Received remote refresh request. Keys refreshed [config.client.version, blog.name]
3.4、WebHook
可能有的同學會發出一個疑問,不可能每次修改倉庫的配置文件,自己都需要手動調用/bus/refresh接口吧。
實際上,GIT 里面有個 WebHooks 功能,每次 push 代碼后,我們可以利用它給遠程 HTTP URL 發送一個 POST 請求,以此省去手動調用的工作。
圖片
操作非常簡單,只需要將其中一個客戶端/bus/refresh接口地址添加進去即可。需要注意的是,這里的 HTTP URL 必須是一個能請求通過的公網地址哈!
四、升級版
在以上的方案中,雖然我們利用消息總線實現了手動刷新一個客戶端的配置文件更新,其它客戶端也同步跟著一起刷新的目的,但在實際的實踐過程中,發現還是有一些不便的地方。
例如客戶端因為業務的快速迭代會頻繁的發布服務,同時也會根據服務的并發量適度的增減服務實例數量,這種情況下,客戶端的 IP 和端口會經常發生變動,每次人工運維起來會很繁瑣。
因此,我們可以將上面的交互流程改變一下,由服務配置中心通過 Spring Cloud Bus 模塊向客戶端發送重刷配置文件的指令。
整個流程,可以用如下圖來描述。
圖片
因為服務配置中心基本上很少會去迭代,客戶端的 IP 和端口發生變動的可能性較小,由它向客戶端推送消息,運維的工作量可以顯著的下降。
改造的流程也很簡單,只需要兩步即可!
3.1、添加依賴
與上文類似,根據eureka-config-server復制一個服務消費者工程,命名為eureka-config-server-bus,并在pom.xml中引入spring-cloud-starter-bus-amqp依賴包,示例如下:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
</dependencies>
4.2、添加配置文件
接著,在application.properties中添加消息代理服務相關的屬性,示例如下:
spring.application.name=eureka-config-server
server.port=9020
# 配置git倉庫地址
spring.cloud.config.server.git.uri=https://gitee.com/pzblogs/config-demo
spring.cloud.config.server.git.username=
spring.cloud.config.server.git.password=
# 設置與Eureka Server交互的地址,多個地址可使用【,】分隔
eureka.client.serviceUrl.defaultZnotallow=http://localhost:8001/eureka/
# 關閉安全認證
management.security.enabled=false
# 配置rabbitmq服務器地址
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
4.3、服務測試
然后,依次將eureka-server、eureka-config-server-bus、eureka-config-client-bus服務啟動起來。
接著,修改倉庫中的配置文件內容,完成之后,以 POST 方式調用服務配置中心的刷新配置接口,例如http://localhost:9020/bus/refresh。
最后,在瀏覽器訪問客戶端http://localhost:9023/hello,不意外的話,能看到最新的信息。
圖片
五、小結
最后總結一下,當我們手動更新某個倉庫配置文件的時候,想要實現所有客戶端同時也自動更新配置,可以利用消息總線來實現節點之間數據的同步變更操作。
如果想要用 Kafka 來做消息代理服務,實現思路也類似,將bus-amqp換成bus-kafka,示例子如下
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
最后在配置文件中添加 Kafka 相關服務地址配置參數即可。