diff --git a/index.js b/index.js index 88e9120..bb85722 100644 --- a/index.js +++ b/index.js @@ -5,7 +5,7 @@ const initStream = require('./stream') const createRemoteApi = require('./remote-api') const createLocalApi = require('./local-api') -function createMuxrpc (remoteManifest, localManifest, localApi, id, perms, codec, legacy) { +function createMuxrpc (remoteManifest, localManifest, localApi, perms, codec) { let bootstrapCB if (typeof remoteManifest === 'function') { bootstrapCB = remoteManifest @@ -17,29 +17,13 @@ function createMuxrpc (remoteManifest, localManifest, localApi, id, perms, codec const emitter = new EventEmitter() if (!codec) codec = PacketStreamCodec - // pass the manifest to the permissions so that it can know - // what something should be. - let _cb - const context = { - _emit (event, value) { - if (emitter) emitter._emit(event, value) - return context - }, - id - } - const ws = initStream( - createLocalApi(localApi, localManifest, perms).bind(context), + createLocalApi(localApi, localManifest, perms).bind(emitter), codec, - (err) => { + () => { if (emitter.closed) return emitter.closed = true emitter.emit('closed') - if (_cb) { - const cb = _cb - _cb = null - cb(err) - } } ) @@ -53,26 +37,7 @@ function createMuxrpc (remoteManifest, localManifest, localApi, id, perms, codec bootstrapCB ) - // legacy local emit, from when remote emit was supported. - emitter._emit = emitter.emit - - if (legacy) { - Object.__defineGetter__.call(emitter, 'id', () => context.id) - Object.__defineSetter__.call(emitter, 'id', (value) => { context.id = value }) - - let first = true - emitter.createStream = (cb) => { - _cb = cb - if (first) { - first = false - return ws - } else { - throw new Error('one stream per rpc') - } - } - } else { - emitter.stream = ws - } + emitter.stream = ws emitter.closed = false @@ -84,11 +49,4 @@ function createMuxrpc (remoteManifest, localManifest, localApi, id, perms, codec return emitter } -module.exports = function (remoteManifest, localManifest, codec) { - if (arguments.length > 3) { - return createMuxrpc.apply(this, arguments) - } - return function (local, perms, id) { - return createMuxrpc(remoteManifest, localManifest, local, id, perms, codec, true) - } -} +module.exports = createMuxrpc diff --git a/test/abort.js b/test/abort.js index 8be285e..d366ef6 100644 --- a/test/abort.js +++ b/test/abort.js @@ -13,8 +13,8 @@ module.exports = function (serializer) { drainAbort: 'sink' } - const A = mux(client, null, serializer)() - const B = mux(null, client, serializer)({ + const A = mux(client, null, null, null, serializer) + const B = mux(null, client, { drainAbort: (n) => { return pull( pull.through(() => { @@ -28,12 +28,9 @@ module.exports = function (serializer) { }) ) } - }) + }, null, serializer) - const as = A.createStream() - const bs = B.createStream() - - pull(as, abortable, bs, as) + pull(A.stream, abortable, B.stream, A.stream) const sent = [] @@ -41,7 +38,7 @@ module.exports = function (serializer) { pull.values([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], (abort) => { if (process.env.TEST_VERBOSE) console.log(abort) t.ok(sent.length < 10, 'sent is correct') - t.end() + // t.end() }), pull.asyncMap((data, cb) => { setImmediate(() => { diff --git a/test/async.js b/test/async.js index 41b9a1d..fdaf222 100644 --- a/test/async.js +++ b/test/async.js @@ -26,23 +26,22 @@ module.exports = function (serializer, buffers) { } tape('async', (t) => { - const A = mux(client, null, serializer)() - const B = mux(null, client, serializer)({ + const A = mux(client, null, null, null, serializer) + const B = mux(null, client, { hello (a, cb) { cb(null, 'hello, ' + a) }, goodbye (b, cb) { cb(null, b) } - }) + }, null, serializer) - const s = A.createStream() pull( - s, + A.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - B.createStream(), + B.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - s + A.stream ) A.hello('world', (err, value) => { @@ -61,23 +60,22 @@ module.exports = function (serializer, buffers) { }) tape('async promise', (t) => { - const A = mux(client, null, serializer)() - const B = mux(null, client, serializer)({ + const A = mux(client, null, null, null, serializer) + const B = mux(null, client, { hello (a, cb) { cb(null, 'hello, ' + a) }, goodbye (b, cb) { cb(null, b) } - }) + }, null, serializer) - const s = A.createStream() pull( - s, + A.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - B.createStream(), + B.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - s + A.stream ) A.hello('world').then((value) => { @@ -100,23 +98,22 @@ module.exports = function (serializer, buffers) { Buffer.from([4, 5]) ] - const A = mux(client, null, serializer)() - const B = mux(null, client, serializer)({ + const A = mux(client, null, null, null, serializer) + const B = mux(null, client, { stuff (b) { return pull.values([1, 2, 3, 4, 5].map((a) => a * b)) }, bstuff () { return pull.values(expected) } - }) + }, null, serializer) - const s = A.createStream() pull( - s, + A.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - B.createStream(), + B.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - s + A.stream ) pull(A.stuff(5), pull.collect((err, ary) => { @@ -139,18 +136,17 @@ module.exports = function (serializer, buffers) { syncErr: 'sync' } - const A = mux(client, null, serializer)() - const B = mux(null, client, serializer)({ + const A = mux(client, null, null, null, serializer) + const B = mux(null, client, { syncOk (a) { return { okay: a } }, syncErr (b) { throw new Error('test error:' + b) } - }) + }, null, serializer) - const s = A.createStream() - pull(s, B.createStream(), s) + pull(A.stream, B.stream, A.stream) A.syncOk(true, (err, value) => { t.error(err) @@ -163,8 +159,8 @@ module.exports = function (serializer, buffers) { }) tape('sink', (t) => { - const A = mux(client, null, serializer)() - const B = mux(null, client, serializer)({ + const A = mux(client, null, null, null, serializer) + const B = mux(null, client, { things (someParam) { return pull.collect((err, values) => { if (err) throw err @@ -173,22 +169,21 @@ module.exports = function (serializer, buffers) { t.end() }) } - }) + }, null, serializer) - const s = A.createStream() pull( - s, + A.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - B.createStream(), + B.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - s + A.stream ) pull(pull.values([1, 2, 3, 4, 5]), A.things(5)) }) tape('duplex', (t) => { - const A = mux(client, null, serializer)() - const B = mux(null, client, serializer)({ + const A = mux(client, null, null, null, serializer) + const B = mux(null, client, { suchstreamwow (someParam) { // did the param come through? t.equal(someParam, 5) @@ -211,24 +206,23 @@ module.exports = function (serializer, buffers) { }) } } - }) + }, null, serializer) - const s = A.createStream() pull( - s, + A.stream, process.env.TEST_VERBOSE ? pull.through(console.log.bind(console, 'IN')) : null, - B.createStream(), + B.stream, process.env.TEST_VERBOSE ? pull.through(console.log.bind(console, 'OUT')) : null, - s + A.stream ) const dup = A.suchstreamwow(5) pull(dup, dup) }) tape('async - error1', (t) => { - const A = mux(client, null)() + const A = mux(client, null) - const s = A.createStream() + const s = A.stream A.hello('world', (err) => { t.ok(err) @@ -242,9 +236,9 @@ module.exports = function (serializer, buffers) { }) tape('async - error2', (t) => { - const A = mux(client, null)() + const A = mux(client, null) - const s = A.createStream() + const s = A.stream A.hello('world', (err) => { if (process.env.TEST_VERBOSE) console.log('CB!') @@ -256,8 +250,8 @@ module.exports = function (serializer, buffers) { }) tape('buffer calls before stream is created', (t) => { - const A = mux(client, null)() - const B = mux(null, client)({ + const A = mux(client, null) + const B = mux(null, client, { hello (a, cb) { cb(null, 'hello, ' + a) } @@ -270,14 +264,13 @@ module.exports = function (serializer, buffers) { t.end() }) - const s = A.createStream() - pull(s, B.createStream(), s) + pull(A.stream, B.stream, A.stream) }) // tape('async - error, and reconnect', (t) => { - // var A = mux(client, null) () + // var A = mux(client, null) // - // var s = A.createStream() + // var s = A.stream // // A.hello('world', (err, value) => { // t.ok(err) @@ -288,9 +281,9 @@ module.exports = function (serializer, buffers) { // }, // }) // - // var s = A.createStream() + // var s = A.stream // - // pull(s, B.createStream(), s) + // pull(s, B.stream, s) // // A.hello('world', (err, value) => { // t.notOk(err) @@ -302,53 +295,6 @@ module.exports = function (serializer, buffers) { // s.sink((abort, cb) => { cb(true) }) // }) - tape('recover error written to outer stream', (t) => { - const A = mux(client, null)() - const err = new Error('testing errors') - const s = A.createStream((_err) => { - t.equal(_err, err) - t.end() - }) - - pull(pull.error(err), s.sink) - }) - - tape('recover error when outer stream aborted', (t) => { - const A = mux(client, null)() - const err = new Error('testing errors') - const s = A.createStream((_err) => { - t.equal(_err, err) - t.end() - }) - - s.source(err, () => {}) - }) - - tape('cb when stream is ended', (t) => { - const A = mux(client, null)() - const s = A.createStream((err) => { - // if(err) throw err - t.ok(err) - // t.equal(err, null) - t.end() - }) - - pull(pull.empty(), s.sink) - }) - - tape('cb when stream is aborted', (t) => { - const err = new Error('testing error') - const A = mux(client, null)() - const s = A.createStream((_err) => { - // if(_err) throw err - t.equal(_err, err) - // t.ok(err) - t.end() - }) - - s.source(err, () => {}) - }) - const client2 = { salutations: { hello: 'async', @@ -357,8 +303,8 @@ module.exports = function (serializer, buffers) { } tape('nested methods', (t) => { - const A = mux(client2, null, serializer)() - const B = mux(null, client2, serializer)({ + const A = mux(client2, null, null, null, serializer) + const B = mux(null, client2, { salutations: { hello (a, cb) { cb(null, 'hello, ' + a) @@ -367,15 +313,14 @@ module.exports = function (serializer, buffers) { cb(null, b) } } - }) + }, null, serializer) - const s = A.createStream() pull( - s, + A.stream, process.env.TEST_VERBOSE ? pull.through(console.log.bind(console, 'IN')) : null, - B.createStream(), + B.stream, process.env.TEST_VERBOSE ? pull.through(console.log.bind(console, 'OUT')) : null, - s + A.stream ) A.salutations.hello('world', (err, value) => { @@ -393,17 +338,17 @@ module.exports = function (serializer, buffers) { }) tape('sink', (t) => { - const A = mux(client, null, serializer)() - const B = mux(null, client, serializer)({ + const A = mux(client, null, serializer) + const B = mux(null, client, { things (len) { return pull.collect((err, ary) => { t.error(err) t.equal(ary.length, len) }) } - }) + }, null, serializer) - const s = A.createStream(); pull(s, B.createStream(), s) + pull(A.stream, B.stream, A.stream) pull(pull.values([1, 2, 3]), A.things(3, (err) => { if (err) throw err @@ -414,16 +359,16 @@ module.exports = function (serializer, buffers) { tape('sink - abort', (t) => { const err = new Error('test abort error') - const A = mux(client, null, serializer)() - const B = mux(null, client, serializer)({ + const A = mux(client, null, null, null, serializer) + const B = mux(null, client, { things () { return (read) => { read(err, () => {}) } } - }) + }, null, serializer) - const s = A.createStream(); pull(s, B.createStream(), s) + pull(A.stream, B.stream, A.stream) pull(pull.values([1, 2, 3]), A.things(3, (_err) => { t.ok(_err) diff --git a/test/attack.js b/test/attack.js index 039218c..ce6d5c8 100644 --- a/test/attack.js +++ b/test/attack.js @@ -11,9 +11,9 @@ function createClient (t) { echo: 'duplex' } - const A = mux(client, null)() + const A = mux(client, null) - const B = mux(null, {})({ + const B = mux(null, {}, { get (a, cb) { // root access!! this should never happen! t.ok(false, 'attacker got in') @@ -33,13 +33,12 @@ function createClient (t) { } }) - const s = A.createStream() pull( - s, + A.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - B.createStream(), + B.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - s + A.stream ) return A @@ -103,11 +102,10 @@ tape('client and server manifest have different types', (t) => { const clientM = { foo: 'async' } const serverM = { foo: 'source' } - const A = mux(clientM, null)() - const B = mux(null, serverM)() + const A = mux(clientM, null) + const B = mux(null, serverM) - const as = A.createStream() - pull(as, B.createStream(), as) + pull(A.stream, B.stream, A.stream) A.foo((err) => { if (process.env.TEST_VERBOSE) console.log(err) diff --git a/test/auth-perms.js b/test/auth-perms.js index 319e4b2..77bb0fe 100644 --- a/test/auth-perms.js +++ b/test/auth-perms.js @@ -4,7 +4,7 @@ const cont = require('cont') const mux = require('../') const Permissions = require('../permissions') -const api = { +const manifest = { login: 'async', logout: 'async', get: 'async', @@ -19,8 +19,6 @@ const api = { } } -const id = (e) => e - const store = { foo: 1, bar: 2, @@ -34,7 +32,7 @@ function createServerAPI (store) { const perms = Permissions({ allow: ['login'] }) - const session = { + const local = { // implement your own auth function. // it should just set the allow and deny lists. @@ -85,23 +83,20 @@ function createServerAPI (store) { } } - session.nested = session + local.nested = local - return mux(null, api, id)(session, perms) + return mux(null, manifest, local, perms) } function createClientAPI () { - return mux(api, null, id)() + return mux(manifest, null) } tape('secure rpc', (t) => { const server = createServerAPI(store) const client = createClientAPI() - const ss = server.createStream() - const cs = client.createStream() - - pull(cs, ss, cs) + pull(client.stream, server.stream, client.stream) cont.para([ (cb) => { @@ -172,10 +167,10 @@ tape('multiple sessions at once', (t) => { const admin = createClientAPI() const user = createClientAPI() - const s1s = server1.createStream() - const s2s = server2.createStream() - const us = user.createStream() - const as = admin.createStream() + const s1s = server1.stream + const s2s = server2.stream + const us = user.stream + const as = admin.stream pull(us, s1s, us) pull(as, s2s, as) @@ -209,8 +204,8 @@ tape('nested sessions', (t) => { const server = createServerAPI(store) const client = createClientAPI() - const ss = server.createStream() - const cs = client.createStream() + const ss = server.stream + const cs = client.stream pull(cs, ss, cs) diff --git a/test/bootstrap.js b/test/bootstrap.js index 88d52f4..38a4224 100644 --- a/test/bootstrap.js +++ b/test/bootstrap.js @@ -5,7 +5,7 @@ const Muxrpc = require('../') const manifest = { hello: 'sync', manifest: 'sync' } const api = { hello (n) { - if (this._emit) this._emit('hello', n) + this.emit('hello', n) if (process.env.TEST_VERBOSE) console.log('hello from ' + this.id) return n + ':' + this.id }, @@ -17,16 +17,15 @@ const api = { tape('emit an event from the called api function', (t) => { t.plan(6) - const bob = Muxrpc(null, manifest)(api) + const bob = Muxrpc(null, manifest, api) const cb = (err, val, emitter) => { - t.notOk(err) - t.deepEqual(manifest, val) - t.ok(emitter) + t.notOk(err, 'manifest bootstrap no error') + t.deepEqual(manifest, val, 'manifest bootstrap is expected') + t.ok(emitter, 'emitter is passed') } - const alice = Muxrpc(cb)() - const as = alice.createStream() - pull(as, bob.createStream(), as) + const alice = Muxrpc(cb) + pull(alice.stream, bob.stream, alice.stream) bob.id = 'Alice' @@ -37,6 +36,5 @@ tape('emit an event from the called api function', (t) => { alice.hello('bob', (err, data) => { t.notOk(err) t.equal(data, 'bob:Alice') - t.end() }) }) diff --git a/test/closed.js b/test/closed.js index 50e9bea..f96ca62 100644 --- a/test/closed.js +++ b/test/closed.js @@ -14,20 +14,19 @@ module.exports = function (serializer) { } tape('async handle closed gracefully', (t) => { - const A = mux(client, null, serializer)() - const B = mux(null, client, serializer)({ + const A = mux(client, null, null, null, serializer) + const B = mux(null, client, { hello (a, cb) { cb(null, 'hello, ' + a) } - }) + }, null, serializer) - const s = A.createStream() pull( - s, + A.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - B.createStream(), + B.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - s + A.stream ) A.hello('world', (err, value) => { @@ -47,20 +46,19 @@ module.exports = function (serializer) { }) tape('source handle closed gracefully', (t) => { - const A = mux(client, null, serializer)() - const B = mux(null, client, serializer)({ + const A = mux(client, null, null, null, serializer) + const B = mux(null, client, { stuff (b) { return pull.values([1, 2, 3, 4, 5].map((a) => a * b)) } - }) + }, null, serializer) - const s = A.createStream() pull( - s, + A.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - B.createStream(), + B.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - s + A.stream ) pull(A.stuff(5), pull.collect((err, ary) => { @@ -80,20 +78,19 @@ module.exports = function (serializer) { }) tape('sink handle closed gracefully', (t) => { - const A = mux(client, null, serializer)() - const B = mux(null, client, serializer)({ + const A = mux(client, null, null, null, serializer) + const B = mux(null, client, { things () { throw new Error('should not be called') } - }) + }, null, serializer) - const s = A.createStream() pull( - s, + A.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - B.createStream(), + B.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - s + A.stream ) A.close((err) => { if (err) throw err @@ -111,20 +108,19 @@ module.exports = function (serializer) { }) tape('close twice', (t) => { - const A = mux(client, null, serializer)() - const B = mux(null, client, serializer)({ + const A = mux(client, null, null, null, serializer) + const B = mux(null, client, { hello (a, cb) { cb(null, 'hello, ' + a) } - }) + }, null, serializer) - const s = A.createStream() pull( - s, + A.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - B.createStream(), + B.stream, process.env.TEST_VERBOSE ? pull.through(console.log) : null, - s + A.stream ) A.hello('world', (err, value) => { @@ -147,13 +143,12 @@ module.exports = function (serializer) { const pushable = Pushable() let closed = false; let n = 2; const drained = [] - const A = mux(client, null, serializer)() - const B = mux(null, client, serializer)({ + const A = mux(client, null, null, null, serializer) + const B = mux(null, client, { stuff () { return pushable } - }) + }, null, serializer) - const s = A.createStream() - pull(s, B.createStream(), s) + pull(A.stream, B.stream, A.stream) pull( A.stuff(), @@ -199,13 +194,12 @@ module.exports = function (serializer) { const pushable = Pushable(() => { next() }) - const A = mux(client, null, serializer)() - const B = mux(null, client, serializer)({ + const A = mux(client, null, null, null, serializer) + const B = mux(null, client, { stuff () { return pushable } - }) + }, null, serializer) - const s = A.createStream() - pull(s, B.createStream(), s) + pull(A.stream, B.stream, A.stream) pull( A.stuff(), diff --git a/test/initial-perms-bootstrap.js b/test/initial-perms-bootstrap.js index 6de1a4c..b905bab 100644 --- a/test/initial-perms-bootstrap.js +++ b/test/initial-perms-bootstrap.js @@ -3,7 +3,7 @@ const pull = require('pull-stream') const cont = require('cont') const mux = require('../') -const api = { +const manifest = { get: 'async', put: 'async', del: 'async', @@ -17,8 +17,6 @@ const api = { manifest: 'sync' } -const id = (e) => e - const store = { foo: 1, bar: 2, @@ -30,7 +28,7 @@ function createServerAPI (store) { // this wraps a session. - const session = { + const local = { whoami (cb) { cb(null, { okay: true, user: name }) }, @@ -49,17 +47,17 @@ function createServerAPI (store) { return pull.values([1, 2, 3]) }, manifest () { - return api + return manifest } } - session.nested = session + local.nested = local - return mux(null, api, id)(session, { allow: ['manifest', 'get'] }) + return mux(null, manifest, local, { allow: ['manifest', 'get'] }) } function createClientAPI (cb) { - return mux(cb, null, id)() + return mux(cb) } tape('secure rpc', (t) => { @@ -93,8 +91,8 @@ tape('secure rpc', (t) => { const server = createServerAPI(store) const client = createClientAPI(afterBootstrap) - const ss = server.createStream() - const cs = client.createStream() + const ss = server.stream + const cs = client.stream pull(cs, ss, cs) }) diff --git a/test/initial-perms.js b/test/initial-perms.js index c9f149f..138e6c6 100644 --- a/test/initial-perms.js +++ b/test/initial-perms.js @@ -3,7 +3,7 @@ const pull = require('pull-stream') const cont = require('cont') const mux = require('../') -const api = { +const manifest = { get: 'async', put: 'async', del: 'async', @@ -16,8 +16,6 @@ const api = { } } -const id = (e) => e - const store = { foo: 1, bar: 2, @@ -27,9 +25,7 @@ const store = { function createServerAPI (store) { const name = 'nobody' - // this wraps a session. - - const session = { + const local = { whoami (cb) { cb(null, { okay: true, user: name }) }, @@ -49,21 +45,21 @@ function createServerAPI (store) { } } - session.nested = session + local.nested = local - return mux(null, api, id)(session, { allow: ['get'] }) + return mux(null, manifest, local, { allow: ['get'] }) } function createClientAPI () { - return mux(api, null, id)() + return mux(manifest) } tape('secure rpc', (t) => { const server = createServerAPI(store) const client = createClientAPI() - const ss = server.createStream() - const cs = client.createStream() + const ss = server.stream + const cs = client.stream pull(cs, ss, cs) diff --git a/test/missing.js b/test/missing.js index 406dbc4..0d30e1f 100644 --- a/test/missing.js +++ b/test/missing.js @@ -10,12 +10,11 @@ const id = (e) => e module.exports = function (codec) { tape('close after both sides of a duplex stream ends', (t) => { - const A = mux(client, null, codec)() - const B = mux(null, client, codec)({ - }) + const A = mux(client, null, null, null, codec) + const B = mux(null, client, {}, null, codec) - const bs = B.createStream() - const as = A.createStream() + const bs = B.stream + const as = A.stream pull( (err, cb) => { diff --git a/test/scuttlebot.js b/test/scuttlebot.js index 495fa11..2106bde 100644 --- a/test/scuttlebot.js +++ b/test/scuttlebot.js @@ -5,17 +5,16 @@ const Muxrpc = require('../') const manifest = { hello: 'sync' } const api = { hello (n) { - if (this._emit) this._emit('hello', n) + this.emit('hello', n) if (process.env.TEST_VERBOSE) console.log('hello from ' + this.id) return n + ':' + this.id } } tape('give a muxrpc instance an id', (t) => { - const bob = Muxrpc(null, manifest)(api) - const alice = Muxrpc(manifest, null)() - const as = alice.createStream() - pull(as, bob.createStream(), as) + const bob = Muxrpc(null, manifest, api) + const alice = Muxrpc(manifest, null) + pull(alice.stream, bob.stream, alice.stream) bob.id = 'Alice' @@ -26,11 +25,11 @@ tape('give a muxrpc instance an id', (t) => { }) }) -tape('initialize muxrpc with an id', (t) => { - const bob = Muxrpc(null, manifest)(api, null, 'Alice') - const alice = Muxrpc(manifest, null)() - const as = alice.createStream() - pull(as, bob.createStream(), as) +tape.skip('initialize muxrpc with an id', (t) => { + const bob = Muxrpc(null, manifest, api) + const alice = Muxrpc(manifest, null) + pull(alice.stream, bob.stream, alice.stream) + bob.id = 'Alice' alice.hello('bob', (err, data) => { t.notOk(err) @@ -42,10 +41,9 @@ tape('initialize muxrpc with an id', (t) => { tape('emit an event from the called api function', (t) => { t.plan(3) - const bob = Muxrpc(null, manifest)(api) - const alice = Muxrpc(manifest, null)() - const as = alice.createStream() - pull(as, bob.createStream(), as) + const bob = Muxrpc(null, manifest, api) + const alice = Muxrpc(manifest) + pull(alice.stream, bob.stream, alice.stream) bob.id = 'Alice' @@ -56,6 +54,5 @@ tape('emit an event from the called api function', (t) => { alice.hello('bob', (err, data) => { t.notOk(err) t.equal(data, 'bob:Alice') - t.end() }) }) diff --git a/test/stream-end.js b/test/stream-end.js index 7330aee..3b312af 100644 --- a/test/stream-end.js +++ b/test/stream-end.js @@ -12,7 +12,7 @@ function delay (fun) { const id = (e) => e -const client = { +const clientManf = { hello: 'async', goodbye: 'async', stuff: 'source', @@ -26,15 +26,15 @@ module.exports = function (codec) { tape('outer stream ends after close', (t) => { t.plan(4) - const A = mux(client, null, codec)() - const B = mux(null, client, codec)({ + const A = mux(clientManf, null, null, null, codec) + const B = mux(null, clientManf, { hello (a, cb) { delay(cb)(null, 'hello, ' + a) }, goodbye (b, cb) { delay(cb)(null, b) } - }) + }, null, codec) A.hello('jim', (err, value) => { if (err) throw err @@ -48,10 +48,7 @@ module.exports = function (codec) { t.equal(value, 'bbb') }) - const bs = B.createStream() - - const as = A.createStream() - pull(as, bs, as) + pull(A.stream, B.stream, A.stream) A.on('closed', () => { t.ok(true) @@ -65,22 +62,20 @@ module.exports = function (codec) { tape('close after uniplex streams end', (t) => { t.plan(7) - const A = mux(client, null, codec)() - const B = mux(null, client, codec)({ + const A = mux(clientManf, null, null, null, codec) + const B = mux(null, clientManf, { stuff () { t.ok(true) return pull.values([1, 2, 3, 4, 5]) } - }) + }, null, codec) pull(A.stuff(), pull.collect((err, ary) => { t.error(err) t.deepEqual(ary, [1, 2, 3, 4, 5]) })) - const bs = B.createStream() - const as = A.createStream() - pull(as, bs, as) + pull(A.stream, B.stream, A.stream) B.on('closed', () => { if (process.env.TEST_VERBOSE) console.log('B emits "closed"') @@ -106,8 +101,8 @@ module.exports = function (codec) { tape('close after uniplex streams end 2', (t) => { t.plan(5) - const A = mux(client, null, codec)() - const B = mux(null, client, codec)({ + const A = mux(clientManf, null, null, null, codec) + const B = mux(null, clientManf, { things () { t.ok(true) return pull.collect((err, ary) => { @@ -115,14 +110,11 @@ module.exports = function (codec) { t.deepEqual(ary, [1, 2, 3, 4, 5]) }) } - }) + }, null, codec) pull(pull.values([1, 2, 3, 4, 5]), A.things()) - const bs = B.createStream() - const as = A.createStream() - - pull(as, bs, as) + pull(A.stream, B.stream, A.stream) B.close((err) => { if (process.env.TEST_VERBOSE) console.log('B CLOSE') @@ -138,18 +130,15 @@ module.exports = function (codec) { tape('close after both sides of a duplex stream ends', (t) => { t.plan(8) - const A = mux(client, null, codec)() - const B = mux(null, client, codec)({ + const A = mux(clientManf, null, null, null, codec) + const B = mux(null, clientManf, { echo () { return pull.through( process.env.TEST_VERBOSE ? console.log : () => {}, () => { t.ok(true) } ) } - }) - - const bs = B.createStream() - const as = A.createStream() + }, null, codec) pull( pull.values([1, 2, 3, 4, 5]), @@ -160,7 +149,7 @@ module.exports = function (codec) { }) ) - pull(as, bs, as) + pull(A.stream, B.stream, A.stream) t.notOk(B.closed) t.notOk(A.closed) @@ -185,28 +174,22 @@ module.exports = function (codec) { }) tape('closed is emitted when stream disconnects', (t) => { - t.plan(2) - const A = mux(client, null)() + t.plan(1) + const A = mux(clientManf, null) A.on('closed', (err) => { if (process.env.TEST_VERBOSE) console.log('EMIT CLOSED') t.notOk(err) }) - pull(pull.empty(), A.createStream((err) => { - // console.log(err) - t.ok(err) // end of parent stream - }), pull.drain()) + pull(pull.empty(), A.stream, pull.drain()) }) tape('closed is emitted with error when stream errors', (t) => { - t.plan(2) - const A = mux(client, null, codec)() + t.plan(1) + const A = mux(clientManf, null, null, null, codec) A.on('closed', (err) => { t.notOk(err) }) - pull(pull.empty(), A.createStream((err) => { - if (process.env.TEST_VERBOSE) console.log(err) - t.notOk(err) // end of parent stream - }), pull.drain()) + pull(pull.empty(), A.stream, pull.drain()) }) }