An authenticated multi-room live chat server in 240 lines

Today we're going to build a live chat backend using Cloudflare Workers, Websocket and Node.js. Users will be abe to authenticate with a JWT, join rooms they have permission for, send messages and see typing indicators.

As we're using Cloudflare Workers, there will be some small differences in how we handle incoming connections. You can read more about this in the docs.

Let's jump straight into the code, and I'll explain how it works as we go. First, we'll create a new class, WebsocketApi, which will contain all the business logic:

class WebsocketApi {
  constructor() {
    this.sessions = {}
  }
  
  /**
   * Handle a new connection
   * @param  {String} token JWT
   * @return {Response} Upgrade connection to websocket
   */
  onConnection(token) {
  	
  }
}

On receiving a connection we need to create a new instance of WebSocketPair (which returns separate client and server objects), and upgrade the connection.

onConnection(token) {
  const [client, server] = Object.values(new WebSocketPair())
  // TODO: handle new connection

  return new Response(null, {
    status: 101,
    webSocket: client
  })
}

Before we can start sending messages, we need to do some housekeeping. To do anything useful we'll need to keep track of active connections and listen for events from the server. To do this, we'll create another method, _handleSession.

To keep track of sessions, we'll give each connection a random UUID, and store the connection using the UUID as a key.

/**
 * Handle a new session
 * @param  {Websocket} websocket server instance
 * @param  {String} token     JWT
 * @return {undefined}
 */
_handleSession(websocket, token) {
  const session = uuid()
  websocket.accept()
  websocket.addEventListener('message', event => {})
  websocket.addEventListener('close', event => {})
  
  this.sessions[session] = { websocket }
}

All our inbound and outbound messages will use the same standard format, with a message type, timestamp and arbitrary data:

{
  "type": String,
  "data": {},
  "tz": Date
}

Let's build a quick helper to send messages in this format. We'll also write another helper, just to send errors.

_send(session, type, data = {}) {
  const { websocket } = this.sessions[session]
  websocket.send(JSON.stringify({ type, data, tz: new Date() }))
}

_sendErr(session, message) {
  this._send(session, 'error', { message })
}

We can now send the user a welcome message when they connect to the server:

_handleSession(websocket, token) {
  const session = uuid()
  websocket.accept()
  websocket.addEventListener('message', event => {})
  websocket.addEventListener('close', event => {})
  
  this.sessions[session] = { websocket }

  // Send welcome message
  this._send(session, 'hello', { session })
}

Astute readers will notice that so far we haven't actually done anything to verify the JWT. Let's implement this now. Instead of using a random session UUID, we'll take the sub claim of the users JWT. To parse and verify the token, we'll use the jsonwebtoken package.

import jwt from 'jsonwebtoken'

class WebsocketApi {
  ...
  _handleSession(websocket, token) {
    let session
    try {
      ({ sub: session } = jwt.verify(token, JWT_SECRET))
    } catch (err) {
      websocket.accept()
      websocket.send(JSON.stringify({
        type: 'error',
        data: { message: 'Not authorized' },
        tz: new Date()
      }))
      websocket.close()
      return
    }

    websocket.accept()
    websocket.addEventListener('message', event => {})
    websocket.addEventListener('close', event => {})
    
    this.sessions[session] = { websocket }

    this._send(session, 'hello', { session })
  }
}

There's a lot happening here, so let's break it down. First, we try to verify the token and destructure the sub claim into our session variable. If this fails, we accept the socket connection temporarily, but immediately send an error message and close the connection.

If the token is valid, everything continues as before, the session is accepted and a welcome message sent.

At the moment, the message and close event handlers are just stubs, so let's add some methods to handle these messages.

/**
 * Handle connection closed
 * @param  {String} session UUID of the user
 */
onClose(session) {
  delete this.sessions[session]
}

/**
 * Handle an incoming message
 * @param  {String} session UUID of the user
 * @param  {String} options.data: raw message contents
 */
async onMessage(session, { data: raw }) {
  let type, data
  try {
    ({ type, data } = JSON.parse(raw))
  } catch (err) {
    return this._sendErr(session, 'Invalid message format')
  }

  switch (type) {
    default:
      return this._sendErr(session, 'Unknown message received')
  }
}

When the user disconnects, we delete the entry from the session object. This lets us keep track of who is online, which we'll use later.

When a message is received, it'll come as a plain text string. We need to parse it, and route it to different handlers based on the type.

Let's update our _handleSession method to call these handlers:

_handleSession(websocket, token) {
  ...
  websocket.addEventListener('message', event => this.onMessage(session, event))
  websocket.addEventListener('close', () => this.onClose(session))
  ...
}

Great, we can now handle incoming messages. What messages? Good question. To implement a multi-room chat, we'll need to handle several types:

  • join - join a room (and leave the current room if set)
  • message - send a message to the current room
  • typing - indicate the user is typing

We'll add them all as stubs, and fill out the code shortly.

async onMessage(session, { data: raw }) {
  ...

  switch (type) {
    case 'join':
      return this.join(session, data.room)
    case 'message':
      return this.message(session, data.room, data.message)
    case 'typing':
      return this.typing(session, data.room)
    default:
      return this._sendErr(session, 'Unknown message received')
  }
}

/**
 * Join a room
 * @param  {String} session UUID of user
 * @param  {String} room UUID of room
 */
join(session, room) {
  ...
}

/**
 * Send a message to the room
 * @param  {String} session UUID of user
 * @param  {String} room UUID of room
 * @param  {String} message text to send
 */
message(session, room, message) {
  ...
}

/**
 * Indicate the user is typing
 * @param  {String} session UUID of user
 * @param  {String} room UUID of room
 */
typing(session, room) {
  ...
}

Let's start with handling joining a room. In this model, users can only join rooms they have permission for. To do this, we'll add a custom claim to the users JWT called rooms. The JWT data should looks like this:

{
  "sub": "374b6c83-4576-4740-8446-f21fe87fc21d",
  "rooms": [
    "04520cad-c342-4a37-b417-79cd3ae16dfb"
  ]
}

Note that in this example we use UUIDs, but these could be swapped out for any string format you want. Let's update our _handleSession method to store the rooms a user can access.

_handleSession(websocket, token) {
  let session, authorizedRooms
  try {
    ({ sub: session, rooms: authorizedRooms } = jwt.verify(token, JWT_SECRET))
  } catch (err) {
    websocket.accept()
    websocket.send(JSON.stringify({
      type: 'error',
      data: { message: 'Not authorized' },
      tz: new Date()
    }))
    websocket.close()
    return
  }

  websocket.accept()
  websocket.addEventListener('message', event => this.onMessage(session, event))
  websocket.addEventListener('close', () => this.onClose(session))
  
  this.sessions[session] = { websocket, authorizedRooms }

  this._send(session, 'hello', { session, authorizedRooms })
}

We'll need to check that a user has permission for a room every time we receive a message from them. Let's build a quick helper to do this.

/**
 * Check whether user is authorized for room
 * @param  {String} session UUID of user
 * @param  {String} room UUID of room
 */
_authorized(session, room) {
  const { authorizedRooms } = this.sessions[session]
  return authorizedRooms.includes(room)
}

Now we can verify users have permission to join the room inside our join handler:

join(session, room) {
  if (!this._authorized(session, room)) {
    return this._sendErr(session, 'Not authorized')
  }
}

In order to broadcast messages to a room, we need to keep track of who is in each room. We'll add a new object membership to track this, where the room is the key and the value is an array of sessions in the room:

class WebsocketApi {
  constructor() {
    ...
    this.membership = {}
  }
  
  join(session, room) {
    if (!this._authorized(session, room)) {
      return this._sendErr(session, 'Not authorized')
    }

    this.membership[room] = this.membership[room] || []
    this.membership[room].push(session)
  }
}

Now when we receive a join message it'll add the current users UUID to the array (if they're authorized to join the room). Usually we want to tell the rest of the chat when a user joins, so we need to implement a method to send a message everyone in a room.

/**
 * Send a message to all room members
 * @param  {String} room           UUID of the room
 * @param  {String} type           Message type
 * @param  {Object} data           Message data
 */
_sendToRoom(room, type, data) {
  const members = this.membership[room]
  members.forEach(member => this._send(member, type, data))
}

Just add a call to this method at the end of join, and all room members will be alerted when a new user joins:

join(session, room) {
  ...
  this._sendToRoom(room, 'joined', { session, room })
}

Next, let's handle receiving an incoming message. We want to make sure that the user is both authorized for the room and currently in it. To do this, we need to create an inverse map of memberships. memberships lets us easily look up the members of a room, but we want to be able to look up which room a specific user is in.

get activeRooms() {
  const res = {}
  Object.entries(this.membership).forEach(([room, members]) => {
    members.forEach(member => {
      res[member] = room
    })
  })
  return res
}

This will return an object where the key is the session UUID, and the value is the room they're in. Let's implement a helper to query this easily.

/**
 * Check if user is the specified room
 * @param  {String} session UUID of user
 * @param  {String} room UUID of room
 */
_inRoom(session, room) {
  const userRoom = this.activeRooms[session]
  return userRoom && userRoom === room
}

Next we'll add these checks to the top of the message handler, and if both pass then we broadcast the message to the room:

message(session, room, message) {
  if (!this._authorized(session, room)) {
    return this._sendErr(session, 'Not authorized')
  }

  if (!this._inRoom(session, room)) {
    return this._sendErr(session, 'Not in room')
  }

  this._sendToRoom(room, 'message', {
    id: uuid(),
    from: session,
    message,
    room,
  })
}

Technically since a user needs permission to join, as long as they're in the room they're authorized, but keeping it as two separate checks allows us to provide more specific error messages.

As mentioned previously, we only want each user to be in one room at once so we need to implement a way to switch rooms. Let's update our join method to check whether the user is in a room, and leave it if they are.

join(session, room) {
  if (!this._authorized(session, room)) {
    return this._sendErr(session, 'Not authorized')
  }

  const userRoom = this.activeRooms[session]
  if (userRoom && userRoom === room) {
    // User already in this room, return success
    this._send(session, 'joined', { room })
  } else if (userRoom) {
    // User in a different room, leave it
    this.leave(session, userRoom)
  }

  this.membership[room] = this.membership[room] || []
  this.membership[room].push(session)
  this._sendToRoom(room, 'joined', { session, room })
}

/**
 * Leave a room
 * @param  {String} session UUID of user
 * @param  {String} room UUID of room
 */
leave(session, room) {
  if (!this._authorized(session, room)) {
    return this._sendErr(session, 'Not authorized')
  }

  if (!this._inRoom(session, room)) {
    return this._sendErr(session, 'Not in room')
  }

  const index = this.membership[room].indexOf(session)
  this.membership[room].splice(index, 1)
  this._sendToRoom(room, 'left', { session, room })
}

The leave method is only called when a user switches room, and can't be called directly so we don't need to add anything to our message handler. However, we should update the onClose handler to let the room know if the user goes offline.

onClose(session) {
  const room = this.activeRooms[session]
  if (room) {
    this.leave(session, room)
  }

  delete this.sessions[session]
}

Great. The last user-side message to implement is typing indicators, which are just a special type of message with no data. We don't want to send the typing indicator to the current session, so let's also update our _sendToRoom method to add an exclude parameter.

_sendToRoom(room, type, data, excludeSession) {
  const members = this.membership[room]
  members.forEach(member => {
    if (excludeSession && excludeSession === member) return
    this._send(member, type, data)
  })
}

typing(session, room) {
  if (!this._authorized(session, room)) {
    return this._sendErr(session, 'Not authorized')
  }

  if (!this._inRoom(session, room)) {
    return this._sendErr(session, 'Not in room')
  }

  // Send to all users in room except for current user
  this._sendToRoom(room, 'typing', { session }, session)
}

All our user-side messages are now implemented 🎉

You'll probably want to show how many users are online in each room, so let's add a stats message which gets sent every 10 seconds. This scheduled message will also act as a heartbeat, ensuring our connection is kept alive. Let's add a method to get the stats for a user:

/**
 * Get the room stats for a user
 * @param  {String} session UUID of the session
 */
_roomStats(session) {
  const { authorizedRooms } = this.sessions[session]
  return authorizedRooms.reduce((acc, cur) => {
    const members = this.membership[cur]
    acc[cur] = {
      online: members ? members.length : 0
    }
    return acc
  }, {})
}

To send this on a schedule, we'll set up a setInterval and hook into it from our _handleSession method:

/**
 * Send stats to a user
 * @param  {String} session UUID of the user
 */
_sendStats(session) {
  const stats = this._roomStats(session)
  this._send(session, 'stats', stats)
}

/**
 * Create scheduled messages
 * @param  {String} session UUID of the user
 */
_setupHeartbeat(session) {
  return [
    // Send room stats every 10 seconds
    setInterval(() => this._sendStats(session), 10000),
  ]
}

_handleSession(websocket, token) {
  let session, authorizedRooms
  try {
    ({ sub: session, rooms: authorizedRooms } = jwt.verify(token, JWT_SECRET))
  } catch (err) {
    websocket.accept()
    websocket.send(JSON.stringify({
      type: 'error',
      data: { message: 'Not authorized' },
      tz: new Date()
    }))
    websocket.close()
    return
  }

  websocket.accept()
  websocket.addEventListener('message', event => this.onMessage(session, event))
  websocket.addEventListener('close', () => this.onClose(session))
  
  const heartbeat = this._setupHeartbeat(session)
  this.sessions[session] = { websocket, heartbeat, authorizedRooms }

  // Send welcome message
  const stats = this._roomStats(session)
  this._send(session, 'hello', { session, authorizedRooms, stats })
}

We need to remember to clear this interval when the session disconnects, so lastly we need to update our onClose handler:

onClose(session) {
  const room = this.activeRooms[session]
  if (room) {
    this.leave(session, room)
  }

  const { heartbeat } = this.sessions[session]
  heartbeat.forEach(item => clearInterval(item))
  delete this.sessions[session]
}

WebsocketApi

Nice work! If you've reached this far, your class should look like this:

import jwt from 'jsonwebtoken'
import { v4 as uuid } from 'uuid'

class WebsocketApi {
  constructor() {
    this.sessions = {}
    this.membership = {}
  }

  // Inverse map of memberships
  get activeRooms() {
    const res = {}
    Object.entries(this.membership).forEach(([room, members]) => {
      members.forEach(member => {
        res[member] = room
      })
    })
    return res
  }

  /**
   * Handle a new session
   * @param  {Websocket} websocket server instance
   * @param  {String} token     JWT
   * @return {undefined}
   */
  _handleSession(websocket, token) {
    let session, authorizedRooms
    try {
      ({ sub: session, rooms: authorizedRooms } = jwt.verify(token, JWT_SECRET))
    } catch (err) {
      websocket.accept()
      websocket.send(JSON.stringify({
        type: 'error',
        data: { message: 'Not authorized' },
        tz: new Date()
      }))
      websocket.close()
      return
    }

    websocket.accept()
    websocket.addEventListener('message', event => this.onMessage(session, event))
    websocket.addEventListener('close', () => this.onClose(session))
    
    const heartbeat = this._setupHeartbeat(session)
    this.sessions[session] = { websocket, heartbeat, authorizedRooms }

    // Send welcome message
    const stats = this._roomStats(session)
    this._send(session, 'hello', { session, authorizedRooms, stats })
  }

  _setupHeartbeat(session) {
    return [
      // Send room stats every 10 seconds
      setInterval(() => this._sendStats(session), 10000),
    ]
  }

  _send(session, type, data = {}) {
    const { websocket } = this.sessions[session]
    websocket.send(JSON.stringify({ type, data, tz: new Date() }))
  }

  _sendErr(session, message) {
    this._send(session, 'error', { message })
  }

  /**
   * Send a message to all room members
   * @param  {String} room           UUID of the room
   * @param  {String} type           Message type
   * @param  {Object} data           Message data
   */
  _sendToRoom(room, type, data, excludeSession) {
    const members = this.membership[room]
    members.forEach(member => {
      if (excludeSession && excludeSession === member) return
      this._send(member, type, data)
    })
  }

  /**
   * Send stats to a user
   * @param  {String} session UUID of the user
   */
  _sendStats(session) {
    const stats = this._roomStats(session)
    this._send(session, 'stats', stats)
  }

  _authorized(session, room) {
    const { authorizedRooms, admin } = this.sessions[session]
    return authorizedRooms.includes(room) || admin
  }

  _inRoom(session, room) {
    const userRoom = this.activeRooms[session]
    return userRoom && userRoom === room
  }

  /**
   * Get the room stats for a user
   * @param  {String} session UUID of the session
   */
  _roomStats(session) {
    const { authorizedRooms } = this.sessions[session]
    return authorizedRooms.reduce((acc, cur) => {
      const members = this.membership[cur]
      acc[cur] = {
        members: members ? members.length : 0
      }
      return acc
    }, {})
  }

  /**
   * Handle a new connection
   * @param  {String} token JWT
   * @return {Response}Upgrade connection to websocket
   */
  onConnection(token) {
    const [client, server] = Object.values(new WebSocketPair())
    this._handleSession(server, token)

    return new Response(null, {
      status: 101,
      webSocket: client
    })
  }

  /**
   * Handle connection closed
   * @param  {String} session UUID of the user
   */
  onClose(session) {
    const room = this.activeRooms[session]
    if (room) {
      this.leave(session, room)
    }

    const { heartbeat } = this.sessions[session]
    heartbeat.forEach(item => clearInterval(item))
    delete this.sessions[session]
  }

  /**
   * Handle an incoming message
   * @param  {String} session UUID of the user
   * @param  {String} options.data: raw message contents
   */
  async onMessage(session, { data: raw }) {
    let type, data
    try {
      ({ type, data } = JSON.parse(raw))
    } catch (err) {
      return this._sendErr(session, 'Invalid message format')
    }

    switch (type) {
      case 'join':
        return this.join(session, data.room)
      case 'message':
        return this.message(session, data.room, data.message)
      case 'typing':
        return this.typing(session, data.room)
      default:
        return this._sendErr(session, 'Unknown message received')
    }
  }

  /**
   * Join a room
   * @param  {String} session UUID of user
   * @param  {String} room UUID of room
   */
  join(session, room) {
    if (!this._authorized(session, room)) {
      return this._sendErr(session, 'Not authorized')
    }

    const userRoom = this.activeRooms[session]
    if (userRoom && userRoom === room) {
      // User already in this room, return success
      this._send(session, 'joined', { room })
    } else if (userRoom) {
      // User in a different room, leave it
      this.leave(session, userRoom)
    }

    this.membership[room] = this.membership[room] || []
    this.membership[room].push(session)
    this._sendToRoom(room, 'joined', { session, room })
  }

  /**
   * Leave a room
   * @param  {String} session UUID of user
   * @param  {String} room UUID of room
   */
  leave(session, room) {
    if (!this._authorized(session, room)) {
      return this._sendErr(session, 'Not authorized')
    }

    if (!this._inRoom(session, room)) {
      return this._sendErr(session, 'Not in room')
    }

    const index = this.membership[room].indexOf(session)
    this.membership[room].splice(index, 1)
    this._sendToRoom(room, 'left', { session, room })
  }

  message(session, room, message) {
    if (!this._authorized(session, room)) {
      return this._sendErr(session, 'Not authorized')
    }

    if (!this._inRoom(session, room)) {
      return this._sendErr(session, 'Not in room')
    }

    this._sendToRoom(room, 'message', {
      id: uuid(),
      from: session,
      message,
      room,
    })
  }

  typing(session, room) {
    if (!this._authorized(session, room)) {
      return this._sendErr(session, 'Not authorized')
    }

    if (!this._inRoom(session, room)) {
      return this._sendErr(session, 'Not in room')
    }

    // Send to all users in room except for current user
    this._sendToRoom(room, 'typing', { session }, session)
  }
}

export default WebsocketApi

The last step is to actually use the class, and accept your first connection. This is where I admit that the title is minor clickbait - you'll need a couple more lines to finish implementation.

Create a new file, index.js, as below:

import WebsocketApi from './WebsocketApi'

const ws = new WebsocketApi()

addEventListener('fetch', event => {
  event.respondWith(handleRequest(event.request))
})

async function handleRequest(request) {
  const tokenHeader = request.headers.get('Authorization')
  const upgradeHeader = request.headers.get('Upgrade')
  if (upgradeHeader !== 'websocket') {
    return new Response('Expected websocket', { status: 400 })
  }
  if (!tokenHeader) {
    return new Response('Expected authorization header', { status: 401 })
  }

  const [,token] = tokenHeader.split(' ')
  return ws.onConnection(token)
}

That's it!

Deploy your worker with wrangler publish and get experimenting!