蘇寧如何解決事務與非事務的數據一致性問題
原創【51CTO.com原創稿件】
1、業務場景
作為擁有線上線下大數據的智慧零售平臺,蘇寧的系統對于并發和高效要求非常高。針對各種苛刻的場景,蘇寧都有相應的解決方案。
蘇寧的售后訂單系統每天要處理大量訂單的創建,修改以及數據分發的操作。
為了保證高效,我們的數據經過分庫分表存儲于數據庫集群中,同時根據一定的算法將部分活躍訂單緩存在Redis,保證訂單處理的效率。分發數據時,我們通過蘇寧自研的MQ消息平臺 (WindQ)向下游系統分發消息,處理效率上可以做到準實時,即消息能夠及時被下游接收,并立即通過反饋接口反饋回來,避免實時接口調用時可能發生因網絡,下游處理效率問題帶來的阻塞。
基于以上的背景,我們遇到了這樣的場景:
1. 在創建訂單的時候,我們要保存訂單到數據庫和緩存。
2. 同時要將下發下游的消息保存到數據庫,并通過WindQ發往下游系統。
3. 下游系統返回接收數據的結果后,需要根據返回的結果,對保存到數據庫的數據進行刪除操作。
圖1:業務模型
雖然邏輯簡單,但由于數據庫操作是在一個事務中,而Redis和發消息隊列的操作則并非能靠事務控制。
- 如果緩存成功,但是事務失敗,則可能導致我們在系統有了一份異常的訂單緩存。而實際上這個訂單并不存在。
- 另外當消息發送到下游了以后,如果下游處理速度非常快,處理結果立刻返回,處理返回結果的程序要去刪除一條已經發送成功數據。此時,有可能本地的事務尚未提交,那么,刪除操作就做了無用功。***當事務提交以后,那條應該刪掉的數據,就會一直在待處理表中,變成異常數據。
圖2:異常場景
說明:數據通過消息隊列發送下游系統,同時保存一份到數據庫是為了保證發送隊列異?;蛘呦掠卧诮邮铡⑻幚頃r發生異常的情況,可以通過數據庫保存的數據進行補償處理的一種機制,當下游系統反饋數據接收正常后,將該數據刪除。最終保證上下游數據一致。
業務場景模擬
系統由Business對外提供服務,在此過程中通過OrdeSaver和MessageSender執行具體數據處理功能。
- Business為業務的入口,所有的業務邏輯由此開始。
- OrderSaver執行的業務是保存訂單以及緩存訂單到Redis。
- MessageSender執行的業務是保存下發數據到下發表中,并將數據發送到WindQ消息隊列中。
Business
- /**
- * 完整的業務,有多個數據庫操作,以及數據庫以外的需要延遲執行的業務邏輯
- */
- public class Business {
- public void saveInfo(Map<String, Object> map){
- System.out.println("業務開始 事務開啟 保存數據操作開始" );
- new OrderSaver().saveOrderInfo(map);
- new MessageSender().saveMessageForSend(map);
- System.out.println("業務結束 事務提交 保存數據操作結束");
- }
- }
OrderSaver
- /**
- * 保存服務單的業務邏輯
- */
- public class OrderSaver {
- public void saveOrderInfo(Map<String, Object> map,ExecutorHandler executorHandler){
- System.out.println("Save order info to datebase");
- System.out.println("Cache order info into redis");
- }
- }
MessageSender
- /**
- * 下發數據的業務邏輯
- */
- public class MessageSender {
- public void saveMessageForSend(Map<String, Object> map,ExecutorHandler executorHandler){
- //保存數據到數據庫
- System.out.println("Save create message to datebase.");
- System.out.println("Send message to windq");
- }
- }
業務接口調用
- public class Sample {
- //此處模擬業務接口被調用的情況
- public static void main(String[] args) {
- Business business = new Business();
- Map<String,Object> param = new HashMap<>();
- business.saveInfo(param);
- }
- }
輸出結果
業務開始 事務開啟 保存數據操作開始
Save order info to datebase
Cache order info into redis
Save create message to datebase.
Send message to windq
業務結束 事務提交 保存數據操作結束
圖3:常規輸出
以上場景模擬了常規情況下,我們的業務處理流程,此時也就會相應地出現我們上面所描述地異常。而我們所期望的處理結果應該是:
業務開始 事務開啟 保存數據操作開始
Save order info to datebase
Save create message to datebase.
業務結束 事務提交 保存數據操作結束
Cache order info into redis
Send message to windq
圖4:期望的結果
2、解決方案
該問題如何處理呢,這個時候我們就該將緩存到Redis以及下發WindQ的操作延遲到事務提交后處理。這樣在事務沒有提交前Resdis中不會有數據,WindQ也不會將數據下發。假如事務失敗,后續也可以根據異常進行后續處理。即便事務成功緩存Redis或者發送WindQ出錯,也可以根據存入數據庫的數據進行后續的補償處理。
2.1 處理方案一:代碼轉移
我們首先會想到通過代碼轉移的方式,將邏輯代碼移動到事務外面。但這個時候問題來了......
2.1.1 問題一:邏輯割裂
我們為了描述業務模型,將現實場景盡量簡化,從模型上看,是可以將兩段非事務方法挪到事務外面來操作。但是,由于保存訂單和緩存Redis是一套操作,其使用的數據是一致的,保存下發的消息和發送WindQ也是一對呼應的操作。代碼是寫在一起的,符合邏輯和業務要求有改動時也能夠一并處理,拆開的話,對統一數據的處理上給人的感覺就不是一氣呵成了。
某些業務中并非只有一兩個成對的操作,將多個成對操作的事務-非事務關聯型邏輯強行拆開,顯得規模浩大,這種方式變得不可取。
另外原先的處理方案中,保存數據庫、緩存以及發送WindQ處理的數據是一致的,而拆開寫的話,就會導致數據要從前傳到后。要保證數據能從里面傳到外面也將成為一個問題。
2.2 處理方案二:延遲執行的模型
為了解決***方案的問題,我們做了以下的設計。
數據要往數據庫中存的時候,我們可以先把要處理的數據和要做的動作先定義好,放到一個容器中去,在事務提交后,我們再拿到這個容器,統一將之前定義好的操作和數據取出來,按要求執行。
具體怎么做呢?
經過一番思考,我們構建出了一個模型
ExecutorHandler - Executor
- Executor 可執行對象,用于定義一個需要執行的邏輯。比如將數據通過WindQ發送,或者將訂單刷入緩存。
- ExecutorHandler容器類,內部保存了一個Executor的列表。
代碼邏輯
- 在業務代碼中,我們將需要執行的業務操作封裝到Executor中。
- 定義好以后,通過ExecutorHandler的add方法,添加到容器中去。
- 在業務邏輯執行的過程中,先進行數據庫操作,而非數據庫操作只是在對應的位置進行定義,在整個事務完成以后,通過ExecutorHandler的handle方法,遍歷所有的Executor對象,執行需要延遲的非事務操作。
圖5:業務模型
Executor
- public interface Executor {
- void execute();
- }
ExecutorHandler
- public class ExecutorHandler {
- //需要執行的業務處理對象列表
- private List<Executor> executors;
- public void handle(){
- if(!(null == executors || executors.isEmpty())){
- for(Executor executor : executors){
- executor.execute();
- }
- }
- }
- public void add(Executor executor){
- if(null == executors){
- executors = new ArrayList<>();
- }
- executors.add(executor);
- }
- }
業務接口調用
- public class Sample {
- //此處模擬業務接口被調用的情況
- public static void main(String[] args) {
- Business business = new Business();
- ExecutorHandler handler = new ExecutorHandler();
- Map<String, Object> param = new HashMap<String, Object>();
- //執行業務方法,開啟事務,保存數據
- business.saveInfo(param, handler);
- //執行延遲執行的方法
- handler.handle();
- }
- }
輸出結果
業務開始 事務開啟 保存數據操作開始
Save order info to datebase
Save create message to datebase.
業務結束 事務提交 保存數據操作結束
Cache order info into redis
Send message to windq
Business
- /**
- * 完整的業務,有多個數據庫操作,以及數據庫以外的需要延遲執行的業務邏輯
- */
- public class Business {
- public void saveInfo(Map<String, Object> map,ExecutorHandler executorHandler){
- System.out.println("業務開始 事務開啟 保存數據操作開始" );
- new OrderSaver().saveOrderInfo(map,executorHandler);
- new MessageSender().saveMessageForSend(map,executorHandler);
- System.out.println("業務結束 事務提交 保存數據操作結束");
- }
- }
MessageSender
- /**
- * 下發數據的業務邏輯
- */
- public class MessageSender {
- public void saveMessageForSend(Map<String, Object> map,ExecutorHandler executorHandler){
- //保存數據到數據庫
- System.out.println("Save create message to datebase.");
- //將要延遲執行的業務邏輯定義好,注冊到容器中去
- executorHandler.add(new Executor() {
- @Override
- public void execute() {
- System.out.println("Send message to windq");
- }
- });
- }
- }
OrderSaver
- /**
- * 保存服務單的業務邏輯
- */
- public class OrderSaver {
- public void saveOrderInfo(Map<String, Object> map,ExecutorHandler executorHandler){
- System.out.println("Save order info to datebase");
- //這就是所謂的回調函數
- executorHandler.add(new Executor() {
- @Override
- public void execute() {
- System.out.println("Cache order info into redis");
- }
- });
- }
- }
2.2.1 問題二:參數傳遞
以上的方案,解決了延遲執行的問題,但是,此刻我們發現,由于要使用ExecutorHandler,這個時候就需要隨時隨地將該對象傳遞下去,要考慮如何降低該對象的侵略性。
- 靜態變量:在使用中需要考慮同步和清理的問題,很容易在多線程的情況下使得邏輯變得混亂,不采用。
- 成員變量:同樣也存在著數據清理的問題,不推薦使用,不采用。
2.2.2 問題二處理方案:使用ThreadLocal參數傳遞
有沒有生命周期是整個線程內的呢?這時我們就需要用到ThreadLocal了。
通過ThreadLocal來獲取ExecutorHandler 可以作為有效的解決方案。
由于ThreadLocal對象最終在使用完的時候需要remove掉,因此,該方法需要集中統一調用。
實現時,我們定義了HandlerThreadLocal類。
HandlerThreadLocal對象負責通過 ThreadLocal的get方法來獲取線程本地的ExecutorHandler對象,并執行其 handle方法(具體實現可以參照后面的代碼)。
執行完業務操作以后,通過調用remove方法將其銷毀。
2.2.3 異常的捕捉和處理DelayedCallHandler
由于ThreadLocal的remove方法是一定需要被執行,因此該方法應該放在一個try- catch - finally 塊的finally段中,保證其不被遺漏。
- DelayedCallHandler通過handle()方法調用業務邏輯。
- 在調用完業務邏輯后,調用ExecutorHandler的handle()方法,執行已經注冊到延遲調用容器中的業務方法。
- ***在finally中將ThreadLocal 對象remove掉。
整個DelayedCallHandler的handle方法就是一個完整的try- catch - finally 塊。
2.2.4 標準化定義:DelayablelService需要延遲調用的業務類
由于DelayedCallHandler已經模塊化,業務方法***也定義成一個具體的方法名(doBusiness),所有的業務處理類,實現DelayedCallHandler接口,在doBusiness方法中調用有事務的業務邏輯。
3、最終實現方案
基于處理方案二的分析,***我們使用ThreadLocal來傳遞業務數據。
我們通過ThreadLocal
在業務邏輯MessageSender 、OrderSaver中通過executorHandler將需要延遲執行的業務定義好。
在HandlerThreadLocal中,使用 executorHandler處理之前定義好的邏輯。
這樣做將事務和非事務分開,不再以方法參數的方式向下游傳遞數據,使得數據傳遞得以結構,處理起來更加優雅。
示例代碼如下。
業務接口調用
- public static void main(String[] args) {
- Business b = new Business();
- Map<String, Object> map = new HashMap<String, Object>();
- DelayedCallHandler<Map<String, Object>> bu = new DelayedCallHandler<Map<String, Object>>();
- bu.handle(b,map);
- }
輸出結果:
業務開始 事務開啟 保存數據操作開始
Save order info to datebase
Save create message to datebase.
業務結束 事務提交 保存數據操作結束
Cache order info into redis
Send message to windq
HandlerThreadLocal
- public class HandlerThreadLocal {
- public static final ThreadLocal<ExecutorHandler> executorHandler = new ThreadLocal<ExecutorHandler>(){
- protected ExecutorHandler initialValue(){
- return new ExecutorHandler();
- }
- };
- public static final void handle(){
- executorHandler.get().handle();
- }
- public static final void remove(){
- executorHandler.remove();
- }
- }
DelayedCallHandler
- public class DelayedCallHandler<T> {
- public void handle(DelayablelService<T> buisnes,T param){
- try {
- //先執行業務操作
- buisnes.doBusiness(param);
- //執行延遲執行的業務
- HandlerThreadLocal.handle();
- } catch (Exception e) {
- //處理異常
- }finally {
- HandlerThreadLocal.remove();
- }
- }
- }
DelayablelService1.
- public interface DelayablelService {
- public void doBusiness(T param);
- }
Business
- /**
- * 完整的業務,有多個數據庫操作,以及數據庫以外的需要延遲執行的業務邏輯
- */
- public class Business implements DelayablelService< Map<String, Object> > {
- @Override
- public void doBusiness(Map<String, Object> map){
- saveInfo(map);
- }
- public void saveInfo(Map<String, Object> map){
- System.out.println("業務開始 事務開啟 保存數據操作開始" );
- new OrderSaver().saveOrderInfo();
- new MessageSender().saveMessageForSend();
- System.out.println("業務結束 事務提交 保存數據操作結束");
- }
- }
MessageSender
- /**
- * 下發數據的業務邏輯
- */
- public class MessageSender {
- public void saveMessageForSend(){
- ExecutorHandler executorHandler = HandlerThreadLocal.executorHandler.get();
- System.out.println("Save create message to datebase.");
- executorHandler.add(new Executor() {
- @Override
- public void execute() {
- System.out.println("Send message to windq");
- }
- });
- }
- }
OrderSaver
- /**
- * 保存服務單的業務邏輯
- */
- public class OrderSaver {
- public void saveOrderInfo(){
- System.out.println("Save order info to datebase");
- ExecutorHandler executorHandler = HandlerThreadLocal.executorHandler.get();
- //這就是所謂的回調函數
- executorHandler.add(new Executor() {
- @Override
- public void execute() {
- System.out.println("Cache order info into redis");
- }
- });
- }
- }
4、總結
使用延遲執行的模型,解決了在一個業務邏輯中既有數據庫事務的操作,又有相關的非事務操作時,事務失敗或未提交而非事務操作成功導致的數據不一致問題。
文中提到的邏輯割裂和參數傳遞的問題,都是在比較復雜的場景下才有的。蘇寧售后訂單業務中此類邏輯常有出現,因此我們就這些問題進行了分析、討論,得出這樣的解決方案。并非所有的系統和業務都需要這樣。任何解決方案都要因情況而定,避免畫蛇添足。
在使用該模型時,有使用到匿名內部類和線程局部變量(ThreadLocal),在使用時,有一定的注意事項,ThreadLocal在使用結束后要通過其remove()方法移除,使用時需要留意。
作者:
王海勇,蘇寧科技集團蘇寧云軟件公司售后研發中心技術經理。從事Java開發多年,擅長業務抽象及業務架構設計,2016年9月加入蘇寧,參與售后服務域訂單平臺、時效平臺等系統平臺的研發工作。在蘇寧巨大業務量的場景下,保證系統穩定、安全、高效地提供服務。
【51CTO原創稿件,合作站點轉載請注明原文作者和出處為51CTO.com】