商家下載中心設(shè)計演進之路
一、背景
在電商平臺上,二八定律尤為明顯,20%的高價值商家往往創(chuàng)造了80%以上的銷售額。而這些商家通常擁有大量的訂單、商品、出價等管理需求,推動了他們對批量操作功能的迫切需求。批量操作能夠幫助這些商家高效地處理商品信息、庫存和訂單管理,顯著提升運營效率。
通過批量操作,商戶可以在短時間內(nèi)對多個產(chǎn)品進行修改,如統(tǒng)一調(diào)價、調(diào)整促銷策略等,從而快速響應市場變化,優(yōu)化用戶體驗。此外,批量操作還降低了人工出錯的風險,確保了數(shù)據(jù)的一致性,讓商家能夠更加專注于戰(zhàn)略規(guī)劃和客戶關(guān)系管理??傊?,對于這些商戶而言,批量操作不僅是提升管理效率的關(guān)鍵工具,也是實現(xiàn)業(yè)務增長的重要保障。
在得物的商家后臺中,商家的所有批量操作都承載在批處理系統(tǒng)(批處理中心),商家可以通過在功能頁面操作批量導入或是批量導出來完成批量操作。操作后的文件將展示在下載中心。
此外,批處理中心還維護了交易后臺、客服、匯金、門店等多個域的批量操作任務。截止目前,批處理中心維護了十個域的上千種批量任務,日均處理數(shù)萬個相關(guān)任務,數(shù)億條相關(guān)數(shù)據(jù)。
隨著得物體量的不斷上升,批處理系統(tǒng)也在不斷演進。簡單來說,批處理系統(tǒng)經(jīng)歷了從分散到耦合、再到集中與隔離的多個發(fā)展階段。接下來,我們以批處理的開發(fā)者小王的視角,介紹批處理系統(tǒng)的這三種設(shè)計,并探討它們各自的特點與適用場景。
二、集中式:流程擴展
假設(shè)小王接到了一個批量操作的需求,要求在商家后臺能進行批量出價。需求很簡單,小王僅用時兩天半就完成了基本流程的搭建。
圖片
業(yè)務上線后,商家反饋非常好,產(chǎn)品要求立刻上線一個批量修改出價的需求。于是小王照葫蘆畫瓢寫又寫了一條流程。
圖片
兩條幾乎一樣的流程,有代碼潔癖的小王表示無法接受。經(jīng)過分析后,小王發(fā)現(xiàn),不管是什么導入流程,有些步驟總是固定的,因此決定代碼復用。
圖片
代碼復用后,出價和修改之間只有格式校驗和業(yè)務邏輯不同。其余的文件下載、內(nèi)容解析、結(jié)果保存和上傳均使用相同的節(jié)點。既然各個業(yè)務之間的差異主要集中在數(shù)據(jù)處理,小王決定直接將其開成擴展點。不同的業(yè)務場景只需要實現(xiàn)各自的數(shù)據(jù)處理擴展,就能無縫接入批處理流程。業(yè)務擴展的示意圖如下:
圖片
在具體實現(xiàn)的時候,小王在代碼里面通過業(yè)務身份來進行擴展點的選擇,可以建立一個相關(guān)的工廠類進行。
@Component
public class BpcProcessHandlerFactory {
@Autowired
private ApplicationContext applicationContext;
private static ConcurrentHashMap<String, BpcProcessDefine> templateMap = new ConcurrentHashMap<>();
@PostConstruct
private void init() {
Map<String, ImportService> importServiceMap = applicationContext.getBeansOfType(ImportService.class);
for (ImportService importService : importServiceMap.values()) {
initImportService(importService);
}
}
private void initImportService(ImportService importService) {
// ...
}
public BpcProcessHandler getBpcProcessHandler(String templateCode) {
if (StringUtils.isBlank(templateCode)) {
return null;
}
if(!templateMap.containsKey(templateCode)) {
return null;
}
return templateMap.get(templateCode).newProcessHandler();
}
}
對于導入的任務處理,簡化的代碼流程如下:
@Service
public class BpcProcessService {
@Autowired
private BpcProcessHandlerFactory bpcProcessHandlerFactory;
public String doBpcProcess(BpcProcessReq req) throws BpcProcessException {
// 獲取擴展點
BpcProcessHandler bpcProcessHandler = bpcProcessHandlerFactory.getBpcProcessHandler(req.getTaskTemplateCode());
if (bpcProcessHandler == null) {
throw new BpcProcessException("找不到模版定義");
}
// 1. 創(chuàng)建任務
createTask();
// 2. 文件下載 && 文件保存
downloadFromOss();
// 3. 數(shù)據(jù)解析
int loopCnt = 0;
int maxLoopCnt = bpcProcessHandler.getMaxLoopCnt();
while(loopCnt++ < maxLoopCnt) {
// 調(diào)用擴展點處理
bpcProcessHandler.process();
// 更新任務
updateTaskProcess();
}
// 更新任務
updateTaskStatus();
return taskId;
}
}
在完成了流程擴展點后,小王心想,這下可算是高枕無憂了。后續(xù)有新的導入場景,只需要實現(xiàn)自己的校驗邏輯和處理邏輯即可。
但是好景不長,隨著商家體量的增長,小王發(fā)現(xiàn)對接的業(yè)務越來越多了;先是出價、再是商品然后是其他逆向、服務費的批量服務,小王一個人實在是寫不過來了,只能讓各個業(yè)務的開發(fā)到批處理系統(tǒng)開發(fā)自己的業(yè)務。各個人的編碼習慣不一樣,批處理系統(tǒng)對接的Jar也越來越多,系統(tǒng)已經(jīng)變成了一個大雜燴。
怎么才能改變這個現(xiàn)狀呢?
三、平臺化:配置注冊
在集中式架構(gòu)中:所有的業(yè)務處理流程是共用的,不同的業(yè)務通過實現(xiàn)各自的擴展點來完成各個業(yè)務的邏輯。這帶來了一個最明顯的問題,即系統(tǒng)的邊界模糊,業(yè)務耦合重。
這個擴展點能不能寫在外部呢?
小王靈光一現(xiàn):SPI不就可以嗎。Java的SPI機制能幫助我們獲取各個業(yè)務的實現(xiàn),因此批處理系統(tǒng)只需要基于SPI抽象出一套核心的導入/導出流程即可。由于各個業(yè)務要能準確找到SPI,還需要加入一定業(yè)務配置能力。
圖片
和集中式架構(gòu)對比,配置化方案的可擴展性更強,但是也不可避免的帶來了一個缺點:開發(fā)人員需要去創(chuàng)建配置。
而批處理配置至少需要包含以下內(nèi)容:
- Excel格式。
- 流程調(diào)用的SPI信息。
- 數(shù)據(jù)對象和Excel字段之間的映射關(guān)系。
其中字段的映射關(guān)系和SPI等信息的維護成本較高,為了減輕開發(fā)人員的工作量,小王還維護了一個IDEA插件。用于一鍵上傳配置。
后端開發(fā)人員可以僅通過注解的方法一鍵上報自身的配置,大大減輕了業(yè)務的配置上傳的工作量。
同步執(zhí)行-通用配置處理
在創(chuàng)建完配置后,可以利用dubbo的泛化調(diào)用來執(zhí)行各個SPI的實現(xiàn):
@Override
public String invoke(ServiceDefinition serviceDefinition, Object inputParam) {
GenericService genericService = DubboConfig.buildService(serviceDefinition.getInterfaceName(), serviceDefinition.getTimeout());
//參數(shù)list轉(zhuǎn)換處理,由請求參數(shù)key轉(zhuǎn)換成內(nèi)部參數(shù)
String[] parameterTypes = new String[] {serviceDefinition.getRequestType().getClassName()};
Object[] args = new Object[] {inputParam};
long startTime = System.currentTimeMillis();
Object result;
try {
log.info("invoke service={}#{} with request={}", serviceDefinition.getInterfaceName(), serviceDefinition.getMethod(), JSON.toJSONString(args));
result = genericService.$invoke(serviceDefinition.getMethod(), parameterTypes, args);
long endTime = System.currentTimeMillis();
digestLog(serviceDefinition, true, endTime - startTime);
log.info("invoke service={}#{} with result={}", serviceDefinition.getInterfaceName(), serviceDefinition.getMethod(), JSON.toJSONString(result));
} catch (Exception ex) {
long endTime = System.currentTimeMillis();
digestLog(serviceDefinition, false, endTime - startTime);
log.info("failed to dubbo invoke:" + serviceDefinition.getInterfaceName() + "#" +serviceDefinition.getMethod() + " with error " + ex.getMessage());
throw new DependencyException(ErrorCodeEnum.DEFAULT_DEPENDENCY_ERROR.getCode(), ex.getMessage(), ex);
}
if (result == null) {
throw new DependencyException(ErrorCodeEnum.DEFAULT_BIZ_ERROR.getCode(), "the result is null");
}
Map resultMap = JSON.parseObject(JSON.toJSONString(result), Map.class);
processError(resultMap);
Object data = resultMap.get("data");
return JSON.toJSONString(data);
}
簡化版的執(zhí)行流程如下所示:
@Service
public class BpcProcessService {
@Autowired
private BpcProcessHandlerFactory bpcProcessHandlerFactory;
public String doBpcProcess(BpcProcessReq req) throws BpcProcessException {
// 1. 獲取配置
TaskTemplate template = getTemplate();
// 2. 創(chuàng)建任務
Task task = createTask();
// 3. 文件下載 && 文件保存
downloadFromOss();
// 4. 數(shù)據(jù)解析
int loopCnt = 0;
int maxLoopCnt = template.getMaxLoopCnt();
while(loopCnt++ < maxLoopCnt) {
// 調(diào)用SPI處理
invoke(template, task)
// 更新任務
updateTaskProcess();
}
// 更新任務
updateTaskStatus();
return taskId;
}
}
可以看到配置化后的執(zhí)行策略和之前流程擴展的執(zhí)行策略是類似的,主要的變化就是從調(diào)用本地擴展點,切換成了調(diào)用配置后的SPI。
調(diào)度執(zhí)行-業(yè)務針對調(diào)整
配置化完成之后,小王松了一口氣,這下系統(tǒng)總算是干凈了。業(yè)務的歸業(yè)務,流程的歸流程,兩者互不打擾。然而凡事總不順利、沒過多久批處理系統(tǒng)就出了一次冒煙。簡單來說,這次冒煙是由于批處理系統(tǒng)同時處理了大量任務導致的內(nèi)存溢出。
針對這次冒煙,小王仔細分析系統(tǒng)數(shù)據(jù)后發(fā)現(xiàn),商家下載中心的業(yè)務有著自己的業(yè)務特點:
- 不同任務之間的數(shù)量差異巨大(如,運營任務和商家任務的差距);
- 商家操作的流量時間上分布不均,大部分商家操作集中在剛上班(10點左右)和快下班(17點左右);
- 任務流量在商家上分布不均,重點商家會創(chuàng)建大量任務。
以下是小王分析的部分數(shù)據(jù)來源圖:
- 任務流量分布不均,下面是各個任務類型的執(zhí)行統(tǒng)計,其中不同顏色代表不同類型的任務。
圖片
- 時間流量分布不均,下面是導入導出任務流量的時間分布
圖片
- 商家流量分布不均
圖片
這些特點在批處理系統(tǒng)中表現(xiàn)為:
- 系統(tǒng)穩(wěn)定性風險高,出現(xiàn)過一次線上冒煙。因為系統(tǒng)資源是有限的,高峰期的大流量任務可能會占用過多系統(tǒng)內(nèi)存,導致OOM。
- 商家體驗得不到保證,運營操作可能會導致商家長時間等待。
不就是資源導致的風險嗎,小王覺得這是小case 了,加個限流就搞定了,然后就對創(chuàng)建任務加上了限流。結(jié)果上線后情況不僅沒有好轉(zhuǎn),還因為限流“誤殺”了好多比較重要的導入任務,經(jīng)過分析后小王終于找到了原因。在商家下載中心的業(yè)務中,限流并不能滿足資源保護訴求。這實際上是由系統(tǒng)本身的內(nèi)部架構(gòu)決定的、因為批處理在大部分情況下是一個低CPU高內(nèi)存占用的系統(tǒng)。如果對任務的提交進行限流,一方面容易誤傷核心的訂單/出價任務,另一個方面忽略了高耗時任務的影響。如下圖所示:
圖片
- 運營任務"恰好"占用了流控的窗口,導致后續(xù)提交的商家任務都被限流。
- 長耗時任務會跨越多個時間窗口,導致限流不生效。
不能限流,那只能自己來了。只要把一切都拿到手里,任務啥時候執(zhí)行不就是自己說了算了嘛。于是小王打算轉(zhuǎn)變身份,從被動式執(zhí)行到主動式調(diào)度。換言之,就是從同步流程切換成異步調(diào)度流程,由系統(tǒng)自己來解決資源的分配,并對業(yè)務進行隔離。小王很快畫好了自己的核心流程。
圖片
流程很簡單,創(chuàng)建任務的時候不再直接執(zhí)行,而是等待系統(tǒng)調(diào)度后執(zhí)行。然而小王在調(diào)度和隔離這里又犯了難,這倆該怎么做呢?
業(yè)務隔離
隔離主要分為兩大類,物理隔離和邏輯隔離。
物理隔離:不同的機器執(zhí)行不同業(yè)務的調(diào)度。
- 集群隔離:類似于應用發(fā)布時的藍綠集群,我們可以把集群分為核心集群+非核心集群,用核心集群來保障商家訂單,出價等相關(guān)動作的穩(wěn)定性,用非核心集群來保障其他鏈路;
- 機器隔離:機器隔離相較于集群隔離,其粒度更小。通過指定IP來控制不同業(yè)務之間的調(diào)度;
邏輯隔離:通過使用不同線程池的方式來完成業(yè)務的隔離。
凡事先易后難,小王決定先采用簡單的方式來對業(yè)務進行隔離,線程池的方式已經(jīng)能分離開可能造成資損的任務和不會造成資損的任務了。在調(diào)度方面,小王列舉了業(yè)內(nèi)常見的帶優(yōu)先級的調(diào)度方法
任務調(diào)度
1.優(yōu)先隊列:利用線程池的等待隊列來完成優(yōu)先級的調(diào)度。
圖片
優(yōu)點:代碼簡單,易維護,只需要維護一個優(yōu)先級隊列即可。
缺點:需要額外增加一個狀態(tài)來代表等待調(diào)度,有饑餓問題,存在一定穩(wěn)定性風險,因為對線程池的等待隊列缺少管控手段。
2.老化策略:利用老化策略,動態(tài)提升任務優(yōu)先級。
圖片
優(yōu)點:較大程度上避免饑餓問題,優(yōu)先級的可擴展性高,對任務的管控能力強,狀態(tài)機侵入少。
缺點:需要考慮并發(fā)問題,代碼較復雜。
3.多級隊列:利用多級隊列來完成任務優(yōu)先級。
圖片
優(yōu)點:較大程度上避免饑餓問題,代碼較為簡潔,任務管控能力強,狀態(tài)機改動少。
缺點:任務優(yōu)先級可擴展性較差,如果新增一個優(yōu)先級需要改動調(diào)度代碼,沒有高優(yōu)任務時,系統(tǒng)吞吐性較差。
綜合以上各種方案后,小王最終采用了多級隊列 + 線程隔離的方式來進行任務的調(diào)度。在調(diào)度的具體實現(xiàn)上,采用定時任務來進行流程的觸發(fā)。
此外,為了支持大任務量場景臨時增加系統(tǒng)吞吐,小王還增加了分片的能力,通過接受分片參數(shù),每臺機器只取自己的分片。簡化版本的代碼如下:
@Service
@Slf4j
public class TaskScheduleServiceImpl implements TaskScheduleService {
@Override
@LogAnnotation
public void schedule(int shared, int all) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
// 丟線程池執(zhí)行
List<Long> highTaskIds = taskInstanceRepository.queryUnstartedTaskIdByPriority(TaskPriorityEnum.HIGH, all * arkConfig.highSize);
highTaskIds = highTaskIds.stream().filter((id) -> id % all == shared).collect(Collectors.toList());
log.info("優(yōu)先級調(diào)度任務,待執(zhí)行高優(yōu)任務 Ids = {}", highTaskIds);
process(highTaskIds, (id) -> taskThreadPool.executeHigh(() -> process(id)));
// 丟線程池執(zhí)行
List<Long> mediumTaskIds = taskInstanceRepository.queryUnstartedTaskIdByPriority(TaskPriorityEnum.MEDIUM, all * arkConfig.mediumSize);
mediumTaskIds = mediumTaskIds.stream().filter((id) -> id % all == shared).collect(Collectors.toList());
log.info("優(yōu)先級調(diào)度任務,待執(zhí)行中優(yōu)任務 Ids = {}", mediumTaskIds);
process(mediumTaskIds, (id) -> taskThreadPool.executeMedium(() -> process(id)));
// 丟線程池執(zhí)行
List<Long> lowTaskIds = taskInstanceRepository.queryUnstartedTaskIdByPriority(TaskPriorityEnum.LOW, all * arkConfig.lowSize);
lowTaskIds = lowTaskIds.stream().filter((id) -> id % all == shared).collect(Collectors.toList());
log.info("優(yōu)先級調(diào)度任務,待執(zhí)行低優(yōu)任務 Ids = {}", lowTaskIds);
process(lowTaskIds, (id) -> taskThreadPool.executeLow(() -> process(id)));
log.info("優(yōu)先級調(diào)度任務,執(zhí)行完畢, cost = {}", stopWatch.getTime());
}
private void process(List<Long> idList, Consumer<Long> consumer) {
if (CollectionUtils.isEmpty(idList)) {
return;
}
for (Long id : idList) {
consumer.accept(id);
}
}
private void process(Long id) {
// 任務處理邏輯。。。
}
}
干完了這些事后,小王突然想起來,測試環(huán)境還需要走染色呢。
于是又在調(diào)度上增加了染色環(huán)境的路由。
這回總算是徹底解決了系統(tǒng)的穩(wěn)定性問題了,以后系統(tǒng)存在吞吐風險時,只需要動態(tài)調(diào)整召回數(shù)量就好了。
四、本地化:任務上報
作為上面的一切后,小王打開了APM的監(jiān)控,發(fā)現(xiàn)系統(tǒng)的內(nèi)存占用還是很高。明明復雜的業(yè)務流程都放到外面了,為啥性能還是一般呢?
小王思考了現(xiàn)在批處理存在的缺點:
- 配置維護成本高、業(yè)務需要上報SPI信息和映射關(guān)系,且配置完成后更改風險高。
- 全局資源利用率低,一份業(yè)務數(shù)據(jù)在多個系統(tǒng)都需要占用內(nèi)存。
- 調(diào)度的隔離是基于線程池or物理機器,粒度較粗,無法完全避免業(yè)務之間的互相干擾。
此外,還有一個令他最難受的問題,就是業(yè)務咨詢很多。很多業(yè)務雖然對接了他的系統(tǒng),但是在執(zhí)行失敗時他們經(jīng)常找不到錯誤原因,需要小王配合排查。如何解決這幾個問題呢?
小王決定返璞歸真,回歸本源。批處理中心是為了解決商家批量導入導出的問題而生的,其產(chǎn)生的主要目的在于幫助業(yè)務平臺減少文件解析、文件生成、文件上傳、頁面展示的成本。
這些問題一定需要一個系統(tǒng)來支持嗎?文件解析和生成實際上是用EasyExcel的SDK完成的,文件上傳是用Oss的SDK完成的,還有一個頁面展示的功能是一個非常輕量的邏輯。換言之,完全可以在業(yè)務系統(tǒng)把前幾件事都做了。構(gòu)建一個批處理插件來完成批處理中心的大部分能力,批處理系統(tǒng)僅作為展示使用。小王產(chǎn)生了一個新的想法:把邏輯放到批處理SDK中去,批處理僅維護一兩臺機器用于承載展示邏輯即可。
整體的架構(gòu)設(shè)計如下圖所示:
圖片
在本地化的思路下:批處理中心類似于一個中心節(jié)點,各個業(yè)務系統(tǒng)作為其的葉子節(jié)點,只需要定時上報任務相關(guān)情況即可。批處理系統(tǒng)只負責頁面的展示,和業(yè)務完全解耦。
本地化帶來了以下幾個明顯的好處:
- 效率高,不再需要跨系統(tǒng)之間的邏輯調(diào)用,既能節(jié)約系統(tǒng)資源,又能減少網(wǎng)絡(luò)傳輸時間。
- 維護成本低,業(yè)務方可隨時調(diào)整業(yè)務映射,批處理只需要維護極小的配置(模版和對應的展示名稱、展示地方)。
- 迭代升級容易,平臺化的改造由于影響面比較大,風險高。而SDK的升級是單應用升級的,因此影響小,風險可控。
- 流程擴展相對簡單,SDK可以提供相對較多的鉤子函數(shù)。
當然,凡事沒有銀彈。本地化也不可避免帶來了一些缺點:
- 業(yè)務需要維護部分配置,這其中主要是一些oss相關(guān)的配置。
本地化后,批處理中心不需要維護業(yè)務邏輯、也不需要任務調(diào)度、任務的隔離粒度最細。小王總算是能安心睡個好覺了。
五、總結(jié)
上面我們以一個批處理系統(tǒng)普通開發(fā)者的視角,回歸了商家批處理系統(tǒng)發(fā)展的三個階段(本地化正在進行中)。這三個階段,體現(xiàn)了從厚到薄、從業(yè)務耦合到業(yè)務隔離的演進過程。從本地到平臺再到本地,頗有種天下大勢,分久必合合久必分的感覺。這三種方式并沒有絕對的優(yōu)劣之分,而是隨著業(yè)務需求的變化而逐步演化的。
在初始階段,系統(tǒng)功能較少,通常只有一兩個簡單的導入導出功能,此時使用流程擴展是最輕量、最靈活的選擇,能夠快速滿足商家的基本需求。隨著業(yè)務量的增長,系統(tǒng)之間的隔離性變得越來越重要,這時引入配置注冊成為必要措施,以確保不同模塊之間的自主性和穩(wěn)定性。
進一步發(fā)展后,平臺化改造的初步實現(xiàn)通常采用同步調(diào)用方式,但隨之而來的穩(wěn)定性要求推動了異步調(diào)度的引入。然而,到了后期,即使是異步調(diào)度也可能面臨系統(tǒng)吞吐量不足的問題。因此,業(yè)務系統(tǒng)本地執(zhí)行狀態(tài)上報的模式逐漸成為更優(yōu)的選擇,能夠有效提升系統(tǒng)的響應速度和處理能力。
隨著業(yè)務的不斷發(fā)展,商家的批處理系統(tǒng)必然會進行更新與迭代,以適應新的需求和挑戰(zhàn)。系統(tǒng)設(shè)計沒有銀彈,所有設(shè)計的迭代實際上就是開發(fā)人員遇見問題、解決問題的能力體現(xiàn)。