本地消息表這個方案最初是ebay提出的,核心就是將需要分布式處理的任務通過本地消息日志存儲的方式來異步執行。該方案可以存到本地文本,數據庫或消息隊列,再通過異步線程或者自動job發起重試。

1、背景介紹
供應鏈倉儲域子域繁多,例如庫存域,lpn域等,平時開發的過程中涉及很多分布式事務的場景,例如收貨加庫存,發貨扣庫存,揀貨入箱,發貨出箱等一些分布式事務場景,所以迫切需要出一套分布式事務處理方案,在調研了市場上的分布式事務解決方案,結合wms自身業務域不是強一致性的特色,選擇了最終一致性,且使用本地消息表去實現它。
本地消息表這個方案最初是ebay提出的,核心就是將需要分布式處理的任務通過本地消息日志存儲的方式來異步執行。該方案可以存到本地文本,數據庫或消息隊列,再通過異步線程或者自動job發起重試。
2、設計前的思考
在操作本地事務的同時,需要額外寫入一張需要最終一致性業務記錄的表,即本地消息表,且該業務記錄是有狀態的,在本地事務提交后,再執行需要最終一致性的方法,成功后更新記錄狀態為成功,如果失敗了,還要引入兜底重試機制,下圖能簡單介紹它的功能:

為了實現以上最終一致性的功能。我們同樣還需要做到以下幾點:
- 無侵入:這個好理解,對于需要最終一致性的場景,可以很簡單的實現
- 策略化:包括重試次數,重試的間隔時間,是否使用異步方式等
- 通用性:最好是無改動(或者很小改動)的支持絕大部分的場景,拿過來直接可用
- 復用性:對于異常場景存在需要重試場景,同時希望把正常邏輯和重試邏輯復用
3、架構設計
調研了大家對一致性框架的訴求,最終定義了如,入參自定義序列化,環境隔離,同步異步執行切換,注解支持,自定義執行攔截器,以及適配得物倉儲業務的業務上下文訂制以及持久化等一系列的核心能力,底層依賴了Spring的生態,在數據存儲依賴了Mysql,Mongodb,緩存分布式鎖上依賴了Redis等一系列主流的中間件,最終以jar包形式實現,盡可能做到即拿即用。

4、詳細設計
基于在以上的架構設計后,做了以下設計
4.1 注解支持:@EnableConsistency
為了讓用戶更快,更方便的接入一致性框架,我們在早期的抽象類繼承的方案上做了一版本升級,使用注解,使得使用方式跟@Transactional注解一樣,只要加上@EnableConsistency就支持最終一致性的支持,非常方便。
4.2 自動重試&重試等待策略可配:
最終一致性有個天然的組成部分就是需要重試,一致性框架也不例外,引用了Spring的
ScheduledTask實現定時重試那些運行失敗的記錄,另外重試等待策略同樣可配置:
4.2.1 重試等待策略
固定時間重試
支持配置固定時間間隔重試
延遲指數重試
底層采用Math.pow函數在重試次數越多次,執行間隔時間呈指數級增長

4.2.2 自定義重試次數
注解式支持重試次數的定義
4.3 自定義攔截器
在執行需要最終一致性方法的時候,我們同樣提供了如Spring AOP一樣被代理方法的前置,成功,異常后需要做的一些切面功能,非常方便的滿足使用者的擴展,解耦了實現與擴展。

/**
* 在記錄重試次數失敗后 執行
* @param context
* @param lastException
*/
void close(ConsistencyContext context, Throwable lastException);
/**
* 開始執行重試前 攔截器
* 如果返回false 則 執行期不繼續進行
* @param context
* @return
*/
boolean open(ConsistencyContext context);
/**
* 發生異常攔截器
* @param context
* @param throwable
*/
void onError(ConsistencyContext context, Throwable throwable);
4.4 業務上下文&持久化
在業務系統的開發中,存在一個ThreadLocal的上下文,記錄了用戶的組織架構,簽到等一系列用戶信息。在設計一致性框架的時候我們考慮到用戶上下文的存在,暴露了業務上下文擴展,以及存儲業務上下文供重試時使用的能力。

public interface ContextListener {
/**
* 獲取上下文內容
* @return
*/
Map<String,String> getContext();
/**
* 設置用戶上下文
* @param contextMap
*/
void setContext(Map<String, String> contextMap);
/**
* 清除用戶上下文
* @param contextMap
*/
void clear(Map<String, String> contextMap);
}
4.5 環境隔離
由于得物的預發環境與生產環境采用的同一批數據庫,故也將一致性框架記錄采用了spring.profile.active的值做環境隔離,確保重試時不會預發的跑到生產的數據。

4.6 入參自定義序列化
由于需要在本地消息表中記錄需要重試的方法的入參,故就涉及到入參序列化的問題,在思考良久之后,只提供默認的Json方式的序列化與反序列化,如果用戶需要額外的序列化與反序列化方法,我們也支持,提供了暴露序列化與反序列化的口子供用戶實現。
public interface SerializerListener{
/**
* 進行序列化
* @param params
* @return
*/
String serializer(Object[] params);
/**
* 反序列化
* @param str
* @return
*/
Object[] deserializer(String str);
}
4.7 執行模式可配置
一般用本地消息執行最終一致性的部分,開始的設計是異步化執行,后續收到使用者用戶的反饋,也有部分同步執行的場景,故增加了同步異步執行開關,開發者自行選擇。
4.8 數據模型&狀態機

4.9 核心代碼展示
4.9.1ExecutorAutoConfiguration框架初始化
/**
* 加載為切面增強提供織入接口advice,和注入advice的pointcut
*/
@PostConstruct
public void init() {
Set<Class<? extends Annotation>> eventualConsistencyAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(1);
eventualConsistencyAnnotationTypes.add(EventualConsistency.class);
this.pointcut = buildPointcut(eventualConsistencyAnnotationTypes);
this.advice = buildAdvice();
buildExecutorManager();
if (this.advice instanceof BeanFactoryAware) {
((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
}
}
4.9.2 核心注解@EventualConsistency
使用最終一致性方法的核心注解:
public @interface EventualConsistency {
/**
* 第一次執行是否異步執行
* @return
*/
boolean async() default true;
/**
* 最大重試次數
* @return
*/
int maxRetryTimes() default 6;
/**
* 延遲時間
* @return
*/
Delay delay() default @Delay;
/**
* Bean names 攔截器
* @return retry listeners bean names
*/
String[] listeners() default {};
@Deprecated
String label() default "";
String beanName() default "";
/**
* Bean names 攔截器 用來進行序列化和反序列化
* @return retry listeners bean names
*/
String serializerListener() default "";
String referenceNo() default "";
@AliasFor(annotation = Transactional.class)
Class<? extends Throwable>[] rollbackFor() default Exception.class;
@AliasFor(annotation = Transactional.class)
Class<? extends Throwable>[] noRollbackFor() default {};
boolean manageContext() default true;
}
4.9.3 核心實現Advice,MethodInterceptor的AnnotationAwareRetryOperationsInterceptor

@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
// 獲取最終執行攔截的委托
MethodInterceptor delegate = getDelegate(invocation.getThis(), invocation.getMethod());
if (delegate != null) {
return delegate.invoke(invocation);
} else {
return invocation.proceed();
}
}
4.9.4 延遲執行策略
public interface DelayPolicy {
/**
* 獲取下次執行時間
* @return
*/
Long nextTime();
}
/**
* 延遲指數
*/
public class ExponentialDelayPolicy implements DelayPolicy {
public static final long DEFAULT_INITIAL_INTERVAL = 2L;
public static final int DEFAULT_MULTIPLIER = 3;
/**
* 默認重試最大延遲時間 (24小時)
*/
public static final long DEFAULT_MAX_INTERVAL = 86400L;
/**
* 初始間隔
*/
private volatile long initialInterval = DEFAULT_INITIAL_INTERVAL;
private volatile int multiplier = DEFAULT_MULTIPLIER;
/**
* 最大延遲時間
*/
private volatile long maxInterval = DEFAULT_MAX_INTERVAL;
@Override
public Long nextTime() {
ConsistencyContext context = ConsistencyContextHolder.getContext();
Double pow = Math.pow(initialInterval + context.getRetryCounts(), multiplier);
if(pow > maxInterval){
return maxInterval;
}
return pow.longValue();
}
}
/**
* 固定時間
*/
public class FixedDelayPolicy implements DelayPolicy {
private static final long DEFAULT_DELAY_PERIOD = 10L;
private volatile long delayPeriod = DEFAULT_DELAY_PERIOD;
@Override
public Long nextTime() {
return this.delayPeriod;
}
}
4.9.5 AsyncConsistencyExecutor異步最終一致性執行

5、未來展望
(1)后臺管理頁面設計,支持報表查詢,以及錯誤異常處理
(2)trace監控接入,方便定位問題
(3)適配業務支持類型DB
(4)自定義歸檔策略
最終一致性框架是由wms全組同學一起設計和開發完成,并且陪伴了得物快速發展的兩年多,經歷了2個618以及3個雙十一,若干個情人節,圣誕節的考驗。系統運行健康,無性能瓶頸,提升了很多場景下最終一致性的開發速度。目前仍在安全穩健的保障著倉儲域服務的正常運轉。