常规代码

首先Controller层、Service层不必多说,常规流程的代码,这里说一下实现的功能有什么:

1、根据弹幕id集合获取弹幕列表

2、删除弹幕

DanmuWebSocketServer

技术考量

1、首先因为DanmuWebSocketServer 类的实例不是由 Spring 容器管理的标准 Bean,想拿到数据库和缓存的内容只能定义成静态字段获取

2、Session 是 中代表一个客户端与服务器之间完整、双向、持久化 WebSocket 连接的 Java 对象。基于SpringBoot容器。

ServerEndpoint

一、连接时执行

 @OnOpen
    public void onOpen(Session session, @PathParam("vid") String vid) {
        if (videoConnectionMap.get(vid) == null) {
            Set<Session> set = new HashSet<>();
            set.add(session);
            videoConnectionMap.put(vid, set);
        } else {
            videoConnectionMap.get(vid).add(session);
        }
        sendMessage(vid, "当前观看人数" + videoConnectionMap.get(vid).size());
//        System.out.println("建立连接,当前观看人数: " + videoConnectionMap.get(vid).size());
    }

1、@OnOpen:连接就执行

2、 videoConnectionMap.put(vid, set):连接注册

3、sendMessage(vid, "当前观看人数"...:广播观看人数的消息

二、收到消息时触发

1、@OnMessage

2、Token验证:目的是做到只有登录用户才能发弹幕

3、持久化弹幕到数据库

4、(序列化后)广播弹幕

String dmJson = JSON.toJSONString(danmu);
sendMessage(vid, dmJson);

三、连接关闭执行

1、@OnClose

2、在Map中移除当前的连接记录

3、如果没人了就直接移除这个视频,有人就更新在线人数

if (videoConnectionMap.get(vid).size() == 0) {
            videoConnectionMap.remove(vid);
        } else {
            sendMessage(vid, "当前观看人数" + ...)

重点

发送信息

    public void sendMessage(String vid, String text) {
        Set<Session> set = videoConnectionMap.get(vid);
        // 使用并行流往各客户端发送数据
        set.parallelStream().forEach(session -> {
            try {
                session.getBasicRemote().sendText(text);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

session.getBasicRemote().sendText(text)实现发送并且动态渲染

广播机制

服务器里面存在一个通讯录

// 对每个视频存储该视频下的session集合
    private static final Map<String, Set<Session>> videoConnectionMap = new ConcurrentHashMap<>();

有用户连接就会把session存到对应视频的集合里面

然后像每个连接都发一遍消息就是广播

// 步骤1:根据视频ID,获取所有观看该视频的客户端连接
    Set<Session> set = videoConnectionMap.get(vid);

    // 步骤2:使用并行流遍历所有连接,并向每个连接发送消息
    set.parallelStream().forEach(session -> {
        try {
            // 步骤3:通过session对象发送文本消息
            session.getBasicRemote().sendText(text);