成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

不可思議!億級數據竟然如此輕松同步至ES!

云計算 云原生
文件源指的是數據源來源于文本文件,適合中等數據的同步。ECP和對象存儲進行了對接,用戶可以上傳文件至對象存儲,在任務執行時,ECP會讀取對象存儲中的文本數據。

1 這是一個背景

最近接了一個需求,要提供一個隨意組合多個條件來查詢訂單數據的功能,看著數據庫里過億的訂單量,頭發不爭氣的又脫落了兩根代表這個需求不簡單。

圖片圖片

脫落的兩根頭發,不是技術實現上很難,其實技術實現上清晰明了,就是通過數據異構,將數據同步到ES,利用ES的倒排索引、緩存等能力,提供多條件復雜查詢的能力,而ES集群我們已經有了。

但有些數據,在目前的ES索引中是不存在的,也就是說,我需要將過億的訂單數據從訂單數據庫重新刷一遍到ES中,而這一頓操作下來得需要一周的時間!

什么?你不信,那咱們來捋一捋。

2 捋一捋訂單數據同步到ES中的復雜度

2.1 數據同步ES索引流程

圖片圖片

如上圖所示,就是將數據同步到ES索引的過程。

首先需要從訂單數據庫查詢所有的訂單數據,然后根據訂單數據上保存的用戶ID,商品ID等信息從用戶服務,商品服務查詢相關信息,經過處理與組裝后落到ES集群中。

之所以要查詢用戶信息和商品信息,是因為異構在ES索引中的訂單數據,并不會與mysql中的數據一一對應,有很多根據商品類目,用戶信息等查詢訂單信息的訴求存在,因此在這里就需要查詢很多的上游服務來組裝信息。

2.2 來梳理下是否有難點?

  1. 從數據庫把上億的訂單數據讀取出來。這個操作不能影響到線上業務,因此查詢的訂單數據庫一般是從庫,OK,配置多數據源來讀取數據吧,而且上億的訂單一般采用的都是分庫分表來存儲的,我們是分了16個庫,每個庫16個表,總共256張表,嘿嘿。
  2. 上億的訂單數據不能一次性全部讀取到內存吧,不然內存冒煙都存不下啊。所以得考慮分頁,分頁直接limit也不好,隨著數據量越大,速度越慢,所以得考慮一個游標,嗯,選一個字段當游標吧,游標最好唯一且遞增。
  3. 從多個服務獲取數據,這些數據所在的服務一般都屬于公司的其它部門,讀取數據的時候也不能影響到人家的服務吧,你這里查詢的是嘎嘎猛,一看人家的服務都崩了,這個黑鍋就飛來了。所以這里得考慮限流吧,得考慮隔離吧?不說全鏈路隔離,成本太高,起碼關鍵服務得隔離一下。
  4. 數據同步一段時間,產品來問,同步多久了啊,大概還有多久能完成啊,數據量大概是多少啊,一臉懵,不知道啊。
  5. 如果中途同步失敗了,咋處理啊,是不是得重試,咋重試,重試策略是啥?失敗有沒有報警,能不能及時感知并處理啊?如果同步一段時間中斷了咋整啊?有沒有記錄從哪中斷的?能否從中斷處繼續同步啊,不然從頭開始又得N天,哭了。
  6. 同步了一部分,發現有問題需要暫停一會,咋整?
  7. 如果只想同步部分數據不一致的訂單數據,可能就2,3個訂單,咋整,是不是還得提供按照手動輸入訂單ID同步ES數據的能力?
  8. 同步過程是咋樣的?開始時間?結束時間?共耗時多久?操作人是誰?這些統計數據從哪來?
  9. 想夜深人靜的時候同步數據,這有時候對業務的影響小,定個鬧鐘晚上起?
  10. 現在不單需要同步訂單的數據了,還需要同步商品ES集群的數據,這些邏輯還得重新寫一遍?

啊啊啊啊,想想都頭疼啊!

所以,一些事情看著簡單,其實并沒有那么簡單。

3 神奇的服務

為了讓頭發更有歸屬感,針對上述的難點開發了一款神奇的服務,那就是ECP。它可以將整個流程自動化、可視化的處理,降低數據異構到ES的成本任務界面如下所示:

圖片圖片

3.1 ECP的簡單運行流程

簡單來說,ECP的作用就是將數據從數據源讀取出來,然后推送給ES寫服務。因為數據處理的邏輯因不同的業務而異,ES寫服務由各個對接方來實現,因此一個簡單的流程如下圖:

圖片圖片

這里面涉及到一些技術細節,比如如何進行多數據源數據讀取,數據源配置,sql校驗,動態限流、SPI機制、重試策略與故障感知、探活與故障恢復,環境隔離等等。

下面一一介紹下:

3.2 多數據源數據讀取

ECP支持目前支持三個數據源數據的讀取,分別為ID源,文本源、以及腳本源。

3.2.1 ID源

有個文本框用來輸入ID。這種場景適用于小數據的數據同步,比如發現一些數據庫和ES的數據不一致了,就簡單的刷一下數據。

圖片圖片

3.2.2 文件源

文件源指的是數據源來源于文本文件,適合中等數據的同步。ECP和對象存儲進行了對接,用戶可以上傳文件至對象存儲,在任務執行時,ECP會讀取對象存儲中的文本數據。

這種情況需要注意的是,用戶上傳的文件有可能會比較大,直接都讀取到內存再處理不現實,因此這里采用的是流的方式進行讀取,讀取一批處理一批,再釋放一批,不會造成OOM。

圖片圖片

簡化的處理方式如下:

try (Response response = OK_HTTP_CLIENT.newCall(request).execute()) {
            if (!response.isSuccessful()) {
                throw new IOException("Unexpected code " + response);
            }

  // 以流的方式讀取文件數據
  InputStream inputStream = response.body().byteStream();
  BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));

}

3.2.3 腳本源

腳本源適用于大數據量的數據同步。

腳本本質上就是SQL和數據源的結合。

用戶在ECP中配置數據庫的連接信息,然后配置SQL。ECP會執行該SQL,將數據從配置的數據庫中讀取出來,推送到ES寫服務中。

腳本源可以支持上億數據的讀取與推送,如下圖為訂單庫(分庫分表)配置的腳本信息:

圖片圖片

3.2.4 腳本源大數據讀取的實現

將幾億數據讀取到內存中來處理顯然不可能,因此采用局部數據的讀取與處理才是正道。

在業務中,經常使用的是分頁,但分頁如果僅是使用limit offset,size,待offset的值比較大時,性能會急劇下降,形成慢SQL,甚至拖累整個數據庫的性能。

因此在分頁數量比較大時,需要指定一個有索引的字段作為游標,該游標可以提高分頁的性能,如在訂單表中,若在訂單ID是遞增的且有設置了索引,SQL就可以這么寫:select * from t_order where order_id > xxx order by order_id desc limit 10; 利用order_id值的變化就可以起到分頁的效果。

這種方式雖好,但讓用戶選定游標索引無疑增加了使用的門檻,因此ECP沒有采用上述分頁的形式來讀取大數據,而是采用JDBC游標查詢的方式,如下所示:

// 建立連接
       conn = DriverManager.getConnection(url, param.getDsUsername(), param.getDsPassword());
       // 創建查詢
       stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
       stmt.setFetchSize(param.getFetchSize());

游標查詢每次讀取fetchSize大小的數據量,可以很好的避免讀取大數據量導致的OOM問題。

3.3 SQL的解析與校驗

用戶配置SQL腳本,ECP需要對該SQL腳本進行校驗與修改,傳統的字符串處理(比如正則)雖然在一定情況下可以滿足需求,但是容易出錯。因此ECP采用的是Druid的SQL解析工具包,可以將SQL解析成AST語法樹,以便對SQL進行各種處理。如下圖所示:

圖片圖片

ECP提供的數據樣例查詢,會對SQL自動拼接上limit 1。

圖片圖片

圖片圖片

3.4 動態限流的實現

限流分集群限流和單機限流,經過評估,在能簡單就簡單的原則下,我們采用的是單機限流,限流組件使用的是guava的RateLimiter。

圖片圖片

當在頁面上修改QPS的值時,會將該值同步到數據庫中,有個調度任務會不斷地掃描該值的變動,將變動的值同步到RateLimiter組件中。

當然,也可以采用數據監聽的策略(比如廣播MQ),讓變動值同步到RateLimiter更及時,但這種方式還需引入其它組件,復雜度嗷嗷上升,不符合我們簡單實現的策略。

動態限流的實現流程如下:

圖片圖片

如下圖是在不同的時間點修改了限流值后的QPS變化圖:

圖片圖片

3.5 重試策略與故障感知

ES中和DB中的數據要盡可能的保證實時一致性,但最終一致性是必須要保證的,所以數據推送、處理失敗的時候要進行重試,如何重試?

首先需要了解下失敗的類型,制定合適的重試策略,知彼知己,百戰不殆嘛!

一、網絡抖動導致的接口調用超時。在調用微服務RPC接口的時候,由于網絡抖動等情況,會導致接口調用超時,但很快就會恢復,通常情況下也就偶爾一次,下一次調用就會正常。

二、數據處理邏輯異常。這種情況下,異常沒辦法自恢復,只能人工介入。

三、上游服務異常。如上游服務壓力過大導致接口調用失敗,這時候就需要我們緩一緩再繼續處理,不能一個勁的調用導致上游服務崩潰掉。

結合上面的失敗類型的特點,斐波那契數列的重試策略就非常適合 斐波那契數列的特點是:1,1,2,3,5,8,13,21,34,55,89…

當第一次失敗的時候,延時1秒后就重試,如果此時是網絡抖動導致的超時,重試就成功了,不影響數據處理的速度 若失敗的次數越多,重試的間隔時間就會越長,這也會兼顧到上述二、三的失敗類型。

重試組件使用的是Guava Retry,簡單的偽代碼如下:

// 重試組件配置
private final Retryer<Boolean> RETRYER = RetryerBuilder.<Boolean>newBuilder()
            // 對中斷類的異常不重試
            .retryIfException(input -> !isPauseException(input))
            // 1,1,2,3,5,8,13,21,33...
            .withWaitStrategy(WaitStrategies.fibonacciWait(1000, 30, TimeUnit.SECONDS))
           // 重試次數達到一定的次數后,不再重試
            .withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_TIMES))
            .withRetryListener(new RetryListener() {
                @Override
                public <V> void onRetry(Attempt<V> attempt) {
                    if (attempt.hasException()) {
                        log.error("act=【DataFlushRpcCallRetry】desc=【重試】重試次數=【{}】重試異常=【{}】", attempt.getAttemptNumber(), attempt.getExceptionCause());
                        // 重試超過閾值進行報警提醒
                        alarmIfExceedThreshold(attempt);
                    }
                }
            })
            .build();

// 將執行邏輯抽象為Runnable,對外暴露該方法
public void execute(Runnable runnable) {
    innerExecute(runnable,RETRYER);
}


private void innerExecute(Runnable runnable, Retryer<Boolean> retryer) {
    try {
    retryer.call(() -> {
        runnable.run();
        return true;
       });
    } catch (Exception e) {
       log.error("act=【DataFlushRpcCallRetry】desc=【重試異常】error=【{}】", e);
       throw new IllegalStateException(e);
    }
}

若重試到一定次數之后依然是失敗的話,則會將錯誤信息發送到報警群。根據推送的信息,可以明確知道錯誤的類型,重試的次數,以及任務的創建人等等信息,無需查看日志,即可定位大部分的問題。如下圖:

圖片圖片

3.6 將數據推送給哪個服務來處理?-SPI機制

ECP是個通用的服務,因此需要將共性功能收攏在一起做成成品,將非共性的功能抽象一下,交給各個對接方去實現。

從簡單實現的角度來看,若有某個服務想要對接ECP,我們在ECP上開發一下,調用該服務的接口,將數據推送給該服務,思路雖清晰明了,但對接及維護成本極高,且沒有一個統一的規范,因此不可取,其流程如下圖:

圖片圖片

Java上有個很好的思想可以解決這個問題,那就是SPI。因此由ECP提供一個接口,制定一個規范,具體的ES索引數據的組裝邏輯由各個對接方去實現。

這樣,若有一個新的對接方接入,只要實現接口即可,ECP無需做任何改動。

圖片圖片

至于服務發現,ECP采用的配置的方式,也就是在新建任務的時候,選擇數據推送的消費方服務,如下圖:

圖片圖片

對于實現方式,得益于公司內部自研的RPC框架,提供了動態指定調用服務的方式,偽代碼如下:

Reference<IEsIndexFlushAPI> reference = new Reference<>();
// 設置調用的服務名
reference.setServiceName(serviceName);
// 設置接口名
reference.setInterfaceClass(IEsIndexFlushAPI.class);
// 設置上下文
reference.setApplicationConfig(applicationConfig);
// 獲取接口實例
IEsIndexFlushAPI iEsIndexFlushAPI = ES_INDEX_FLUSH_API_MAP.computeIfAbsent(serviceName, s -> reference.refer());
// 接口調用
log.info("act=【EsIndexFlushApiInvoker】desc=【請求值】serviceName=【{}】dataListSize=【{}】indexNameList=【{}】tag=【{}】", serviceName,request.getDataList().size(),request.getIndexNameList(),request.getTag() );
EMApiResult<FlushResponse> result = iEsIndexFlushAPI.flush(request);

3.7 環境隔離

同步數據是個比較重的操作,這個操作不應該影響到線上業務 因此,同步數據的服務應當與線上服務隔離開 ECP整合了架構組提供的標簽路由功能,可以在整個請求鏈路中調用指定標簽的服務,實現環境隔離。

ECP標簽路由配置圖:

圖片圖片

如下圖,若在ECP上配置任務的標簽路由為FLUSH,則在同步任務執行過程中,會自動調用鏈路中綁定了FLUSH標簽的服務分組。

圖片圖片

若某些服務沒有配置為FLUSH標簽的分組,這時就會自動請求該服務的線上正常環境。這樣,就可以做到一定程度上的環境隔離。

圖片圖片

3.8 探活與任務故障恢復機制

在推送數據的過程中,若發生了不可描述的事情導致任務中斷,咋整?

到了需求DeadLine,發現任務在某年某月某日進度為1%的時候停了,哭了。

而且工作時間緊,任務重,總不能一定盯著任務,看有沒有中斷吧?這不適合,也不禮貌。

當然,這種情況在ECP是不會發生的,因為ECP是有“自救包”的。下面聊下ECP的任務探活和中斷恢復機制。

如下圖,在ECP中有探活和任務故障恢復兩大組件 探活組件負責監控當前任務線程的執行狀態,若任務線程正在執行,則對該任務的存活時間進行續期 任務故障恢復組件負責掃描當前未完成的任務,若任務上次存活時間大于指定的閾值時,則拉取該任務恢復執行。

圖片圖片

續期的偽代碼如下:

@Scheduled(fixedDelay = ScheduleTimeConstants.KEEP_ALIVE_MILLS)
    public void renewal(){
        futureMap.forEach((taskId,future)->{
            if (!future.isDone()){
                log.info("act=【renewal】desc=【任務續期】taskId=【{}】續期時間=【{}】",taskId, DateUtils.dateToString(new Date(),DateUtils.PATTERN));
                contextService.renewal(taskId);
            }else {
                log.info("act=【renewal】desc=【任務結束】taskId=【{}】",taskId);
                futureMap.remove(taskId);
            }
        });
    }

任務故障恢復的偽代碼如下:

@Scheduled(fixedDelay = ScheduleTimeConstants.RESTART_TASK_MILLS)
    public void restartTask(){

     // 1.查詢當前未完成的任務
        List<TaskFlushExecuteContextPO> contextPOS = contextService.queryRunningTask();

        for (TaskFlushExecuteContextPO contextPO : contextPOS) {
            // 2.計算上次存活到當前的時間
            Integer durationMin = calculateTimeSinceLastAlive();

      // 3.若時間大于指定閾值 則對任務重新拉起
            if (durationMin >= MAX_DURATION_MIN){
                log.info("act=【restartTask】desc=【任務重新拉起】taskId=【{}】",contextPO.getTaskId());
                // 4.更新alive_time進行鎖定 防止并發執行
                int i = contextExtMapper.casUpdateAliveTime();
                if (i >0){
                    // 5.重新拉起任務
                    restart0(contextPO, aliveTime);
                }
            }
        }
    }

3.9 平滑遷移的實現

將數據同步到ES,通常有兩種方式:

  1. 直接把數據同步到原索引上。
  2. 新建一個索引,利用雙寫以及切換別名的方式實現流量的平滑遷移。

對于新建一個索引的場景,往往是索引Mapping的改變,或者是為了不影響原索引,保證操作可回滾。

針對這種場景,ECP分析了歷來大家手動操作刷ES索引的步驟,將流程進行抽象,歸納了以下幾個步驟,如下圖:

圖片圖片

ECP提供了平滑遷移組件,其內部整合了Apollo配置中心實現推送能力,其簡要的實現流程如下圖:

圖片圖片

3.10 優雅的日志記錄

如下圖所示展示了該任務操作的日志,原則上日志記錄為非核心業務,需要與核心業務代碼進行剝離,因此使用注解式流水記錄是個很好的選擇。

圖片圖片

但注解式流水記錄有個問題,就是在很多的場景下,流水里面的值需要動態獲取,利用注解可以實現嗎? 答案是可以的,在上圖所示中,任務ID、數據來源都是動態數據,那如何實現的呢?看下面代碼:

@Flow(subjectIdEp = "#taskPO.id",subjectType = SubjectTypeEnum.TASK,operateFlowType = OperateFlowTypeEnum.CREATE_TASK,content = "'創建任務,任務ID:' + #taskPO.id ")
    public void saveTaskWithUser(TaskPO taskPO) {
        String name = LoginUserContext.get().getName();
        taskPO.setCreator(name);
        taskPO.setModifier(name);
        taskMapper.insertSelective(taskPO);
    }

subjectIdEp為流水主題ID,#taskPo.id為一個表達式,可用動態獲取參數taskPo中的id值,這里利用了springEl表達式的能力。

content = "'創建任務,任務ID:' + #taskPO.id " 為流水信息,同樣利用了springEL表達式,動態獲取請求參數taskPo中的id信息。

但有些信息需要一系列的計算才可以獲取到,而不是單純的從對象中取值,這也是可以實現的。如下:

@Flow(subjectIdEp = "#contextPO.taskId",
            subjectType = SubjectTypeEnum.TASK,
            operateFlowType = OperateFlowTypeEnum.DATA_FLUSH,
            content = "'【數據同步】異常中斷任務恢復執行,中斷時間:' + T(com.zhuanzhuan.esmanage.utils.DateUtils).dateToStringSimple(#aliveTime)")
    @Transactional(rollbackFor = Exception.class,isolation = Isolation.REPEATABLE_READ)
    public void restart0(TaskFlushExecuteContextPO contextPO, Date aliveTime) {
        log.info("act=【restartTask】desc=【任務重新拉起】taskId=【{}】原aliveTime=【{}】", contextPO.getTaskId(), aliveTime);
        dsProcessorExecutor.executeAndKeepAliveMonitor(contextPO.getTaskId());
    }

其中T(com.zhuanzhuan.esmanage.utils.DateUtils).dateToStringSimple(#aliveTime) 代表執行的是DateUtils.dateToStringSimple 方法,也就是說表達式是可以調用方法的,包括從spring容器中獲取對象,調用對象的方法均可。

這種注解式流水的實現原理,就是利用SPEL表達式和Spring Aop的特性,寫一個切面,攔截自定義的flow注解即可,偽代碼如下:

// 定義切面,攔截FLOW注解
@Around("@annotation(com.zhuanzhuan.esmanage.entity.annotation.Flow)")
public Object around(ProceedingJoinPoint point) throws Throwable {

    // 調用目標方法
    Object result = null;
    try {
        result = point.proceed();
        recordFlow(point,result);
        return result;
    } catch (Throwable e) {
        recordException(point,e);
        throw e;
    }
}


// 流水記錄的實現
private void recordFlow(ProceedingJoinPoint point, Object result) {
    // try catch 防止影響主邏輯
    //TODO 看是否需要寫在一個事務中,主要評估流水的重要性
    try {
        MethodSignature signature = (MethodSignature) point.getSignature();
        Flow flowAnnotation = getFlowAnnotation(signature);

        // 組裝參數上下文
        EvaluationContext evaluationContext = buildContext(point, signature);

        evaluationContext.setVariable("result",result);

        // ID表達式
        String subjectIdEp = flowAnnotation.subjectIdEp();

        // content表達式
        String content = getContent(flowAnnotation, evaluationContext);

    // SPEL解析表達式
        Expression expression = PARSER.parseExpression(subjectIdEp);
        Integer subjectId = (Integer)expression.getValue(evaluationContext);
        record(flowAnnotation, subjectId, content);
    } catch (Exception e) {
        log.error("記錄操作流水失敗", e);
    }
}

4 總結

總得來說,ECP的實現中有很多的技術細節需要考慮,技術難度一般。 

實際上,在我們大部分的項目中,考驗的就是對細節的把控。

ps:感謝ChatGPT對本文名稱的大力支持

關于作者

閆展,轉轉交易中臺研發工程師

責任編輯:武曉燕 來源: 轉轉技術
相關推薦

2013-10-10 13:07:25

方物

2021-11-10 06:38:01

Python鏈式操作

2023-04-06 09:44:00

ChatGPT行業質量

2017-03-21 08:52:20

神經網絡聲譽

2011-02-23 08:50:22

C#.NETdynamic

2020-07-02 15:40:11

Spring BootJar包Java

2010-07-15 16:21:03

不可思議的服務器

2022-01-24 15:57:34

Python返回功能代碼

2011-07-18 13:35:14

HTML 5

2012-02-13 11:01:27

N9Android 4.0

2014-07-26 22:18:51

2013-07-31 15:06:58

未來的WebWebGLWeb

2014-01-14 10:33:42

開源硬件開源

2017-11-08 14:07:45

數據庫MySQL慢查分析

2024-08-22 14:16:08

2019-05-27 09:56:00

數據庫高可用架構

2016-07-06 11:56:52

思科漢堡光纖骨干網

2021-03-03 07:12:47

Windows10操作系統微軟

2012-05-16 17:28:32

智能手機

2018-01-18 13:12:02

數據安全存儲
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 香蕉大人久久国产成人av | 超碰免费在线观看 | 日本xx视频免费观看 | 91精品国产91久久久久久不卞 | 精品国产伦一区二区三区观看体验 | 黄色大片视频 | 欧美区日韩区 | 狠狠ri| 日韩www | 日韩av成人在线 | a在线观看 | 蜜桃传媒av| 最新中文字幕 | 国产精品成人在线播放 | 国产精品永久 | 精品粉嫩超白一线天av | 澳门永久av免费网站 | 国产区在线观看 | 久久91 | 日韩欧美国产一区二区三区 | 欧美视频二区 | 日本三级全黄三级a | 日韩精品一区二区三区中文在线 | 91精品久久久久久久 | 精产国产伦理一二三区 | 亚洲一区免费在线 | 久久lu| 成人欧美一区二区 | 精品久久久久一区二区国产 | 成人免费视频网站在线观看 | 国产一区二区自拍 | 国产亚洲成av人在线观看导航 | 国产精品久久久久久婷婷天堂 | 九九久久精品 | 国产精品av久久久久久毛片 | 久久999| 国产欧美日韩精品一区二区三区 | jlzzjlzz国产精品久久 | 精品少妇v888av | 在线视频一区二区 | 99视频免费|