Commit c56ea9814a318dd9a4f26fcf109a217e2d13658b
add a pull stream of all polls you made / participated in
mix irving committed on 6/13/2018, 6:43:50 AMParent: 54da6278f7b2e4b69c93d48ac2318edec1ca00f7
Files changed
methods.js | changed |
package-lock.json | changed |
package.json | changed |
poll/pull/all.js | changed |
poll/pull/mine.js | added |
test.js | added |
methods.js | ||
---|---|---|
@@ -19,9 +19,10 @@ | ||
19 | 19 … | }, |
20 | 20 … | pull: { |
21 | 21 … | closed: require('./poll/pull/closed'), |
22 | 22 … | open: require('./poll/pull/open'), |
23 | - all: require('./poll/pull/all') | |
23 … | + all: require('./poll/pull/all'), | |
24 … | + mine: require('./poll/pull/mine') | |
24 | 25 … | } |
25 | 26 … | }, |
26 | 27 … | position: { |
27 | 28 … | async: { |
package-lock.json | ||
---|---|---|
The diff is too large to show. Use a local git client to view these changes. Old file size: 196200 bytes New file size: 196414 bytes |
package.json | ||
---|---|---|
@@ -26,11 +26,14 @@ | ||
26 | 26 … | "dependencies": { |
27 | 27 … | "is-my-json-valid": "^2.17.1", |
28 | 28 … | "libnested": "^1.2.1", |
29 | 29 … | "lodash.clonedeep": "^4.5.0", |
30 … | + "lodash.merge": "^4.6.1", | |
30 | 31 … | "mutant": "^3.22.1", |
31 | 32 … | "pull-async": "^1.0.0", |
33 … | + "pull-merge": "^1.0.4", | |
32 | 34 … | "pull-next-query": "^1.0.0", |
35 … | + "pull-paramap": "^1.2.2", | |
33 | 36 … | "pull-stream": "^3.6.2", |
34 | 37 … | "ssb-msg-content": "^1.0.1", |
35 | 38 … | "ssb-msg-schemas": "^6.3.0", |
36 | 39 … | "ssb-poll-schema": "^1.6.1", |
poll/pull/all.js | ||
---|---|---|
@@ -1,8 +1,8 @@ | ||
1 | -var next = require('pull-next-query') | |
1 … | +const next = require('pull-next-query') | |
2 | 2 … | |
3 | 3 … | module.exports = function (server) { |
4 | - return function ClosedPollsStream (opts) { | |
4 … | + return function AllPollsStream (opts) { | |
5 | 5 … | const defaultOpts = { |
6 | 6 … | limit: 100, |
7 | 7 … | query: [{ |
8 | 8 … | $filter: { |
poll/pull/mine.js | |||
---|---|---|---|
@@ -1,0 +1,72 @@ | |||
1 … | +const pull = require('pull-stream') | ||
2 … | +const pullMerge = require('pull-merge') | ||
3 … | +const paramap = require('pull-paramap') | ||
4 … | +const next = require('pull-next-query') | ||
5 … | +const clone = require('lodash.clonedeep') | ||
6 … | +const merge = require('lodash.merge') | ||
7 … | + | ||
8 … | +module.exports = function (server) { | ||
9 … | + return function MyPollsStream (opts) { | ||
10 … | + const myKey = server.id | ||
11 … | + | ||
12 … | + const _opts = clone(opts) | ||
13 … | + const postsSeen = new Set() | ||
14 … | + | ||
15 … | + const pollStream = pull( | ||
16 … | + next(server.query.read, optsForType('poll')), | ||
17 … | + pull.filter(m => !postsSeen.has(m.key)), | ||
18 … | + pull.through(m => postsSeen.add(m.key)) | ||
19 … | + ) | ||
20 … | + | ||
21 … | + const positionStream = pull( | ||
22 … | + next(server.query.read, optsForType('position')), | ||
23 … | + pull.map(getRoot), | ||
24 … | + pull.filter(root => Boolean(root) && !postsSeen.has(root)), | ||
25 … | + pull.through(root => postsSeen.add(root)), | ||
26 … | + // pull.asyncMap((root, cb) => { | ||
27 … | + paramap((root, cb) => { | ||
28 … | + server.get(root, (err, value) => { | ||
29 … | + if (err) return console.err(err) | ||
30 … | + cb(null, { key: root, value }) | ||
31 … | + }) | ||
32 … | + }, 5) | ||
33 … | + ) | ||
34 … | + | ||
35 … | + return pullMerge( | ||
36 … | + pollStream, | ||
37 … | + positionStream, | ||
38 … | + Comparer(opts) | ||
39 … | + ) | ||
40 … | + | ||
41 … | + function optsForType (type) { | ||
42 … | + const defaultOpts = { | ||
43 … | + limit: 100, | ||
44 … | + query: [{ | ||
45 … | + $filter: { | ||
46 … | + value: { | ||
47 … | + author: myKey, | ||
48 … | + timestamp: { $gt: 0 }, | ||
49 … | + content: { type } | ||
50 … | + } | ||
51 … | + } | ||
52 … | + }] | ||
53 … | + } | ||
54 … | + | ||
55 … | + return merge({}, defaultOpts, _opts) | ||
56 … | + } | ||
57 … | + } | ||
58 … | +} | ||
59 … | + | ||
60 … | +function Comparer (opts) { | ||
61 … | + return (a, b) => { | ||
62 … | + if (opts.reverse) { | ||
63 … | + return a.value.timestamp > b.value.timestamp ? -1 : +1 | ||
64 … | + } else { | ||
65 … | + return a.value.timestamp < b.value.timestamp ? -1 : +1 | ||
66 … | + } | ||
67 … | + } | ||
68 … | +} | ||
69 … | + | ||
70 … | +function getRoot (position) { | ||
71 … | + return position.value.content.root | ||
72 … | +} |
test.js | ||
---|---|---|
@@ -1,0 +1,29 @@ | ||
1 … | +const pull = require('pull-stream') | |
2 … | +pull.merge = require('pull-merge') | |
3 … | +pull.pMap = require('pull-paramap') | |
4 … | + | |
5 … | +const fastStream = pull.values([1, 2, 3, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]) | |
6 … | +const slowStream = pull( | |
7 … | + pull.values([4, 5, 20, 21, 22, 23, 24, 25, 26, 27]), | |
8 … | + pull.through(n => console.log('pulling ', n)), | |
9 … | + pull.pMap(asyncThing, 2) | |
10 … | +) | |
11 … | + | |
12 … | +pull( | |
13 … | + pull.merge( | |
14 … | + slowStream, | |
15 … | + fastStream, | |
16 … | + comparer | |
17 … | + ), | |
18 … | + pull.asyncMap((n, cb) => setTimeout(() => cb(null, n), 500)), | |
19 … | + pull.log(() => console.log('DONE')) | |
20 … | +) | |
21 … | + | |
22 … | +function asyncThing (n, cb) { | |
23 … | + setTimeout(() => cb(null, n), 1000) // simulate some slow lookup | |
24 … | +} | |
25 … | + | |
26 … | +function comparer (a, b) { | |
27 … | + // console.log('comparing', a, b) | |
28 … | + return a < b ? -1 : 1 | |
29 … | +} |
Built with git-ssb-web