成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

聊聊分布式下的WebSocket解決方案

開發 前端 分布式
最近自己搭建了個項目,項目本身很簡單,但是里面有使用WebSocket進行消息提醒的功能,大體情況是這樣的。發布消息者在系統中發送消息,實時的把消息推送給對應的一個部門下的所有人。

前言

最近自己搭建了個項目,項目本身很簡單,但是里面有使用WebSocket進行消息提醒的功能,大體情況是這樣的。

發布消息者在系統中發送消息,實時的把消息推送給對應的一個部門下的所有人。

這里面如果是單機應用的情況時,我們可以通過部門的id和用戶的id組成一個唯一的key,與應用服務器建立WebSocket長連接,然后就可以接收到發布消息者發送的消息了。

但是真正把項目應用于生產環境中時,我們是不可能就部署一個單機應用的,而是要部署一個集群。

[[343393]]

所以我通過Nginx+兩臺Tomcat搭建了一個簡單的負載均衡集群,作為測試使用

但是問題出現了,我們的客戶端瀏覽器只會與一臺服務器建立WebSocket長連接,所以發布消息者在發送消息時,就沒法保證所有目標部門的人都能接收到消息(因為這些人連接的可能不是一個服務器)。

本篇文章就是針對于這么一個問題展開討論,提出一種解決方案,當然解決方案不止一種,那我們開始吧。

WebSocket單體應用介紹

在介紹分布式集群之前,我們先來看一下王子的WebSocket代碼實現,先來看java后端代碼如下:

  1. import javax.websocket.*; 
  2. import javax.websocket.server.PathParam; 
  3. import javax.websocket.server.ServerEndpoint; 
  4. import com.alibaba.fastjson.JSON; 
  5. import com.alibaba.fastjson.JSONObject;import java.io.IOException; 
  6. import java.util.Map; 
  7. import java.util.concurrent.ConcurrentHashMap; 
  8. @ServerEndpoint("/webSocket/{key}"
  9. public class WebSocket { 
  10.     private static int onlineCount = 0; 
  11.     /** 
  12.      * 存儲連接的客戶端 
  13.      */ 
  14.     private static Map<String, WebSocket> clients = new ConcurrentHashMap<String, WebSocket>(); 
  15.     private Session session; 
  16.     /** 
  17.      * 發送的目標科室code 
  18.      */ 
  19.     private String key
  20.     @OnOpen 
  21.     public void onOpen(@PathParam("key") String key, Session session) throws IOException { 
  22.         this.key = key
  23.         this.session = session; 
  24.         if (!clients.containsKey(key)) { 
  25.             addOnlineCount();        }        clients.put(key, this); 
  26.         Log.info(key+"已連接消息服務!"); 
  27.     }    @OnClose 
  28.     public void onClose() throws IOException { 
  29.         clients.remove(key);        subOnlineCount();    }    @OnMessage 
  30.     public void onMessage(String message) throws IOException { 
  31.         if(message.equals("ping")){ 
  32.             return ; 
  33.         }        JSONObject jsonTo = JSON.parseObject(message);        String mes = (String) jsonTo.get("message"); 
  34.         if (!jsonTo.get("to").equals("All")){ 
  35.             sendMessageTo(mes, jsonTo.get("to").toString()); 
  36.         }else
  37.             sendMessageAll(mes);        }    }    @OnError 
  38.     public void onError(Session session, Throwable error) { 
  39.         error.printStackTrace();    }    private void sendMessageTo(String message, String To) throws IOException { 
  40.         for (WebSocket item : clients.values()) { 
  41.             if (item.key.contains(To) ) 
  42.                 item.session.getAsyncRemote().sendText(message);        }    }    private void sendMessageAll(String message) throws IOException { 
  43.         for (WebSocket item : clients.values()) { 
  44.             item.session.getAsyncRemote().sendText(message);        }    }    public static synchronized int getOnlineCount() { 
  45.         return onlineCount; 
  46.     }    public static synchronized void addOnlineCount() { 
  47.         WebSocket.onlineCount++;    }    public static synchronized void subOnlineCount() { 
  48.         WebSocket.onlineCount--;    }    public static synchronized Map<String, WebSocket> getClients() { 
  49.         return clients; 
  50.     }} 

示例代碼中并沒有使用Spring,用的是原生的java web編寫的,簡單和大家介紹一下里面的方法。

  • onOpen:在客戶端與WebSocket服務連接時觸發方法執行
  • onClose:在客戶端與WebSocket連接斷開的時候觸發執行
  • onMessage:在接收到客戶端發送的消息時觸發執行
  • onError:在發生錯誤時觸發執行

可以看到,在onMessage方法中,我們直接根據客戶端發送的消息,進行消息的轉發功能,這樣在單體消息服務中是沒有問題的。

再來看一下js代碼

  1. var host = document.location.host; 
  2.     // 獲得當前登錄科室    var deptCodes='${sessionScope.$UserContext.departmentID}'
  3.     deptCodes=deptCodes.replace(/[\[|\]|\s]+/g, ""); 
  4.     var key = '${sessionScope.$UserContext.userID}'+deptCodes; 
  5.     var lockReconnect = false;  //避免ws重復連接 
  6.     var ws = null;          // 判斷當前瀏覽器是否支持WebSocket    var wsUrl = 'ws://' + host + '/webSocket/'key
  7.     createWebSocket(wsUrl);   //連接ws    function createWebSocket(url) { 
  8.         try{            if('WebSocket' in window){ 
  9.                 ws = new WebSocket(url);            }else if('MozWebSocket' in window){   
  10.                 ws = new MozWebSocket(url);            }else
  11.                   layer.alert("您的瀏覽器不支持websocket協議,建議使用新版谷歌、火狐等瀏覽器,請勿使用IE10以下瀏覽器,360瀏覽器請使用極速模式,不要使用兼容模式!");  
  12.             }            initEventHandle();        }catch(e){            reconnect(url);            console.log(e); 
  13.         }         }    function initEventHandle() { 
  14.         ws.onclose = function () { 
  15.             reconnect(wsUrl);            console.log("llws連接關閉!"+new Date().toUTCString()); 
  16.         };        ws.onerror = function () { 
  17.             reconnect(wsUrl);            console.log("llws連接錯誤!"); 
  18.         };        ws.onopen = function () { 
  19.             heartCheck.reset().start();      //心跳檢測重置            console.log("llws連接成功!"+new Date().toUTCString()); 
  20.         };        ws.onmessage = function (event) {    //如果獲取到消息,心跳檢測重置 
  21.             heartCheck.reset().start();      //拿到任何消息都說明當前連接是正常的//接收到消息實際業務處理        ...        };    }    // 監聽窗口關閉事件,當窗口關閉時,主動去關閉websocket連接,防止連接還沒斷開就關閉窗口,server端會拋異常。    window.onbeforeunload = function() { 
  22.         ws.close(); 
  23.     }      function reconnect(url) { 
  24.         if(lockReconnect) return
  25.         lockReconnect = true
  26.         setTimeout(function () {     //沒連接上會一直重連,設置延遲避免請求過多 
  27.             createWebSocket(url);            lockReconnect = false
  28.         }, 2000); 
  29.     }    //心跳檢測    var heartCheck = {        timeout: 300000,        //5分鐘發一次心跳 
  30.         timeoutObj: null,        serverTimeoutObj: null,        reset: function(){ 
  31.             clearTimeout(this.timeoutObj);            clearTimeout(this.serverTimeoutObj);            return this; 
  32.         },        start: function(){ 
  33.             var self = this;            this.timeoutObj = setTimeout(function(){ 
  34.                 //這里發送一個心跳,后端收到后,返回一個心跳消息,                //onmessage拿到返回的心跳就說明連接正常                ws.send("ping"); 
  35.                 console.log("ping!"
  36.                 self.serverTimeoutObj = setTimeout(function(){//如果超過一定時間還沒重置,說明后端主動斷開了 
  37.                     ws.close();     //如果onclose會執行reconnect,我們執行ws.close()就行了.如果直接執行reconnect 會觸發onclose導致重連兩次 
  38.                 }, self.timeout)            }, this.timeout)        }  } 

js部分使用的是原生H5編寫的,如果為了更好的兼容瀏覽器,也可以使用SockJS,有興趣小伙伴們可以自行百度。

接下來我們就手動的優化代碼,實現WebSocket對分布式架構的支持。

解決方案的思考

現在我們已經了解單體應用下的代碼結構,也清楚了WebSocket在分布式環境下面臨的問題,那么是時候思考一下如何能夠解決這個問題了。

我們先來看一看發生這個問題的根本原因是什么。

簡單思考一下就能明白,單體應用下只有一臺服務器,所有的客戶端連接的都是這一臺消息服務器,所以當發布消息者發送消息時,所有的客戶端其實已經全部與這臺服務器建立了連接,直接群發消息就可以了。

換成分布式系統后,假如我們有兩臺消息服務器,那么客戶端通過Nginx負載均衡后,就會有一部分連接到其中一臺服務器,另一部分連接到另一臺服務器,所以發布消息者發送消息時,只會發送到其中的一臺服務器上,而這臺消息服務器就可以執行群發操作,但問題是,另一臺服務器并不知道這件事,也就無法發送消息了。

現在我們知道了根本原因是生產消息時,只有一臺消息服務器能夠感知到,所以我們只要讓另一臺消息服務器也能感知到就可以了,這樣感知到之后,它就可以群發消息給連接到它上邊的客戶端了。

那么什么方法可以實現這種功能呢,王子很快想到了引入消息中間件,并使用它的發布訂閱模式來通知所有消息服務器就可以了。

引入RabbitMQ解決分布式下的WebSocket問題

在消息中間件的選擇上,王子選擇了RabbitMQ,原因是它的搭建比較簡單,功能也很強大,而且我們只是用到它群發消息的功能。

RabbitMQ有一個廣播模式(fanout),我們使用的就是這種模式。

首先我們寫一個RabbitMQ的連接類:

  1. import com.rabbitmq.client.Connection
  2. import com.rabbitmq.client.ConnectionFactory; 
  3. import java.io.IOException; 
  4. import java.util.concurrent.TimeoutException; 
  5. public class RabbitMQUtil { 
  6.     private static Connection connection
  7.     /** 
  8.      * 與rabbitmq建立連接 
  9.      * @return 
  10.      */ 
  11.     public static Connection getConnection() { 
  12.         if (connection != null&&connection.isOpen()) { 
  13.             return connection
  14.         }        ConnectionFactory factory = new ConnectionFactory(); 
  15.         factory.setVirtualHost("/"); 
  16.         factory.setHost("192.168.220.110"); // 用的是虛擬IP地址 
  17.         factory.setPort(5672); 
  18.         factory.setUsername("guest"); 
  19.         factory.setPassword("guest"); 
  20.         try { 
  21.             connection = factory.newConnection(); 
  22.         } catch (IOException e) { 
  23.             e.printStackTrace(); 
  24.         } catch (TimeoutException e) { 
  25.             e.printStackTrace(); 
  26.         } 
  27.         return connection
  28.     } 

這個類沒什么說的,就是獲取MQ連接的一個工廠類。

然后按照我們的思路,就是每次服務器啟動的時候,都會創建一個MQ的消費者監聽MQ的消息,王子這里測試使用的是Servlet的監聽器,如下:

  1. import javax.servlet.ServletContextEvent; 
  2. import javax.servlet.ServletContextListener; 
  3. public class InitListener implements ServletContextListener { 
  4.     @Override 
  5.     public void contextInitialized(ServletContextEvent servletContextEvent) { 
  6.         WebSocket.init();    }    @Override 
  7.     public void contextDestroyed(ServletContextEvent servletContextEvent) { 
  8.     }} 

記得要在Web.xml中配置監聽器信息

  1. <?xml version="1.0" encoding="UTF-8"?> 
  2. <web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee" 
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
  4.          xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd" 
  5.          version="4.0"
  6.     <listener> 
  7.         <listener-class>InitListener</listener-class> 
  8.     </listener> 
  9. </web-app> 

WebSocket中增加init方法,作為MQ消費者部分

  1. public  static void init() { 
  2.         try {            Connection connection = RabbitMQUtil.getConnection();            Channel channel = connection.createChannel();            //交換機聲明(參數為:交換機名稱;交換機類型) 
  3.             channel.exchangeDeclare("fanoutLogs",BuiltinExchangeType.FANOUT); 
  4.             //獲取一個臨時隊列 
  5.             String queueName = channel.queueDeclare().getQueue();            //隊列與交換機綁定(參數為:隊列名稱;交換機名稱;routingKey忽略) 
  6.             channel.queueBind(queueName,"fanoutLogs",""); 
  7.             //這里重寫了DefaultConsumer的handleDelivery方法,因為發送的時候對消息進行了getByte(),在這里要重新組裝成String 
  8.             Consumer consumer = new DefaultConsumer(channel) {                @Override                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                    super.handleDelivery(consumerTag, envelope, properties, body); 
  9.                     String message = new String(body,"UTF-8"); 
  10.                     System.out.println(message);            //這里可以使用WebSocket通過消息內容發送消息給對應的客戶端 
  11.                 }            };            //聲明隊列中被消費掉的消息(參數為:隊列名稱;消息是否自動確認;consumer主體) 
  12.             channel.basicConsume(queueName,true,consumer); 
  13.             //這里不能關閉連接,調用了消費方法后,消費者會一直連接著rabbitMQ等待消費 
  14.         } catch (IOException e) {            e.printStackTrace();        }    } 

同時在接收到消息時,不是直接通過WebSocket發送消息給對應客戶端,而是發送消息給MQ,這樣如果消息服務器有多個,就都會從MQ中獲得消息,之后通過獲取的消息內容再使用WebSocket推送給對應的客戶端就可以了。

WebSocket的onMessage方法增加內容如下:

  1. try { 
  2.             //嘗試獲取一個連接 
  3.             Connection connection = RabbitMQUtil.getConnection();            //嘗試創建一個channel 
  4.             Channel channel = connection.createChannel();            //聲明交換機(參數為:交換機名稱; 交換機類型,廣播模式) 
  5.             channel.exchangeDeclare("fanoutLogs", BuiltinExchangeType.FANOUT); 
  6.             //消息發布(參數為:交換機名稱; routingKey,忽略。在廣播模式中,生產者聲明交換機的名稱和類型即可) 
  7.             channel.basicPublish("fanoutLogs",""null,msg.getBytes("UTF-8")); 
  8.             System.out.println("發布消息"); 
  9.             channel.close();        } catch (IOException |TimeoutException e) { 
  10.             e.printStackTrace(); 
  11.         } 

增加后刪除掉原來的Websocket推送部分代碼。

這樣一整套的解決方案就完成了。

總結

到這里,我們就解決了分布式下WebSocket的推送消息問題。

我們主要是引入了RabbitMQ,通過RabbitMQ的發布訂閱模式,讓每個消息服務器啟動的時候都去訂閱消息,而無論哪臺消息服務器在發送消息的時候都會發送給MQ,這樣每臺消息服務器就都會感知到發送消息的時間,從而再通過Websocket發送給客戶端。

大體流程就是這樣,那么小伙伴們有沒有想過,如果RabbitMQ掛掉了幾分鐘,之后重啟了,消費者是否可以重新連接到RabbitMQ?是否還能正常接收消息呢?

生產環境下,這個問題是必須考慮的。

這里已經測試過,消費者是支持自動重連的,所以我們可以放心的使用這套架構來解決此問題。

本文到這里就結束了,歡迎各位小伙伴留言討論,一起學習,一起進步。

責任編輯:未麗燕 來源: 今日頭條
相關推薦

2021-09-28 09:43:11

微服務架構技術

2023-03-05 18:23:38

分布式ID節點

2025-04-28 00:44:04

2023-09-14 15:44:46

分布式事務數據存儲

2020-05-28 09:35:05

分布式事務方案

2025-04-29 04:00:00

分布式事務事務消息

2010-07-21 13:53:41

SQL Server分

2025-05-07 00:10:00

分布式事務TCC模式

2022-09-07 08:18:26

分布式灰度方案分支號

2023-11-30 07:19:08

.NET開源

2024-03-26 12:08:53

分布式事務存儲

2021-05-08 08:01:05

Session登錄瀏覽器

2022-02-10 08:57:45

分布式線程鎖

2024-06-13 08:04:23

2023-05-18 14:02:00

分布式系統冪等性

2018-07-19 14:53:23

秒殺websocket異步

2019-01-11 18:22:07

阿里巴巴技術開源

2023-09-28 08:39:23

分布式鎖Redis

2021-06-28 10:03:44

分布式數據庫架構

2020-03-31 16:13:26

分布式事務方案TCC
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 日韩高清中文字幕 | 七七婷婷婷婷精品国产 | 欧美日韩中文在线观看 | 中文字幕在线观看视频网站 | 免费黄色在线 | 国产福利在线视频 | 黄网站在线播放 | 精品国产一区二区三区性色av | 亚洲www| 久久99精品视频 | 一区二区三区国产好 | 国产精品视频免费播放 | 久久三级影院 | 中文字幕在线网 | 亚洲一区在线日韩在线深爱 | 日韩欧美三级电影 | 日韩h | 色欧美综合 | 第四色播日韩第一页 | 亚洲欧美日韩精品久久亚洲区 | 日本a视频 | 91麻豆精品一区二区三区 | 成人精品鲁一区一区二区 | 国产一区二区日韩 | 欧美群妇大交群中文字幕 | 亚洲精品中文在线 | www.国产 | www.天天操 | 免费视频一区 | 亚洲一区二区在线播放 | 国产精品日韩欧美一区二区三区 | 在线一区 | 狠狠av| 中文字幕成人 | 91视频免费在观看 | 成人久久18免费 | 国产精品99久久久久久动医院 | 九九色综合 | 精品国产青草久久久久福利 | 亚洲精品一区二区三区 | 久久精品欧美一区二区三区麻豆 |