WebSocket的心跳机制和断线重连

背景

在服务器重启或是弱网情况下,前端不能保证一直连接成功。因此在出现被动断开的情况下,需要有心跳机制断线重连的功能。

心跳机制:客户端每隔一段时间向服务端发送一个特有的心跳消息,每次服务端收到消息后只需将消息返回,此时,若二者还保持连接,则客户端就会收到消息,若没收到,则说明连接断开,此时,客户端就要主动重连,完成一个周期

断线重连:若某时间段内客户端发送了消息,而服务端未返回,则认定为断线;这个时候会触发到websocket中的onclose事件,需要重新连接服务

nodejs+ws模块搭建websocket服务器

之前我有篇文章使用的是nodejs+websocket模块搭建的服务器,后来发现ws模块更易使用和社区一直有人维护,因此推荐使用ws模块

下载ws依赖 ws - npm

npm i ws

在文件夹下新增server.js文件

/* server.js 服务器 */

// 引入模块
const WebSocket = require('ws').Server
const port = 8002

// 创建服务器
const server = new WebSocket({ port }, () => {
  console.log('websocket服务开启')
})

const connectHandler = (ws) => {
  console.log('客户端连接')
  // 监听客户端出错
  ws.on('error', errorHandler)
  // 监听客户端断开链接
  ws.on('close', closeHandler)
  // 监听客户端发来的消息
  ws.on('message', messageHandler)
}

// 监听接收客户端信息回调
// 注意:因为这里用到this的指向,因此用普通的函数
function messageHandler(data) {
  console.log('messageHandler===>接收客户端消息', JSON.parse(data))
  const { ModeCode } = JSON.parse(data)
  switch(ModeCode) {
    case 'message':
      console.log('收到消息')
      // 需要发送信息给客户端以此说明连接成功
      this.send(JSON.stringify(JSON.parse(data)))
      break;
    case 'heart_beat':
      console.log('心跳检测')
      // 需要发送信息给客户端以此说明连接成功
      this.send(JSON.stringify(JSON.parse(data)))
      break;
  }
}

// 监听客户端出错回调
const errorHandler = (error) => {
  console.log('errorHandler===>客户端出错', error)
}
// 监听客户端断开连接回调
const closeHandler = (e) => {
  console.log('closeHandler===>客户端断开?', e)
}

// 建立连接
server.on('connection', connectHandler)

客户端的实现

1. 封装websocket,需要实现心跳机制和断线重连

2. 封装自定义通信事件,实现监听和触发功能

在文件夹下新增eventBus.js文件

// eventBus.js
// 用到了发布订阅模式
class EventBus {
  constructor() {
    // 消息中心,记录了所有的事件 以及 事件对应的处理函数
    this.subs = Object.create(null)
  }

  // 注册时间
  // 参数:1.事件名称  2.事件处理函数
  on(eventType, handler) {
    this.subs[eventType] = this.subs[eventType] || []
    this.subs[eventType].push(handler)
  }

  // 触发事件
  // 参数: 1.事件名称 2.接收的参数
  emit(eventType, ...ars) {
    if(this.subs[eventType]) {
      this.subs[eventType].forEach(handler => {
        handler(...ars)
      })
    }
  }
}

export default new EventBus()

在文件夹下新增myWebSocket.js文件

// myWebSocket.js  单独把websocket的处理方法抽离出来
import eventBus  from "./eventBus.js"
// 定义websocket消息类型
const ModeCodeEnum = {
  MSG: 'message', // 普通消息
  HEART_BEAT: 'heart_beat'  // 心跳
}
class MyWebSocket extends WebSocket {
  constructor (url) {
    super(url)
    return this
  }
  /**
   * heartBeatConfig 心跳连接参数
   *    time: 心跳时间间隔
   *    timeout: 心跳超时间隔
   *    reconnect: 断线重连时间间隔
   * isReconnect 是否断线重连
   */
  init (heartBeatConfig, isReconnect) {
    this.onopen = this.openHandler // 连接成功后的回调函数
    this.onclose = this.closeHandler // 连接关闭后的回调 函数
    this.onmessage = this.messageHandler // 收到服务器数据后的回调函数
    this.onerror = this.errorHandler // 连接发生错误的回调方法
    this.heartBeatConfig = heartBeatConfig // 心跳连接配置参数
    this.isReconnect = isReconnect // 记录是否断线重连
    this.reconnectTimer = null // 记录断线重连的时间器
    this.startHeartBeatTimer = null // 记录心跳时间器
    this.webSocketState = false // 记录socket连接状态 true为已连接
  }
  // 获取消息
  getMessage ({ data }) {
    return JSON.parse(data)
  }
  // 发送消息
  sendMessage (data) {
    // 当前的this 就是指向websocket
    return this.send(JSON.stringify(data))
  }
  // 连接成功后的回调函数
  openHandler () {
    console.log('====onopen 连接成功====')
    // 触发事件更改按钮的状态
    eventBus.emit('changeBtnState', 'open')
    // socket状态设置为连接,做为后面的断线重连的拦截器
    this.webSocketState = true
    // 判断是否启动心跳机制
    if(this.heartBeatConfig && this.heartBeatConfig.time) {
      this.startHeartBeat(this.heartBeatConfig.time)
    }
  }
  // 收到服务器数据后的回调函数 
  messageHandler (data) {
    const { ModeCode, msg} = this.getMessage(data)
    switch (ModeCode) {
      case ModeCodeEnum.MSG: // 普通消息类型
        console.log('====onmessage 有新消息啦====', msg)
        break
      case ModeCodeEnum.HEART_BEAT: // 心跳
        this.webSocketState = true
        console.log('====onmessage 心跳响应====', msg)
        break
    } 
  }
  // 连接关闭后的回调 函数
  closeHandler () {
    console.log('====onclose websocket关闭连接====')
    // 触发事件更改按钮的状态
    eventBus.emit('changeBtnState', 'close')
    // 设置socket状态为断线
    this.webSocketState = false
    // 在断开连接时 记得要清楚心跳时间器和 断开重连时间器材
    this.startHeartBeatTimer && clearTimeout(this.startHeartBeatTimer)
    this.reconnectTimer && clearTimeout(this.reconnectTimer)
    this.reconnectWebSocket()
  }
  errorHandler () {
    console.log('====onerror websocket连接出错====')
    // 触发事件更改按钮的状态
    eventBus.emit('changeBtnState', 'close')
    // 设置socket状态为断线
    this.webSocketState = false
    // 重新连接
    this.reconnectWebSocket()
  }

  // 心跳初始化方法 time:心跳间隔
  startHeartBeat (time) {
    this.startHeartBeatTimer = setTimeout(() => {
      // 客户端每隔一段时间向服务端发送一个心跳消息
      this.sendMessage({
        ModeCode: ModeCodeEnum.HEART_BEAT,
        msg: Date.now()
      })
      this.waitingServer()
    }, time);
  }
  //在客户端发送消息之后,延时等待服务器响应,通过webSocketState判断是否连线成功
  waitingServer () {
    this.webSocketState = false
    setTimeout(() => {
      // 连线成功状态下 继续心跳检测
      if(this.webSocketState) {
        this.startHeartBeat(this.heartBeatConfig.time)
        return
      }
      console.log('心跳无响应, 已经和服务端断线')
      // 重新连接时,记得要先关闭当前连接
      try {
        this.close()
      } catch (error) {
        console.log('当前连接已经关闭')
      }
      // // 重新连接
      // this.reconnectWebSocket()
    }, this.heartBeatConfig.timeout)
  }


  // 重新连接
  reconnectWebSocket () {
    // 判断是否是重新连接状态(即被动状态断线),如果是主动断线的不需要重新连接
    if(!this.isReconnect) {
      return
    }
    // 根据传入的断线重连时间间隔 延时连接
    this.reconnectTimer = setTimeout(() => {
      // 触发重新连接事件
      eventBus.emit('reconnect')
    }, this.heartBeatConfig.reconnect)
  }
}
export default MyWebSocket

在文件夹下新增index.html文件,引入eventBus.js和myWebSocket.js 文件

<html lang="en">
<body>
  <div>
    <button id="connect">连接</button>
    <button disabled id="sendMessage">发送</button>
    <button disabled id="close">关闭</button>
  </div>
</body>
</html>
<script type="module">
  import eventBus from './eventBus.js'
  import MyWebsocket from './myWebSocket.js'

  const connectBtn = document.getElementById('connect')
  const sendMessageBtn = document.getElementById('sendMessage')
  const closeBtn = document.getElementById('close')
  const wsUrl = 'ws://127.0.0.1:8002'
  let myWS = null //  // 用来记录是否连接了websocket

  // 处理下按钮的状态,连接情况下才能有发送和关闭功能,关闭情况下只能有连接功能
  const setButtonState = (state) => {
    switch(state) {
      case 'open':
        connectBtn.disabled = true
        sendMessageBtn.disabled =false
        closeBtn.disabled = false
        break
      case 'close':
        connectBtn.disabled = false
        sendMessageBtn.disabled = true
        closeBtn.disabled = true
    }
  }

  // 连接websocket处理函数
  const connectWeboSocket = () => {
    myWS = new MyWebsocket(wsUrl)
    // 调用实例对象的init函数 
    myWS.init({
      time: 4 * 1000,
      timeout: 2 * 1000,
      reconnect: 3 * 1000
    }, true)
  }

  // 重新连接webscoket处理 函数
  const reconnectWebSocket = () => {
    // 判断是否有初始化websocket
    if(!myWS) {
      connectWeboSocket()
      return
    }
    // 判断实例上的reconnectTimer 是否有值,要记得清除后再连接
    if(myWS && myWS.reconnectTimer) {
      clearTimeout(myWS.reconnectTimer)
      myWS.reconnectTimer = null
      connectWeboSocket()
    }
  }


  // 注册设置按钮样式
  eventBus.on('changeBtnState', setButtonState)
  // 注册重连websocket 事件
  eventBus.on('reconnect', reconnectWebSocket)

  // 点击连接按钮 连接websocket服务器
  connectBtn.addEventListener('click', reconnectWebSocket)
  // 点击发送按钮 向服务端传送数据
  sendMessageBtn.addEventListener('click', e => {
    myWS.sendMessage({
      ModeCode: "message",
      msg: 'hello world'
    })
  })
  // 点击关闭按钮 断开连接
  closeBtn.addEventListener('click', e => {
    myWS.close()
    myWS = null
  })
</script>

实现心跳机制和断线重连总结

心跳机制的实现,在客户端连接成功的回调中即开启心跳。心跳处理函数内部使用定时器延时触发向服务端发送消息的方法,待服务器将消息返回证明是连线成功状态下,继续调用心跳检测方法。

如果客户端给服务端发送心跳消息,在定义的超时时间后客户端没有收到回复,则说明和服务端断线了,此时会触发到客户端连接关闭的回调函数,在此回调中发起重新连接websocket,如果连接失败继续会触发客户端连接关闭的回调函数继续发起重新连接(如此循环)。

等断线重新连接起来时,在客户端连接成功的回调中又开始了心跳检测。其实就是通过延时的定时器反复以上的操作来和服务端一直通信保持连接。