Skip to content

Instantly share code, notes, and snippets.

@coolaj86
Last active August 16, 2024 14:52
Show Gist options
  • Save coolaj86/cea44f97d8670714470fa1f67fed503c to your computer and use it in GitHub Desktop.
Save coolaj86/cea44f97d8670714470fa1f67fed503c to your computer and use it in GitHub Desktop.
EventSocket

EventSocket.js

The Power of EventEmitter in the Palm of your Promises

(in other words: I figured out how to model goroutines in JavaScript - shhhh! don't tell!)

EventSocket

let socket = EventSocket.create({ explicit: ['_internalEvent'] });
// Event Socket Connection (Listener)
let conn = socket.listen(['*', 'error']);
for (;;) {
  let msg = await conn.accept().catch(Object);
  console.log(msg);
  if (msg instanceof Error) {
    break;
  }
}
// Event Socket Server (Emitter)
socket.resolveAll('fruit', ['apple', 'banana', 'grape']);
socket.resolveAll('_internalEvent', { housekeeping: true });

let err = new Error('no more events available');
// Note: if no conns subscribe to the error event,
//       then the error will cause all to reject.
socket.rejectAll(err);
'use strict';
let EventSocket = {};
/**
* @param {Object} eventTypes
* @param {Array<String>} eventTypes.explicit - events listed here are excluded from '*' (catchall)
*/
EventSocket.create = function (eventTypes) {
let stream = {};
stream._explicitEvents = eventTypes.explicit;
/** @type {Array<any>} */
stream._connections = [];
/**
* @param {Array<String>} events - ex: ['*', 'error'] for default events, or list by name
*/
stream.listen = function (events = null) {
let conn = EventSocket.createConnection(stream, events);
return conn;
};
stream.resolveAll = function (eventname, msg) {
for (let p of stream._connections) {
let isSubscribed = p._events.includes(eventname);
if (isSubscribed) {
p._resolve(msg);
continue;
}
let isExplicit = stream._explicitEvents.includes(eventname);
if (isExplicit) {
continue;
}
let hasCatchall = p._events.includes('*');
if (hasCatchall) {
p._resolve(msg);
}
}
};
stream.rejectAll = function (err) {
let handled = false;
for (let p of stream._connections) {
let handlesErrors = p._events.includes('error');
if (!handlesErrors) {
continue;
}
handled = true;
p._reject(err);
}
if (!handled) {
for (let p of stream._connections) {
p._reject(err);
}
}
};
return stream;
};
EventSocket.createConnection = function (stream, defaultEvents = null) {
let p = {};
stream._connections.push(p);
p._events = defaultEvents;
p.closed = false;
p._settled = false;
p._resolve = function (msg) {};
p._reject = function (err) {};
p._promise = Promise.resolve(null);
p._next = async function () {
p._settled = false;
p._promise = new Promise(function (_resolve, _reject) {
p._resolve = function (msg) {
p._close(true);
_resolve(msg);
};
p._reject = function (err) {
p._close(true);
_reject(err);
};
});
return await p._promise;
};
/**
* Accepts the next message of the given event name,
* or of any of the default event names.
* @param {String} eventname - '*' for default events, 'error' for error, or others by name
*/
p.accept = async function (eventname) {
if (p.closed) {
let err = new Error('cannot accept new events after close');
Object.assign(err, { code: 'E_ALREADY_CLOSED' });
throw err;
}
if (eventname) {
p.events = [eventname];
} else if (defaultEvents?.length) {
p.events = defaultEvents;
} else {
let err = new Error(
`call stream.listen(['*']) or conn.accept('*') for default events`,
);
Object.assign(err, { code: 'E_NO_EVENTS' });
throw err;
}
return await p._next();
};
p._close = function (_settle) {
if (p.closed) {
return;
}
p.closed = true;
let index = stream._connections.indexOf(p);
if (index >= 0) {
void stream._connections.splice(index, 1);
}
if (_settle) {
p._settled = true;
}
if (p._settled) {
return;
}
p._settled = true;
let err = new Error('promise stream closed');
Object.assign(err, { code: 'E_CLOSE' });
p._reject(err);
};
/**
* Causes `let msg = conn.accept()` to fail with E_CLOSE or E_ALREADY_CLOSED
*/
p.close = function () {
p._close(false);
};
return p;
};
@copyright AJ ONeal 2024 MPL-2.0
@coolaj86
Copy link
Author

In the initial version, this may not work as expected:

for (;;) {
    let msg = await conn.accept();
    await sleep(100);
}

If an event comes before the next await accept(), that event will not be available.

Missed events were queued in an earlier draft, but was removed intentionally.

There's a simple solution but, of course, it comes with a caveat - it will leak memory unless 'close()' is called.

A best-set-of-trade-offs approach would be to have a buffered mode, with a configurable max number of messages to buffer.

Buffered, listen() is on() and close() is off().

Unbuffered, accept() is once().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment