java整合WebSocket

一、WebSocket介绍

1、简介

WebSocket协议通过在客户端和服务端之间提供全双工通信来进行Web和服务器的交互功能。在WebSocket应用程序中,服务器发布WebSocket端点,客户端使用url连接到服务器。建立连接后,服务器和客户端就可以互相发送消息。客户端通常连接到一台服务器,服务器接受多个客户端的连接。

2、优势

HTPP协议是基于请求响应模式,并且无状态的。HTTP通信只能由客户端发起,HTTP 协议做不到服务器主动向客户端推送信息。
如果我们想要查询当前的排队情况,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)

3、服务端注解

@ServerEndpoint(“/websocket/{uid}”)
申明这是一个websocket服务;
需要指定访问该服务的地址,在地址中可以指定参数,需要通过{}进行占位;

@OnOpen
用法:public void onOpen(Session session, @PathParam(“uid”) String uid) throws IOException{}
该方法将在建立连接后执行,会传入session对象,就是客户端与服务端建立的长连接通道,通过@PathParam获取url中声明的参数;

@OnClose
用法:public void onClose() {}
该方法是在连接关闭后执行;

@OnMessage
用法:public void onMessage(String message, Session session) throws IOException {}
该方法用于接收客户端发送的消息;
message:发来的消息数据;
session:会话对象(也是长连接通道);
发送消息到客户端;
用法:session.getBasicRemote().sendText(“hello,websocket.”);
通过session进行消息发送;

二、springboot整合

1、引入依赖

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

2、配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3、业务代码

>>群聊

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

@Component
@Slf4j
@ServerEndpoint(value = "/api/pushMessageMulti/{roomId}/{userId}",encoders = { ServerEncoder.class })
public class WebSocketServerMulti {

    /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
    private static int onlineCount = 0;
    /**concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。*/
    private static Map<String, WebSocketServerMulti> userMap = new ConcurrentHashMap<>();
    //存放房间对象
    private static Map<String, Set<WebSocketServerMulti>> roomMap = new ConcurrentHashMap<>();
    /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
    private Session session;
    /**接收userId*/
    private String userId = "";
    //存出当前群聊在线的人数(使用原因:roomMap中无法获取到当前用户id)
    private static Map<String, List<String>> multiUser = new ConcurrentHashMap<>();




    /**
     * 连接建立成
     * 功调用的方法
     */
    @OnOpen
    public void onOpen(Session session,@PathParam("roomId") String roomId , @PathParam("userId") String userId) throws IOException, EncodeException {
        synchronized (session){
            try {
                this.session = session;
                this.userId=userId;
                userMap.put(userId,this);
                if (!roomMap.containsKey(roomId)) {
                    Set<WebSocketServerMulti> set = new HashSet<>();
                    set.add(userMap.get(userId));
                    roomMap.put(roomId,set);

                    List<String> dd = new LinkedList<>();
                    dd.add(userId);
                    multiUser.put(roomId,dd);
                } else {
                    if(multiUser.get(roomId).contains(userId)){

                        log.info("存在:房间号:"+roomId+"用户连接:"+userId+",当前在线人数为:" + multiUser.get(roomId).size());
                    }else{
                        multiUser.get(roomId).add(userId);

                        roomMap.get(roomId).add(this);
                    }
                }
                System.out.println(multiUser.get(roomId).size());
                log.info("房间号:"+roomId+"用户连接:"+userId+",当前在线人数为:" + multiUser.get(roomId).size());
                Map<String,Object> map = new HashMap<>();
                map.put("online_num",multiUser.get(roomId).size());//在线人数
                map.put("online_list",roomMap.get(roomId));//人员列表
                map.put("roomId",roomId);//群id
                map.put("message","用户"+***+"加入群聊");//消息
				//自定义发送消息
                sendMessageObject(map,roomId);
            }catch (Exception e){
                e.printStackTrace();
            }

        }
    }

    /**
     * 连接关闭@PathParam("userId") String userId
     * 调用的方法
     */
    @OnClose
    public void onClose( @PathParam("roomId") String roomId,@PathParam("userId") String userId) {
        try {
            if (roomMap.containsKey(roomId)) {
                Set<WebSocketServerMulti> set = roomMap.get(roomId);
                Iterator<WebSocketServerMulti> it = set.iterator();
                while (it.hasNext()) {
                    if (it.next().userId.equals(userId)) {
                        it.remove();
                    }
                }

                multiUser.get(roomId).remove(userId);
                log.info("房间号:"+roomId+"用户退出:"+userId+",当前在线人数为:" + multiUser.get(roomId).size());
                Map<String,Object> map = new HashMap<>();
                map.put("online_num",multiUser.get(roomId).size());//在线人数
                map.put("online_list",roomMap.get(roomId));//人员列表
                map.put("roomId",roomId);//群id
                map.put("message","用户"+***+"加入群聊");//消息
				//自定义发送消息
                sendMessageObject(map,roomId);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 收到客户端消
     * 息后调用的方法
     * @param message
     * 客户端发送过来的消息
     **/
    @OnMessage
    public void onMessage(String message) {
        //注意,要给session加上同步锁,否则会出现多个线程同时往同一个session写数据,导致报错的情况。
        synchronized (session){
            //可以群发消息
            if(StringUtils.isNotBlank(message)){
                try {
                    //解析发送的报文
                    JSONObject jsonObject = JSONObject.parseObject(message);
                    //追加发送人(防止串改)
                    jsonObject.put("fromUserId",this.userId);
                    int chatType=jsonObject.getInteger("chatType");
                    String myUserId=jsonObject.getString("myUserId");
                    String toRoomId=jsonObject.getString("toRoomId");
                    log.info("房间号:"+toRoomId+"用户消息:"+userId+",报文:"+message);
                    sendMessageTo(message,toRoomId);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 群聊
     * @param message 消息
     * @param roomId 房间号
     */
    public void sendMessageTo(String message , String roomId) throws IOException {
        if (roomMap.containsKey(roomId)) {
            for (WebSocketServerMulti item : roomMap.get(roomId)) {
                    item.session.getAsyncRemote().sendText(message);
            }
        }
    }


    /**
     * @param error
     */
    @OnError
    public SystemResult onError(Throwable error) {

        log.error("用户错误:"+this.userId+",原因:"+error.getMessage());

        ChatError chatError = new ChatError();
        chatError.setUserId(Integer.valueOf(this.userId));
        chatError.setDetails(error.getMessage());
        chatError.setAddTime(new Date());
        chatErrorMapper.insert(chatError);
        SystemResult systemResult = new SystemResult();
        return systemResult.error(error.getMessage());

    }

    /**
     * 实现服务
     * 器主动推送
     */
    public void sendMessage(String message) {
        try {
            this.session.getBasicRemote().sendText(message);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 实现服务传送Object类型
     * 器主动推送
     */
    public void sendMessageObject(Object message,String roomId) {
        try {
            if (roomMap.containsKey(roomId)) {
                for (WebSocketServerMulti item : roomMap.get(roomId)) {
                    item.session.getBasicRemote().sendObject(message);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 获得此时的
     * 在线人数
     * @return
     */
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    /**
     * 在线人
     * 数加1
     */
    public static synchronized void addOnlineCount() {
        WebSocketServerMulti.onlineCount++;
    }

    /**
     * 在线人
     * 数减1
     */
    public static synchronized void subOnlineCount() {
        WebSocketServerMulti.onlineCount--;
    }

}

>>单人聊天

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;


@Component
@Slf4j
@ServerEndpoint("/api/pushMessageSolo/{userId}")
public class WebSocketServerSolo {

    /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
    private static int onlineCount = 0;
    /**concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。*/
    private static ConcurrentHashMap<String, WebSocketServerSolo> webSocketMap = new ConcurrentHashMap<>();
    /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
    private Session session;
    /**接收userId*/
    private String userId = "";

    /**
     * 连接建立成
     * 功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        this.session = session;
        this.userId=userId;
        if(webSocketMap.containsKey(userId)){
            webSocketMap.remove(userId);
            //加入set中
            webSocketMap.put(userId,this);
        }else{
            //加入set中
            webSocketMap.put(userId,this);
            //在线数加1
            addOnlineCount();
        }
        log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount());
        sendMessage("连接成功");
    }

    /**
     * 连接关闭
     * 调用的方法
     */
    @OnClose
    public void onClose() {
        if(webSocketMap.containsKey(userId)){
            webSocketMap.remove(userId);
            //从set中删除
            subOnlineCount();
        }
        log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount());
    }

    /**
     * 收到客户端消
     * 息后调用的方法
     * @param message
     * 客户端发送过来的消息
     **/
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("用户消息:"+userId+",报文:"+message);
        //可以群发消息
        //消息保存到数据库、redis
        if(StringUtils.isNotBlank(message)){
            try {
                //解析发送的报文
                JSONObject jsonObject = JSONObject.parseObject(message);
                //追加发送人(防止串改)
                jsonObject.put("fromUserId",this.userId);
                String toUserId=jsonObject.getString("toUserId");
                String myUserId=jsonObject.getString("myUserId");
                //传送给对应toUserId用户的websocket
                if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
                    webSocketMap.get(toUserId).sendMessage(message);
                }else{
                    //否则不在这个服务器上,发送到mysql或者redis
                    log.error("请求的userId:"+toUserId+"不在该服务器上");
                }
                if(StringUtils.isNotBlank(myUserId)&&webSocketMap.containsKey(myUserId)){
                    webSocketMap.get(myUserId).sendMessage(message);
                }else{
                    //否则不在这个服务器上,发送到mysql或者redis
                    log.error("请求的userId:"+myUserId+"不在该服务器上");
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }


    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {

        log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
        error.printStackTrace();
    }

    /**
     * 实现服务
     * 器主动推送
     */
    public void sendMessage(String message) {
        try {
            this.session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     *发送自定
     *义消息
     **/
    public static void sendInfo(String message, String userId) {
        log.info("发送消息到:"+userId+",报文:"+message);
        if(StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)){
            webSocketMap.get(userId).sendMessage(message);
        }else{
            log.error("用户"+userId+",不在线!");
        }
    }

    /**
     * 获得此时的
     * 在线人数
     * @return
     */
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    /**
     * 在线人
     * 数加1
     */
    public static synchronized void addOnlineCount() {
        WebSocketServerSolo.onlineCount++;
    }

    /**
     * 在线人
     * 数减1
     */
    public static synchronized void subOnlineCount() {
        WebSocketServerSolo.onlineCount--;
    }

}

三、部署websocket项目问题

1、webSocket功能失效

本地开发的时候都可以正常使用,但是在部署到nginx代理服务器的时候发现报了错误,连不上,报错:Error in connection establishment: net::ERR_NAME_NOT_RESOLVED
发现是nginx服务器默认是不打开webSocket的功能的,这需要我们在nginx服务器上配置:

 location /test/ {
                proxy_pass http://test.com;
                proxy_redirect default;
                proxy_set_header Upgrade $http_upgrade; # allow websockets
                proxy_set_header Connection "upgrade";
                proxy_http_version 1.1;
                }

2、断线重连

如果nginx没有设置读取超时时间,websocket会一直断线重连,大约一分钟重连一次
可以设置长时间得超时时间,避免一直断线重连,避免消耗内存

location /test{
	    proxy_pass  http://test.com;
	    proxy_set_header Upgrade $http_upgrade; # allow websockets
    	proxy_set_header Connection "upgrade";
    	proxy_http_version 1.1;
        proxy_connect_timeout 60s;#l连接超时时间,不能设置太长会浪费连接资源
	    proxy_read_timeout 500s;#读超时时间
	    proxy_send_timeout 500s;#写超时时间
            index  index.html index.htm;
        }