成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

教你用純Java實現一個即時通訊系統(附源碼)

開發 后端
和各位讀者大致介紹下具體場景,線上的小程序中開放一些語音麥克風的房間,讓用戶進入房間之后可以互相通過語音聊天的方式進行互動。

 項目背景

和各位讀者大致介紹下具體場景,線上的小程序中開放一些語音麥克風的房間,讓用戶進入房間之后可以互相通過語音聊天的方式進行互動。

這里分享一下相關的技術設計方案。這款系統的核心點設計在于如何能讓一個用戶發出的語音通知到其他用戶上邊。語音數據在客戶端同事的處理下最終變成了io數據流請求到了后端,后端只需要將這些數據流傳達給各個不同的終端即可達到廣播通知的效果。

單機版架構

最初期上線的時候,為了趕速度,快速試錯,所以簡單地采用了單機版架構去設計。結合技術棧為 SpringBoot,WebSocket,MySQL技術。

線上一間語音房間的同時在線人數并不會特別多,大概在15-50人的區間段內,系統核心代碼是通過SpringBoot內部的WebSocket技術去進行數據的主動推送。

設計思路

整體的設計圖比較簡單,基本就是一臺服務器存儲WebSocket連接,如下圖所示:

用戶進行WebSocket初始化連接的時候需要一個連接分配和存儲的過程:

早期的存儲是存放在了服務器本地的一個Map集合中。

當WebSocket進行連接的時候就會往內存中寫入一條數據信息,當鏈接斷開的時候,就將內存中的數據移除。然后進行語音廣播的時候需要結合WebSocket內部的廣播發送功能進行通知

看似設計比較簡單,但是在后期業務變得龐大的時候出現了瓶頸。因為隨著參加語音活動用戶的增加,越來越多的WebSocketSession對象需要被存儲到內存當中,這種有狀態性的存儲對于單機擴容不靈活。

設計缺陷

1.假設原先的服務器擴容到了A,B兩臺機器,A用戶在A機器上邊建立了WebSocketSession,B用戶在B機器上邊建立的WebSocketSession連接。此時如果A想要和B進行對話發送,需要先查找到具體WebSocketSession存放在哪臺機器上邊。

2.當用戶出現了網絡異常,臨時斷開連接進行重連的時候,也可能會出現1所說的問題。

集群架構

設計思路

一旦出現需要發送語音通知的時候,發送一條廣播的mq消息,每個機器都接收到消息之后,觸發自己的廣播操作即可。

RocketMq的接入系統設計里面mq采用的是廣播模式,這和我們通常使用的集群模式有一定的區別。

消息隊列RocketMQ版是基于發布或訂閱模型的消息系統。消費者,即消息的訂閱方訂閱關注的Topic,以獲取并消費消息。由于消費者應用一般是分布式系統,以集群方式部署,因此消息隊列RocketMQ版約定以下概念:

  •  集群:使用相同Group ID的消費者屬于同一個集群。同一個集群下的消費者消費邏輯必須完全一致(包括Tag的使用)。
  •  集群消費:當使用集群消費模式時,消息隊列RocketMQ版認為任意一條消息只需要被集群內的任意一個消費者處理即可。
  •  廣播消費:當使用廣播消費模式時,消息隊列RocketMQ版會將每條消息推送給集群內所有注冊過的消費者,保證消息至少被每個消費者消費一次。

集群消費模式適用場景 適用于消費端集群化部署,每條消息只需要被處理一次的場景。此外,由于消費進度在服務端維護,可靠性更高。具體消費示例如下圖所示。

注意事項

  •  集群消費模式下,每一條消息都只會被分發到一臺機器上處理。如果需要被集群下的每一臺機器都處理,請使用廣播模式。
  •  集群消費模式下,不保證每一次失敗重投的消息路由到同一臺機器上。

廣播消費模式適用場景 適用于消費端集群化部署,每條消息需要被集群下的每個消費者處理的場景。具體消費示例如下圖所示。

注意事項

  •  廣播消費模式下不支持順序消息。
  •  廣播消費模式下不支持重置消費位點。
  •  每條消息都需要被相同訂閱邏輯的多臺機器處理。
  •  消費進度在客戶端維護,出現重復消費的概率稍大于集群模式。
  •  廣播模式下,消息隊列RocketMQ版保證每條消息至少被每臺客戶端消費一次,但是并不會重投消費失敗的消息,因此業務方需要關注消費失敗的情況。
  •  廣播模式下,客戶端每一次重啟都會從最新消息消費。客戶端在被停止期間發送至服務端的消息將會被自動跳過,請謹慎選擇。
  •  廣播模式下,每條消息都會被大量的客戶端重復處理,因此推薦盡可能使用集群模式。
  •  廣播模式下服務端不維護消費進度,所以消息隊列RocketMQ版控制臺不支持消息堆積查詢、消息堆積報警和訂閱關系查詢功能。

這里面的應用場景需要對集群內部對每個消費者都對服務器內存中的socket連接進行session是否存在對判斷,因此需要采用mq的廣播模式。

關于mq部分的接入代碼

Consumer模塊的配置: 

  1. package org.idea.web.socket.config;  
  2. import org.springframework.boot.context.properties.ConfigurationProperties;  
  3. /**  
  4.  * @Author linhao  
  5.  * @Date created in 10:30 上午 2021/5/10  
  6.  */  
  7. @ConfigurationProperties(prefix = "rocketmq.consumer" 
  8. public class MqConsumerConfig {  
  9.     private boolean isOn;  
  10.     private String groupName;  
  11.     private String nameSrvAddr;  
  12.     private String topics;  
  13.     private Integer consumeThreadMin;  
  14.     private Integer consumeThreadMax;  
  15.     private Integer consumeMessageBatchMaxSize;   
  16.      /**  
  17.      getter 和 setter部分省略  
  18.     **/  

Producer模塊的配置展示: 

  1. package org.idea.web.socket.config;  
  2. import org.springframework.boot.context.properties.ConfigurationProperties;  
  3. /**  
  4.  * @Author linhao  
  5.  * @Date created in 10:26 上午 2021/5/10  
  6.  */  
  7. @ConfigurationProperties(prefix = "rocketmq.producer" 
  8. public class MqProducerConfig {  
  9.     private boolean isOn;  
  10.     private String groupName;  
  11.     private String nameSrvAddr;  
  12.     private Integer maxMessageSize;  
  13.     private Integer sendMsgTimeout;  
  14.     private Integer retryTimesWhenSendFailed;   
  15.      /**  
  16.      getter 和 setter部分省略  
  17.     **/  

RocketMq內部的消費端Bean配置 

  1. package org.idea.web.socket.mq;  
  2. import lombok.extern.slf4j.Slf4j;  
  3. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;  
  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
  5. import org.apache.rocketmq.client.exception.MQClientException;  
  6. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;  
  7. import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;  
  8. import org.idea.web.socket.config.MqConsumerConfig;  
  9. import org.idea.web.socket.config.MqProducerConfig;  
  10. import org.springframework.boot.autoconfigure.AutoConfigureAfter;  
  11. import org.springframework.boot.autoconfigure.AutoConfigureBefore;  
  12. import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;  
  13. import org.springframework.boot.context.properties.EnableConfigurationProperties;  
  14. import org.springframework.context.annotation.Bean;  
  15. import org.springframework.context.annotation.Configuration;  
  16. import javax.annotation.Resource;  
  17. /**  
  18.  * @Author linhao  
  19.  * @Date created in 10:34 上午 2021/5/10  
  20.  */  
  21. @Configuration  
  22. @Slf4j  
  23. @EnableConfigurationProperties({MqConsumerConfig.class})  
  24. public class MqConsumerAutoConfig {  
  25.     @Resource  
  26.     private MqConsumerConfig mqConsumerConfig;  
  27.     @Resource  
  28.     //這個接口需要手動實現順序消費的邏輯 每次獲取到消息隊列的第一條數據  
  29.     private MessageListenerHandler messageListenerConcurrently;  
  30.     @Bean  
  31.     @ConditionalOnMissingBean  
  32.     public DefaultMQPushConsumer defaultMQPushConsumer() {  
  33.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();  
  34.         consumer.setNamesrvAddr(mqConsumerConfig.getNameSrvAddr());  
  35.         consumer.setConsumerGroup(mqConsumerConfig.getGroupName());  
  36.         consumer.setConsumeThreadMin(mqConsumerConfig.getConsumeThreadMin());  
  37.         consumer.setConsumeThreadMax(mqConsumerConfig.getConsumeThreadMax());  
  38.         consumer.registerMessageListener(messageListenerConcurrently);  
  39.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  40.         //消費模型是什么?  
  41.         consumer.setMessageModel(MessageModel.BROADCASTING);  
  42.         //默認一次拉取一條消費  
  43.         consumer.setConsumeMessageBatchMaxSize(mqConsumerConfig.getConsumeMessageBatchMaxSize());  
  44.         //*表示訂閱所有的tag  
  45.         try {  
  46.             consumer.subscribe(mqConsumerConfig.getTopics(), "*");  
  47.             consumer.start();  
  48.             log.info("【 MqConsumerAutoConfig 】mq consumer is started!");  
  49.         } catch (Exception e) {  
  50.             log.error("mq start fail,e is ", e);  
  51.         }  
  52.         return consumer;  
  53.     }  

RocketMq的服務生產者Bean配置 

  1. package org.idea.web.socket.mq;  
  2. import lombok.extern.slf4j.Slf4j;  
  3. import org.apache.rocketmq.client.producer.DefaultMQProducer;  
  4. import org.idea.web.socket.config.MqProducerConfig;  
  5. import org.springframework.boot.autoconfigure.AutoConfigureAfter;  
  6. import org.springframework.boot.autoconfigure.AutoConfigureBefore;  
  7. import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;  
  8. import org.springframework.boot.context.properties.EnableConfigurationProperties;  
  9. import org.springframework.context.annotation.Bean;  
  10. import org.springframework.context.annotation.Configuration;  
  11. import javax.annotation.Resource;  
  12. /**  
  13.  * @Author linhao  
  14.  * @Date created in 11:05 上午 2021/5/10  
  15.  */  
  16. @Configuration  
  17. @Slf4j  
  18. @EnableConfigurationProperties({MqProducerConfig.class})  
  19. public class MqProducerAutoConfig {  
  20.     @Resource  
  21.     private MqProducerConfig mqProducerConfig;  
  22.     @Bean  
  23.     @ConditionalOnMissingBean  
  24.     //意味著DefaultMQProducer的配置可以被覆蓋  
  25.     public DefaultMQProducer defaultMQProducer() {  
  26.         DefaultMQProducer producer = new DefaultMQProducer(mqProducerConfig.getGroupName());  
  27.         producer.setNamesrvAddr(mqProducerConfig.getNameSrvAddr());  
  28.         //沒有則自動創建topic的key 
  29.  //        producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");  
  30.         producer.setMaxMessageSize(mqProducerConfig.getMaxMessageSize()); 
  31.         producer.setSendMsgTimeout(mqProducerConfig.getSendMsgTimeout());  
  32.         producer.setRetryTimesWhenSendFailed(mqProducerConfig.getRetryTimesWhenSendFailed());  
  33.         try {  
  34.             producer.start();  
  35.             log.info("【 MqProducerAutoConfig 】mq producer is started!");  
  36.         } catch (Exception e) {  
  37.             log.error("[MqProducerAutoConfig] start fail, e is ", e);  
  38.         }  
  39.         return producer;  
  40.     }  

然后是對RocketMq內部發送消息事件的一層函數封裝 

  1. package org.idea.web.socket.mq;  
  2. import com.alibaba.fastjson.JSON;  
  3. import lombok.extern.slf4j.Slf4j;  
  4. import org.apache.commons.lang3.StringUtils;  
  5. import org.apache.rocketmq.client.producer.DefaultMQProducer;  
  6. import org.apache.rocketmq.client.producer.SendResult;  
  7. import org.apache.rocketmq.common.message.Message;  
  8. import org.apache.rocketmq.remoting.common.RemotingHelper;  
  9. import org.idea.web.socket.config.MqProducerConfig;  
  10. import org.idea.web.socket.dto.BroadcastMqDTO;  
  11. import org.springframework.stereotype.Component;  
  12. import javax.annotation.Resource;  
  13. import java.io.UnsupportedEncodingException;  
  14. /**  
  15.  * 消息廣播發送端  
  16.  *  
  17.  * @Author linhao  
  18.  * @Date created in 10:43 下午 2021/5/9  
  19.  */  
  20. @Component  
  21. @Slf4j  
  22. public class BroadcastMqProducer {  
  23.     @Resource  
  24.     private DefaultMQProducer defaultMQProducer;  
  25.     @Resource  
  26.     private MqProducerConfig mqProducerConfig;  
  27.     private static String TOPIC = "ws-topic" 
  28.     private static String TAGS = "ws-tag" 
  29.     public static Integer ALL_USER_RECEIVE_TYPE = 1 
  30.     public static Integer ONE_USER_RECEIVE_TYPE = 2 
  31.     /**  
  32.      * 點對點之間的消息發送  
  33.      *  
  34.      * @param destSessionKey  
  35.      * @param msg  
  36.      * @return  
  37.      */  
  38.     public SendResult sendWebSocketToUser(String destSessionKey,String msg) {  
  39.         if (StringUtils.isEmpty(msg)) {  
  40.             log.error("[sendWebSocketToUser] msg can not be null!");  
  41.             return null;  
  42.         }  
  43.         Message message = null 
  44.         SendResult sendResult = null 
  45.         try {  
  46.             BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();  
  47.             broadcastMqDTO.setEventType(ONE_USER_RECEIVE_TYPE);  
  48.             broadcastMqDTO.setMessage(msg);  
  49.             broadcastMqDTO.setSessionKey(destSessionKey);  
  50.             message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));  
  51.             sendResult = defaultMQProducer.send(message);  
  52.         } catch (Exception e) { 
  53.              log.error("[sendWebSocketBroadcastMsg] e is ", e);  
  54.         }  
  55.         return sendResult;  
  56.     }  
  57.     /**  
  58.      * 廣播消息發送  
  59.      *  
  60.      * @param msg 
  61.      * @return  
  62.      */  
  63.     public SendResult sendWebSocketBroadcastMsg(String msg) {  
  64.         if (StringUtils.isEmpty(msg)) {  
  65.             log.error("[sendWebSocketBroadcastMsg] msg can not be null!");  
  66.             return null;  
  67.         }  
  68.         Message message = null 
  69.         SendResult sendResult = null 
  70.         try {  
  71.             BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();  
  72.             broadcastMqDTO.setEventType(ALL_USER_RECEIVE_TYPE);  
  73.             broadcastMqDTO.setMessage(msg);  
  74.             message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));  
  75.             sendResult = defaultMQProducer.send(message);  
  76.         } catch (Exception e) {  
  77.             log.error("[sendWebSocketBroadcastMsg] e is ", e);  
  78.         }  
  79.         return sendResult;  
  80.     }  

對消息的訂閱模塊實現代碼如下: 

  1. package org.idea.web.socket.mq;  
  2. import com.alibaba.fastjson.JSON;  
  3. import com.oracle.tools.packager.Log;  
  4. import lombok.extern.slf4j.Slf4j;  
  5. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
  6. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
  7. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
  8. import org.apache.rocketmq.common.message.MessageExt;  
  9. import org.idea.web.socket.dto.BroadcastMqDTO;  
  10. import org.idea.web.socket.manager.SocketManager;  
  11. import org.springframework.messaging.simp.SimpMessagingTemplate;  
  12. import org.springframework.stereotype.Component;  
  13. import org.springframework.util.CollectionUtils;  
  14. import org.springframework.web.socket.WebSocketSession;  
  15. import javax.annotation.Resource;  
  16. import java.util.List;  
  17. import static org.idea.web.socket.mq.BroadcastMqProducer.ALL_USER_RECEIVE_TYPE;  
  18. import static org.idea.web.socket.mq.BroadcastMqProducer.ONE_USER_RECEIVE_TYPE;  
  19. /**  
  20.  * @Author linhao  
  21.  * @Date created in 10:59 上午 2021/5/10  
  22.  */  
  23. @Component  
  24. @Slf4j  
  25. public class MessageListenerHandler implements MessageListenerConcurrently {  
  26.     @Resource  
  27.     private SocketManager socketManager;  
  28.     @Resource  
  29.     private SimpMessagingTemplate template;  
  30.     @Override  
  31.     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {  
  32.         if (CollectionUtils.isEmpty(list)) {  
  33.             Log.info("receive empty msg");  
  34.             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
  35.         }  
  36.         MessageExt messageExt = list.get(0);  
  37.         byte[] bytes = messageExt.getBody();  
  38.         String json = new String(bytes);  
  39.         BroadcastMqDTO broadcastMqDTO = JSON.parseObject(json, BroadcastMqDTO.class);  
  40.         log.info("[MessageListenerHandler] broadcastMqDTO is " + broadcastMqDTO);  
  41.         if (ALL_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {  
  42.             log.info("[consumeMessage] 廣播發送消息:觸發----》消息內容為:" + broadcastMqDTO);  
  43.             template.convertAndSend("/topic/sendTopic", broadcastMqDTO);  
  44.         } else if (ONE_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {  
  45.             String sessionKey = broadcastMqDTO.getSessionKey();  
  46.             WebSocketSession webSocketSession = socketManager.get(sessionKey);  
  47.             if (webSocketSession != null) {  
  48.                 template.convertAndSendToUser(sessionKey, "/queue/sendUser", broadcastMqDTO.getMessage());  
  49.                 log.info("[consumeMessage] 點對點發送消息;觸發----》消息內容為:" + broadcastMqDTO);  
  50.             }  
  51.         }  
  52.         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
  53.     }  

整體設計結構如下圖:

于是按照這個結構進行了一版本的緊急開發迭代,原先的單臺服務器擴展為了服務集群。

業務拓展后續產品經理提出一個需求,要求支持在同一間房內的兩個用戶之間發送悄悄話功能。這就需要我們進行一個點對點之間傳輸通訊的功能了。因此需要在mq通知到每臺機器的時候加一個本地Session遍歷的邏輯,如果當前機器存有用戶token對應的session變量,那么就單獨針對那個Session進行WebSocket的發送通知。

設計弊端一旦某臺機器出現了異常崩潰,那么就意味著這臺機器上的所有語音連接可能會出現中斷情況。目前這一塊的問題也在考慮解決,計劃是將WebSocketSession存入到分布式緩存的redis中保證數據可靠存儲,但是在后續嘗試的時候發現WebSocketSession對象沒有實現序列化接口,在存儲到Redis的時候會出現異常。目前這個問題還在尋找解決思路中,不知道各位讀者朋友們有什么好的思路。

遇到的問題點用戶請求直接訪問到了我們的內部服務器,如果在請求的中間加入一臺nginx做負載均衡則需要在nginx中配置一些額外信息。

項目的源代碼比較多,這里我把核心部分的代碼整理了一份,感興趣的朋友可以到我的gitee上邊去下載:

https://gitee.com/IdeaHome_admin/socket-framework 

 

責任編輯:龐桂玉 來源: Java知音
相關推薦

2019-03-21 09:45:20

IM即時通訊CIM

2021-08-14 09:23:03

即時通訊IM互聯網

2011-10-20 22:25:49

網易即時通

2010-04-30 10:35:09

即時通訊MSN

2012-06-11 09:27:17

imo即時通訊

2011-06-30 10:50:24

即時通訊

2012-03-23 21:18:34

imo即時通訊

2020-09-30 18:00:48

JavaSpring BootIM

2011-08-04 14:50:07

263EM

2012-03-05 11:06:28

imo即時通訊

2012-03-30 10:47:05

imo

2013-10-16 11:32:55

imoRTX即時通訊

2021-10-20 05:55:22

即時通訊IM網絡

2024-01-24 09:51:47

Vue3.NET通訊功能

2014-11-17 11:58:49

即時通訊云

2012-03-29 13:47:18

即時通訊

2012-03-15 14:55:03

imo即時通訊

2012-05-24 10:31:16

imo即時通訊

2012-05-07 10:20:55

imo即時通訊
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: a毛片 | 欧美成人h版在线观看 | 五月天激情综合网 | 国产精品久久久 | 日韩在线观看精品 | 亚洲欧美中文字幕 | 欧美一区二区在线看 | 久久99精品久久久久久秒播九色 | 国产成人综合在线 | 国产精品久久久久久久久久久免费看 | 色欧美片视频在线观看 | 午夜在线小视频 | 久久久久国产一区二区三区四区 | 国产福利视频网站 | 免费黄网站在线观看 | 天天操天天射天天 | 日韩高清一区二区 | 欧美不卡视频 | 亚洲国产精品99久久久久久久久 | 欧美精品综合 | 色视频在线免费观看 | 久久久国产一区二区三区四区小说 | 91精品国产乱码久久蜜臀 | 亚洲成年在线 | 国产精品免费在线 | 久久精品亚洲国产 | av在线一区二区三区 | 成人亚洲综合 | 一级片aaa| 国产一级视频在线 | a国产视频 | 特黄色一级毛片 | 91国在线高清视频 | 欧美日韩毛片 | 黄色在线免费观看视频 | 欧美日韩在线不卡 | 亚洲成人激情在线观看 | 男人天堂av网站 | 亚洲国产成人av | 国产精品久久久久久网站 | 成人福利在线视频 |