站长统计:文章【】篇评论【2】条标签【17】个
当前日期

SpringBoot+Redis订阅实现分布式Websocket的Session共享方案

SpringBoot+Redis订阅实现分布式Websocket的Session共享方案


背景介绍

有一个项目(这里就称为A)是对接阿里云云呼(下称 CCC),CCC通过ASR识别之后以MQ的方式发送某些特定topic&tag,A项目的职责就是通过MQ的topic&tag处理消息与A项目的前端进行websocket交互,完成整个通话流程的可视化。

项目实现

A项目中的websocket模块部分就是分布式的

实现单机的WebSocket方案

这个很简单,就是一个 springboot 中实现 websocket 的基本方式。

@Component
@ServerEndpoint("/ws/{userId}")
public class MessageWsClusterServer{

    private static final ConcurrentHashMap<String, MessageWsClusterServer> AGENT_MESSAGE_SERVER_MAP = new ConcurrentHashMap<>();
    private javax.websocket.Session session;
    private String userId;
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public static void sendMessageTo(String userId, String message) throws IOException {
        if (userId != null && AGENT_MESSAGE_SERVER_MAP.containsKey(userId)) {
            AGENT_MESSAGE_SERVER_MAP.get(userId).sendMessage(message);
        }
    }

    public static void closeSession(String userId) throws IOException {
        if (AGENT_MESSAGE_SERVER_MAP.containsKey(userId)) {
            AGENT_MESSAGE_SERVER_MAP.get(userId).close();
        }
    }

    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) throws IOException {
        this.session = session;
        this.userId = userId;
        log.info("WebSocket Open: " + agentId);
        AGENT_MESSAGE_SERVER_MAP.remove(userId);
        AGENT_MESSAGE_SERVER_MAP.put(userId, this);
        sendMessage("Connected");
    }

    @OnClose
    public void onClose() {
        if (AGENT_MESSAGE_SERVER_MAP.containsKey(userId)) {
            log.info("WebSocket Close: " + userId);
            AGENT_MESSAGE_SERVER_MAP.remove(userId);
        }
    }

    @OnError
    public void onError(Throwable throwable, Session session, @PathParam("userId") String userId) {
        throwable.printStackTrace();
        log.error(throwable.getMessage());
        log.info("Websocket Error: " + userId);
    }

    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    public void close() throws IOException {
        this.session.close();
    }
}

实现分布式 session 共享

此处使用的是 redis 的 pub/sub 发布订阅功能

实现基本原理

每个 websocket 的 server 端都订阅到某一个 redis 的 channel 上,因为每一台 server 端都保存了当前这台服务器与哪个 userId 建立了websocket 的链接。当 websocket 需要推送消息时,我们先使用 redis publish channel_name message_xxx,之后通过每个 server 的订阅端读取到 channel_name 下的消息(消息中包含了 userId),再之后通过判断当前 server 端中建立的 websocket 链接是否包含了收到的消息中的 userId,如果包含,那么使用当前的 websocket server 端进行 消息推送给前端。

模块实现的逻辑图

分布式实现

  1. 新增加 redis 发布订阅配置

下面的 messageWsClusterServer 是上面实现的单机版的 websocket 实例对象

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisConnectionFactory factory,
            MessageWsClusterServer messageWsClusterServer) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        //可添加多个listener
        container.addMessageListener(
                //发布订阅的操作 onMessage方法
                messageWsClusterServer,
                //发布订阅的通道,名字可自定义
                new PatternTopic(CommonConstants.WEBSOCKET_CHANNEL_KEY));
        return container;
    }
  1. 实现 Messagelistener
    因为要实现 redis 的消息订阅功能,所以我们的 websocketServer 则需要实现 redis 的 MessageListener 接口
@Component
@ServerEndpoint("/ws/{agentId}")
public class MessageWsClusterServer implements MessageListener {

	//...省略其他代码
	
	@Override
    public void onMessage(Message message, byte[] bytes) {
        try {
            byte[] body = message.getBody();
			//推送的消息内容,反序列化解析一下
			//这里的POCMessage包含:userId、message、以及一些其他的业务属性
            PocMessage pocMessage = (PocMessage) redisTemplate.getValueSerializer().deserialize(body);
            String channel = new String(bytes);
            if (pocMessage == null) {
                log.error("消息为空 : {}", "null");
            } else {
				//推送的消息可能是其他 channel 的,需要判断一下
                if (WEBSOCKET_CHANNEL_KEY.equals(channel)) {
                    String userId = pocMessage.getAgentId();
                    String msg = pocMessage.getMessage();
					MessageWsClusterServer.sendMessageTo(userId, msg);
					log.info("通道 ==>{},收到消息:{},发送给:{}", channel, msg, userId);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("解析消息/推送消息失败");
            log.error(e.getMessage());
        }
    }
}

其他(消息发布实现)

如果你读到文章的此处,那么一定会发现,怎么本篇只讲到了订阅收消息的功能,发布publish message 的模块在哪呢,
说明一下:
redis 的 publish、subscribe 都是redis client,当然 publish 也是在 A 项目中实现的,publish 的模块在 A 项目中与CCC对接的模块部分中实现,当CCC推送消息至MQ中,A订阅了 CCC 的MQ 的tag,就会进行处理(pushMessage),处理之后判断发给哪个用户之后进行推送 websocket 消息(此处就是 publish channel message),接下来就是 我们websocket的部分了。
这里大部分逻辑都是业务代码,主要的就是发布消息

PocMessage pocMessage = new PocMessage(userId, messageStr);
redisTemplate.convertAndSend(CommonConstant.WEBSOCKET_CHANNEL_KEY, pocMessage);
文章出处: 文章地址:https://wuwenbin.me/article/17 转载注明下哦!o(≧v≦)o~~

标签: websocket 分布式

展开阅读全文
评论请遵守相关法律法规,请勿恶意抨击他人。

发表评论

数据・用户

10 篇文章 2 条评论 17 个标签

登录

导航按钮

专题 归档 关于我 友链 收藏 搜索