成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

哪種分布式事務處理方案效率最高?必然是...

開發 架構 分布式
在微服務系統中,服務之間的互相調用,我們可以使用 HTTP 的方式,例如 OpenFeign,也可以使用 RPC 的方式,例如 Dubbo,除了這些方案之外,我們也可以使用消息驅動,這是一種典型的響應式系統設計方案。

[[421456]]

前面幾篇文章松哥和大家介紹了 Seata 中四種分布式事務處理方案,相信經過前面的幾篇文章的學習,大家對于 Seata 中的分布式事務已經非常了解了。還沒看過前面文章的小伙伴,可以先看一下:

  • 五分鐘帶你體驗一把分布式事務!so easy!
  • 看了那么多博客,還是不懂 TCC,不妨看看這個案例!
  • XA 事務水很深,小伙子我怕你把握不住!
  • 你這 Saga 事務保“隔離性”嗎?

不過很多小伙伴看完后感覺 Seata 對于分布式事務的處理,代碼雖然簡單,但是內部花費在網絡上的時間消耗太多了,在高并發場景下,這似乎并不是一種很好的解決方案。

要說哪種分布式事務處理方案效率高,必然繞不開消息中間件!基于消息中間件的兩階段提交方案,通常用在高并發場景下。這種方式通過犧牲數據的強一致性換取性能的大幅提升,不過實現這種方式的成本和復雜度是比較高的,使用時還要看實際業務情況。

今天松哥想通過一個簡單的案例,來和大家聊一聊如何通過消息中間件來處理分布式事務。

1. 思路分析

先來說說整體思路。

有一個名詞叫做消息驅動的微服務,相信很多小伙伴都聽說過。怎么理解呢?

在微服務系統中,服務之間的互相調用,我們可以使用 HTTP 的方式,例如 OpenFeign,也可以使用 RPC 的方式,例如 Dubbo,除了這些方案之外,我們也可以使用消息驅動,這是一種典型的響應式系統設計方案。

在消息驅動的微服務中,服務之間不再互相直接調用,當服務之間需要通信時,就把通信內容發送到消息中間件上,另一個服務則通過監聽消息中間件中的消息隊列,來完成相應的業務邏輯調用,過程就是這么個過程,并不難,具體怎么玩,我們繼續往下看。

2. 業務分析

折騰了半天,后來松哥在網上找到了一個別人寫好的例子,我覺得用來演示這個問題特別合適,所以我就沒有自己寫案例了,直接用別人的代碼,我們來逐個分析,跟前面講分布式事務 Seata 的方式一致。

首先我們來看如下一張流程圖,這是一個用戶購票的案例:

當用戶想要購買一張票時:

  1. 向新訂單隊列中寫入一條數據。
  2. Order Service 負責消費這個隊列中的消息,完成訂單的創建,然后再向新訂單繳費隊列中寫入一條消息。
  3. User Service 負責消費新訂單繳費隊列中的消息,在 User Service 中完成對用戶賬戶余額的劃扣,然后向新訂單轉移票隊列中寫入一條消息。
  4. Ticket Service 負責消費新訂單轉移票隊列,在 Ticket Service 中完成票的轉移,然后發送一條消息給訂單完成隊列。
  5. 最后 Order Service 中負責監聽訂單完成隊列,處理完成后的訂單。

這就是一個典型的消息驅動微服務,也是一個典型的響應式系統。在這個系統中,一共有三個服務,分別是:

  • Order Service
  • User Service
  • Ticket Service

這三個服務之間不會進行任何形式的直接調用,大家有事都是直接發送到消息中間件,其他服務則從消息中間件中獲取自己想要的消息然后進行處理。

具體到我們的實踐中,則多了一個檢查票是否夠用的流程,如下圖:

創建訂單時,先由 Ticket 服務檢查票是否夠用,沒問題的話再繼續發起訂單的創建。其他過程我就不說了。

另外還需要注意,在售票系統中,由于每張票都不同,例如每張票可能有座位啥的,因此一張票在數據庫中往往是被設計成一條記錄。

3. 實踐

流程我已經說明白了,接下來我們就來看看具體的代碼實踐。

3.1 準備數據庫

首先我們準備三個數據庫,分別是:

  • javaboy_order:訂單庫,用戶創建訂單等操作,在這個數據庫中完成。
  • javaboy_ticket:票務庫,這個庫中保存著所有的票據信息,每一張票都是一條記錄,都保存在這個庫中。
  • javaboy_user:用戶庫,這里保存著用戶的賬戶余額以及付款記錄等信息。

每個庫中都有各自對應的表,為了操作方便,這些表不用自己創建,將來等項目啟動了,利用 JPA 自動創建即可。

3.2 項目概覽

我們先來整體上看下這個項目,公眾號后臺回復 mq_tran 可以下載完整代碼:

一共有五個服務:

  • eureka:注冊中心
  • order:訂單服務
  • service:公共模塊
  • ticket:票務服務
  • user:用戶服務

下面分別來說。

3.3 注冊中心

有人說,都消息驅動了,還要注冊中心干嘛?

消息驅動沒錯,消息驅動微服務之后每個服務只管把消息往消息中間件上扔,每個服務又只管消費消息中間件上的消息,這個時候對于服務注冊中心似乎不是那么強需要。不過在我們這個案例中,消息驅動主要用來處理事務問題,其他常規需求我們還是用 OpenFeign 來處理,所以這里我們依然需要一個注冊中心。

這里的注冊中心我就選擇常見的 Eureka,省事一些。由于本文主要是和大家聊分布式事務,所以涉及到微服務的東西我就簡單介紹下,不會占用過多篇幅,如果大家還不熟悉 Spring Cloud 的用法,可以在公眾號后臺回復 vhr 有一套視頻介紹。

服務注冊中心的創建記得加上 Spring Security,將自己的服務注冊中心保護起來。

這塊有一個小小的細節和大家多說兩句。

Eureka 用 Spring Security 保護起來之后,以后其他服務注冊都是通過 Http Basic 來認證,所以我們要在代碼中開啟 Http Basic 認證,如下(以前舊版本不需要下面這段代碼,但是新版本需要):

  1. @Configuration 
  2. public class SecurityConfig extends WebSecurityConfigurerAdapter { 
  3.     @Override 
  4.     protected void configure(HttpSecurity http) throws Exception { 
  5.         http.authorizeRequests() 
  6.                 .anyRequest().authenticated() 
  7.                 .and() 
  8.                 .httpBasic() 
  9.                 .and().formLogin().and().csrf().disable(); 
  10.     } 

3.4 購票服務

接下來我們就來看看購票服務。

購票是從下訂單開始,所以我們就先從訂單服務 order 開始整個流程的分析。

3.4.1 新訂單處理(order)

當用戶發起一個購票請求后,這個請求發送到 order 服務上,order 服務首先會向 order:new 隊列發送一條消息,開啟一個訂單的處理流程。代碼如下:

  1. @Transactional 
  2. @PostMapping(""
  3. public void create(@RequestBody OrderDTO dto) { 
  4.     dto.setUuid(UUID.randomUUID().toString()); 
  5.     rabbitTemplate.convertAndSend("order:new", dto); 

上面設置的 UUID 是整個訂單在處理過程中的一個唯一標志符,也算是一條主線。

order:new 隊列中的消息將被 ticket 服務消費,ticket 服務消費 order:new 中的消息,并進行鎖票操作(鎖票的目的防止有兩個消費同時購買同一張票),鎖票成功后,ticket 服務將向 order:locked 隊列發送一條消息,表示鎖票成功;否則向 order:fail 隊列發送一條消息表示鎖票失敗。

這里的 OrderDTO 對象將貫穿整個購票過程。

3.4.2 鎖票(ticket)

鎖票操作是在 ticket 服務中完成的,代碼如下:

  1. @Transactional 
  2. @RabbitListener(queues = "order:new"
  3. public void handleTicketLock(OrderDTO msg) { 
  4.     LOG.info("Get new order for ticket lock:{}", msg); 
  5.     int lockCount = ticketRepository.lockTicket(msg.getCustomerId(), msg.getTicketNum()); 
  6.     if (lockCount == 0) { 
  7.         msg.setStatus("TICKET_LOCK_FAIL"); 
  8.         rabbitTemplate.convertAndSend("order:fail", msg); 
  9.     } else { 
  10.         msg.setStatus("TICKET_LOCKED"); 
  11.         rabbitTemplate.convertAndSend("order:locked", msg); 
  12.     } 

先調用 lockTicket 方法去數據庫中鎖票,所謂的鎖票就是將要購買的票的 lock_user 字段設置為 customer_id(購買者的 id)。

如果鎖票成功(即數據庫修改成功),設置 msg 的狀態為 TICKET_LOCKED,同時發送消息到 order:locked 隊列,表示鎖票成功。

如果鎖票失敗(即數據庫修改失敗),設置 msg 的狀態為 TICKET_LOCK_FAIL,同時發送消息到 order:fail 隊列,表示鎖票失敗。

3.4.2 鎖票成功(order)

接下來,由 order 服務消費 order:locked 隊列中的消息,也就是鎖票成功后接下來的操作。

  1. @Transactional 
  2. @RabbitListener(queues = "order:locked"
  3. public void handle(OrderDTO msg) { 
  4.     LOG.info("Get new order to create:{}", msg); 
  5.     if (orderRepository.findOneByUuid(msg.getUuid()) != null) { 
  6.         LOG.info("Msg already processed:{}", msg); 
  7.     } else { 
  8.         Order order = newOrder(msg); 
  9.         orderRepository.save(order); 
  10.         msg.setId(order.getId()); 
  11.     } 
  12.     msg.setStatus("NEW"); 
  13.     rabbitTemplate.convertAndSend("order:pay", msg); 

鎖票成功后,先根據訂單的 UUID 去訂單數據庫查詢,是否已經有訂單記錄了,如果有,說明這條消息已經被處理了,可以防止訂單的重復處理(這塊主要是解決冪等性問題)。

如果訂單還沒有被處理,則創建一個新的訂單對象,并保存到數據庫中,創建新訂單對象的時候,需要設置訂單的 status 為 NEW。

最后設置 msg 的 status 為 NEW,然后向 order:pay 隊列發送一條消息開啟付款流程,付款是由 user 服務提供的。user 服務中會檢查用戶的賬戶余額是否夠用,如果不夠用,就會發送消息到 order:ticket_error 隊列,表示訂票失敗;如果余額夠用,則進行正常的付款操作,并在付款成功后發送消息到 order:ticket_move 隊列,開啟票的轉移。

3.4.3 繳費(user)

鎖票成功后,接下來就是付費了,付費服務由 user 提供。

  1. @Transactional 
  2. @RabbitListener(queues = "order:pay"
  3. public void handle(OrderDTO msg) { 
  4.     LOG.info("Get new order to pay:{}", msg); 
  5.     // 先檢查payInfo判斷重復消息。 
  6.     PayInfo pay = payInfoRepository.findOneByOrderId(msg.getId()); 
  7.     if (pay != null) { 
  8.         LOG.warn("Order already paid, duplicated message."); 
  9.         return
  10.     } 
  11.     Customer customer = customerRepository.getById(msg.getCustomerId()); 
  12.     if (customer.getDeposit() < msg.getAmount()) { 
  13.         LOG.info("No enough deposit, need amount:{}", msg.getAmount()); 
  14.         msg.setStatus("NOT_ENOUGH_DEPOSIT"); 
  15.         rabbitTemplate.convertAndSend("order:ticket_error", msg); 
  16.         return
  17.     } 
  18.     pay = new PayInfo(); 
  19.     pay.setOrderId(msg.getId()); 
  20.     pay.setAmount(msg.getAmount()); 
  21.     pay.setStatus("PAID"); 
  22.     payInfoRepository.save(pay); 
  23.     customerRepository.charge(msg.getCustomerId(), msg.getAmount()); 
  24.     msg.setStatus("PAID"); 
  25.     rabbitTemplate.convertAndSend("order:ticket_move", msg); 

這里的執行步驟如下:

  1. 首先根據訂單 id 去查找付款信息,檢查當前訂單是否已經完成付款,如果已經完成服務,則直接 return,這一步也是為了處理冪等性問題。
  2. 根據顧客的 id,查找到顧客的完整信息,包括顧客的賬戶余額。
  3. 檢查顧客的賬戶余額是否足夠支付票價,如果不夠,則設置 msg 的 status 為 NOT_ENOUGH_DEPOSIT,同時向 order:ticket_error 隊列發送消息,表示訂票失敗。
  4. 如果顧客賬戶余額足夠支付票價,則創建一個 PayInfo 對象,設置相關的支付信息,并存入 pay_info 表中。
  5. 調用 charge 方法完成顧客賬戶余額的扣款。
  6. 發送消息到 order:ticket_move 隊列中,開啟交票操作。

3.4.4 交票(ticket)

  1. @Transactional 
  2. @RabbitListener(queues = "order:ticket_move"
  3. public void handleTicketMove(OrderDTO msg) { 
  4.     LOG.info("Get new order for ticket move:{}", msg); 
  5.     int moveCount = ticketRepository.moveTicket(msg.getCustomerId(), msg.getTicketNum()); 
  6.     if (moveCount == 0) { 
  7.         LOG.info("Ticket already transferred."); 
  8.     } 
  9.     msg.setStatus("TICKET_MOVED"); 
  10.     rabbitTemplate.convertAndSend("order:finish", msg); 

調用 moveTicket 方法完成交票操作,也就是設置 ticket 表中票的 owner 為 customerId。

交票成功后,發送消息到 order:finish 隊列,表示交票完成。

3.4.5 訂單完成(order)

  1. @Transactional 
  2. @RabbitListener(queues = "order:finish"
  3. public void handleFinish(OrderDTO msg) { 
  4.     LOG.info("Get finished order:{}", msg); 
  5.     Order order = orderRepository.getById(msg.getId()); 
  6.     order.setStatus("FINISH"); 
  7.     orderRepository.save(order); 

這里的處理就比較簡單,訂單完成后,就設置訂單的狀態為 FINISH 即可。

上面介紹的是一條主線,順利的話,消息順著這條線走一遍,一個訂單就處理完成了。

不順利的話,就有各種幺蛾子,我們分別來看。

3.4.6 鎖票失敗(order)

鎖票是在 ticket 服務中完成的,如果鎖票失敗,就會直接向 order:fail 隊列發送消息,該隊列的消息由 order 服務負責消費。

3.4.7 扣款失敗(ticket)

扣款操作是在 user 中完成的,扣款失敗就會向 order:ticket_error 隊列中發送消息,該隊列的消息由 ticket 服務負責消費。

  1. @Transactional 
  2. @RabbitListener(queues = "order:ticket_error"
  3. public void handleError(OrderDTO msg) { 
  4.     LOG.info("Get order error for ticket unlock:{}", msg); 
  5.     int count = ticketRepository.unMoveTicket(msg.getCustomerId(), msg.getTicketNum()); 
  6.     if (count == 0) { 
  7.         LOG.info("Ticket already unlocked:", msg); 
  8.     } 
  9.     count = ticketRepository.unLockTicket(msg.getCustomerId(), msg.getTicketNum()); 
  10.     if (count == 0) { 
  11.         LOG.info("Ticket already unmoved, or not moved:", msg); 
  12.     } 
  13.     rabbitTemplate.convertAndSend("order:fail", msg); 

當扣款失敗的時候,做三件事:

  1. 撤銷票的轉移,也就是把票的 owner 字段重新置為 null。
  2. 撤銷鎖票,也就是把票的 lock_user 字段重新置為 null。
  3. 向 order:fail 隊列發送訂單失敗的消息。

3.4.8 下單失敗(order)

下單失敗的處理在 order 服務中,有三種情況會向 order:fail 隊列發送消息:

  1. 鎖票失敗
  2. 扣款失敗(客戶賬戶余額不足)
  3. 訂單超時
  1. @Transactional 
  2. @RabbitListener(queues = "order:fail"
  3. public void handleFailed(OrderDTO msg) { 
  4.     LOG.info("Get failed order:{}", msg); 
  5.     Order order
  6.     if (msg.getId() == null) { 
  7.         order = newOrder(msg); 
  8.         order.setReason("TICKET_LOCK_FAIL"); 
  9.     } else { 
  10.         order = orderRepository.getById(msg.getId()); 
  11.         if (msg.getStatus().equals("NOT_ENOUGH_DEPOSIT")) { 
  12.             order.setReason("NOT_ENOUGH_DEPOSIT"); 
  13.         } 
  14.     } 
  15.     order.setStatus("FAIL"); 
  16.     orderRepository.save(order); 

該方法的具體處理邏輯如下:

  1. 首先查看是否有訂單 id,如果連訂單 id 都沒有,就說明是鎖票失敗,給訂單設置 reason 屬性的值為TICKET_LOCK_FAIL。
  2. 如果有訂單 id,則根據 id 查詢訂單信息,并判斷訂單狀態是否為 NOT_ENOUGH_DEPOSIT,這個表示扣款失敗,如果訂單狀態是 NOT_ENOUGH_DEPOSIT,則設置失敗的 reason 也為此。
  3. 最后設置訂單狀態為 FAIL,然后更新數據庫中的訂單信息即可。

3.4.9 訂單超時(order)

order 服務中還有一個定時任務,定時去數據庫中撈取那些處理失敗的訂單,如下:

  1. @Scheduled(fixedDelay = 10000L) 
  2. public void checkInvalidOrder() { 
  3.     ZonedDateTime checkTime = ZonedDateTime.now().minusMinutes(1L); 
  4.     List<Order> orders = orderRepository.findAllByStatusAndCreatedDateBefore("NEW", checkTime); 
  5.     orders.stream().forEach(order -> { 
  6.         LOG.error("Order timeout:{}"order); 
  7.         OrderDTO dto = new OrderDTO(); 
  8.         dto.setId(order.getId()); 
  9.         dto.setTicketNum(order.getTicketNum()); 
  10.         dto.setUuid(order.getUuid()); 
  11.         dto.setAmount(order.getAmount()); 
  12.         dto.setTitle(order.getTitle()); 
  13.         dto.setCustomerId(order.getCustomerId()); 
  14.         dto.setStatus("TIMEOUT"); 
  15.         rabbitTemplate.convertAndSend("order:ticket_error", dto); 
  16.     }); 

可以看到,這里是去數據庫中撈取那些狀態為 NEW 并且是 1 分鐘之前的訂單,根據前面的分析,當鎖票成功后,就會將訂單的狀態設置為 NEW 并且存入數據庫中。換言之,當鎖票成功一分鐘之后,這張票還沒有賣掉,就設置訂單超時,同時向 order:ticket_error 隊列發送一條消息,這條消息在 ticket 服務中被消費,最終完成撤銷交票、撤銷鎖票等操作。

這就是大致的代碼處理流程。

再來回顧一下前面那張圖:

結合著代碼來看這張圖是不是就很容易懂了。

3.5 測試

接下來我們來進行一個簡單的測試。

先來一個訂票失敗的測試,如下:

由于用戶只有 1000 塊錢,這張票要 10000,所以購票必然失敗。請求執行成功后,我們查看 order 表,多了如下一條記錄:

可以看到,訂單失敗的理由就是賬戶余額不足。此時查看 ticket 和 user 表,發現都完好如初(如果需要,則已經反向補償了)。

接下來我們手動給 ticket 表中 lock_user 字段設置一個值,如下:

這個表示這張票已經被人鎖定了。

然后我們發起一次購票請求(這次可以把金額設置到合理范圍,其實不設置也行,反正這次失敗還沒走到付款這一步):

請求發送成功后,接下來我們去查看 order 表,多了如下一條記錄:

可以看到,這次下單失敗的理由是鎖票失敗。此時查看 ticket 和 user 表,發現都完好如初(如果需要,則已經反向補償了)。

最后再來一次成功測試,先把 ticket 表中的 lock_user 字段置空,然后發送如下請求:

這次購票成功,查看 ticket 表,發票已經票有所屬:

查看訂單表:

可以多了一條成功的購票記錄。

查看用戶表:

用戶賬戶已扣款。

查看支付記錄表:

可以看到已經有了支付記錄。

4. 總結

整體上來說,上面這個案例,技術上并沒有什么難的,復雜之處在于設計。一開始要設計好消息的處理流程以及消息處理失敗后如何進行補償,這個是比較考驗大家技術的。

另外上面案例中,消息的發送和消費都用到了 RabbitMQ 中的事務機制(確保消息消費成功)以及 Spring 中的事務機制(確保消息發送和數據保存同時成功),這些我就不再贅述了。

總之,通過消息中間件處理分布式事務,這種方式通過犧牲數據的強一致性換取性能的大幅提升,但是實現這種方式的成本和復雜度是比較高的,使用時還要看實際業務情況。

本文轉載自微信公眾號「江南一點雨」,可以通過以下二維碼關注。轉載本文請聯系江南一點雨公眾號。

 

責任編輯:武曉燕 來源: 江南一點雨
相關推薦

2022-06-13 10:42:21

分布式事務數據庫

2014-01-22 13:37:53

2014-02-11 09:07:31

2009-02-05 11:39:41

Oracle甲骨文Tuxedo

2015-03-18 09:33:41

大數據分布式系統事務處理

2023-12-29 08:14:41

BASE事務ServiceB

2019-07-30 07:26:26

技術分布式指標

2015-03-16 14:38:16

大數據存儲分布式系統事務處理

2019-11-18 10:19:02

分布式系統事務模型

2023-08-16 11:43:57

數據引擎

2023-11-01 10:11:00

Java分布式

2023-12-07 08:37:49

TCC模式

2009-07-15 17:41:55

iBATIS事務處理

2011-04-27 15:55:16

2009-07-09 18:15:42

JDBC事務處理

2009-09-14 19:55:03

LINQ事務處理

2010-01-04 13:06:50

ADO.NET事務

2009-11-13 17:01:07

ADO.NET事務處理

2010-04-13 15:44:00

Oracle與SqlS

2025-04-29 04:00:00

分布式事務事務消息
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 中文字幕国产精品视频 | av一级| 国产一级免费视频 | 中文在线一区二区 | 久久久久久久久久一区 | 狠狠干狠狠操 | 亚洲导航深夜福利涩涩屋 | 国产成人网 | 人人玩人人添人人澡欧美 | 欧美中文字幕 | 一区二区三区亚洲精品国 | 欧美视频二区 | 日韩国产三区 | 国产精品视频网 | 草久久 | av中文在线观看 | 日日天天 | 欧美成人精品一区二区三区 | 精品一区电影 | 亚洲一区二区网站 | 在线播放一区二区三区 | 一区久久 | 午夜视频导航 | 91伊人 | 日本黄视频在线观看 | 欧美精品一区二区三区在线播放 | 色在线免费视频 | 久久久久久亚洲 | 欧美一级免费看 | 久久久久国产精品一区二区 | av在线视| 久久亚洲一区二区三区四区 | 午夜视频在线播放 | 亚洲交性| 一区二区三区在线电影 | 天天操天天射综合 | 中国一级毛片免费 | 欧美专区日韩 | 在线色网| 国产91一区 | 国产欧美日韩一区二区三区在线 |