從架構到底層:構建高效的實時彈幕系統
彈幕系統是一種即時互動機制,廣泛用于直播、短視頻等內容平臺。在該系統中,用戶發送的消息需在極短時間內被收集、處理,并同步分發給所有觀看者,要求高并發、高吞吐、低延遲。本文將從數據結構、消息通道、風控機制以及前端渲染四個層面,重構該系統的設計與實現方案。
彈幕數據結構與消息緩沖機制
彈幕消息模型設計
文件路徑:
/src/main/java/com/icoderoad/danmaku/model/DanmakuMessage.java
package com.icoderoad.danmaku.model;
import lombok.Data;
/**
* 表示一條彈幕消息的實體類
*/
@Data
public class DanmakuMessage {
private String userId; // 用戶唯一標識
private String content; // 彈幕內容
private long timestamp; // 消息發送的時間戳(ms)
private String color; // 彈幕顏色(可選)
private String type; // 彈幕類型,如 scroll、top、bottom
}
彈幕緩沖隊列設計
文件路徑:
/src/main/java/com/icoderoad/danmaku/service/DanmakuBufferService.java
package com.icoderoad.danmaku.service;
import com.icoderoad.danmaku.model.DanmakuMessage;
import org.springframework.stereotype.Service;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* 管理實時彈幕緩沖隊列的服務
*/
@Service
public class DanmakuBufferService {
// 彈幕接收緩沖區,線程安全
private final Queue<DanmakuMessage> writeQueue = new ConcurrentLinkedQueue<>();
/**
* 添加彈幕消息
*/
public void addMessage(DanmakuMessage message) {
writeQueue.offer(message);
}
/**
* 批量獲取彈幕(用于調度器分發)
*/
public Queue<DanmakuMessage> fetchAll() {
Queue<DanmakuMessage> result = new ConcurrentLinkedQueue<>();
DanmakuMessage msg;
while ((msg = writeQueue.poll()) != null) {
result.offer(msg);
}
return result;
}
}
WebSocket 實時通信實現
WebSocket 配置與通道建立
件路徑:
/src/main/java/com/icoderoad/danmaku/websocket/DanmakuWebSocketServer.java
package com.icoderoad.danmaku.websocket;
import com.icoderoad.danmaku.model.DanmakuMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* WebSocket 處理類,實現實時通信
*/
@Slf4j
@Component
@ServerEndpoint(value = "/ws/danmaku")
public class DanmakuWebSocketServer {
private static final Set<Session> clients = new CopyOnWriteArraySet<>();
private static final ObjectMapper mapper = new ObjectMapper();
@OnOpen
public void onOpen(Session session) {
clients.add(session);
log.info("新連接加入: {}", session.getId());
}
@OnMessage
public void onMessage(String message, Session session) {
try {
DanmakuMessage danmaku = mapper.readValue(message, DanmakuMessage.class);
for (Session client : clients) {
client.getAsyncRemote().sendText(mapper.writeValueAsString(danmaku));
}
} catch (Exception e) {
log.error("消息處理失敗", e);
}
}
@OnClose
public void onClose(Session session) {
clients.remove(session);
log.info("連接關閉: {}", session.getId());
}
@OnError
public void onError(Session session, Throwable throwable) {
log.error("連接異常: {}", session.getId(), throwable);
}
/**
* 主動推送(用于后臺調度)
*/
public static void broadcast(DanmakuMessage message) throws Exception {
String msg = mapper.writeValueAsString(message);
for (Session client : clients) {
client.getAsyncRemote().sendText(msg);
}
}
}
風控與限流邏輯(防刷屏、防攻擊)
Redis 限流邏輯實現
件路徑:
/src/main/java/com/icoderoad/danmaku/security/RateLimiterService.java
package com.icoderoad.danmaku.security;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
/**
* 限流服務(基于 Redis)
*/
@Service
public class RateLimiterService {
@Resource
private StringRedisTemplate redisTemplate;
/**
* 判斷是否允許發送彈幕
* 限制頻率為:每用戶 1 秒 1 條
*/
public boolean canSend(String userId) {
String key = "danmaku:limit:" + userId;
Boolean exist = redisTemplate.hasKey(key);
if (Boolean.TRUE.equals(exist)) {
return false; // 剛剛發過,限流
}
redisTemplate.opsForValue().set(key, "1", java.time.Duration.ofSeconds(1));
return true;
}
}
敏感詞過濾(簡單正則)
可擴展為 DFA 敏感詞算法。
public boolean containsIllegalContent(String content) {
String[] illegalWords = {"垃圾", "罵人詞"};
for (String word : illegalWords) {
if (content.contains(word)) return true;
}
return false;
}
前端彈幕展示邏輯實現
Canvas 動畫渲染(多軌道)
文件路徑:
/web/static/js/danmaku.js
const canvas = document.getElementById('danmakuCanvas');
const ctx = canvas.getContext('2d');
let messages = [];
function Danmaku(text, color, speed, y) {
this.text = text;
this.color = color;
this.speed = speed;
this.x = canvas.width;
this.y = y;
}
Danmaku.prototype.draw = function () {
ctx.font = "20px Arial";
ctx.fillStyle = this.color || "#ffffff";
ctx.fillText(this.text, this.x, this.y);
this.x -= this.speed;
}
function render() {
ctx.clearRect(0, 0, canvas.width, canvas.height);
messages.forEach(msg => msg.draw());
messages = messages.filter(m => m.x + ctx.measureText(m.text).width > 0);
requestAnimationFrame(render);
}
render();
最終總結與優化建議
模塊 | 技術選型 | 說明 |
消息通道 | WebSocket (JDK + Spring Boot) | 支持毫秒級推送延遲 |
隊列結構 | ConcurrentLinkedQueue | 支持無鎖并發寫入 |
限流機制 | Redis TTL + Key 檢查 | 用戶級限速,低成本 |
前端渲染 | Canvas + requestAnimationFrame | 高性能動畫,適配移動端 |
風控邏輯 | Redis 黑名單 + 敏感詞攔截 | 攔截非法信息與頻繁操作 |