響應式異步非阻塞編程在服務端的應用
作者 | 搜狐視頻 趙文浩
1、前言
對于服務端的開發者而言,我們總有一個共同的目標,那就是如何用更少的資源獲得足夠的性能來支持我們的服務!,我們不是在性能優化中,就是在性能優化的路上。作為Javaer我們,服務性能優化的武器庫中,異步和并發是永遠不會過時的兩個。
然而理想很美好,現實很骨感:
- 異步編程的思維方式同大腦的命令式思維是背道而馳的。在Java的世界中,直到目前Jdk17,也沒有async/await來幫我們解決回調地獄的問題,強行大量異步,將深陷回調地獄而不能解脫...
- 并發調用方面,大量編排異步線程任務,不僅會造成資源的快速消耗,也會導致業務流程的實現難以理解,正所謂:寫這段代碼時能理解它的只有我和God,一個月后能理解它的就只有God了...。
在服務端引入響應式編程,是解決如上問題的一個好的思路。
下面,我以搜狐視頻服務端PUGC團隊在PUGC視頻對象創建接口的重構工作的實踐為背景,介紹響應式(基于RxJava)異步非阻塞編程在服務端的應用在服務端的應用。
2、問題概述
PUGC視頻對象創建接口,從業務角度看:用于為用戶上傳視頻數據前在服務端為其分配一個視頻對象記錄,該視頻對象記錄被用于描述當前視頻的完整生命周期,從技術角度看:它是一個聚合接口,通過組合多個上游接口數據,主業務過程涉及:帳號、內容審核、視頻對象存儲、轉碼、CDN調度等,實現業務過程。
該接口代碼年代久遠,從提交記錄中可查到的最早的歷史在2013年,隨著業務的變化,開發人員的變更,代碼中充斥著各種各樣的味道。不論是從業務角度、性能角度亦或是日常維護角度看,都難以滿足需要。
- 業務流處理過程邏輯冗長,邏輯復雜,可讀性差,難于維護。
- 并發任務較多,但缺少合理的編排措施,是否需要異步或并發控制完全隨意。
- 幾乎沒有正確的異常處理。
- 充滿Ctrl+C/V的味道
- ......
為了解決以上的諸多問題,我們開始了重構(重寫)之路。
3、讀懂業務流
重構的原則是保證接口實現的業務規則的一致性,通過仔細研讀代碼,整理出接口中實現的諸多特性和業務規則。
視頻對象創建主流程如下圖所示:
注:時序圖中描述的是主流程中的關鍵點,因篇幅所限并未列出每個調用的具體細節
從用戶請求到達服務端開始劃分業務執行階段:
- 數據校驗階段:帳號狀態/入參合規/內容重復性
- 對象創建階段: 視頻對象存儲
- 關聯數據對象創建階段:
- 3.1 必要內容:自媒體業務/視頻對象擴展
- 3.2 可選內容:視頻元數據/轉碼/其它
- 4 上傳準備:從CDN調度視頻內容存儲結點
- 5 結束
這其中每個階段的內容都需要通過若干個上游接口調用協作來完成,因篇幅所限,并未完全描述每個階段的具體實現細節。
4、選擇合適的架構
通過分析業務流的特點:
- 業務流程鏈路較長
- 需要協作的上游接口較多
- IO操作為主
結合接口重構核心目標:
- 代碼應該準確描述業務流
- 合理的并發任務編排
- 準確異常處理
- 降低資源占用,提升接口性能
我們決定基于響應式異步非阻塞架構對接口進行重構:
- 響應式編程是面向數據流的,業務流的特點可知,它有明確的階段性,數據在每個階段變換和流動
- 整個過程涉及較多的接口調用和異步任務編排,基于異步非阻塞可顯著減少異步線程的依賴
- 響應式編程中提供了完善的異常處理機制,特別對異步環境下的異常處理非常友好
涉及的基礎組件
- 核心響應式編程框架: ReactiveX/RxJava
- Http客戶端 Vertx WebClient
- CacheCloud響應式客戶端 sohutv-basic / cachecloud-client
- Dubbo響應式客戶端sohutv-basic / dubbo-reactive-consumer
同步阻塞模式適配響應式異步非阻塞
因后續章節代碼示例中主要以dubbo服務接口調用為主,所以我以視頻服務端團隊基于dubbo-2.6.5版本實現的dubbo響應式客戶端dubbo-reactive-consumer為例,介紹將傳統的同步阻塞模式適配響應式異步非阻塞的思路。
Dubbo是目前視頻服務端用使用的較多的RPC框架,目前最新的版本是Dubbo3.x(注:由于歷史原因,目前團隊使用的版本還停留在2.6.5版本)。在Spring環境中,比較簡便的注入dubbo服務接口代理對象(簡稱接口對象)的方式是通過@com.alibaba.dubbo.config.annotation.Reference注解自動注入,示例如下:
@Component
public class DubboSyncExample {
@Reference
private VideoInfoService videoInfoService;
@Nullable
public VideoInfo getVideoInfo(long vid) throws Exception {
return videoInfoService.getVideoInfo(vid);
}
}
默認情況下,這種方式注入的是基于同步阻塞模式接口對象,由于dubbo客戶端基于Netty實現,所以它天生是是異步的。一般情況下,如果希望通過異步方式使用dubbo客戶端可以按如下方式操作:
@Component
public class DubboAsyncExample {
/**
* 標記為異步調用
*/
@Reference(async = true)
private VideoInfoService videoInfoService;
@NotNull
public Future<VideoInfo> getVideoInfo(long vid) throws Exception {
// 提交異步請求
videoInfoService.getVideoInfo(vid);
// 通過ThreadLocal保存了對當前請求上下文的引用RpcContext
RpcContext rpcContext = RpcContext.getContext();
// 從RpcContext中獲取當前請求的Future<T>
Future<VideoInfo> future = rpcContext.getFuture();
return future;
}
}
為支持響應式框架,同時保持通過@com.alibaba.dubbo.config.annotation.Reference注解自動注入接口對象的方式,我們的實現過程如下:
將接口對象包裝成響應式引用類型:ReactiveReference<Service>
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import java.util.Optional;
public interface ReactiveReference<Service> {
@NotNull
<T> Maybe<T> maybe(@NotNull Function<Service, T> f);
@NotNull
<T> Single<Optional<T>> single(@NotNull Function<Service, T> f);
}
接口中提供了兩個方法用到了整個單值類型的響應式流:
- Maybe<T>: 當向下游Observer發射的對象為null時,會結束整個流的生命周期
- Single<T>: 在整個流的生命周期中不允許發射null對象
實現響應式引用實現ReactiveReferenceImpl<Service>
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import org.jetbrains.annotations.NotNull;
import java.util.Optional;
final class RxJavaReactiveReferenceImpl<Service> implements ReactiveReference<Service> {
@NotNull
final Service service;
RxJavaReactiveReferenceImpl(@NotNull Service service) {
this.service = service;
}
/**
* maybe
*/
@Override
@NotNull
public <T> Maybe<T> maybe(@NotNull Function<Service, T> f) {
// 將Single<Optional<T>>解包成Maybe<T>
return this.single(f).mapOptional($ -> $);
}
/**
* single
* @param actual 實際的調用
*/
@Override
@NotNull
public <T> Single<Optional<T>> single(@NotNull Function<Service, T> actual) {
// 延遲創建一個Single流
return Single.defer(() ->
Single.create(
emitter -> {
DubboSubscription<T> subscription =
new DubboSubscription<>(
() -> actual.apply(service), // 將實際調用包裝成 java.util.concurrent.Callable<T>
emitter::onSuccess, // 執行成功向下游發射結果
emitter::onError // 執行異常,向下游發射異常信息
);
emitter.setCancellable(subscription::cancel); // 支持流取消時的回調
subscription.request(1L);
}
)
);
}
}
DubboSubscription<T>用于同Dubbo框架橋接,實現如下:
import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Subscription;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
public class DubboSubscription<T> implements Subscription {
/**
* dubbo rpc調用的結果Future引用,用于維護生命周期
* 當響應式流被取消時需要取消這個Future
*/
@NotNull
private final AtomicReference<Future<T>> futureRef = new AtomicReference<>();
/**
* 實際的接口調用
*/
@NotNull
private final Callable<T> actual;
/**
* 當rpc調用成功時執行
*/
@NotNull
private final Consumer<Optional<T>> onNext;
/**
* 當rpc調用異常時執行
*/
@NotNull
private final Consumer<Throwable> onError;
public DubboSubscription(@NotNull Callable<T> actual, @NotNull Consumer<Optional<T>> onNext, @NotNull Consumer<Throwable> onError) {
this.actual = actual;
this.onNext = onNext;
this.onError = onError;
}
@Override
public void request(long n) {
try {
RpcContext rpcContext = RpcContext.getContext();
// 異步執行dubbo rpc請求
this.actual.call();
// 取到當前請求的Future<T>
Future<T> future = rpcContext.getFuture();
if (!(future instanceof FutureAdapter)) {
// Future<T>類型驗證,要求是 com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter,
// 因為FutureAdapter包裝了 com.alibaba.dubbo.remoting.exchange.ResponseFuture對象,它支持設置結果回調
throw new UnsupportedOperationException(String.format("The future is not a instance of [%s]", FutureAdapter.class.getName()));
}
futureRef.set(future); // 設置Future引用
FutureAdapter<T> futureAdapter = (FutureAdapter<T>) future;
futureAdapter
.getFuture()
.setCallback(
new ResponseCallback() {
@SuppressWarnings("unchecked")
@Override
public void done(Object response) {
try {
T t = (T) ((Result) response).recreate(); // 結果對象反序列化
DubboSubscription.this.onNext.accept(Optional.ofNullable(t)); // dubbo rpc調用成功
} catch (Throwable e) {
DubboSubscription.this.onError(e); // 一般為反序列化操作引起的異常
}
}
@Override
public void caught(Throwable e) {
// // dubbo rpc調用異常
DubboSubscription.this.onError(e);
}
}
);
} catch (Throwable e) {
// 其它異常
onError(e);
}
}
private void onError(Throwable e) {
// 執行onError回調
DubboSubscription.this.onError.accept(e);
}
/**
* 響應式流取消時回調
*/
@Override
public void cancel() {
Future<T> future = futureRef.get();
if (future != null && !future.isDone()) {
future.cancel(true);
}
}
}
于是基于我們的響應式dubbo客戶端,服務接口的調用方式為:
@Component
public class DubboReactiveExample {
@Reference
private ReactiveReference<VideoInfoService> videoInfoService;
/**
* 響應式請求
*/
@NotNull
public Maybe<VideoInfo> maybeVideoInfo(long vid) {
// 提交響應式請求
return videoInfoService.maybe(service -> service.getVideoInfo(vid));
}
/**
* 響應式請求
*/
@NotNull
public Single<Optional<VideoInfo>> singleVideoInfo(long vid) {
// 提交響應式請求
return videoInfoService.single(service -> service.getVideoInfo(vid));
}
}
注:此處的實現為常規的Spring生命周期處理,與本文無關,具體實現細節不再贅述
擴展com.alibaba.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor使dubbo自動注入框架支持ReactiveReference<Service>類型,自動設置為異步模式,且向目標Component中注入的實際類型為ReactiveReferenceImpl<Service>
5、實踐過程
我們在讀懂業務流一節中對視頻對象創建主流程做了說明,然后將主流程分成了5個階段,現在我們使用RxJava代碼來描述這5個階段(注:后續代碼僅用于演示主流程的實現過程,并不是完整的實現過程)。
輸入參數Args
- 為保證每個流處理結點的無狀態性,Args被設計為不可變
/**
* 入參包裝
* 為保證每個流處理結點的無狀態性,Args被設計為不可變
*/
@Builder
@Getter
public class Args {
@NotNull
private final long accountId;
private final String title;
private final String coverImg;
private final String other;
/**
* 視頻內容校驗值
*/
private final String signature;
... // other fields
}
過程結果PassObj
- 為保證每個流處理結點的無狀態性,PassObj被設計為不可變
/**
* 響應式流處理過程對象包裝
* 為保證每個流處理結點的無狀態性,PassObj被設計為不可變
*/
@Builder(toBuilder = true)
@Getter
public class PassObj {
private final Args args;
private final Account account;
private final boolean argsAccepted;
private final VideoInfo videoInfo;
private final VideoInfoExtend videoInfoExtend;
private final PugcInfo pugcInfo;
private final CDNNodeInfo cdnNodeInfo;
... // other fields
}
響應式流主干
- 清晰地表達了業務流的執行過程
@NotNull
public Single<PassObj> create(@NotNull Args inputArgs) {
return Single.fromCallable(() -> PassObj.builder().args(inputArgs).build())
.flatMap(this::checkStage) // 1 數據校驗階段:帳號狀態/入參合規/內容重復性
.flatMap(this::createVideoObjectStage)// 2 對象創建階段: 視頻對象存儲
.flatMap(this::createCorrelateObjectStage)// 3 關聯數據對象創建階段
.flatMap(this::cdnDispatchStage)// 4 上傳準備:從CDN調度視頻內容存儲結點\
.doOnSuccess(passObj -> { // 5 結束
// 視頻對象創建成功
})
.doOnError(e -> log.error(e.getMessage(), e))
;
}
每個階段的詳細實現:
數據校驗階段:帳號狀態/入參合規/內容重復性
/**
* 1 數據校驗階段:帳號狀態/入參合規/內容重復性
*/
@NotNull
public Single<PassObj> checkStage(@NotNull PassObj passObj) {
return this.checkAndGetAccount(passObj) // 驗證并獲取account對象
.map(account -> passObj.toBuilder().account(account).build())// 帳號狀態校驗通過, 重建passObj,并裝配account
.flatMap(_passObj ->
this.checkInputArgs(_passObj) // 入參合規性校驗
.map(argsAccepted -> _passObj.toBuilder().argsAccepted(argsAccepted).build())// 入參合規性校驗通過, 重建passObj,并裝配校驗結果
)
.flatMap(_passObj -> // 視頻元數據信息校驗
this.videoMetaService.single(s -> s.getVideoMetaBySignature(passObj.getArgs().getSignature()))
.map($videoMeta -> {
if ($videoMeta.isPresent()) {
// 視頻元數據信息存在,說明視頻上傳是重復的,執行相關的異常處理
throw new VideoDuplicatedException();
}
// 視頻沒有重復,可以創建,向后傳遞passObj
return _passObj;
})
)
;
}
- 驗證并獲取Account對象
/**
* 驗證并獲取Account對象
*/
@NotNull
private Single<Account> checkAndGetAccount(@NotNull PassObj passObj) {
return Maybe.fromCallable(() -> passObj.getArgs().getAccountId())// 取accountId
.filter(accountId -> accountId > 0L) // 取基本有效的值
.flatMap(accountId -> this.accountService.maybe(s -> s.checkAccount(accountId)))// 驗證帳號狀態
.defaultIfEmpty(false)
.doOnSuccess(accountAccepted -> {
if (!accountAccepted) {
// 帳號狀態校驗失敗
// 通過明確的異常來處理失敗過程
throw new AccountRejectedException();
}
})
.flatMapMaybe(_accountAccepted -> this.accountService.maybe(s -> s.getAccount(passObj.getArgs().getAccountId())))
.switchIfEmpty(Single.error(AccountObjectNullPointerException::new))
;
}
- 入參合規性校驗
/**
* 入參合規性校驗
*/
private Single<Boolean> checkInputArgs(@NotNull PassObj passObj) { // 入參合規性校驗可并發執行
return Single.zip(
this.contentAuditService.maybe(s -> s.titleCheck(passObj.getArgs().getTitle())).switchIfEmpty(Single.error(TitleRejectedException::new)), // title
this.contentAuditService.maybe(s -> s.coverImgCheck(passObj.getArgs().getTitle())).switchIfEmpty(Single.error(CoverImgRejectedException::new)), // coverImage
this.contentAuditService.maybe(s -> s.otherCheck(passObj.getArgs().getTitle())).switchIfEmpty(Single.error(OtherRejectedException::new)), // others
(titleAccepted, coverImgAccepted, otherAccepted) -> titleAccepted && coverImgAccepted && otherAccepted
);
}
對象創建階段: 視頻對象存儲
/**
* 2 對象創建階段: 視頻對象存儲
*/
@NotNull
public Single<PassObj> createVideoObjectStage(@NotNull PassObj passObj) {
return Single.fromCallable(() ->
// 包裝VideoInfo對象
VideoInfo.builder()
.userId(passObj.getAccount().getId())
.title(passObj.getArgs().getTitle())
.coverImg(passObj.getArgs().getCoverImg())
.other(passObj.getArgs().getOther())
.build()
)
.flatMapMaybe(videoInfo ->
this.videoInfoService.maybe(s -> s.createVideoInfo(videoInfo)) // 保存視頻對象
.filter(createdVideoInfo -> createdVideoInfo.getId() > 0L) // 驗證保存結果
)
.onErrorResumeNext(e -> Maybe.error(() -> new VideoInfoCreateException(e))) // 異常處理
.switchIfEmpty(Single.error(VideoInfoCreateException::new)) // 異常處理
.map(createdVideoInfo -> passObj.toBuilder().videoInfo(createdVideoInfo).build()) // 將保存結果裝配到passObj中
;
}
關聯數據對象創建階段
/**
* 3 關聯數據對象創建階段
*/
@NotNull
public Single<PassObj> createCorrelateObjectStage(@NotNull PassObj passObj) {
return Single.zip( // 必要內容:自媒體業務/視頻對象擴展 可并發執行
Single.fromCallable(() -> // 視頻對象擴展
VideoInfoExtend.builder()
.userId(passObj.getAccount().getId())
.videoInfoId(passObj.getVideoInfo().getId())
.build()
)
.flatMap(videoInfoExtend ->
this.videoInfoService.maybe(s -> s.createVideoInfoExtend(videoInfoExtend))
.onErrorResumeNext(e -> Maybe.error(() -> new VideoInfoExtendCreateException(e))) // 異常處理
.switchIfEmpty(Single.error(VideoInfoExtendCreateException::new)) // 異常處理
),
Single.fromCallable(() -> // 自媒體業務
PugcInfo.builder()
.userId(passObj.getAccount().getId())
.videoInfoId(passObj.getVideoInfo().getId())
.build()
)
.flatMap(pugcInfo ->
this.pugcService.maybe(s -> s.createPugcInfo(pugcInfo))
.onErrorResumeNext(e -> Maybe.error(() -> new PugcInfoCreateException(e))) // 異常處理
.switchIfEmpty(Single.error(PugcInfoCreateException::new)) // 異常處理
),
(createdVideoInfoExtend, createdPugcInfo) ->
passObj.toBuilder()
.videoInfoExtend(createdVideoInfoExtend)
.pugcInfo(createdPugcInfo)
.build()
)
.doOnSuccess(updatedPassObj -> { // 可選內容:視頻元數據/轉碼/其它 不關注結果,異步執行
Maybe.fromCallable(() ->
VideoMeta.builder()
.signature(updatedPassObj.getArgs().getSignature())
.build()
)
.flatMap(videoMeta -> this.videoMetaService.maybe(s -> s.createVideoMeta(videoMeta)))
.doOnError(e -> log.error(e.getMessage(), e))
.subscribe();
this.videoTranscodingService.maybe(s -> s.createVideoKeyFrame(updatedPassObj.getVideoInfo())).subscribe();
this.videoTranscodingService.maybe(s -> s.createVideoResolution(updatedPassObj.getVideoInfo())).subscribe();
this.otherOptionalService.maybe(s -> s.doSomething(updatedPassObj.getVideoInfo())).subscribe();
})
;
}
上傳準備:從CDN調度視頻內容存儲結點
/**
* 上傳準備:從CDN調度視頻內容存儲結點
*/
@NotNull
public Single<PassObj> cdnDispatchStage(@NotNull PassObj passObj) {
return cdnDispatchService.maybe(CDNDispatchService::dispatch) // 調度一個CDN上傳節點
.onErrorResumeNext(e -> Maybe.error(() -> new CDNDispatchException(e))) // 調用異常
.switchIfEmpty(Single.error(CDNDispatchException::new)) // 沒有取到節點, 異常處理
.map(cdnNodeInfo ->
passObj.toBuilder()
.cdnNodeInfo(cdnNodeInfo)
.build()
)
;
}
6、總結
以上內容是搜狐視頻服務端PUGC團隊,首次在核心業務接口中應用響應式異步非阻塞架構的思考和實施過程,文中主要闡述了兩個內容:
- Dubbo響應式客戶端的實現過程
- 視頻對象創建接口的業務分析和實現過程
通過本次重構,我們獲得了如下收益:
- 重新梳理業務流,將業務流同響應式流整合,基于響應式編程規范業務流的實現過程
- 視頻對象創建接口平均響應時間從256ms降低到146ms,性能提供顯著
結束,感謝閱讀!