Last active
December 13, 2023 19:45
-
-
Save benzmuircroft/2dcbbf3aa8b136d87606facc6377140c to your computer and use it in GitHub Desktop.
handle offline events server/client on an autodeebee. Get events that happened when offline, handle them when you come back online
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
SERVER EXAMPLE | |
const server = require('hyperdown')(uniqueKeyPair, storageFolderName, true); | |
server.onClientConsumedEvents(function(remotePublicKey, eventsArray) { | |
// do something ... | |
}); | |
await server.addEvent(remotePublicKey, eventString); | |
CLIENT EXAMPLE | |
const client = require('hyperdown')(uniqueKeyPair, storageFolderName, true); | |
client.eventHandler(function(id, ev, cb) { | |
cb(id, true); | |
}); | |
OPTIONS | |
{ | |
uniqueKeyPair: must be unique to each peer (including the server peer) and be able to reproduce socket.remotePublicKey | |
folderName: is storage and the swarm topic | |
isServer: is a bool | |
onClientConsumedEvents: required for server. is a function (see sever example) | |
eventHandler: required for clients. is a function (see client example) | |
} | |
*/ | |
async function hyperdown(options) { | |
const Corestore = require('corestore'); | |
const AutobaseManager = require('@lejeunerenard/autobase-manager'); | |
const Autodeebee = require('hyperdbee/autodeebee'); | |
const { DB } = require('hyperdbee'); | |
const Hyperswarm = require('hyperswarm'); | |
const Keychain = require('keypear'); | |
const b4a = require('b4a'); | |
const goodbye = await import('graceful-goodbye'); | |
let base, swarm, keyPair; | |
if (!options) { | |
throw new Error('options object is missing'); | |
} | |
else if (!options.uniqueKeyPair) { | |
throw new Error('options.uniqueKeyPair should be a KeyChain or keyPair. see: https://github.com/holepunchto/keypear'); | |
} | |
else if (!options.folderName || typeof options.folderName !== 'string') { | |
throw new Error('options.folderName should be a string'); | |
} | |
else if (options.isServer && (!options.onClientConsumedEvents || typeof options.onClientConsumedEvents !== 'function' || options.eventHandler)) { | |
throw new Error('options.onClientConsumedEvents should be a function if you intend this to be a server'); | |
} | |
else if (!options.isServer && (!options.eventHandler || typeof options.eventHandler !== 'function' || options.onClientConsumedEvents)) { | |
throw new Error('options.eventHandler should be a function if you intend this to be a client'); | |
} | |
if (!options.uniqueKeyPair.publicKey) { | |
if (typeof options.uniqueKeyPair.get == 'function') { | |
keyPair = options.uniqueKeyPair.get(); | |
} | |
else { | |
throw new Error('options.uniqueKeyPair should be a KeyChain or keyPair. see: https://github.com/holepunchto/keypear'); | |
} | |
} | |
else { | |
keyPair = new Keychain(options.uniqueKeyPair); | |
keyPair = keyPair.get(); | |
} | |
const store = new Corestore(`./${options.folderName}`); | |
const input = store.get({ name: 'input', sparse: false, valueEncoding: 'json' }); | |
const output = store.get({ name: 'output', sparse: false, valueEncoding: 'json' }); | |
if (options.isServer) { // --------------------------------------- server | |
this.onClientConsumedEvents = options.onClientConsumedEvents; | |
base = new Autobase({ | |
inputs: [input], | |
localInput: input, | |
localOutput: output | |
}); | |
const manager = new AutobaseManager( | |
base, | |
(key, coreType, channel) => true, // function to filter core keys | |
store.get.bind(store), // get(key) function to get a hypercore given a key | |
store.storage, // Storage for managing autobase keys | |
{ id: options.folderName } // Options | |
); | |
await manager.ready(); | |
const autobee = new Autodeebee(base); | |
this.db = new DB(autobee); | |
this.addEvent = async function(userPublicKey, data) { | |
const hyperdownId = +new Date(); // todo: improve this | |
const user = await this.db.collection('events').findOne(userPublicKey); | |
let events = user.events; | |
data.hyperdownId = hyperdownId; | |
events[hyperdownId] = data; | |
await this.db.collection('events').update({ _id: userPublicKey }, { events: events }, { multi: false, upsert: true }); | |
if (!user.offline && clients[userPublicKey]) { | |
clients[userPublicKey].write(b4a.from(JSON.stringify({ event: data }))); | |
} | |
}; | |
swarm = new Hyperswarm({ | |
keyPair: keyPair | |
}); | |
const clients = {}; | |
swarm.on('connection', function(socket) { | |
const stream = store.replicate(socket); | |
manager.attachStream(stream); // Attach manager | |
socket.write(b4a.from(JSON.stringify({ isServer: options.isServer }))); // tell the client you are the server ... | |
clients[socket.remotePublicKey] = socket; | |
socket.on('data', async function(data) { | |
let e; | |
try { | |
data = JSON.parse(data); | |
} | |
catch (err) { | |
e = err; | |
} | |
if (!e) { | |
if (data.consumedEvents) { | |
let user = await this.db.collection('events').findOne(socket.remotePublicKey); | |
let consumedEvents = []; | |
for (const hyperdownId in user.events) { | |
if (user.consumed.includes(hyperdownId)) { | |
consumedEvents.push(JSON.stringify(JSON.parse(user.events[hyperdownId]))); | |
delete user.events[hyperdownId]; | |
} | |
} | |
await this.db.collection('events').update({ _id: socket.remotePublicKey }, { events: user.events, consumed: [] }, { multi: false }); | |
this.onClientConsumedEvents(socket.remotePublicKey, consumedEvents); // application can handle anything it needs to .... | |
} | |
else if (data.goodbye) { | |
if (!(await this.db.collection('events').findOne(data.goodbye)).offline) { | |
await this.db.collection('events').update({ _id: data.goodbye }, { offline: true }, { multi: false }); | |
} | |
} | |
} | |
}); | |
socket.on('close', async function() { | |
delete clients[socket.remotePublicKey]; | |
await this.db.collection('events').update({ _id: socket.remotePublicKey }, { offline: true }, { multi: false }); | |
}); | |
}); | |
goodbye(() => swarm.destroy()); | |
await swarm.join(b4a.alloc(32).fill(options.folderName)); | |
await swarm.flush(); | |
} | |
else { // ---------------------------------------------------------------- client | |
this.eventHandler = options.eventHandler; | |
base = new Autobase({ | |
inputs: [input], | |
localInput: input, | |
localOutput: output | |
}); | |
let server; | |
const manager = new AutobaseManager( | |
base, | |
(key, coreType, channel) => true, // function to filter core keys | |
store.get.bind(store), // get(key) function to get a hypercore given a key | |
store.storage, // Storage for managing autobase keys | |
{ id: options.folderName } // Options | |
); | |
await manager.ready(); | |
const autobee = new Autodeebee(base); | |
this.db = new DB(autobee); | |
swarm = new Hyperswarm({ | |
keyPair: keyPair | |
}); | |
swarm.on('connection', async function(socket) { | |
const stream = store.replicate(socket); | |
manager.attachStream(stream); // Attach manager | |
socket.on('data', async function(data) { | |
let e; | |
try { | |
data = JSON.parse(data); | |
} | |
catch (err) { | |
e = err; | |
} | |
if (!e) { | |
if (data.isServer) { | |
hasServer(socket); // server is ready to talk ! | |
} | |
else if (data.event) { | |
const hyperdownId = data.event.hyperdownId + ''; // clone | |
delete data.event.hyperdownId; | |
this.eventHandler(hyperdownId, data.event, async function(id, bool) { // call back | |
if (id !== hyperdownId) { | |
throw new Error(`Malformed hyperdownId for event. Got: '${id}', expected: '${hyperdownId}'`); | |
} | |
if (bool) { // true | |
await this.db.collection('events').update({ _id: keyPair.publicKey }, { $push: { consumed: hyperdownId } }, { multi: false, upsert: true }); | |
if (server) { | |
server.write(b4a.from(JSON.stringify({ consumedEvents: true }))); | |
} | |
} | |
}); | |
} | |
} | |
}); | |
}); | |
goodbye(async function() { | |
if (server) { | |
server.write(b4a.from(JSON.stringify({ goodbye: keyPair.publicKey.toString('hex') }))); | |
} | |
await this.db.collection('events').update({ _id: keyPair.publicKey }, { offline: true }, { multi: false }); | |
swarm.destroy(); | |
}); | |
await swarm.join(b4a.alloc(32).fill(options.folderName)); | |
await swarm.flush(); | |
// when the server is ready to talk ... | |
async function hasServer(socket) { | |
server = socket; | |
socket.on('close', function() { | |
server = undefined; | |
}); | |
if (!await this.db.collection('events').findOne(keyPair.publicKey)) { | |
await this.db.collection('events').insert({ _id: keyPair.publicKey, offline: false, events: {} }); | |
} | |
else { | |
await this.db.collection('events').update({ _id: keyPair.publicKey }, { offline: false }, { multi: false }); | |
} | |
// look up our events and consume them ... | |
let found = (await this.db.collection('events').findOne({ _id: keyPair.publicKey })).events; | |
this.evs = JSON.stringify(JSON.parse(found)); | |
if (this.evs.length) { | |
let hyperdownId = Object.keys(found); | |
;(async function next(s, that) { | |
if (found[hyperdownId[s]]) { | |
that.eventHandler(hyperdownId[s], found[hyperdownId[s]], async function(id, bool) { // callback result | |
if (id !== hyperdownId[s]) { | |
throw new Error(`Malformed hyperdownId for event. Got: '${id}', expected: '${hyperdownId[s]}'`); | |
} | |
if (bool) { // true | |
await this.db.collection('events').update({ _id: keyPair.publicKey }, { $push: { consumed: hyperdownId[s] } }, { multi: false, upsert: true }); | |
} | |
await next(s + 1, that); | |
}); | |
} | |
else { //end | |
next = null; | |
if (server) { | |
server.write(b4a.from(JSON.stringify({ consumedEvents: true }))); | |
} | |
} | |
})(0, this); | |
} | |
} | |
} | |
}; | |
module.exports = hyperdown; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment