討論一個(gè)技術(shù)問(wèn)題,大模型流式返回 原創(chuàng)
?“ 技術(shù)上最容易犯的錯(cuò)就是經(jīng)驗(yàn)主義,以及拿來(lái)主義”
最近在對(duì)接GPT實(shí)現(xiàn)一個(gè)功能,具體功能就不說(shuō)了;主要是這個(gè)功能需要流式返回,因此踩了一些坑;所以就在此記錄一下。至于什么是流式返回,不清楚的可以自己?jiǎn)柖饶铩?/p>
大模型流式返回帶來(lái)的問(wèn)題
自chatGPT推出以來(lái),其一個(gè)字一個(gè)字的出現(xiàn),就像一個(gè)打字機(jī);這效果驚艷了很多人,因此在很多場(chǎng)景下很多人都會(huì)選擇打字機(jī)的效果。而打字機(jī)效果背后的實(shí)現(xiàn)就是流式返回。
對(duì)技術(shù)有過(guò)了解的人應(yīng)該都知道,正常情況下接口是在所有業(yè)務(wù)處理完成之后一起返回;但流式返回是分多批次返回。簡(jiǎn)單來(lái)說(shuō)就是處理了一部分就返回一部分,不用等全部完成之后再返回。
如下圖所示就是一個(gè)典型的流式返回:
那目前流式返回所遇到的問(wèn)題是什么呢?
其實(shí)從后端的角度來(lái)說(shuō),流式返回沒(méi)有任何問(wèn)題;不論是使用大模型官方提供的SDK亦或者是調(diào)用他們的接口,都是正常的流式返回。但問(wèn)題是,調(diào)用第三方接口的目的是為了完成業(yè)務(wù)功能,因此怎么把這個(gè)流式返回也用流式返回給前端就是一個(gè)需要思考的問(wèn)題了。
從web開(kāi)發(fā)的角度來(lái)說(shuō),現(xiàn)在前后端交互主要使用的是http協(xié)議;但http協(xié)議是前端向后端發(fā)起請(qǐng)求,而不能從后端向前端發(fā)起請(qǐng)求;為了解決這個(gè)問(wèn)題,因此就有了websocket和SSE協(xié)議。
這兩個(gè)協(xié)議的區(qū)別是websocket是全雙工的,而SSE是半雙工的;意思就是說(shuō),websocket建立連接之后,前端可以主動(dòng)向后端發(fā)消息,后端也可以主動(dòng)向前端發(fā)消息;而SSE是只能后端向前端發(fā)消息。
但不論是websocket還是SSE協(xié)議,本質(zhì)上只是一種通訊協(xié)議,和業(yè)務(wù)沒(méi)什么具體的關(guān)系;這就類(lèi)似于,搞貨運(yùn)的目的是把貨物安全的送到目的地,至于你是用汽車(chē)運(yùn),還是用火車(chē)運(yùn)都可以。
那問(wèn)題出在哪里呢?
剛開(kāi)始我們使用的是websocket作為流式返回的通訊工具;但再實(shí)際使用中才發(fā)現(xiàn)一個(gè)很大的問(wèn)題,那就是websocket無(wú)法在短時(shí)間內(nèi)接受大量的網(wǎng)絡(luò)傳輸需求;一旦過(guò)量就會(huì)導(dǎo)致websocket緩沖區(qū)溢出,也就是TEXT_FULL_WRITRING異常;簡(jiǎn)單來(lái)說(shuō)就是,websocket為了減輕網(wǎng)絡(luò)壓力,每次發(fā)送消息都會(huì)先把緩沖區(qū)寫(xiě)滿;然后再一次性發(fā)送。
但由于流式返回速度較快,有時(shí)候websocket上一條消息還沒(méi)發(fā)送出去,下一條新的數(shù)據(jù)又進(jìn)來(lái)了;因此就會(huì)導(dǎo)致websocket報(bào)錯(cuò),即使使用的是異步發(fā)送也會(huì)報(bào)錯(cuò)。
public void sendText(String text) {
for(Session session : sessions.values()){
if (session.isOpen()) {
try {
//異步發(fā)送
session.getAsyncRemote().sendText(text);
} catch (Exception e) {
log.error("發(fā)送會(huì)話異常");
}
}else{
log.error("socket 在不可發(fā)送狀態(tài)");
}
}
}
為了解決這個(gè)問(wèn)題, 因此就在網(wǎng)上查了一下發(fā)現(xiàn);類(lèi)似于這種流式返回,大部分人的處理方式都是用SSE協(xié)議;因?yàn)镾SE協(xié)議相對(duì)websocket更簡(jiǎn)單,效率更高。而在java語(yǔ)言中,使用SSE有兩種方式,第一種就是自己手動(dòng)創(chuàng)建SSE對(duì)象,使用SseEmitter 對(duì)象來(lái)實(shí)現(xiàn)。
但這種原生的實(shí)現(xiàn)方式存在很多問(wèn)題,比如需要自己去控制sse與用戶的關(guān)聯(lián)關(guān)系,sse的狀態(tài)判斷,自動(dòng)重連等等。
因此,springboot就提供了另一種方式,那就是Flux流式處理。
OpenAIClient client = new OpenAIClientBuilder().credential(new AzureKeyCredential(key)).endpoint(endPoint).buildClient();
IterableStream<ChatCompletions> stream = client.getChatCompletionsStream(modelName, new ChatCompletionsOptions(messages));
StringBuffer stringBuffer = new StringBuffer();
return Flux.<String>create(sink -> {
stream.iterator().forEachRemaining(
chatCompletions -> {
if (chatCompletions.getChoices() != null && chatCompletions.getChoices().size() > 0) {
if (chatCompletions.getChoices().get(0).getDelta() != null) {
String content = chatCompletions.getChoices().get(0).getDelta().getContent();
log.info(content);
if (content != null) {
stringBuffer.append(content);
sink.next(content);
}
}
}
}
);
sink.complete();
}).map(data -> ServerSentEvent.<String>builder().data(data).build())
// 每隔一段時(shí)間發(fā)送一個(gè)字符
.delayElements(Duration.ofMillis(10))
// 停止
.takeWhile(i -> !redisUtil.hasKey(stopKey))
// 最后執(zhí)行
.doOnComplete(() -> {
//傳輸完成 業(yè)務(wù)處理
});
如上所示,F(xiàn)lux通過(guò)sink封裝大模型的流式返回,然后調(diào)用next方法主動(dòng)把數(shù)據(jù)返回給前端,以此達(dá)到流式效果。
雖然從操作上來(lái)說(shuō),各種技術(shù)已經(jīng)逐漸成熟,我們都可以直接拿來(lái)主義,拿過(guò)來(lái)用就好了;但實(shí)際上存在的一個(gè)問(wèn)題就是,當(dāng)你不知道其原理,又沒(méi)有經(jīng)驗(yàn)時(shí),你還是會(huì)踩很多坑。
?
本文轉(zhuǎn)載自公眾號(hào)AI探索時(shí)代 作者:DFires
