git ssb

0+

Piet / ssb-loomio



Tree: c56ea9814a318dd9a4f26fcf109a217e2d13658b

Files: c56ea9814a318dd9a4f26fcf109a217e2d13658b / poll / pull / mine.js

1764 bytesRaw
1const pull = require('pull-stream')
2const pullMerge = require('pull-merge')
3const paramap = require('pull-paramap')
4const next = require('pull-next-query')
5const clone = require('lodash.clonedeep')
6const merge = require('lodash.merge')
7
8module.exports = function (server) {
9 return function MyPollsStream (opts) {
10 const myKey = server.id
11
12 const _opts = clone(opts)
13 const postsSeen = new Set()
14
15 const pollStream = pull(
16 next(server.query.read, optsForType('poll')),
17 pull.filter(m => !postsSeen.has(m.key)),
18 pull.through(m => postsSeen.add(m.key))
19 )
20
21 const positionStream = pull(
22 next(server.query.read, optsForType('position')),
23 pull.map(getRoot),
24 pull.filter(root => Boolean(root) && !postsSeen.has(root)),
25 pull.through(root => postsSeen.add(root)),
26 // pull.asyncMap((root, cb) => {
27 paramap((root, cb) => {
28 server.get(root, (err, value) => {
29 if (err) return console.err(err)
30 cb(null, { key: root, value })
31 })
32 }, 5)
33 )
34
35 return pullMerge(
36 pollStream,
37 positionStream,
38 Comparer(opts)
39 )
40
41 function optsForType (type) {
42 const defaultOpts = {
43 limit: 100,
44 query: [{
45 $filter: {
46 value: {
47 author: myKey,
48 timestamp: { $gt: 0 },
49 content: { type }
50 }
51 }
52 }]
53 }
54
55 return merge({}, defaultOpts, _opts)
56 }
57 }
58}
59
60function Comparer (opts) {
61 return (a, b) => {
62 if (opts.reverse) {
63 return a.value.timestamp > b.value.timestamp ? -1 : +1
64 } else {
65 return a.value.timestamp < b.value.timestamp ? -1 : +1
66 }
67 }
68}
69
70function getRoot (position) {
71 return position.value.content.root
72}
73

Built with git-ssb-web