Commit c43b221ba14d52bab66a39600ac72c74b12c762f
reduce flumeview-level && return live state of subs, needs refactor
Kieran committed on 5/25/2018, 1:28:26 AMParent: 9881ae44565d5783671b359a9bf7c0a0dbdabd49
Files changed
difficult/ex2/index.js | added |
difficult/ex2/index.test.js | added |
package.json | changed |
difficult/ex2/index.js | |||
---|---|---|---|
@@ -1,0 +1,110 @@ | |||
1 … | +const flumeView = require('flumeview-level') | ||
2 … | +const pull = require('pull-stream') | ||
3 … | +const defer = require('pull-defer') | ||
4 … | +const get = require('lodash/get') | ||
5 … | +const NAME = 'channels' | ||
6 … | +const VERSION = 1 | ||
7 … | + | ||
8 … | +const getAuthor = (msg) => get(msg, 'value.author') | ||
9 … | +const getContent = (msg) => get(msg, 'value.content') | ||
10 … | +const getTimestamp = (msg) => get(msg, 'value.timestamp') | ||
11 … | + | ||
12 … | +module.exports = { | ||
13 … | + name: NAME, | ||
14 … | + version: VERSION, | ||
15 … | + manifest: { | ||
16 … | + read: 'source', | ||
17 … | + subscriptions: 'source', | ||
18 … | + getUserSubscriptions: 'async', | ||
19 … | + popular: 'async' | ||
20 … | + }, | ||
21 … | + init: function (server, config) { | ||
22 … | + const view = server._flumeUse( | ||
23 … | + NAME, | ||
24 … | + flumeView(VERSION, map) | ||
25 … | + ) | ||
26 … | + | ||
27 … | + return { read: view.read, subscriptions, popular } | ||
28 … | + | ||
29 … | + function map (msg, seq) { | ||
30 … | + const content = getContent(msg) | ||
31 … | + if (isChannelSubscription()) { | ||
32 … | + return [ | ||
33 … | + [content.channel, getAuthor(msg), getTimestamp(msg), content.subscribed] | ||
34 … | + ] | ||
35 … | + } | ||
36 … | + | ||
37 … | + function isChannelSubscription () { | ||
38 … | + return content.channel && | ||
39 … | + (typeof content.subscribed !== undefined) | ||
40 … | + } | ||
41 … | + } | ||
42 … | + | ||
43 … | + function subscriptions (opts) { | ||
44 … | + const query = Object.assign({}, { | ||
45 … | + live: true, | ||
46 … | + keys: true, | ||
47 … | + values: false, | ||
48 … | + seq: false | ||
49 … | + }, opts) | ||
50 … | + | ||
51 … | + return view.read(query) | ||
52 … | + } | ||
53 … | + | ||
54 … | + function getUserSubscriptions (feedId, cb) { | ||
55 … | + pull( | ||
56 … | + view.read({ keys: false, values: true }), | ||
57 … | + pull.collect((err, msgs) => { | ||
58 … | + | ||
59 … | + }) | ||
60 … | + ) | ||
61 … | + } | ||
62 … | + | ||
63 … | + function popular (opts, cb) { | ||
64 … | + if (typeof opts === 'function') return popular({}, opts) | ||
65 … | + // opts = Object.assign({}, opts, { keys: true, values: false, seq: false }) | ||
66 … | + } | ||
67 … | + | ||
68 … | + } | ||
69 … | +} | ||
70 … | + | ||
71 … | + // function subscriptions (opts) { | ||
72 … | + // const query = Object.assign({}, { | ||
73 … | + // live: true, | ||
74 … | + // keys: true, | ||
75 … | + // values: false, | ||
76 … | + // seq: false | ||
77 … | + // }, opts) | ||
78 … | + | ||
79 … | + // var source = defer.source(query) | ||
80 … | + | ||
81 … | + // // Do a { live: false } request to get collection of current subscribed channels | ||
82 … | + // var getOpts = { live: false, keys: false, values: false, seq: false } | ||
83 … | + | ||
84 … | + // getSubscribedChannels(getOpts, (err, msgs) => { | ||
85 … | + // if (err) throw err | ||
86 … | + | ||
87 … | + // source.resolve( | ||
88 … | + // // pull stream without a sink returns a source | ||
89 … | + // pull( | ||
90 … | + // pull.values(msgs), | ||
91 … | + // pull.filter(msg => !msg.sync) | ||
92 … | + // ) | ||
93 … | + // ) | ||
94 … | + // }) | ||
95 … | + | ||
96 … | + // // Return collection as source to a pull stream | ||
97 … | + // return source | ||
98 … | + // } | ||
99 … | + | ||
100 … | + // function getSubscribedChannels (opts, cb) { | ||
101 … | + // // reduced state of subscriptions to be up to date | ||
102 … | + // pull( | ||
103 … | + // view.read(opts), | ||
104 … | + // pull.collect((err, msgs) => { | ||
105 … | + // if (err) return cb(err) | ||
106 … | + // // return an up to date collection | ||
107 … | + // cb(null, msgs) | ||
108 … | + // }) | ||
109 … | + // ) | ||
110 … | + // } |
difficult/ex2/index.test.js | ||
---|---|---|
@@ -1,0 +1,121 @@ | ||
1 … | +const test = require('tape') | |
2 … | +const Server = require('scuttle-testbot') | |
3 … | +const pull = require('pull-stream') | |
4 … | + | |
5 … | +test('[Difficult] Exercise 2 - Get a live stream list of channels a given user is subscribed to', assert => { | |
6 … | + Server.use(require('./index')) | |
7 … | + const server = Server() | |
8 … | + // A channel can be subscribed to and unsubscribed from, | |
9 … | + // therefore we need to draw the current state of a user's relationship with a given channel | |
10 … | + | |
11 … | + assert.plan(1) | |
12 … | + | |
13 … | + // Construct two independent identities to ensure we narrow the field | |
14 … | + const grace = server.createFeed() | |
15 … | + const elvis = server.createFeed() | |
16 … | + | |
17 … | + // Pre-populate before setting up live stream | |
18 … | + grace.publish({ type: 'channel', channel: 'mycology', subscribed: true }, (err, msg) => { | |
19 … | + grace.publish({ type: 'channel', channel: 'mycology', subscribed: false }, (err, msg) => { | |
20 … | + | |
21 … | + var collection | |
22 … | + | |
23 … | + pull( | |
24 … | + server.channels.read({ values: false, keys: true, live: true }), | |
25 … | + pull.filter(msg => !msg.sync), | |
26 … | + pull.filter(msg => msg.key !== undefined), // no idea why { key: undefined, seq: undefined } is coming through | |
27 … | + pull.drain(msg => { | |
28 … | + collection = reduce(collection, msg) | |
29 … | + | |
30 … | + if (msg.key[0] === 'ruby') { | |
31 … | + result = {} | |
32 … | + result[grace.id] = [ | |
33 … | + { channel: 'mycology', subscribed: true }, | |
34 … | + { channel: 'ruby', subscribed: true } | |
35 … | + ] | |
36 … | + result[elvis.id] = [ | |
37 … | + { channel: 'mycology', subscribed: true }, | |
38 … | + { channel: 'scuttlebutt', subscribed: false } | |
39 … | + ] | |
40 … | + | |
41 … | + assert.deepEqual(collection, result) | |
42 … | + server.close() | |
43 … | + } | |
44 … | + | |
45 … | + function reduce (collection, msg) { | |
46 … | + collection = collection || {} | |
47 … | + const [ channel, author, timestamp, subscribed ] = msg.key | |
48 … | + collection[author] = collection[author] || [] | |
49 … | + var channelIndex = collection[author].findIndex(sub => sub.channel === channel) | |
50 … | + if (channelIndex !== -1) collection[author][channelIndex].subscribed = subscribed | |
51 … | + else collection[author].push({ channel, subscribed }) | |
52 … | + return collection | |
53 … | + } | |
54 … | + | |
55 … | + }) | |
56 … | + ) | |
57 … | + | |
58 … | + grace.publish({ type: 'channel', channel: 'mycology', subscribed: true }, (err, msg) => { | |
59 … | + elvis.publish({ type: 'channel', channel: 'mycology', subscribed: true }, (err, msg) => { | |
60 … | + elvis.publish({ type: 'channel', channel: 'scuttlebutt', subscribed: true }, (err, msg) => { | |
61 … | + elvis.publish({ type: 'channel', channel: 'scuttlebutt', subscribed: false }, (err, msg) => { | |
62 … | + grace.publish({ type: 'channel', channel: 'ruby', subscribed: true }, () => { | |
63 … | + // these should now stream through to live pull stream above | |
64 … | + }) | |
65 … | + }) | |
66 … | + }) | |
67 … | + }) | |
68 … | + }) | |
69 … | + }) | |
70 … | + }) | |
71 … | +}) | |
72 … | + | |
73 … | +// test('get a list of channels ordered by how many people subscribed to each', assert => { | |
74 … | +// Server.use(require('./index')) | |
75 … | +// const server = Server() | |
76 … | + | |
77 … | +// assert.plan(1) | |
78 … | + | |
79 … | +// var grace = server.createFeed() | |
80 … | +// var elvis = server.createFeed() | |
81 … | + | |
82 … | + | |
83 … | +// function cb (msg) { | |
84 … | +// server.channels.popular | |
85 … | +// } | |
86 … | + | |
87 … | +// server.close() | |
88 … | +// assert.end() | |
89 … | +// }) | |
90 … | + | |
91 … | +// (source) get a list of channels a given user is subscribed to | |
92 … | +// this needs to be a live stream in some way | |
93 … | +// probably the reduced state of channels you're currently subscribed to, then { sync: true } followed by live updates from now on... (to discuss) | |
94 … | +// (async) a list of all channels | |
95 … | +// ordered by how many subscribed to each | |
96 … | + | |
97 … | +// grace.publish({ type: 'post', channel: 'mycology', text: 'fly agarics all over' }, (err, msg) => { | |
98 … | +// elvis.publish({ type: 'post', channel: 'mycology', text: 'lets talk about clathrus archerii' }, (err, msg) => { | |
99 … | +// elvis.publish({ type: 'post', channel: 'mycology', text: 'no lets talk about stinkhorns' }, (err, msg) => { | |
100 … | +// elvis.publish({ type: 'post', channel: 'mycology', text: 'id like to talk about mycelium' }, (err, msg) => { | |
101 … | + // // Get current state | |
102 … | + // pull( | |
103 … | + // server.channels.subscriptions({ live: false }), | |
104 … | + // pull.collect((err, collection) => { | |
105 … | + // // Stream new messages | |
106 … | + // pull( | |
107 … | + // server.channels.subscriptions({ live: true }), // defaults to { live: true } | |
108 … | + // pull.filter(msg => !msg.sync), | |
109 … | + // pull.drain(msg => { | |
110 … | + // console.log(msg) | |
111 … | + // collection.reduce((state, msg) => { | |
112 … | + | |
113 … | + // }, {}) | |
114 … | + // if (msg.key[0] === 'ruby') { | |
115 … | + // assert.end() | |
116 … | + // server.close() | |
117 … | + // } | |
118 … | + // }) | |
119 … | + // ) | |
120 … | + // }) | |
121 … | + // ) |
package.json | ||
---|---|---|
@@ -18,8 +18,9 @@ | ||
18 | 18 … | "homepage": "https://github.com/ssbc/ssb-testing-guide#readme", |
19 | 19 … | "dependencies": { |
20 | 20 … | "flumeview-level": "^3.0.4", |
21 | 21 … | "flumeview-reduce": "^1.3.13", |
22 … | + "lodash": "^4.17.10", | |
22 | 23 … | "pull-stream": "^3.6.8", |
23 | 24 … | "scuttle-testbot": "^1.1.6", |
24 | 25 … | "ssb-backlinks": "^0.7.1", |
25 | 26 … | "ssb-sort": "^1.1.0", |
Built with git-ssb-web