Last active
April 29, 2023 22:07
-
-
Save ethereumdegen/53eecf33a7da4ed11ccfee9d3279f8b0 to your computer and use it in GitHub Desktop.
A singleton websocket server and client implementation with app-wide pub and sub
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Websocket Client | |
let connect = await connectSocketClient( beamServerUrl ) | |
//once the server connects us , try to join a room | |
subscribeToMessageType('init', async (message:string):Promise<string|undefined>=>{ | |
let join = await joinRooms(['pendingtx']) | |
return undefined | |
}) | |
//we can listen for messages from the server and do things | |
subscribeToMessageType('pendingtx', async (message:string)=>{ | |
//do stuff here !! | |
return JSON.stringify({type:"pong",data:"hi"}) | |
}) | |
// we can send messages to the server | |
emitMessage( JSON.stringify({ type:'hello', data:'hi' }) | |
*/ | |
import { io , Socket} from "socket.io-client"; | |
let socket: Socket<any>|undefined = undefined | |
let socketUid:string|undefined = undefined | |
require('dotenv').config() | |
const WEBSOCKET_ACCESS_KEY = process.env.WEBSOCKET_ACCESS_KEY | |
export let isConnected = false | |
//add a way for another module to subscribe to messages of a certain type | |
//and provide a callback function to handle the message | |
let messageSubscriptions:Map<string,Array<(msg:string)=>Promise<string|undefined>>> = new Map() | |
export function subscribeToMessageType(messageType:string, handler:(msg:string)=>Promise<string|undefined>){ | |
let existingHandlers = messageSubscriptions.get(messageType) | |
if(existingHandlers){ | |
existingHandlers.push(handler) | |
}else{ | |
messageSubscriptions.set(messageType, [handler]) | |
} | |
} | |
export function unsubscribeAllToMessageType(messageType:string){ | |
messageSubscriptions.set(messageType, []) | |
} | |
export async function handleReceivedMessage(message:string): Promise<void> | |
{ | |
console.log("received: %s", message); | |
let parsedMessage = JSON.parse(message) | |
let type = parsedMessage.type | |
let handlers = messageSubscriptions.get(type) | |
if(!handlers) return | |
for(let handler of handlers){ | |
let response = await handler(message) | |
if(response && socket){ | |
socket.emit("message", response); | |
} | |
} | |
} | |
export async function emitMessage(message:string){ | |
if(socket){ | |
socket.emit("message", message); | |
} | |
} | |
export async function connect(serverUrl: string) : Promise<any>{ | |
socket = io( serverUrl ); | |
socket.on("connect", () => { | |
console.log("connected"); | |
isConnected = true | |
if(WEBSOCKET_ACCESS_KEY){ | |
socket?.emit("authenticate", WEBSOCKET_ACCESS_KEY); | |
} | |
}); | |
socket.on("init", async (data:string) => { | |
socketUid = data | |
await handleReceivedMessage(JSON.stringify({type:"init",data:socketUid})) | |
}); | |
socket.on("message", async (data:string) => { | |
try{ | |
await handleReceivedMessage(data) | |
}catch(e){ | |
console.error(e) | |
} | |
}); | |
socket.on("event", (data:string) => { | |
console.log(data); | |
}); | |
socket.on("disconnect", () => { | |
console.log("disconnected"); | |
isConnected = false | |
}); | |
return socket; | |
} | |
export async function joinRooms(roomArray:string[]){ | |
if(socket){ | |
socket.emit("join", JSON.stringify(roomArray)); | |
}else{ | |
console.error("socket not initialized") | |
} | |
} | |
export async function leaveRooms(roomArray:string[]){ | |
if(socket){ | |
socket.emit("leave", JSON.stringify(roomArray)); | |
}else{ | |
console.error("socket not initialized") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Websocket Server | |
startWebsocketServer() | |
// we can listen for client messages | |
subscribeToMessageType('add_pending_tx', async (message:any) : Promise<string|undefined>=> { | |
let txData = JSON.parse(message) | |
let txDataStringified = JSON.stringify( txData.data ) | |
//push to redis | |
pushTxDataToRedis(txDataStringified) | |
return undefined | |
}) | |
//we can broadcast to a room | |
broadcastToRoom(roomName, JSON.stringify({ | |
type: roomName, | |
data: txDataStringified | |
})) | |
https://socket.io/docs/v4/namespaces/ | |
https://socket.io/docs/v3/rooms/ | |
*/ | |
import { Server } from "socket.io"; | |
require('dotenv').config() | |
const WEBSOCKET_ACCESS_KEY = process.env.WEBSOCKET_ACCESS_KEY | |
const accessKeyRequired = typeof WEBSOCKET_ACCESS_KEY != undefined | |
const PORT = 8010 | |
const server = new Server({ /* options */ }); | |
//parsed message | |
export interface ISocketPayload{ | |
type:string | |
data?:any | |
} | |
interface UserDefinition { | |
userId:string | |
accessKey:string|undefined | |
} | |
let messageSubscriptions:Map<string,Array<(msg:string)=>Promise<string|undefined>>> = new Map() | |
export function subscribeToMessageType(messageType:string, handler:(msg:string)=>Promise<string|undefined>){ | |
let existingHandlers = messageSubscriptions.get(messageType) | |
if(existingHandlers){ | |
existingHandlers.push(handler) | |
}else{ | |
messageSubscriptions.set(messageType, [handler]) | |
} | |
} | |
export function unsubscribeAllToMessageType(messageType:string){ | |
messageSubscriptions.set(messageType, []) | |
} | |
export async function handleInboundMessage(message:string, socket:any): Promise<string|undefined> | |
{ | |
console.log("received: %s", message); | |
let parsedMessage = JSON.parse(message) | |
let type = parsedMessage.type | |
let handlers = messageSubscriptions.get(type) | |
if(!handlers) return | |
for(let handler of handlers){ | |
let response = await handler(message) | |
if(response && socket){ | |
socket.emit("message", response); | |
} | |
} | |
} | |
//a function that will broadcast a message when called | |
export function broadcast(message:string){ | |
server.emit("message", message); | |
} | |
//a function that will broadcast a message to a room when called | |
export function broadcastToRoom(room:string, message:string){ | |
server.to(room).emit("message", message); | |
} | |
let users:any = {} //map of user id to socket id | |
export function start(){ | |
server.on("connection", (socket) => { | |
let userId = (socket.id) | |
users[userId] = { | |
userId, accessKey:undefined | |
} | |
socket.emit("init", userId); | |
socket.on("authenticate", (accessKey:string) => { | |
if(accessKeyRequired){ | |
if(accessKey == WEBSOCKET_ACCESS_KEY){ | |
users[userId].accessKey = accessKey | |
console.log('authed user for socket') | |
}else{ | |
socket.disconnect() | |
} | |
} | |
}) | |
socket.on("message", async (message,arg) => { | |
if(accessKeyRequired){ | |
if(!users[userId].accessKey){ | |
socket.disconnect() | |
return | |
} | |
} | |
console.log('got msg', message) | |
let response = await handleInboundMessage(message, socket); | |
if(response){ | |
socket.emit("message", response); | |
} | |
}) | |
socket.on("rooms", () => { | |
socket.emit("rooms", socket.rooms) | |
}) | |
//join and leave rooms | |
socket.on("join", (roomArrayStringified) => { | |
if(accessKeyRequired){ | |
if(!users[userId].accessKey){ | |
socket.disconnect() | |
return | |
} | |
} | |
let roomArray = JSON.parse(roomArrayStringified); | |
socket.join(roomArray); | |
console.log("joined rooms", roomArray); | |
}) | |
socket.on("leave", (roomArrayStringified) => { | |
let roomArray = JSON.parse(roomArrayStringified); | |
socket.leave(roomArray); | |
console.log("leaving rooms", roomArray); | |
}) | |
}); | |
server.on("connection_error", (err) => { | |
console.log(err.req); // the request object | |
console.log(err.code); // the error code, for example 1 | |
console.log(err.message); // the error message, for example "Session ID unknown" | |
console.log(err.context); // some additional error context | |
}); | |
server.listen(PORT); | |
console.log(`WebSocket server is running on ws://localhost:${PORT} with access key ${WEBSOCKET_ACCESS_KEY || '<none>'}`); | |
return server | |
} | |
export default server ; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment