成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

蘇寧如何解決事務與非事務的數據一致性問題

原創
開發 項目管理 其他數據庫
作為擁有線上線下大數據的智慧零售平臺,蘇寧的系統對于并發和高效要求非常高。針對各種苛刻的場景,蘇寧都有相應的解決方案。

【51CTO.com原創稿件】

1、業務場景

作為擁有線上線下大數據的智慧零售平臺,蘇寧的系統對于并發和高效要求非常高。針對各種苛刻的場景,蘇寧都有相應的解決方案。

蘇寧的售后訂單系統每天要處理大量訂單的創建,修改以及數據分發的操作。

為了保證高效,我們的數據經過分庫分表存儲于數據庫集群中,同時根據一定的算法將部分活躍訂單緩存在Redis,保證訂單處理的效率。分發數據時,我們通過蘇寧自研的MQ消息平臺 (WindQ)向下游系統分發消息,處理效率上可以做到準實時,即消息能夠及時被下游接收,并立即通過反饋接口反饋回來,避免實時接口調用時可能發生因網絡,下游處理效率問題帶來的阻塞。

基于以上的背景,我們遇到了這樣的場景:

1. 在創建訂單的時候,我們要保存訂單到數據庫和緩存。

2. 同時要將下發下游的消息保存到數據庫,并通過WindQ發往下游系統。

3. 下游系統返回接收數據的結果后,需要根據返回的結果,對保存到數據庫的數據進行刪除操作。

圖1:業務模型

雖然邏輯簡單,但由于數據庫操作是在一個事務中,而Redis和發消息隊列的操作則并非能靠事務控制。

  • 如果緩存成功,但是事務失敗,則可能導致我們在系統有了一份異常的訂單緩存。而實際上這個訂單并不存在。
  • 另外當消息發送到下游了以后,如果下游處理速度非常快,處理結果立刻返回,處理返回結果的程序要去刪除一條已經發送成功數據。此時,有可能本地的事務尚未提交,那么,刪除操作就做了無用功。***當事務提交以后,那條應該刪掉的數據,就會一直在待處理表中,變成異常數據。

圖2:異常場景

說明:數據通過消息隊列發送下游系統,同時保存一份到數據庫是為了保證發送隊列異?;蛘呦掠卧诮邮铡⑻幚頃r發生異常的情況,可以通過數據庫保存的數據進行補償處理的一種機制,當下游系統反饋數據接收正常后,將該數據刪除。最終保證上下游數據一致。

業務場景模擬

系統由Business對外提供服務,在此過程中通過OrdeSaver和MessageSender執行具體數據處理功能。

  • Business為業務的入口,所有的業務邏輯由此開始。
  • OrderSaver執行的業務是保存訂單以及緩存訂單到Redis。
  • MessageSender執行的業務是保存下發數據到下發表中,并將數據發送到WindQ消息隊列中。

Business 

  1. /**  
  2.  * 完整的業務,有多個數據庫操作,以及數據庫以外的需要延遲執行的業務邏輯  
  3.  */   
  4. public class Business  {   
  5.     public void saveInfo(Map<String, Object> map){   
  6.         System.out.println("業務開始 事務開啟  保存數據操作開始" );   
  7.         new OrderSaver().saveOrderInfo(map);   
  8.         new MessageSender().saveMessageForSend(map);   
  9.         System.out.println("業務結束 事務提交 保存數據操作結束");   
  10.     }   
  11. }   

OrderSaver 

  1. /**  
  2.   * 保存服務單的業務邏輯  
  3.   */   
  4. public class OrderSaver {   
  5.            
  6. public void saveOrderInfo(Map<String, Object> map,ExecutorHandler executorHandler){   
  7.        System.out.println("Save order info to datebase");   
  8.     System.out.println("Cache order info into redis");   
  9.   }   
  10. }    

MessageSender 

  1. /**  
  2.  * 下發數據的業務邏輯  
  3.  */   
  4. public class MessageSender {   
  5.  
  6.     public void saveMessageForSend(Map<String, Object> map,ExecutorHandler executorHandler){   
  7.      //保存數據到數據庫   
  8.         System.out.println("Save create message to datebase.");   
  9.         System.out.println("Send message to windq");   
  10.     }   
  11.  

業務接口調用 

  1. public class Sample {   
  2.     //此處模擬業務接口被調用的情況   
  3.     public static void main(String[] args) {   
  4.     Business business = new Business();   
  5.     Map<String,Object> param = new HashMap<>();   
  6.     business.saveInfo(param);   
  7.    }   
  8.  

輸出結果

業務開始 事務開啟 保存數據操作開始

Save order info to datebase

Cache order info into redis

Save create message to datebase.

Send message to windq

業務結束 事務提交 保存數據操作結束

圖3:常規輸出

以上場景模擬了常規情況下,我們的業務處理流程,此時也就會相應地出現我們上面所描述地異常。而我們所期望的處理結果應該是:

業務開始 事務開啟 保存數據操作開始

Save order info to datebase

Save create message to datebase.

業務結束 事務提交 保存數據操作結束

Cache order info into redis

Send message to windq

圖4:期望的結果

2、解決方案

該問題如何處理呢,這個時候我們就該將緩存到Redis以及下發WindQ的操作延遲到事務提交后處理。這樣在事務沒有提交前Resdis中不會有數據,WindQ也不會將數據下發。假如事務失敗,后續也可以根據異常進行后續處理。即便事務成功緩存Redis或者發送WindQ出錯,也可以根據存入數據庫的數據進行后續的補償處理。

2.1 處理方案一:代碼轉移

我們首先會想到通過代碼轉移的方式,將邏輯代碼移動到事務外面。但這個時候問題來了......

2.1.1 問題一:邏輯割裂

我們為了描述業務模型,將現實場景盡量簡化,從模型上看,是可以將兩段非事務方法挪到事務外面來操作。但是,由于保存訂單和緩存Redis是一套操作,其使用的數據是一致的,保存下發的消息和發送WindQ也是一對呼應的操作。代碼是寫在一起的,符合邏輯和業務要求有改動時也能夠一并處理,拆開的話,對統一數據的處理上給人的感覺就不是一氣呵成了。

某些業務中并非只有一兩個成對的操作,將多個成對操作的事務-非事務關聯型邏輯強行拆開,顯得規模浩大,這種方式變得不可取。

另外原先的處理方案中,保存數據庫、緩存以及發送WindQ處理的數據是一致的,而拆開寫的話,就會導致數據要從前傳到后。要保證數據能從里面傳到外面也將成為一個問題。

2.2 處理方案二:延遲執行的模型

為了解決***方案的問題,我們做了以下的設計。

數據要往數據庫中存的時候,我們可以先把要處理的數據和要做的動作先定義好,放到一個容器中去,在事務提交后,我們再拿到這個容器,統一將之前定義好的操作和數據取出來,按要求執行。

具體怎么做呢?

經過一番思考,我們構建出了一個模型

ExecutorHandler - Executor

  • Executor 可執行對象,用于定義一個需要執行的邏輯。比如將數據通過WindQ發送,或者將訂單刷入緩存。
  • ExecutorHandler容器類,內部保存了一個Executor的列表。

代碼邏輯

  1. 在業務代碼中,我們將需要執行的業務操作封裝到Executor中。
  2. 定義好以后,通過ExecutorHandler的add方法,添加到容器中去。
  3. 在業務邏輯執行的過程中,先進行數據庫操作,而非數據庫操作只是在對應的位置進行定義,在整個事務完成以后,通過ExecutorHandler的handle方法,遍歷所有的Executor對象,執行需要延遲的非事務操作。

圖5:業務模型

Executor

  1. public  interface Executor {   
  2.     void execute();   
  3. }    

ExecutorHandler

  1. public class ExecutorHandler {   
  2.    //需要執行的業務處理對象列表   
  3.    private List<Executor> executors;   
  4.        
  5.    public void handle(){   
  6.      if(!(null == executors || executors.isEmpty())){   
  7.            for(Executor  executor : executors){   
  8.               executor.execute();   
  9.             }   
  10.       }   
  11.    }   
  12.    public void add(Executor executor){   
  13.       if(null == executors){   
  14.            executors = new ArrayList<>();   
  15.       }   
  16.         executors.add(executor);   
  17.     }   
  18.  

業務接口調用

  1. public class Sample {   
  2.     //此處模擬業務接口被調用的情況   
  3.      public static void main(String[] args) {   
  4.         Business business = new Business();   
  5.          ExecutorHandler handler = new ExecutorHandler();   
  6.          Map<String, Object> param  =  new HashMap<String, Object>();   
  7.          //執行業務方法,開啟事務,保存數據   
  8.          business.saveInfo(param, handler);   
  9.          //執行延遲執行的方法   
  10.          handler.handle();   
  11.     }      
  12.  }   

輸出結果

業務開始 事務開啟 保存數據操作開始

Save order info to datebase

Save create message to datebase.

業務結束 事務提交 保存數據操作結束

Cache order info into redis

Send message to windq

Business

  1. /**  
  2.  * 完整的業務,有多個數據庫操作,以及數據庫以外的需要延遲執行的業務邏輯  
  3.  */   
  4. public class Business  {   
  5.     public void saveInfo(Map<String, Object> map,ExecutorHandler executorHandler){   
  6.         System.out.println("業務開始 事務開啟  保存數據操作開始" );   
  7.         new OrderSaver().saveOrderInfo(map,executorHandler);   
  8.         new MessageSender().saveMessageForSend(map,executorHandler);   
  9.         System.out.println("業務結束 事務提交 保存數據操作結束");   
  10.     }   
  11. }   

MessageSender 

  1. /**  
  2.  * 下發數據的業務邏輯  
  3.  */   
  4. public class MessageSender {   
  5.        
  6.     public void saveMessageForSend(Map<String, Object> map,ExecutorHandler executorHandler){   
  7.         //保存數據到數據庫   
  8.         System.out.println("Save create message to datebase.");   
  9.         //將要延遲執行的業務邏輯定義好,注冊到容器中去   
  10.         executorHandler.add(new Executor() {   
  11.             @Override   
  12.             public void execute() {   
  13.                 System.out.println("Send message to windq");   
  14.             }   
  15.         });   
  16.     }   
  17. }    

OrderSaver 

  1.  /**  
  2.   * 保存服務單的業務邏輯  
  3.    */   
  4.  public class OrderSaver {   
  5.          
  6.      public void saveOrderInfo(Map<String, Object> map,ExecutorHandler executorHandler){   
  7.          System.out.println("Save order info to datebase");   
  8.           //這就是所謂的回調函數   
  9.          executorHandler.add(new Executor() {   
  10.             @Override   
  11.             public void execute() {   
  12.                 System.out.println("Cache order info into redis");   
  13.             }   
  14.         });   
  15.     }   
  16.  }  

2.2.1 問題二:參數傳遞

以上的方案,解決了延遲執行的問題,但是,此刻我們發現,由于要使用ExecutorHandler,這個時候就需要隨時隨地將該對象傳遞下去,要考慮如何降低該對象的侵略性。

  • 靜態變量:在使用中需要考慮同步和清理的問題,很容易在多線程的情況下使得邏輯變得混亂,不采用。
  • 成員變量:同樣也存在著數據清理的問題,不推薦使用,不采用。

2.2.2 問題二處理方案:使用ThreadLocal參數傳遞

有沒有生命周期是整個線程內的呢?這時我們就需要用到ThreadLocal了。

通過ThreadLocal來獲取ExecutorHandler 可以作為有效的解決方案。

由于ThreadLocal對象最終在使用完的時候需要remove掉,因此,該方法需要集中統一調用。

實現時,我們定義了HandlerThreadLocal類。

HandlerThreadLocal對象負責通過 ThreadLocal的get方法來獲取線程本地的ExecutorHandler對象,并執行其 handle方法(具體實現可以參照后面的代碼)。

執行完業務操作以后,通過調用remove方法將其銷毀。

2.2.3 異常的捕捉和處理DelayedCallHandler

由于ThreadLocal的remove方法是一定需要被執行,因此該方法應該放在一個try- catch - finally 塊的finally段中,保證其不被遺漏。

  1. DelayedCallHandler通過handle()方法調用業務邏輯。
  2. 在調用完業務邏輯后,調用ExecutorHandler的handle()方法,執行已經注冊到延遲調用容器中的業務方法。
  3. ***在finally中將ThreadLocal 對象remove掉。

整個DelayedCallHandler的handle方法就是一個完整的try- catch - finally 塊。

2.2.4 標準化定義:DelayablelService需要延遲調用的業務類

由于DelayedCallHandler已經模塊化,業務方法***也定義成一個具體的方法名(doBusiness),所有的業務處理類,實現DelayedCallHandler接口,在doBusiness方法中調用有事務的業務邏輯。

3、最終實現方案

基于處理方案二的分析,***我們使用ThreadLocal來傳遞業務數據。

我們通過ThreadLocal executorHandler來傳遞數據。

在業務邏輯MessageSender 、OrderSaver中通過executorHandler將需要延遲執行的業務定義好。

在HandlerThreadLocal中,使用 executorHandler處理之前定義好的邏輯。

這樣做將事務和非事務分開,不再以方法參數的方式向下游傳遞數據,使得數據傳遞得以結構,處理起來更加優雅。

示例代碼如下。

業務接口調用 

  1.  public static void main(String[] args) {   
  2.       Business b = new Business();   
  3.       Map<String, Object> map  =  new HashMap<String, Object>();   
  4.       DelayedCallHandler<Map<String, Object>> bu = new DelayedCallHandler<Map<String, Object>>();   
  5.       bu.handle(b,map);   
  6.  }    

輸出結果:

業務開始 事務開啟 保存數據操作開始

Save order info to datebase

Save create message to datebase.

業務結束 事務提交 保存數據操作結束

Cache order info into redis

Send message to windq

HandlerThreadLocal 

  1.   public class HandlerThreadLocal  {   
  2.          
  3.      public static final ThreadLocal<ExecutorHandler> executorHandler = new ThreadLocal<ExecutorHandler>(){   
  4.           protected ExecutorHandler initialValue(){   
  5.              return new ExecutorHandler();   
  6.           }   
  7.       };   
  8.      public static final void handle(){   
  9.              
  10.          executorHandler.get().handle();   
  11.     }   
  12.        
  13.      public static final void remove(){   
  14.          executorHandler.remove();   
  15.      }   
  16. }   

DelayedCallHandler 

  1.  public  class DelayedCallHandler<T> {   
  2.     public void handle(DelayablelService<T> buisnes,T param){             
  3.          try {   
  4.             //先執行業務操作   
  5.              buisnes.doBusiness(param);   
  6.             //執行延遲執行的業務   
  7.             HandlerThreadLocal.handle();   
  8.        } catch (Exception e) {   
  9.             //處理異常   
  10.         }finally {   
  11.            HandlerThreadLocal.remove();   
  12.        }   
  13.     }         
  14.  }  

DelayablelService1.

  1. public interface DelayablelService { 
  2.  
  3.  public void doBusiness(T param); 
  4.  
  5.  

Business 

  1.   /**  
  2.    * 完整的業務,有多個數據庫操作,以及數據庫以外的需要延遲執行的業務邏輯  
  3.    */   
  4.   public class Business implements DelayablelService< Map<String, Object> > {   
  5.          
  6.       @Override   
  7.       public void doBusiness(Map<String, Object> map){   
  8.           saveInfo(map);   
  9.      }   
  10.         
  11.      public void saveInfo(Map<String, Object> map){   
  12.          System.out.println("業務開始 事務開啟  保存數據操作開始" );   
  13.          new OrderSaver().saveOrderInfo();   
  14.         new MessageSender().saveMessageForSend();   
  15.          System.out.println("業務結束 事務提交 保存數據操作結束");   
  16.      }   
  17.  }    

MessageSender 

  1.   /**  
  2.    * 下發數據的業務邏輯  
  3.   */   
  4.   public class MessageSender {   
  5.       public void saveMessageForSend(){   
  6.          ExecutorHandler executorHandler = HandlerThreadLocal.executorHandler.get();   
  7.          System.out.println("Save create message to datebase.");   
  8.          executorHandler.add(new Executor() {   
  9.              @Override   
  10.             public void execute() {   
  11.                 System.out.println("Send message to windq");   
  12.              }   
  13.         });   
  14.      }   
  15. }    

OrderSaver 

  1.   /**  
  2.   * 保存服務單的業務邏輯  
  3.    */   
  4.   public class OrderSaver {   
  5.          
  6.       public void saveOrderInfo(){   
  7.          System.out.println("Save order info to datebase");   
  8.             
  9.          ExecutorHandler executorHandler = HandlerThreadLocal.executorHandler.get();   
  10.         //這就是所謂的回調函數   
  11.          executorHandler.add(new Executor() {   
  12.                 
  13.              @Override   
  14.             public void execute() {   
  15.                 System.out.println("Cache order info into redis");   
  16.              }   
  17.          });   
  18.      }   
  19.  }    

4、總結

使用延遲執行的模型,解決了在一個業務邏輯中既有數據庫事務的操作,又有相關的非事務操作時,事務失敗或未提交而非事務操作成功導致的數據不一致問題。

文中提到的邏輯割裂和參數傳遞的問題,都是在比較復雜的場景下才有的。蘇寧售后訂單業務中此類邏輯常有出現,因此我們就這些問題進行了分析、討論,得出這樣的解決方案。并非所有的系統和業務都需要這樣。任何解決方案都要因情況而定,避免畫蛇添足。

在使用該模型時,有使用到匿名內部類和線程局部變量(ThreadLocal),在使用時,有一定的注意事項,ThreadLocal在使用結束后要通過其remove()方法移除,使用時需要留意。

作者:

王海勇,蘇寧科技集團蘇寧云軟件公司售后研發中心技術經理。從事Java開發多年,擅長業務抽象及業務架構設計,2016年9月加入蘇寧,參與售后服務域訂單平臺、時效平臺等系統平臺的研發工作。在蘇寧巨大業務量的場景下,保證系統穩定、安全、高效地提供服務。

【51CTO原創稿件,合作站點轉載請注明原文作者和出處為51CTO.com】

責任編輯:龐桂玉 來源: 51CTO
相關推薦

2016-11-29 09:00:19

分布式數據一致性CAS

2023-12-01 13:51:21

數據一致性數據庫

2009-06-18 09:18:08

Oracle檢索數據數據一致性事務恢復

2022-08-11 07:55:05

數據庫Mysql

2023-08-01 07:42:33

Redis數據項目

2012-09-24 09:35:42

分布式系統

2024-04-11 13:45:14

Redis數據庫緩存

2022-06-21 21:47:13

數據系統

2022-05-31 08:37:59

RedisMySQL數據一致性

2017-09-04 14:46:10

分布式事務問題

2021-12-26 18:24:51

MySQL InnoDB引擎

2021-03-04 06:49:53

RocketMQ事務

2022-09-06 15:30:20

緩存一致性

2025-03-27 08:20:54

2023-12-19 09:43:43

MongoDB并發

2024-12-26 15:01:29

2023-09-07 08:11:24

Redis管道機制

2023-11-20 09:28:44

2020-09-04 06:32:08

緩存數據庫接口

2024-11-14 07:10:00

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 黄色片网站国产 | 精品在线播放 | 欧美一区二区综合 | 影音先锋中文字幕在线观看 | 欧美国产日韩在线观看 | 亚洲国产成人精品女人久久久 | 日韩免费视频 | 久草热播 | 精品av | 精品一区二区三区四区视频 | 亚洲韩国精品 | 久久久国产精品 | 999精品视频在线观看 | 久久蜜桃资源一区二区老牛 | 久久五月婷 | 久久免费视频观看 | 一区二区久久 | 国产网站在线免费观看 | 精品免费视频 | 黄色大片免费看 | 国产精品视频一二三区 | 国产精品一区二区在线播放 | 日韩精品一二三 | 日韩欧美久久 | 中文字幕在线免费观看 | www.狠狠干| 亚洲精品成人网 | 欧美一级久久 | 在线观看日韩精品视频 | 国产视频中文字幕 | 一区二区三区亚洲 | 91精品一区二区三区久久久久久 | 久草久 | 国产精品福利视频 | 亚洲av一级毛片 | 日日操操| 国产亚洲精品精品国产亚洲综合 | 国产免费av在线 | 欧美日韩一区二区在线观看 | 中文字幕一区二区三区四区五区 | 亚洲aⅴ|