微服務(wù)架構(gòu):利用事件驅(qū)動(dòng)實(shí)現(xiàn)最終一致性
事務(wù)一致性
首先,我們來(lái)回顧一下ACID原則:
- Atomicity:原子性,改變數(shù)據(jù)狀態(tài)要么是一起完成,要么一起失敗
- Consistency:一致性,數(shù)據(jù)的狀態(tài)是完整一致的
- Isolation:隔離線,即使有并發(fā)事務(wù),互相之間也不影響
- Durability:持久性, 一旦事務(wù)提交,不可撤銷
在單體應(yīng)用中,我們可以利用關(guān)系型數(shù)據(jù)庫(kù)的特性去完成事務(wù)一致性,但是一旦應(yīng)用往微服務(wù)發(fā)展,根據(jù)業(yè)務(wù)拆分成不用的模塊,而且每個(gè)模塊的數(shù)據(jù)庫(kù)已經(jīng)分離開(kāi)了,這時(shí)候,我們要面對(duì)的就是分布式事務(wù)了,需要自己在代碼里頭完成ACID了。比較流行的解決方案有:兩階段提交、補(bǔ)償機(jī)制、本地消息表(利用本地事務(wù)和MQ)、MQ的事務(wù)消息(RocketMQ)。
CAP定理
1998年,加州大學(xué)的計(jì)算機(jī)科學(xué)家 Eric Brewer 提出,分布式系統(tǒng)有三個(gè)指標(biāo)。
- Consistency:一致性
- Availability:可用性
- Partition tolerance:分區(qū)容錯(cuò)
Eric Brewer 說(shuō),這三個(gè)指標(biāo)不可能同時(shí)做到。這個(gè)結(jié)論就叫做 CAP 定理。
微服務(wù)中,不同模塊之間使用的數(shù)據(jù)庫(kù)是不同的,不同模塊之間部署的服務(wù)去也有可能是不用的,那么分區(qū)容錯(cuò)是無(wú)法避免的,因?yàn)榉?wù)之間的調(diào)用不能保證百分百的沒(méi)問(wèn)題,所以系統(tǒng)設(shè)計(jì)必須考慮這種情況。因此,我們可以認(rèn)為CAP的P總是成立的,剩下的C和A無(wú)法同時(shí)做到。
實(shí)際上根據(jù)分布式系統(tǒng)中CAP原則,當(dāng)P(分區(qū)容忍)發(fā)生的時(shí)候,強(qiáng)行追求C(一致性),會(huì)導(dǎo)致(A)可用性、吞吐量下降,此時(shí)我們一般用最終一致性來(lái)保證我們系統(tǒng)的AP能力。當(dāng)然不是放棄C,而是放棄強(qiáng)一致性,而且在一般情況下CAP都能保證,只是在發(fā)生分區(qū)容錯(cuò)的情況下,我們可以通過(guò)最終一致性來(lái)保證數(shù)據(jù)一致。
事件驅(qū)動(dòng)實(shí)現(xiàn)最終一致性
事件驅(qū)動(dòng)架構(gòu)在領(lǐng)域?qū)ο笾g通過(guò)異步的消息來(lái)同步狀態(tài),有些消息也可以同時(shí)發(fā)布給多個(gè)服務(wù),在消息引起了一個(gè)服務(wù)的同步后可能會(huì)引起另外消息,事件會(huì)擴(kuò)散開(kāi)。嚴(yán)格意義上的事件驅(qū)動(dòng)是沒(méi)有同步調(diào)用的。
例子:
在電商里面,用戶下單必須根據(jù)庫(kù)存來(lái)確定訂單是否成交。
項(xiàng)目架構(gòu):SpringBoot2+Mybatis+tk-Mybatis+ActiveMQ【因?yàn)樾±樱蛔龀蒘pring Cloud架構(gòu)】
首先,我們來(lái)看看正常的服務(wù)之間調(diào)用:

代碼:
- @Override
- @Transactional(rollbackFor = Exception.class)
- public Result placeOrder(OrderQuery query) {
- Result result = new Result();
- // 先遠(yuǎn)程調(diào)用Stock-Service去減少庫(kù)存
- RestTemplate restTemplate = new RestTemplate();
- //請(qǐng)求頭
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_JSON);
- //封裝成一個(gè)請(qǐng)求對(duì)象
- HttpEntity entity = new HttpEntity(query, headers);
- // 同步調(diào)用庫(kù)存服務(wù)的接口
- Result stockResult = restTemplate.postForObject("http://127.0.0.1:8081/stock/reduceStock",entity,Result.class);
- if (stockResult.getCode() == Result.ResultConstants.SUCCESS){
- Order order = new Order();
- BeanUtils.copyProperties(query,order);
- order.setOrderStatus(1);
- Integer insertCount = orderMapper.insertSelective(order);
- if (insertCount == 1){
- result.setMsg("下單成功");
- }else {
- result.setMsg("下單失敗");
- }
- }else {
- result.setCode(Result.ResultConstants.FAIL);
- result.setMsg("下單失敗:"+stockResult.getMsg());
- }
- return result;
- }
我們可以看到,這樣的服務(wù)調(diào)用的弊端多多:
1、訂單服務(wù)需同步等待庫(kù)存服務(wù)的返回結(jié)果,接口結(jié)果返回延誤。2、訂單服務(wù)直接依賴于庫(kù)存服務(wù),只要庫(kù)存服務(wù)崩了,訂單服務(wù)不能再正常運(yùn)行。3、訂單服務(wù)需考慮并發(fā)問(wèn)題,庫(kù)存最后可能為負(fù)。
下面開(kāi)始利用事件驅(qū)動(dòng)實(shí)現(xiàn)最終一致性
1、在訂單服務(wù)新增訂單后,訂單的狀態(tài)是“已開(kāi)啟”,然后發(fā)布一個(gè)Order Created事件到消息隊(duì)列上

代碼:
- @Transactional(rollbackFor = Exception.class)
- public Result placeOrderByMQ(OrderQuery query) {
- Result result = new Result();
- // 先創(chuàng)建訂單,狀態(tài)為下單0
- Order order = new Order();
- BeanUtils.copyProperties(query,order);
- order.setOrderStatus(0);
- Integer insertCount = orderMapper.insertSelective(order);
- if (insertCount == 1){
- // 發(fā)送 訂單消息
- MqOrderMsg mqOrderMsg = new MqOrderMsg();
- mqOrderMsg.setId(order.getId());
- mqOrderMsg.setGoodCount(query.getGoodCount());
- mqOrderMsg.setGoodName(query.getGoodName());
- mqOrderMsg.setStockId(query.getStockId());
- jmsProducer.sendOrderCreatedMsg(mqOrderMsg);
- // 此時(shí)的訂單只是開(kāi)啟狀態(tài)
- result.setMsg("下單成功");
- }
- return result;
- }
2、庫(kù)存服務(wù)在監(jiān)聽(tīng)到消息隊(duì)列OrderCreated中的消息,將庫(kù)存表中商品的庫(kù)存減去下單數(shù)量,然后再發(fā)送一個(gè)Stock Locked事件給消息隊(duì)列。

代碼:
- /**
- * 接收下單消息
- * @param message 接收到的消息
- * @param session 上下文
- */
- @JmsListener(destination = ORDER_CREATE,containerFactory = "myListenerContainerFactory")
- @Transactional(rollbackFor = Exception.class)
- public void receiveOrderCreatedMsg(Message message, Session session){
- try {
- if (message instanceof ActiveMQObjectMessage){
- MqStockMsg result = new MqStockMsg();
- ActiveMQObjectMessage objectMessage=(ActiveMQObjectMessage)message;
- MqOrderMsg msg = (MqOrderMsg)objectMessage.getObject();
- Integer updateCount = stockMapper.updateNumByStockId(msg.getStockId(),msg.getGoodCount());
- if (updateCount >= 1){
- result.setSuccess(true);
- result.setOrderId(msg.getId());
- }else {
- result.setSuccess(false);
- }
- // 手動(dòng)ack,使消息出隊(duì)列,不然會(huì)不斷消費(fèi)
- message.acknowledge();
- // 發(fā)送庫(kù)存鎖定消息到MQ
- jmsProducer.sendStockLockedMsg(result);
- }
- } catch (JMSException e) {
- log.error("接收訂單創(chuàng)建消息報(bào)錯(cuò):"+e.getMessage());
- }
- }
仔細(xì)的朋友可能會(huì)看到:message.acknowledge(),即手動(dòng)確認(rèn)消息。因?yàn)樵诒WC庫(kù)存服務(wù)的邏輯能正常執(zhí)行后再確認(rèn)消息已消費(fèi),可以保證消息的投遞可靠性,萬(wàn)一在庫(kù)存服務(wù)執(zhí)行時(shí)報(bào)出異常,我們可以做到重新消費(fèi)該下單消息。
3、訂單服務(wù)接收到Stock Locked事件,將訂單的狀態(tài)改為“已確認(rèn)”

代碼:
- /**
- * 判斷是否還有庫(kù)存,有庫(kù)存更新訂單狀態(tài)為1,無(wú)庫(kù)存更新訂單狀態(tài)為2,并且通知用戶(WebSocket)
- * @param message
- */
- @JmsListener(destination = STOCK_LOCKED,containerFactory = "myListenerContainerFactory")
- @Transactional(rollbackFor = Exception.class)
- public void receiveStockLockedMsg(Message message, Session session){
- try {
- if (message instanceof ActiveMQObjectMessage){
- ActiveMQObjectMessage objectMessage=(ActiveMQObjectMessage)message;
- MqStockMsg msg = (MqStockMsg)objectMessage.getObject();
- if (msg.isSuccess()){
- Order updateOrder = new Order();
- updateOrder.setId(msg.getOrderId());
- updateOrder.setOrderStatus(1);
- orderMapper.updateByPrimaryKeySelective(updateOrder);
- log.info("訂單【"+msg.getOrderId()+"】下單成功");
- }else {
- Order updateOrder = new Order();
- updateOrder.setId(msg.getOrderId());
- updateOrder.setOrderStatus(2);
- orderMapper.updateByPrimaryKeySelective(updateOrder);
- // 通知用戶庫(kù)存不足,訂單被取消
- log.error("訂單【"+msg.getOrderId()+"】因庫(kù)存不足被取消");
- }
- // 手動(dòng)ack,使消息出隊(duì)列,不然會(huì)不斷消費(fèi)
- message.acknowledge();
- }
- } catch (JMSException e) {
- log.error("接收庫(kù)存鎖定消息報(bào)錯(cuò):"+e.getMessage());
- }
- }
同樣,這里我們也是會(huì)利用手動(dòng)確認(rèn)消息來(lái)保證消息的投遞可靠性。
至此,已經(jīng)全部搞定了。我們看一下和正常的服務(wù)調(diào)用對(duì)比如何:
1、訂單服務(wù)不再直接依賴于庫(kù)存服務(wù),而是將下單事件發(fā)送到MQ中,讓庫(kù)存監(jiān)聽(tīng)。
2、訂單服務(wù)能真正的作為一個(gè)模塊獨(dú)立運(yùn)行。
3、解決了并發(fā)問(wèn)題,而且MQ的隊(duì)列處理效率非常的高。
但是也存在下面的問(wèn)題:
1、用戶體驗(yàn)改變了:因?yàn)槭褂檬录C(jī)制,訂單是立即生成的,可是很有可能過(guò)一會(huì),系統(tǒng)會(huì)提醒你沒(méi)貨了。。這就像是排隊(duì)搶購(gòu)一樣,排著排著就被通知沒(méi)貨了,不用再排隊(duì)了。
2、數(shù)據(jù)庫(kù)可能會(huì)存在很對(duì)沒(méi)有完成下單的訂單。
最后,如果真的要考慮用戶體驗(yàn),并且不想數(shù)據(jù)庫(kù)存在很多不必要的數(shù)據(jù),該怎么辦?
那就把訂單服務(wù)和庫(kù)存服務(wù)聚合在一起吧。解決當(dāng)前的問(wèn)題應(yīng)當(dāng)是首先要考慮的,我們?cè)O(shè)計(jì)微服務(wù)的目的是本想是解決業(yè)務(wù)并發(fā)量。而現(xiàn)在面臨的卻是用戶體驗(yàn)的問(wèn)題,所以架構(gòu)設(shè)計(jì)也是需要妥協(xié)的。
最主要是,我們是經(jīng)過(guò)思考和分析的,每個(gè)方案能做到哪種程度,能應(yīng)用到哪種場(chǎng)景。正所謂,技術(shù)要和實(shí)際場(chǎng)景結(jié)合,我們不能為了追求新技術(shù)而生搬硬套。