Last active
October 25, 2019 08:11
-
-
Save Frando/fc029021cc0aa69a1b147fb28fb3a4ea to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const tape = require('tape') | |
const hypercore = require('hypercore') | |
const ram = require('random-access-memory') | |
const { runAll, replicate } = require('./lib/util') | |
const { Kappa } = require('..') | |
const createIndexer = require('../sources/hypercore-sparse') | |
const topics = ['red', 'blue', 'green', 'yellow'] | |
function initSparse (kappa, feed) { | |
kappa.use('topics', createTopicView()) | |
kappa.source('sparsefeed' , createIndexer, { | |
feed, | |
onquery (topic, cb) { | |
kappa.api.topics.query(topic, cb) | |
} | |
}) | |
} | |
function createTopicView () { | |
const topics = {} | |
const view = { | |
map (msgs, next) { | |
for (const msg of msgs) { | |
if (msg.value && msg.value.topic) { | |
const topic = msg.value.topic | |
topics[topic] = topics[topic] || [] | |
topics[topic].push({ seq: msg.seq, key: msg.key }) | |
} | |
} | |
next() | |
}, | |
api: { | |
query (kappa, topic, cb) { | |
this.ready(() => { | |
if (!topics[topic]) return cb() | |
const feeds = topics[topic].reduce((agg, msg) => { | |
agg[msg.key] = agg[msg.key] || [] | |
agg[msg.key].push(msg.seq) | |
return agg | |
}, {}) | |
cb(null, feeds) | |
}) | |
}, | |
all (kappa, cb) { | |
this.ready(() => cb(null, topics)) | |
}, | |
requestTopic (kappa, topic) { | |
const flows = kappa.flowsByView(this.name) | |
for (let flow of flows) { | |
if (flow.source.requestTopic) { | |
flow.source.requestTopic(topic) | |
} | |
} | |
} | |
} | |
} | |
return view | |
} | |
tape.only('sparse', async t => { | |
// const cores = { local: null, remote: null } | |
var local, remote | |
var kloc, krem | |
var remotereq | |
const opts = { | |
valueEncoding: 'json', | |
sparse: true | |
} | |
await runAll([ | |
cb => { | |
local = hypercore(ram, opts) | |
local.ready(cb) | |
}, | |
cb => { | |
remote = hypercore(ram, local.key, opts) | |
remote.ready(cb) | |
}, | |
cb => { | |
kloc = new Kappa() | |
krem = new Kappa() | |
initSparse(kloc, local) | |
initSparse(krem, remote) | |
cb() | |
}, | |
cb => { | |
const batch = [] | |
for (let i = 0; i <= 100; i++) { | |
let topic = topics[i % 4] | |
batch[i] = { topic, i } | |
} | |
local.append(batch, cb) | |
}, | |
cb => replicate(local, remote, cb), | |
cb => logAll(cb, 'First replication done'), | |
cb => { | |
krem.api.topics.requestTopic('red') | |
setImmediate(cb) | |
}, | |
cb => logAll(cb, 'Queried for "red"'), | |
cb => { | |
local.append([ | |
{ topic: 'red' }, | |
{ topic: 'blue' }, | |
{ topic: 'red' } | |
], cb) | |
}, | |
cb => setImmediate(cb), | |
cb => logAll(cb, 'Appended'), | |
cb => { | |
cb() | |
} | |
]) | |
t.end() | |
async function logAll (cb, msg) { | |
console.log(`\n${msg}\n${'-'.repeat(msg.length)}\n`) | |
await runAll([ | |
cb => log('local', local, kloc, cb), | |
cb => log('remote', remote, krem, cb) | |
]) | |
cb() | |
} | |
}) | |
function log (name, feed, kappa, cb) { | |
kappa.api.topics.all((err, res) => { | |
console.log(` ${name} | |
Key: ${shortkey(feed)} | |
Length: ${feed.length} | |
Downloaded: ${feed.downloaded()} | |
Topics: ${logTopics(res)}`) | |
cb() | |
}) | |
function logTopics (topics) { | |
let str = '' | |
for (let [key, value] of Object.entries(topics)) { | |
str += `${key}: ${value.length} ` | |
} | |
return str | |
} | |
} | |
function shortkey (feed) { | |
feed._label = feed.key.toString('hex').substring(0, 6) + '..' | |
return feed._label | |
} | |
function pad (str, i) { | |
if (str.length < i) str = str + ' '.repeat(i - str.length) | |
return str | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
After: first replication | |
---------------------- | |
local | |
Key: 49b93d.. | |
Length: 101 | |
Downloaded: 101 | |
Topics: red: 26 blue: 25 green: 25 yellow: 25 | |
remote | |
Key: 49b93d.. | |
Length: 101 | |
Downloaded: 1 | |
Topics: red: 1 | |
After: Remote issued query "red" | |
----------------- | |
local | |
Key: 49b93d.. | |
Length: 101 | |
Downloaded: 101 | |
Topics: red: 26 blue: 25 green: 25 yellow: 25 | |
remote | |
Key: 49b93d.. | |
Length: 101 | |
Downloaded: 26 | |
Topics: red: 26 | |
After: Local appended blocks | |
-------- | |
local | |
Key: 49b93d.. | |
Length: 104 | |
Downloaded: 104 | |
Topics: red: 28 blue: 26 green: 25 yellow: 25 | |
remote | |
Key: 49b93d.. | |
Length: 104 | |
Downloaded: 29 | |
Topics: red: 28 blue: 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const tape = require('tape') | |
const hypercore = require('hypercore') | |
const ram = require('random-access-memory') | |
const { runAll, replicate } = require('./lib/util') | |
const { Kappa } = require('..') | |
const createIndexer = require('../sources/hypercore-sparse') | |
const topics = ['red', 'blue', 'green', 'yellow'] | |
function initSparse (kappa, feed) { | |
kappa.use('topics', createTopicView()) | |
kappa.source('sparsefeed' , createIndexer, { | |
feed, | |
onquery (topic, cb) { | |
kappa.api.topics.query(topic, cb) | |
} | |
}) | |
} | |
function createTopicView () { | |
const topics = {} | |
const view = { | |
map (msgs, next) { | |
for (const msg of msgs) { | |
if (msg.value && msg.value.topic) { | |
const topic = msg.value.topic | |
topics[topic] = topics[topic] || [] | |
topics[topic].push({ seq: msg.seq, key: msg.key }) | |
} | |
} | |
next() | |
}, | |
api: { | |
query (kappa, topic, cb) { | |
this.ready(() => { | |
if (!topics[topic]) return cb() | |
const feeds = topics[topic].reduce((agg, msg) => { | |
agg[msg.key] = agg[msg.key] || [] | |
agg[msg.key].push(msg.seq) | |
return agg | |
}, {}) | |
cb(null, feeds) | |
}) | |
}, | |
all (kappa, cb) { | |
this.ready(() => cb(null, topics)) | |
}, | |
requestTopic (kappa, topic) { | |
const flows = kappa.flowsByView(this.name) | |
for (let flow of flows) { | |
if (flow.source.requestTopic) { | |
flow.source.requestTopic(topic) | |
} | |
} | |
} | |
} | |
} | |
return view | |
} | |
tape.only('sparse', async t => { | |
// const cores = { local: null, remote: null } | |
var local, remote | |
var kloc, krem | |
var remotereq | |
const opts = { | |
valueEncoding: 'json', | |
sparse: true | |
} | |
await runAll([ | |
cb => { | |
local = hypercore(ram, opts) | |
local.ready(cb) | |
}, | |
cb => { | |
remote = hypercore(ram, local.key, opts) | |
remote.ready(cb) | |
}, | |
cb => { | |
kloc = new Kappa() | |
krem = new Kappa() | |
initSparse(kloc, local) | |
initSparse(krem, remote) | |
cb() | |
}, | |
cb => { | |
const batch = [] | |
for (let i = 0; i <= 100; i++) { | |
let topic = topics[i % 4] | |
batch[i] = { topic, i } | |
} | |
local.append(batch, cb) | |
}, | |
cb => replicate(local, remote, cb), | |
cb => logAll(cb, 'First replication done'), | |
cb => { | |
krem.api.topics.requestTopic('red') | |
setImmediate(cb) | |
}, | |
cb => logAll(cb, 'Queried for "red"'), | |
cb => { | |
local.append([ | |
{ topic: 'red' }, | |
{ topic: 'blue' }, | |
{ topic: 'red' } | |
], cb) | |
}, | |
cb => setImmediate(cb), | |
cb => logAll(cb, 'Appended'), | |
cb => { | |
cb() | |
} | |
]) | |
t.end() | |
async function logAll (cb, msg) { | |
console.log(`\n${msg}\n${'-'.repeat(msg.length)}\n`) | |
await runAll([ | |
cb => log('local', local, kloc, cb), | |
cb => log('remote', remote, krem, cb) | |
]) | |
cb() | |
} | |
}) | |
function log (name, feed, kappa, cb) { | |
kappa.api.topics.all((err, res) => { | |
console.log(` ${name} | |
Key: ${shortkey(feed)} | |
Length: ${feed.length} | |
Downloaded: ${feed.downloaded()} | |
Topics: ${logTopics(res)}`) | |
cb() | |
}) | |
function logTopics (topics) { | |
let str = '' | |
for (let [key, value] of Object.entries(topics)) { | |
str += `${key}: ${value.length} ` | |
} | |
return str | |
} | |
} | |
function shortkey (feed) { | |
feed._label = feed.key.toString('hex').substring(0, 6) + '..' | |
return feed._label | |
} | |
function pad (str, i) { | |
if (str.length < i) str = str + ' '.repeat(i - str.length) | |
return str | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment