SpringCloud Gateway Netty Websocket实现高性能聊天系统集群方案

目录

一、问题引出

二、架构图

三、实现方式


一、问题引出

在IM分布式系统的构建中遇到的问题:

Netty服务器通过客户端的连接信息来生成对应的Channel(可以理解为长连接的用户信息),Netty服务器通过Channel来进行消息转发。于是,提出初始构想:通过Redis来序列化Channel,再通过Netty服务器去获取Redis上的Channel,最后转发。但这个构思是错误的,因为Channel是硬件的连接信息,并不能被序列化。

最终构思解决Channel共享的方案有两个:

(1)GateWay网关来自定义负载均衡,当接收到Websocket消息时直接根据用户id进行路由,该方式完美兼容原始功能,原始功能采用Netty来开发Websocket,实现难度低,开发成本低。

(2)采用Netty高性能框架开发Websocket,通过MQ消息队列进行广播来实现Channel的共享,实现难度不大,开发成本较高。

二、架构图

最终,我选择第二种解决方案,IM系统架构如下:

 

三、实现方式

首先,我先搭建一个支持简单聊天的Netty-Websocket聊天服务器,之后,我先构建一个消息聊天对象如下:

package com.dragonwu.im.domain.dto;

import com.dragonwu.common.basic.constant.Constants;
import com.dragonwu.common.security.basic.domain.emums.LoginType;
import com.dragonwu.im.domain.enums.FromUserType;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.commons.lang.StringUtils;
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;

/**
 * @author Dragon Wu
 * @since 2023/2/27 13:05
 * 消息对象
 */
@Getter
@Setter
@ToString
@Document("im_message") //集合名
// {"msg":"你的消息","loginType":"你的类型","userId":"你的id","to":"接收者","group":"群接受对象","isCustomerService":"是否为客服","isVisitor":"是否为游客",
// "isConnect":"是否为连接信息"}
public class IMessage implements Serializable {

    @Id //存入mongo里的id
    private String id;

    //登录类型
    private String loginType;

    //用户id
    @Indexed
    private String userId;

    //发送时间
    private LocalDateTime sendTime;

    //发送人id
    private String to;

    //群发列表
    private List<String> group;

    //发送者是否为客服
    private Boolean isCustomerService;

    //发送者是否为游客
    private Boolean isVisitor;

    //发送的消息
    private String msg;

    //是否为第一次连接信号
    private Boolean isConnect;

    @CreatedDate //创建时默认创建该时间字段
    private LocalDateTime createTime;

    /* 判断消息格式是否正确 */
    public boolean isMsgOK() {
        if (Objects.isNull(isVisitor)) {
            isVisitor = false;
        }
        if (Objects.isNull(to)) {
            to = Constants.EMPTY_STR;
        }
        if (Objects.isNull(isConnect)) {
            isConnect = false;
        }

        try {
            if ((!StringUtils.isEmpty(userId)) && (!StringUtils.isEmpty(msg))) {
                return ((!StringUtils.isEmpty(to)) || ((group != null) && (!group.isEmpty())) || isVisitor);
            }
        } catch (NullPointerException ignored) {
        }
        return false;
    }

    //获取发送者的类型
    public FromUserType getFromUserType() {
        LoginType exists = LoginType.isExists(loginType);
        if (Objects.isNull(isVisitor)) {
            isVisitor = false;
        }
        if (Objects.isNull(isCustomerService)) {
            isCustomerService = false;
        }

        if (Objects.isNull(exists) && isVisitor) {
            return FromUserType.VISITOR_TYPE;
        } else if ((exists == LoginType.USER_TYPE) && isCustomerService) {
            return FromUserType.CUSTOMER_SERVICE;
        } else if (exists == LoginType.USER_TYPE) {
            return FromUserType.USER_TYPE;
        } else if (exists == LoginType.CUSTOMER_TYPE) {
            return FromUserType.CUSTOMER_TYPE;
        }
        return null;
    }

    public void setNowAsSendTime() {
        sendTime = LocalDateTime.now();
    }

    public boolean isGroupChat() {
        if (group != null) {
            return StringUtils.isEmpty(to) && (!group.isEmpty()) && (!isVisitor);
        }
        return false;
    }
}

有了这样的对象以后,我便可对发送过来的消息进行序列化与反序列化获取数据,通过消息对象中的数据是否正确与是否认证来决定消息的转发。

每个用户第一次发送isConnect型号时将其注册到Redis中,Key为用户名唯一,Value为ChannelId的asShort值。当用户在不同Netty服务器上时(此时发送与接收者都在线),我会先让服务器去Redis获取对应用户名的ChannelId,先在本地服务器中查找,若查询到该ChannelId的Channel则直接转发,否则为不在同一个Netty服务器上,发送Channel寻找的信号到MQ进行广播,其他服务器获取到广播后查询直接是否有该ChannelId的Channel,若有则转发;离线消息的话,直接以Zset的形式加入Redis即可,当用户上线时再拉取数据。最后,无论哪种情况发送的消息,都会被MQ进行集群负载均衡来存储到数据库中。

以上,为个人本次实践的总结,希望对遇到相似问题的开发者,有所帮助,有问题可联系共同探讨!