Files: c56ea9814a318dd9a4f26fcf109a217e2d13658b / poll / pull / mine.js
1764 bytesRaw
1 | const pull = require('pull-stream') |
2 | const pullMerge = require('pull-merge') |
3 | const paramap = require('pull-paramap') |
4 | const next = require('pull-next-query') |
5 | const clone = require('lodash.clonedeep') |
6 | const merge = require('lodash.merge') |
7 | |
8 | module.exports = function (server) { |
9 | return function MyPollsStream (opts) { |
10 | const myKey = server.id |
11 | |
12 | const _opts = clone(opts) |
13 | const postsSeen = new Set() |
14 | |
15 | const pollStream = pull( |
16 | next(server.query.read, optsForType('poll')), |
17 | pull.filter(m => !postsSeen.has(m.key)), |
18 | pull.through(m => postsSeen.add(m.key)) |
19 | ) |
20 | |
21 | const positionStream = pull( |
22 | next(server.query.read, optsForType('position')), |
23 | pull.map(getRoot), |
24 | pull.filter(root => Boolean(root) && !postsSeen.has(root)), |
25 | pull.through(root => postsSeen.add(root)), |
26 | // pull.asyncMap((root, cb) => { |
27 | paramap((root, cb) => { |
28 | server.get(root, (err, value) => { |
29 | if (err) return console.err(err) |
30 | cb(null, { key: root, value }) |
31 | }) |
32 | }, 5) |
33 | ) |
34 | |
35 | return pullMerge( |
36 | pollStream, |
37 | positionStream, |
38 | Comparer(opts) |
39 | ) |
40 | |
41 | function optsForType (type) { |
42 | const defaultOpts = { |
43 | limit: 100, |
44 | query: [{ |
45 | $filter: { |
46 | value: { |
47 | author: myKey, |
48 | timestamp: { $gt: 0 }, |
49 | content: { type } |
50 | } |
51 | } |
52 | }] |
53 | } |
54 | |
55 | return merge({}, defaultOpts, _opts) |
56 | } |
57 | } |
58 | } |
59 | |
60 | function Comparer (opts) { |
61 | return (a, b) => { |
62 | if (opts.reverse) { |
63 | return a.value.timestamp > b.value.timestamp ? -1 : +1 |
64 | } else { |
65 | return a.value.timestamp < b.value.timestamp ? -1 : +1 |
66 | } |
67 | } |
68 | } |
69 | |
70 | function getRoot (position) { |
71 | return position.value.content.root |
72 | } |
73 |
Built with git-ssb-web