Skip to content

Instantly share code, notes, and snippets.

@andrew-delph
Last active September 12, 2024 00:17
Show Gist options
  • Save andrew-delph/05a45f7f4293a85755df5fbd15ad26c1 to your computer and use it in GitHub Desktop.
Save andrew-delph/05a45f7f4293a85755df5fbd15ad26c1 to your computer and use it in GitHub Desktop.
A wrapper for the socket io websocket package to give socketio with promises.

A socket.io wrapper for k6/experimental/websockets package with promises. This workings on v4 of socket.io

See the list of current functions in K6SocketIoBase.ts

A simple test that waits for a message establish the ends an event myping, waits for the ack, sends another myping and waits for the ack again. It then closes the socket.

I will eventually make an example of this with the babel configs so its easy to setup.

export enum responseType {
open,
close,
ping,
pong,
message,
upgrade,
noop,
}
export enum responseCode {
connect,
disconnect,
event,
ack,
error,
}
import { responseCode, responseType } from './constants';
import { checkResponse, getArrayFromRequest, getCallbackId } from './socket.io';
import { uuidv4 as uuid } from 'https://jslib.k6.io/k6-utils/1.4.0/index.js';
import { setTimeout, clearTimeout } from 'k6/experimental/timers';
import { check } from 'k6';
export abstract class K6SocketIoBase {
socket: any;
callbackCount = 0;
connected = false;
onConnect: (() => void) | undefined;
ackCallbackMap: Record<string, (data: any) => void> = {};
eventMessageHandleMap: Record<
string,
(data: any, callback?: (data: any) => void) => void
> = {};
waitingEventMap: Record<string, (data: any) => void> = {};
url: string;
max_time: number;
params: any;
constructor(url: string, params: any = {}, max_time: number = 0) {
this.url = url;
this.params = params;
this.max_time = max_time;
}
abstract connect(): void;
abstract on(event: string, callback: (data: any) => void): void;
abstract parseMessage(message: any): string;
setSocket(socket: any): void {
this.socket = socket;
this.on(`message`, (msg) => {
this.handleMessage(this.parseMessage(msg));
});
let max_time_timeout: number;
if (this.max_time != 0) {
max_time_timeout = setTimeout(() => {
this.close();
}, this.max_time);
}
this.on(`error`, (error) => {
console.log(`error.`);
check(false, { error: (r) => r });
this.socket.close();
});
this.on(`close`, () => {
clearTimeout(max_time_timeout);
this.failWaitingEvents();
});
}
listen() {
this.on(`open`, () => {});
}
close() {
this.socket.close();
}
setOnConnect(callback: () => void) {
this.onConnect = callback;
}
setOnError(callback: () => void) {
this.on(`error`, callback);
}
handleMessage(msg: string) {
const response = checkResponse(msg);
const type = response.type;
const code = response.code;
if (type == responseType.open) {
this.socket.send(`40`);
return;
}
switch (code) {
case responseCode.connect: {
if (this.onConnect != null) this.onConnect();
this.connected = true;
break;
}
case responseCode.ack: {
const msgObject = getArrayFromRequest(msg);
const callbackId = getCallbackId(msg);
const callback = this.ackCallbackMap[callbackId];
if (callback != undefined) {
delete this.ackCallbackMap[callbackId];
callback(msgObject);
}
break;
}
case responseCode.event: {
const msgObject = getArrayFromRequest(msg);
const event = msgObject[0];
const message = msgObject[1];
const callbackId = getCallbackId(msg);
const callback = !Number.isNaN(callbackId)
? (data: any) => {
this.sendAck(callbackId, data);
}
: undefined;
const eventMessageHandle = this.eventMessageHandleMap[event];
if (eventMessageHandle != undefined) {
eventMessageHandle(message, callback);
} else {
if (event == `message` || event == `activeCount`) break;
console.log(`no eventMessageHandle:`, event);
}
break;
}
}
}
setEventMessageHandle(event: any, handler: any) {
this.eventMessageHandleMap[event] = handler;
}
send(event: string, data: any, callback: any) {
if (callback == null) {
this.socket.send(
`${responseType.message}${
responseCode.event
}["${event}",${JSON.stringify(data)}]`,
);
} else {
this.callbackCount++;
this.ackCallbackMap[this.callbackCount] = callback;
this.socket.send(
`${responseType.message}${responseCode.event}${
this.callbackCount
}["${event}",${JSON.stringify(data)}]`,
);
}
}
sendAck(callbackId: number, data: any) {
this.socket.send(
`${responseType.message}${responseCode.ack}${callbackId}[${JSON.stringify(
data,
)}]`,
);
}
expectMessage(event: string, timeout = 0) {
const startTime = Date.now();
const waitingEventId: string = uuid();
const wrapper = this;
return new Promise((resolve, reject) => {
wrapper.waitingEventMap[waitingEventId] = reject;
const eventMessageHandle = (data: any, callback: any) => {
const elapsed = Date.now() - startTime;
const isSuccess = elapsed < timeout;
delete wrapper.waitingEventMap[waitingEventId];
if (isSuccess || timeout == 0) {
resolve({ data, callback, elapsed });
} else {
reject(`timeout reached for ${event}`);
}
};
wrapper.eventMessageHandleMap[event] = eventMessageHandle;
});
}
sendWithAck(event: string, data: any, timeout = 0) {
const startTime = Date.now();
const waitingEventId = uuid();
const wrapper = this;
return new Promise(function (resolve, reject) {
wrapper.waitingEventMap[waitingEventId] = reject;
wrapper.send(event, data, (callbackData: any) => {
const elapsed = Date.now() - startTime;
const isSuccess = elapsed < timeout;
delete wrapper.waitingEventMap[waitingEventId];
if (isSuccess || timeout == 0) {
resolve({ data: callbackData, elapsed });
} else {
reject(`timeout reached`);
}
});
});
}
failWaitingEvents() {
for (const waitingEvent of Object.values(this.waitingEventMap)) {
waitingEvent(`failed wait event.`);
}
}
}
import { K6SocketIoBase } from './K6SocketIoBase';
import { WebSocket } from 'k6/experimental/websockets';
export class K6SocketIoExp extends K6SocketIoBase {
connect(): void {
const socketIo = this;
socketIo.setSocket(new WebSocket(this.url));
this.socket.addEventListener(`open`, () => {});
}
on(event: string, callback: (data: any) => void): void {
this.socket.addEventListener(event, callback);
}
parseMessage(message: any): string {
// console.log(`message.data`, message.data);
return message.data;
}
}
import { K6SocketIoExp } from '../libs/K6SocketIoExp';
export const options = {
vus: 100,
duration: `1m`,
};
export default function () {
const domain = `localhost:8888`;
const url = `${`ws`}://${domain}/socket.io/?EIO=4&transport=websocket`;
const socket = new K6SocketIoExp(url);
socket.setOnConnect(() => {
socket
.expectMessage(`established`)
.catch((error) => {
return Promise.reject(error);
})
.then((data: any) => {
return socket.sendWithAck(`myping`, {});
})
.catch((error) => {
return Promise.reject(error);
})
.then((data: any) => {
return socket.sendWithAck(`myping`, {});
})
.catch(() => {})
.finally(() => {
socket.close();
});
});
socket.connect();
}
export interface soResponse {
type: number;
code: number;
}
export function checkResponse(response: string): soResponse {
return { type: parseInt(response[0]), code: parseInt(response[1]) };
}
export function getCallbackId(response: string): number {
return parseInt(response.slice(2));
}
export function getArrayFromRequest(response: string): string[] {
const match = /\[.+\]/;
const parsedResponse = response.match(match);
return parsedResponse ? JSON.parse(parsedResponse[0]) : [];
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment