WebSocket 的六種集成方式
由于前段時間我實現了一個庫【Spring Cloud】一個配置注解實現 WebSocket 集群方案
以至于我對WebSocket的各種集成方式做了一些研究,目前我所了解到的就是下面這些了(就一個破ws都有這么多花里胡哨的集成方式了?)
- Javax
- WebMVC
- WebFlux
- Java-WebSocket
- SocketIO
- Netty
今天主要介紹一下前3種方式,畢竟現在的主流框架還是Spring Boot
而后3種其實和Spring Boot并不強行綁定,基于Java就可以支持,不過我也會對后3種做個簡單的介紹,大家先混個眼熟就行了
那么接下來我們就來講講前3種方式(Javax,WebMVC,WebFlux)在Spring Boot中的服務端和客戶端配置(客戶端配置也超重要的有木有,平時用不到,用到了卻基本找不到文檔,這也太絕望了)
Javax
在java的擴展包javax.websocket中就定義了一套WebSocket的接口規范
服務端
一般使用注解的方式來進行配置
第一步
@Component
@ServerEndpoint("/websocket/{type}")
publicclass JavaxWebSocketServerEndpoint {
@OnOpen
public void onOpen(Session session, EndpointConfig config,
@PathParam(value = "type") String type) {
//連接建立
}
@OnClose
public void onClose(Session session, CloseReason reason) {
//連接關閉
}
@OnMessage
public void onMessage(Session session, String message) {
//接收文本信息
}
@OnMessage
public void onMessage(Session session, PongMessage message) {
//接收pong信息
}
@OnMessage
public void onMessage(Session session, ByteBuffer message) {
//接收二進制信息,也可以用byte[]接收
}
@OnError
public void onError(Session session, Throwable e) {
//異常處理
}
}
我們在類上添加@ServerEndpoint注解來表示這是一個服務端點,同時可以在注解中配置路徑,這個路徑可以配置成動態的,使用{}包起來就可以了
- @OnOpen用來標記對應的方法作為客戶端連接上來之后的回調,Session就相當于和客戶端的連接啦,我們可以把它緩存起來用于發送消息;通過@PathParam注解就可以獲得動態路徑中對應值了
- @OnClose用來標記對應的方法作為客戶端斷開連接之后的回調,我們可以在這個方法中移除對應Session的緩存,同時可以接受一個CloseReason的參數用于獲取關閉原因
- @OnMessage用來標記對應的方法作為接收到消息之后的回調,我們可以接受文本消息,二進制消息和pong消息
- @OnError用來標記對應的方法作為拋出異常之后的回調,可以獲得對應的Session和異常對象
第二步
implementation 'org.springframework.boot:spring-boot-starter-websocket'
@Configuration(proxyBeanMethods = false)
public class JavaxWebSocketConfiguration {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
依賴Spring的WebSocket模塊,手動注入ServerEndpointExporter就可以了
需要注意ServerEndpointExporter是Spring中的類,算是Spring為了支持javax.websocket的原生用法所提供的支持類
冷知識
javax.websocket庫中定義了PongMessage而沒有PingMessage
通過我的測試發現基本上所有的WebSocket包括前端js自帶的,都實現了自動回復;也就是說當接收到一個ping消息之后,是會自動回應一個pong消息,所以沒有必要再自己接受ping消息來處理了,即我們不會接受到ping消息;關注工眾號:碼猿技術專欄,回復關鍵詞:1111 獲取阿里內部Java性能調優手冊!
當然我上面講的ping和pong都是需要使用框架提供的api,如果是我們自己通過Message來自定義心跳數據的話是沒有任何的處理的,下面是對應的api
//發送ping
session.getAsyncRemote().sendPing(ByteBuffer buffer);
//發送pong
session.getAsyncRemote().sendPong(ByteBuffer buffer);
然后我又發現js自帶的WebSocket是沒有發送ping的api的,所以是不是可以猜想當初就是約定服務端發送ping,客戶端回復pong
客戶端
客戶端也是使用注解配置
第一步
@ClientEndpoint
publicclass JavaxWebSocketClientEndpoint {
@OnOpen
public void onOpen(Session session) {
//連接建立
}
@OnClose
public void onClose(Session session, CloseReason reason) {
//連接關閉
}
@OnMessage
public void onMessage(Session session, String message) {
//接收文本消息
}
@OnMessage
public void onMessage(Session session, PongMessage message) {
//接收pong消息
}
@OnMessage
public void onMessage(Session session, ByteBuffer message) {
//接收二進制消息
}
@OnError
public void onError(Session session, Throwable e) {
//異常處理
}
}
客戶端使用@ClientEndpoint來標記,其他的@OnOpen,@OnClose,@OnMessage,@OnError和服務端一模一樣
第二步
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
Session session = container.connectToServer(JavaxWebSocketClientEndpoint.class, uri);
我們可以通過ContainerProvider來獲得一個WebSocketContainer,然后調用connectToServer方法將我們的客戶端類和連接的uri傳入就行了
冷知識
通過ContainerProvider#getWebSocketContainer獲得WebSocketContainer其實是基于SPI實現的
在Spring的環境中我更推薦大家使用ServletContextAware來獲得,代碼如下
@Component
publicclass JavaxWebSocketContainer implements ServletContextAware {
privatevolatile WebSocketContainer container;
public WebSocketContainer getContainer() {
if (container == null) {
synchronized (this) {
if (container == null) {
container = ContainerProvider.getWebSocketContainer();
}
}
}
return container;
}
@Override
public void setServletContext(@NonNull ServletContext servletContext) {
if (container == null) {
container = (WebSocketContainer) servletContext
.getAttribute("javax.websocket.server.ServerContainer");
}
}
}
發消息
Session session = ...
//發送文本消息
session.getAsyncRemote().sendText(String message);
//發送二進制消息
session.getAsyncRemote().sendBinary(ByteBuffer message);
//發送對象消息,會嘗試使用Encoder編碼
session.getAsyncRemote().sendObject(Object message);
//發送ping
session.getAsyncRemote().sendPing(ByteBuffer buffer);
//發送pong
session.getAsyncRemote().sendPong(ByteBuffer buffer);
WebMVC
依賴肯定是必不可少的
implementation 'org.springframework.boot:spring-boot-starter-websocket'
服務端
第一步
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
publicclass ServletWebSocketServerHandler implements WebSocketHandler {
@Override
public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
//連接建立
}
@Override
public void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) throws Exception {
//接收消息
}
@Override
public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {
//異常處理
}
@Override
public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {
//連接關閉
}
@Override
public boolean supportsPartialMessages() {
//是否支持接收不完整的消息
returnfalse;
}
}
我們實現一個WebSocketHandler來處理WebSocket的連接,關閉,消息和異常
第二步
@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
registry
//添加處理器到對應的路徑
.addHandler(new ServletWebSocketServerHandler(), "/websocket")
.setAllowedOrigins("*");
}
}
首先需要添加@EnableWebSocket來啟用WebSocket
然后實現WebSocketConfigurer來注冊WebSocket路徑以及對應的WebSocketHandler
握手攔截
提供了HandshakeInterceptor來攔截握手
@Configuration
@EnableWebSocket
publicclass ServletWebSocketServerConfigurer implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
registry
//添加處理器到對應的路徑
.addHandler(new ServletWebSocketServerHandler(), "/websocket")
//添加握手攔截器
.addInterceptors(new ServletWebSocketHandshakeInterceptor())
.setAllowedOrigins("*");
}
publicstaticclass ServletWebSocketHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
//握手之前
//繼續握手返回true, 中斷握手返回false
returnfalse;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
//握手之后
}
}
}
冷知識
我在集成的時候發現這種方式沒辦法動態匹配路徑,它的路徑就是固定的,沒辦法使用如/websocket/**這樣的通配符
我在研究了一下之后發現可以在UrlPathHelper上做點文章
@Configuration
@EnableWebSocket
publicclass ServletWebSocketServerConfigurer implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
if (registry instanceof ServletWebSocketHandlerRegistry) {
//替換UrlPathHelper
((ServletWebSocketHandlerRegistry) registry)
.setUrlPathHelper(new PrefixUrlPathHelper("/websocket"));
}
registry
//添加處理器到對應的路徑
.addHandler(new ServletWebSocketServerHandler(), "/websocket/**")
.setAllowedOrigins("*");
}
publicclass PrefixUrlPathHelper extends UrlPathHelper {
private String prefix;
@Override
public@NonNullString resolveAndCacheLookupPath(@NonNull HttpServletRequest request) {
//獲得原本的Path
String path = super.resolveAndCacheLookupPath(request);
//如果是指定前綴就返回對應的通配路徑
if (path.startsWith(prefix)) {
return prefix + "/**";
}
return path;
}
}
}
因為它內部實際上就是用一個Map<String, WebSocketHandler>
來存的,所以沒有辦法用通配符
主要是有現成的AntPathMatcher實現通配應該不麻煩才對啊
客戶端
第一步
public class ServletWebSocketClientHandler implements WebSocketHandler {
@Override
public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
//連接建立
}
@Override
public void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) throws Exception {
//接收消息
}
@Override
public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {
//異常處理
}
@Override
public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {
//連接關閉
}
@Override
public boolean supportsPartialMessages() {
//是否支持接收不完整的消息
returnfalse;
}
}
和服務端一樣我們需要先實現一個WebSocketHandler來處理WebSocket的連接,關閉,消息和異常
第二步
WebSocketClient client = new StandardWebSocketClient();
WebSocketHandler handler = new ServletWebSocketClientHandler();
WebSocketConnectionManager manager = new WebSocketConnectionManager(client, handler, uri);
manager.start();
首先我們需要先new一個StandardWebSocketClient,可以傳入一個WebSocketContainer參數,獲得該對象的方式我之前已經介紹過了,這邊就先略過
然后new一個WebSocketConnectionManager傳入WebSocketClient,WebSocketHandler還有路徑uri
最后調用一下WebSocketConnectionManager的start方法就可以啦
冷知識
這里如果大家去看WebSocketClient的實現類就會發現有StandardWebSocketClient還有JettyWebSocketClient等等,所以大家可以根據自身項目所使用的容器來選擇不同的WebSocketClient實現類
這里給大家貼一小段Spring適配不同容器WebSocket的代碼
public abstractclass AbstractHandshakeHandler implements HandshakeHandler, Lifecycle {
privatestaticfinalboolean tomcatWsPresent;
privatestaticfinalboolean jettyWsPresent;
privatestaticfinalboolean jetty10WsPresent;
privatestaticfinalboolean undertowWsPresent;
privatestaticfinalboolean glassfishWsPresent;
privatestaticfinalboolean weblogicWsPresent;
privatestaticfinalboolean websphereWsPresent;
static {
ClassLoader classLoader = AbstractHandshakeHandler.class.getClassLoader();
tomcatWsPresent = ClassUtils.isPresent(
"org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", classLoader);
jetty10WsPresent = ClassUtils.isPresent(
"org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", classLoader);
jettyWsPresent = ClassUtils.isPresent(
"org.eclipse.jetty.websocket.server.WebSocketServerFactory", classLoader);
undertowWsPresent = ClassUtils.isPresent(
"io.undertow.websockets.jsr.ServerWebSocketContainer", classLoader);
glassfishWsPresent = ClassUtils.isPresent(
"org.glassfish.tyrus.servlet.TyrusHttpUpgradeHandler", classLoader);
weblogicWsPresent = ClassUtils.isPresent(
"weblogic.websocket.tyrus.TyrusServletWriter", classLoader);
websphereWsPresent = ClassUtils.isPresent(
"com.ibm.websphere.wsoc.WsWsocServerContainer", classLoader);
}
}
發消息
import org.springframework.web.socket.*;
WebSocketSession session = ...
//發送文本消息
session.sendMessage(new TextMessage(CharSequence message);
//發送二進制消息
session.sendMessage(new BinaryMessage(ByteBuffer message));
//發送ping
session.sendMessage(new PingMessage(ByteBuffer message));
//發送pong
session.sendMessage(new PongMessage(ByteBuffer message));
WebFlux
WebFlux的WebSocket不需要額外的依賴包
服務端
第一步
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
publicclass ReactiveWebSocketServerHandler implements WebSocketHandler {
@NonNull
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> send = session.send(Flux.create(sink -> {
//可以持有sink對象在任意時候調用next發送消息
sink.next(WebSocketMessage message);
})).doOnError(it -> {
//異常處理
});
Mono<Void> receive = session.receive()
.doOnNext(it -> {
//接收消息
})
.doOnError(it -> {
//異常處理
})
.then();
@SuppressWarnings("all")
Disposable disposable = session.closeStatus()
.doOnError(it -> {
//異常處理
})
.subscribe(it -> {
//連接關閉
});
return Mono.zip(send, receive).then();
}
}
首先需要注意這里的WebSocketHandler和WebSocketSession是reactive包下的
- 通過WebSocketSession#send方法來持有一個
FluxSink<WebSocketMessage>
來用于發送消息 - 通過WebSocketSession#receive來訂閱消息
- 通過WebSocketSession#closeStatus來訂閱連接關閉事件
第二步
@Component
public class ReactiveWebSocketServerHandlerMapping extends SimpleUrlHandlerMapping {
public ReactiveWebSocketServerHandlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/websocket/**", new ReactiveWebSocketServerHandler());
setUrlMap(map);
setOrder(100);
}
}
注冊一個HandlerMapping同時配置路徑和對應的WebSocketHandler
第三步
@Configuration(proxyBeanMethods = false)
public class ReactiveWebSocketConfiguration {
@Bean
public WebSocketHandlerAdapter webSocketHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
注入WebSocketHandlerAdapter
冷知識
我們自定義的HandlerMapping需要設置order,如果不設置,默認為Ordered.LOWEST_PRECEDENCE,會導致這個HandlerMapping被放在最后,當有客戶端連接上來時會被其他的HandlerMapping優先匹配上而連接失敗
客戶端
第一步
public class ReactiveWebSocketClientHandler implements WebSocketHandler {
@NonNull
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> send = session.send(Flux.create(sink -> {
//可以持有sink對象在任意時候調用next發送消息
sink.next(WebSocketMessage message);
})).doOnError(it -> {
//處理異常
});
Mono<Void> receive = session.receive()
.doOnNext(it -> {
//接收消息
})
.doOnError(it -> {
//異常處理
})
.then();
@SuppressWarnings("all")
Disposable disposable = session.closeStatus()
.doOnError(it -> {
//異常處理
})
.subscribe(it -> {
//連接關閉
});
return Mono.zip(send, receive).then();
}
}
客戶端WebSocketHandler的寫法和服務端的一樣
第二步
import org.springframework.web.reactive.socket.client.WebSocketClient;
WebSocketClient client = ReactorNettyWebSocketClient();
WebSocketHandler handler = new ReactiveWebSocketClientHandler();
client.execute(uri, handler).subscribe();
首先我們需要先new一個ReactorNettyWebSocketClient
然后調用一下WebSocketClient的execute方法傳入路徑uri和WebSocketHandler并繼續調用subscribe方法就行啦
冷知識
和WebMVC中的WebSocketClient一樣,Reactive包中的WebSocketClient也有很多實現類,比如ReactorNettyWebSocketClient,JettyWebSocketClient,UndertowWebSocketClient,TomcatWebSocketClient等等,也是需要大家基于自身項目的容器使用不同的實現類
這里也給大家貼一小段Reactive適配不同容器WebSocket的代碼
public class HandshakeWebSocketService implements WebSocketService, Lifecycle {
privatestaticfinalboolean tomcatPresent;
privatestaticfinalboolean jettyPresent;
privatestaticfinalboolean jetty10Present;
privatestaticfinalboolean undertowPresent;
privatestaticfinalboolean reactorNettyPresent;
static {
ClassLoader loader = HandshakeWebSocketService.class.getClassLoader();
tomcatPresent = ClassUtils.isPresent("org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", loader);
jettyPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.WebSocketServerFactory", loader);
jetty10Present = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", loader);
undertowPresent = ClassUtils.isPresent("io.undertow.websockets.WebSocketProtocolHandshakeHandler", loader);
reactorNettyPresent = ClassUtils.isPresent("reactor.netty.http.server.HttpServerResponse", loader);
}
}
發消息
我們需要使用在WebSocketHandler中獲得的FluxSink<WebSocketMessage>
來發送消息
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
publicclass ReactiveWebSocket {
privatefinal WebSocketSession session;
privatefinal FluxSink<WebSocketMessage> sender;
public ReactiveWebSocket(WebSocketSession session, FluxSink<WebSocketMessage> sender) {
this.session = session;
this.sender = sender;
}
public String getId() {
return session.getId();
}
public URI getUri() {
return session.getHandshakeInfo().getUri();
}
public void send(Object message) {
if (message instanceof WebSocketMessage) {
sender.next((WebSocketMessage) message);
} elseif (message instanceof String) {
//發送文本消息
sender.next(session.textMessage((String) message));
} elseif (message instanceof DataBuffer) {
//發送二進制消息
sender.next(session.binaryMessage(factory -> (DataBuffer) message));
} elseif (message instanceof ByteBuffer) {
發送二進制消息
sender.next(session.binaryMessage(factory -> factory.wrap((ByteBuffer) message)));
} elseif (message instanceofbyte[]) {
發送二進制消息
sender.next(session.binaryMessage(factory -> factory.wrap((byte[]) message)));
} else {
thrownew IllegalArgumentException("Message type not match");
}
}
public void ping() {
//發送ping
sender.next(session.pingMessage(factory -> factory.wrap(ByteBuffer.allocate(0))));
}
public void pong() {
//發送pong
sender.next(session.pongMessage(factory -> factory.wrap(ByteBuffer.allocate(0))));
}
public void close(CloseStatus reason) {
sender.complete();
session.close(reason).subscribe();
}
}
Java-WebSocket
這是一個純java的第三方庫,專門用于實現WebSocket
Github上已經有很詳細的使用教程了,現在有9k+的Star
傳送門:https://github.com/TooTallNate/Java-WebSocket
SocketIO
該庫使用的協議是經過自己封裝的,支持很多的語言,提供了統一的接口,所以需要使用它提供的Server和Client來連接,如socket.io-server-java和socket.io-client-java
這個庫我了解下來主要用于實時聊天等場景,所以如果只是普通的WebSocket功能就有點大材小用了
Github上也有非常詳細的使用文檔,大家如果有興趣可以研究一下
傳送門:https://github.com/socketio
Netty
這個大家應該都比較熟悉了,就算沒用過肯定也聽過
網上的文檔和示例也非常多,我這里就不介紹有的沒的了,Github傳送門
傳送門:https://github.com/netty/netty