不可思議!億級數據竟然如此輕松同步至ES!
1 這是一個背景
最近接了一個需求,要提供一個隨意組合多個條件來查詢訂單數據的功能,看著數據庫里過億的訂單量,頭發不爭氣的又脫落了兩根代表這個需求不簡單。
圖片
脫落的兩根頭發,不是技術實現上很難,其實技術實現上清晰明了,就是通過數據異構,將數據同步到ES,利用ES的倒排索引、緩存等能力,提供多條件復雜查詢的能力,而ES集群我們已經有了。
但有些數據,在目前的ES索引中是不存在的,也就是說,我需要將過億的訂單數據從訂單數據庫重新刷一遍到ES中,而這一頓操作下來得需要一周的時間!
什么?你不信,那咱們來捋一捋。
2 捋一捋訂單數據同步到ES中的復雜度
2.1 數據同步ES索引流程
圖片
如上圖所示,就是將數據同步到ES索引的過程。
首先需要從訂單數據庫查詢所有的訂單數據,然后根據訂單數據上保存的用戶ID,商品ID等信息從用戶服務,商品服務查詢相關信息,經過處理與組裝后落到ES集群中。
之所以要查詢用戶信息和商品信息,是因為異構在ES索引中的訂單數據,并不會與mysql中的數據一一對應,有很多根據商品類目,用戶信息等查詢訂單信息的訴求存在,因此在這里就需要查詢很多的上游服務來組裝信息。
2.2 來梳理下是否有難點?
- 從數據庫把上億的訂單數據讀取出來。這個操作不能影響到線上業務,因此查詢的訂單數據庫一般是從庫,OK,配置多數據源來讀取數據吧,而且上億的訂單一般采用的都是分庫分表來存儲的,我們是分了16個庫,每個庫16個表,總共256張表,嘿嘿。
- 上億的訂單數據不能一次性全部讀取到內存吧,不然內存冒煙都存不下啊。所以得考慮分頁,分頁直接limit也不好,隨著數據量越大,速度越慢,所以得考慮一個游標,嗯,選一個字段當游標吧,游標最好唯一且遞增。
- 從多個服務獲取數據,這些數據所在的服務一般都屬于公司的其它部門,讀取數據的時候也不能影響到人家的服務吧,你這里查詢的是嘎嘎猛,一看人家的服務都崩了,這個黑鍋就飛來了。所以這里得考慮限流吧,得考慮隔離吧?不說全鏈路隔離,成本太高,起碼關鍵服務得隔離一下。
- 數據同步一段時間,產品來問,同步多久了啊,大概還有多久能完成啊,數據量大概是多少啊,一臉懵,不知道啊。
- 如果中途同步失敗了,咋處理啊,是不是得重試,咋重試,重試策略是啥?失敗有沒有報警,能不能及時感知并處理啊?如果同步一段時間中斷了咋整啊?有沒有記錄從哪中斷的?能否從中斷處繼續同步啊,不然從頭開始又得N天,哭了。
- 同步了一部分,發現有問題需要暫停一會,咋整?
- 如果只想同步部分數據不一致的訂單數據,可能就2,3個訂單,咋整,是不是還得提供按照手動輸入訂單ID同步ES數據的能力?
- 同步過程是咋樣的?開始時間?結束時間?共耗時多久?操作人是誰?這些統計數據從哪來?
- 想夜深人靜的時候同步數據,這有時候對業務的影響小,定個鬧鐘晚上起?
- 現在不單需要同步訂單的數據了,還需要同步商品ES集群的數據,這些邏輯還得重新寫一遍?
啊啊啊啊,想想都頭疼啊!
所以,一些事情看著簡單,其實并沒有那么簡單。
3 神奇的服務
為了讓頭發更有歸屬感,針對上述的難點開發了一款神奇的服務,那就是ECP。它可以將整個流程自動化、可視化的處理,降低數據異構到ES的成本任務界面如下所示:
圖片
3.1 ECP的簡單運行流程
簡單來說,ECP的作用就是將數據從數據源讀取出來,然后推送給ES寫服務。因為數據處理的邏輯因不同的業務而異,ES寫服務由各個對接方來實現,因此一個簡單的流程如下圖:
圖片
這里面涉及到一些技術細節,比如如何進行多數據源數據讀取,數據源配置,sql校驗,動態限流、SPI機制、重試策略與故障感知、探活與故障恢復,環境隔離等等。
下面一一介紹下:
3.2 多數據源數據讀取
ECP支持目前支持三個數據源數據的讀取,分別為ID源,文本源、以及腳本源。
3.2.1 ID源
有個文本框用來輸入ID。這種場景適用于小數據的數據同步,比如發現一些數據庫和ES的數據不一致了,就簡單的刷一下數據。
圖片
3.2.2 文件源
文件源指的是數據源來源于文本文件,適合中等數據的同步。ECP和對象存儲進行了對接,用戶可以上傳文件至對象存儲,在任務執行時,ECP會讀取對象存儲中的文本數據。
這種情況需要注意的是,用戶上傳的文件有可能會比較大,直接都讀取到內存再處理不現實,因此這里采用的是流的方式進行讀取,讀取一批處理一批,再釋放一批,不會造成OOM。
圖片
簡化的處理方式如下:
try (Response response = OK_HTTP_CLIENT.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
// 以流的方式讀取文件數據
InputStream inputStream = response.body().byteStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
}
3.2.3 腳本源
腳本源適用于大數據量的數據同步。
腳本本質上就是SQL和數據源的結合。
用戶在ECP中配置數據庫的連接信息,然后配置SQL。ECP會執行該SQL,將數據從配置的數據庫中讀取出來,推送到ES寫服務中。
腳本源可以支持上億數據的讀取與推送,如下圖為訂單庫(分庫分表)配置的腳本信息:
圖片
3.2.4 腳本源大數據讀取的實現
將幾億數據讀取到內存中來處理顯然不可能,因此采用局部數據的讀取與處理才是正道。
在業務中,經常使用的是分頁,但分頁如果僅是使用limit offset,size,待offset的值比較大時,性能會急劇下降,形成慢SQL,甚至拖累整個數據庫的性能。
因此在分頁數量比較大時,需要指定一個有索引的字段作為游標,該游標可以提高分頁的性能,如在訂單表中,若在訂單ID是遞增的且有設置了索引,SQL就可以這么寫:select * from t_order where order_id > xxx order by order_id desc limit 10; 利用order_id值的變化就可以起到分頁的效果。
這種方式雖好,但讓用戶選定游標索引無疑增加了使用的門檻,因此ECP沒有采用上述分頁的形式來讀取大數據,而是采用JDBC游標查詢的方式,如下所示:
// 建立連接
conn = DriverManager.getConnection(url, param.getDsUsername(), param.getDsPassword());
// 創建查詢
stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(param.getFetchSize());
游標查詢每次讀取fetchSize大小的數據量,可以很好的避免讀取大數據量導致的OOM問題。
3.3 SQL的解析與校驗
用戶配置SQL腳本,ECP需要對該SQL腳本進行校驗與修改,傳統的字符串處理(比如正則)雖然在一定情況下可以滿足需求,但是容易出錯。因此ECP采用的是Druid的SQL解析工具包,可以將SQL解析成AST語法樹,以便對SQL進行各種處理。如下圖所示:
圖片
ECP提供的數據樣例查詢,會對SQL自動拼接上limit 1。
圖片
圖片
3.4 動態限流的實現
限流分集群限流和單機限流,經過評估,在能簡單就簡單的原則下,我們采用的是單機限流,限流組件使用的是guava的RateLimiter。
圖片
當在頁面上修改QPS的值時,會將該值同步到數據庫中,有個調度任務會不斷地掃描該值的變動,將變動的值同步到RateLimiter組件中。
當然,也可以采用數據監聽的策略(比如廣播MQ),讓變動值同步到RateLimiter更及時,但這種方式還需引入其它組件,復雜度嗷嗷上升,不符合我們簡單實現的策略。
動態限流的實現流程如下:
圖片
如下圖是在不同的時間點修改了限流值后的QPS變化圖:
圖片
3.5 重試策略與故障感知
ES中和DB中的數據要盡可能的保證實時一致性,但最終一致性是必須要保證的,所以數據推送、處理失敗的時候要進行重試,如何重試?
首先需要了解下失敗的類型,制定合適的重試策略,知彼知己,百戰不殆嘛!
一、網絡抖動導致的接口調用超時。在調用微服務RPC接口的時候,由于網絡抖動等情況,會導致接口調用超時,但很快就會恢復,通常情況下也就偶爾一次,下一次調用就會正常。
二、數據處理邏輯異常。這種情況下,異常沒辦法自恢復,只能人工介入。
三、上游服務異常。如上游服務壓力過大導致接口調用失敗,這時候就需要我們緩一緩再繼續處理,不能一個勁的調用導致上游服務崩潰掉。
結合上面的失敗類型的特點,斐波那契數列的重試策略就非常適合 斐波那契數列的特點是:1,1,2,3,5,8,13,21,34,55,89…
當第一次失敗的時候,延時1秒后就重試,如果此時是網絡抖動導致的超時,重試就成功了,不影響數據處理的速度 若失敗的次數越多,重試的間隔時間就會越長,這也會兼顧到上述二、三的失敗類型。
重試組件使用的是Guava Retry,簡單的偽代碼如下:
// 重試組件配置
private final Retryer<Boolean> RETRYER = RetryerBuilder.<Boolean>newBuilder()
// 對中斷類的異常不重試
.retryIfException(input -> !isPauseException(input))
// 1,1,2,3,5,8,13,21,33...
.withWaitStrategy(WaitStrategies.fibonacciWait(1000, 30, TimeUnit.SECONDS))
// 重試次數達到一定的次數后,不再重試
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_TIMES))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
if (attempt.hasException()) {
log.error("act=【DataFlushRpcCallRetry】desc=【重試】重試次數=【{}】重試異常=【{}】", attempt.getAttemptNumber(), attempt.getExceptionCause());
// 重試超過閾值進行報警提醒
alarmIfExceedThreshold(attempt);
}
}
})
.build();
// 將執行邏輯抽象為Runnable,對外暴露該方法
public void execute(Runnable runnable) {
innerExecute(runnable,RETRYER);
}
private void innerExecute(Runnable runnable, Retryer<Boolean> retryer) {
try {
retryer.call(() -> {
runnable.run();
return true;
});
} catch (Exception e) {
log.error("act=【DataFlushRpcCallRetry】desc=【重試異常】error=【{}】", e);
throw new IllegalStateException(e);
}
}
若重試到一定次數之后依然是失敗的話,則會將錯誤信息發送到報警群。根據推送的信息,可以明確知道錯誤的類型,重試的次數,以及任務的創建人等等信息,無需查看日志,即可定位大部分的問題。如下圖:
圖片
3.6 將數據推送給哪個服務來處理?-SPI機制
ECP是個通用的服務,因此需要將共性功能收攏在一起做成成品,將非共性的功能抽象一下,交給各個對接方去實現。
從簡單實現的角度來看,若有某個服務想要對接ECP,我們在ECP上開發一下,調用該服務的接口,將數據推送給該服務,思路雖清晰明了,但對接及維護成本極高,且沒有一個統一的規范,因此不可取,其流程如下圖:
圖片
Java上有個很好的思想可以解決這個問題,那就是SPI。因此由ECP提供一個接口,制定一個規范,具體的ES索引數據的組裝邏輯由各個對接方去實現。
這樣,若有一個新的對接方接入,只要實現接口即可,ECP無需做任何改動。
圖片
至于服務發現,ECP采用的配置的方式,也就是在新建任務的時候,選擇數據推送的消費方服務,如下圖:
圖片
對于實現方式,得益于公司內部自研的RPC框架,提供了動態指定調用服務的方式,偽代碼如下:
Reference<IEsIndexFlushAPI> reference = new Reference<>();
// 設置調用的服務名
reference.setServiceName(serviceName);
// 設置接口名
reference.setInterfaceClass(IEsIndexFlushAPI.class);
// 設置上下文
reference.setApplicationConfig(applicationConfig);
// 獲取接口實例
IEsIndexFlushAPI iEsIndexFlushAPI = ES_INDEX_FLUSH_API_MAP.computeIfAbsent(serviceName, s -> reference.refer());
// 接口調用
log.info("act=【EsIndexFlushApiInvoker】desc=【請求值】serviceName=【{}】dataListSize=【{}】indexNameList=【{}】tag=【{}】", serviceName,request.getDataList().size(),request.getIndexNameList(),request.getTag() );
EMApiResult<FlushResponse> result = iEsIndexFlushAPI.flush(request);
3.7 環境隔離
同步數據是個比較重的操作,這個操作不應該影響到線上業務 因此,同步數據的服務應當與線上服務隔離開 ECP整合了架構組提供的標簽路由功能,可以在整個請求鏈路中調用指定標簽的服務,實現環境隔離。
ECP標簽路由配置圖:
圖片
如下圖,若在ECP上配置任務的標簽路由為FLUSH,則在同步任務執行過程中,會自動調用鏈路中綁定了FLUSH標簽的服務分組。
圖片
若某些服務沒有配置為FLUSH標簽的分組,這時就會自動請求該服務的線上正常環境。這樣,就可以做到一定程度上的環境隔離。
圖片
3.8 探活與任務故障恢復機制
在推送數據的過程中,若發生了不可描述的事情導致任務中斷,咋整?
到了需求DeadLine,發現任務在某年某月某日進度為1%的時候停了,哭了。
而且工作時間緊,任務重,總不能一定盯著任務,看有沒有中斷吧?這不適合,也不禮貌。
當然,這種情況在ECP是不會發生的,因為ECP是有“自救包”的。下面聊下ECP的任務探活和中斷恢復機制。
如下圖,在ECP中有探活和任務故障恢復兩大組件 探活組件負責監控當前任務線程的執行狀態,若任務線程正在執行,則對該任務的存活時間進行續期 任務故障恢復組件負責掃描當前未完成的任務,若任務上次存活時間大于指定的閾值時,則拉取該任務恢復執行。
圖片
續期的偽代碼如下:
@Scheduled(fixedDelay = ScheduleTimeConstants.KEEP_ALIVE_MILLS)
public void renewal(){
futureMap.forEach((taskId,future)->{
if (!future.isDone()){
log.info("act=【renewal】desc=【任務續期】taskId=【{}】續期時間=【{}】",taskId, DateUtils.dateToString(new Date(),DateUtils.PATTERN));
contextService.renewal(taskId);
}else {
log.info("act=【renewal】desc=【任務結束】taskId=【{}】",taskId);
futureMap.remove(taskId);
}
});
}
任務故障恢復的偽代碼如下:
@Scheduled(fixedDelay = ScheduleTimeConstants.RESTART_TASK_MILLS)
public void restartTask(){
// 1.查詢當前未完成的任務
List<TaskFlushExecuteContextPO> contextPOS = contextService.queryRunningTask();
for (TaskFlushExecuteContextPO contextPO : contextPOS) {
// 2.計算上次存活到當前的時間
Integer durationMin = calculateTimeSinceLastAlive();
// 3.若時間大于指定閾值 則對任務重新拉起
if (durationMin >= MAX_DURATION_MIN){
log.info("act=【restartTask】desc=【任務重新拉起】taskId=【{}】",contextPO.getTaskId());
// 4.更新alive_time進行鎖定 防止并發執行
int i = contextExtMapper.casUpdateAliveTime();
if (i >0){
// 5.重新拉起任務
restart0(contextPO, aliveTime);
}
}
}
}
3.9 平滑遷移的實現
將數據同步到ES,通常有兩種方式:
- 直接把數據同步到原索引上。
- 新建一個索引,利用雙寫以及切換別名的方式實現流量的平滑遷移。
對于新建一個索引的場景,往往是索引Mapping的改變,或者是為了不影響原索引,保證操作可回滾。
針對這種場景,ECP分析了歷來大家手動操作刷ES索引的步驟,將流程進行抽象,歸納了以下幾個步驟,如下圖:
圖片
ECP提供了平滑遷移組件,其內部整合了Apollo配置中心實現推送能力,其簡要的實現流程如下圖:
圖片
3.10 優雅的日志記錄
如下圖所示展示了該任務操作的日志,原則上日志記錄為非核心業務,需要與核心業務代碼進行剝離,因此使用注解式流水記錄是個很好的選擇。
圖片
但注解式流水記錄有個問題,就是在很多的場景下,流水里面的值需要動態獲取,利用注解可以實現嗎? 答案是可以的,在上圖所示中,任務ID、數據來源都是動態數據,那如何實現的呢?看下面代碼:
@Flow(subjectIdEp = "#taskPO.id",subjectType = SubjectTypeEnum.TASK,operateFlowType = OperateFlowTypeEnum.CREATE_TASK,content = "'創建任務,任務ID:' + #taskPO.id ")
public void saveTaskWithUser(TaskPO taskPO) {
String name = LoginUserContext.get().getName();
taskPO.setCreator(name);
taskPO.setModifier(name);
taskMapper.insertSelective(taskPO);
}
subjectIdEp為流水主題ID,#taskPo.id為一個表達式,可用動態獲取參數taskPo中的id值,這里利用了springEl表達式的能力。
content = "'創建任務,任務ID:' + #taskPO.id " 為流水信息,同樣利用了springEL表達式,動態獲取請求參數taskPo中的id信息。
但有些信息需要一系列的計算才可以獲取到,而不是單純的從對象中取值,這也是可以實現的。如下:
@Flow(subjectIdEp = "#contextPO.taskId",
subjectType = SubjectTypeEnum.TASK,
operateFlowType = OperateFlowTypeEnum.DATA_FLUSH,
content = "'【數據同步】異常中斷任務恢復執行,中斷時間:' + T(com.zhuanzhuan.esmanage.utils.DateUtils).dateToStringSimple(#aliveTime)")
@Transactional(rollbackFor = Exception.class,isolation = Isolation.REPEATABLE_READ)
public void restart0(TaskFlushExecuteContextPO contextPO, Date aliveTime) {
log.info("act=【restartTask】desc=【任務重新拉起】taskId=【{}】原aliveTime=【{}】", contextPO.getTaskId(), aliveTime);
dsProcessorExecutor.executeAndKeepAliveMonitor(contextPO.getTaskId());
}
其中T(com.zhuanzhuan.esmanage.utils.DateUtils).dateToStringSimple(#aliveTime) 代表執行的是DateUtils.dateToStringSimple 方法,也就是說表達式是可以調用方法的,包括從spring容器中獲取對象,調用對象的方法均可。
這種注解式流水的實現原理,就是利用SPEL表達式和Spring Aop的特性,寫一個切面,攔截自定義的flow注解即可,偽代碼如下:
// 定義切面,攔截FLOW注解
@Around("@annotation(com.zhuanzhuan.esmanage.entity.annotation.Flow)")
public Object around(ProceedingJoinPoint point) throws Throwable {
// 調用目標方法
Object result = null;
try {
result = point.proceed();
recordFlow(point,result);
return result;
} catch (Throwable e) {
recordException(point,e);
throw e;
}
}
// 流水記錄的實現
private void recordFlow(ProceedingJoinPoint point, Object result) {
// try catch 防止影響主邏輯
//TODO 看是否需要寫在一個事務中,主要評估流水的重要性
try {
MethodSignature signature = (MethodSignature) point.getSignature();
Flow flowAnnotation = getFlowAnnotation(signature);
// 組裝參數上下文
EvaluationContext evaluationContext = buildContext(point, signature);
evaluationContext.setVariable("result",result);
// ID表達式
String subjectIdEp = flowAnnotation.subjectIdEp();
// content表達式
String content = getContent(flowAnnotation, evaluationContext);
// SPEL解析表達式
Expression expression = PARSER.parseExpression(subjectIdEp);
Integer subjectId = (Integer)expression.getValue(evaluationContext);
record(flowAnnotation, subjectId, content);
} catch (Exception e) {
log.error("記錄操作流水失敗", e);
}
}
4 總結
總得來說,ECP的實現中有很多的技術細節需要考慮,技術難度一般。
實際上,在我們大部分的項目中,考驗的就是對細節的把控。
ps:感謝ChatGPT對本文名稱的大力支持
關于作者
閆展,轉轉交易中臺研發工程師