Created
January 18, 2017 18:39
-
-
Save daviddias/f6a095ce4adc87345c6eb0dae32f5ef7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* eslint-env mocha */ | |
/* eslint max-nested-callbacks: ["error", 8] */ | |
'use strict' | |
const chai = require('chai') | |
chai.use(require('chai-checkmark')) | |
const expect = chai.expect | |
const pair = require('pull-pair/duplex') | |
const pull = require('pull-stream') | |
const parallel = require('async/parallel') | |
const series = require('async/series') | |
const Tcp = require('libp2p-tcp') | |
const multiaddr = require('multiaddr') | |
const mh = multiaddr('/ip4/127.0.0.1/tcp/10000') | |
function closeAndWait (stream, callback) { | |
pull( | |
pull.empty(), | |
stream, | |
pull.onEnd(() => callback()) | |
) | |
} | |
module.exports = (common) => { | |
describe.only('close', () => { | |
let muxer | |
beforeEach((done) => { | |
common.setup((err, _muxer) => { | |
if (err) return done(err) | |
muxer = _muxer | |
done() | |
}) | |
}) | |
it('closing underlying socket closes streams (tcp)', (done) => { | |
expect(2).checks(done) | |
const tcp = new Tcp() | |
const tcpListener = tcp.createListener((conn) => { | |
const listener = muxer.listener(conn) | |
listener.on('stream', (stream) => { | |
pull(stream, stream) | |
}) | |
}) | |
tcpListener.listen(mh, () => { | |
const dialerConn = tcp.dial(mh, setTimeout(() => tcpListener.close(), 2000)) | |
const dialerMuxer = muxer.dialer(dialerConn) | |
const s1 = dialerMuxer.newStream(() => { | |
pull( | |
s1, | |
pull.onEnd((err) => { | |
// console.log('s1', err) | |
expect(err).to.exist.mark() | |
}) | |
) | |
}) | |
const s2 = dialerMuxer.newStream(() => { | |
pull( | |
s2, | |
pull.onEnd((err) => { | |
// console.log('s2', err) | |
expect(err).to.exist.mark() | |
}) | |
) | |
}) | |
}) | |
}) | |
it('closing one of the muxed streams doesn\'t close others', (done) => { | |
const p = pair() | |
const dialer = muxer.dialer(p[0]) | |
const listener = muxer.listener(p[1]) | |
let counter = 0 | |
function check () { | |
if (++counter === 6){ | |
done() | |
} | |
} | |
const conns = [] | |
listener.on('stream', (stream) => { | |
check() | |
pull(stream, stream) | |
}) | |
for (let i = 0; i < 5; i++) { | |
conns.push(dialer.newStream()) | |
} | |
conns.forEach((conn, i) => { | |
if (i === 1) { | |
closeAndWait(conn, (err) => { | |
expect(err).to.not.exist | |
console.log('Happen once') | |
check() | |
}) | |
} else { | |
pull( | |
conn, | |
pull.onEnd(() => { | |
throw new Error('should not end') | |
}) | |
) | |
} | |
}) | |
}) | |
it.skip('closing on spdy doesn\'t close until all the streams that are being muxed are closed', (done) => { | |
const p = pair() | |
const dialer = muxer.dial(p[0]) | |
const listener = muxer.listen(p[1]) | |
expect(15).checks(done) | |
const conns = [] | |
const count = [] | |
for (let i = 0; i < 5; i++) { | |
count.push(i) | |
} | |
series(count.map((i) => (cb) => { | |
parallel([ | |
(cb) => listener.once('stream', (stream) => { | |
console.log('pipe') | |
expect(stream).to.exist.mark() | |
pull(stream, stream) | |
cb() | |
}), | |
(cb) => conns.push(dialer.newStream(cb)) | |
], cb) | |
}), (err) => { | |
if (err) return done(err) | |
conns.forEach((conn, i) => { | |
pull( | |
pull.values([Buffer('hello')]), | |
pull.asyncMap((val, cb) => { | |
setTimeout(() => { | |
cb(null, val) | |
}, i * 10) | |
}), | |
pull.through((val) => console.log('send', val)), | |
conn, | |
pull.through((val) => console.log('recv', val)), | |
pull.collect((err, data) => { | |
console.log('end', i) | |
expect(err).to.not.exist.mark() | |
expect(data).to.be.eql([Buffer('hello')]).mark() | |
}) | |
) | |
}) | |
listener.on('close', () => { | |
console.log('closed listener') | |
}) | |
dialer.end(() => { | |
console.log('CLOSED') | |
}) | |
}) | |
}) | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment