Files: 42ca26e1634f856a891a9af0e592511ac7c54dba / sbot / subscriptions.js
1712 bytesRaw
1 | var FlumeReduce = require('flumeview-reduce') |
2 | var normalizeChannel = require('ssb-ref').normalizeChannel |
3 | var FlatMap = require('pull-flatmap') |
4 | var pull = require('pull-stream') |
5 | |
6 | module.exports = function (ssb, config) { |
7 | var index = ssb._flumeUse('patchwork-subscriptions', FlumeReduce(3, reduce, map)) |
8 | return { |
9 | stream: function (opts) { |
10 | var channel = normalizeChannel(opts.channel) |
11 | return pull( |
12 | index.stream({live: opts.live}), |
13 | FlatMap(items => { |
14 | var result = [] |
15 | |
16 | if (items) { |
17 | Object.keys(items).forEach(key => { |
18 | var parts = getParts(key) |
19 | if (parts && (!channel || parts[1] === channel)) { |
20 | result.push({from: parts[0], to: parts[1], value: items[key][1], ts: items[key][0]}) |
21 | } |
22 | }) |
23 | } |
24 | |
25 | return result |
26 | }) |
27 | ) |
28 | }, |
29 | get: index.get |
30 | } |
31 | } |
32 | |
33 | function reduce (result, item) { |
34 | if (!result) result = {} |
35 | if (item) { |
36 | for (var key in item) { |
37 | if (!result[key] || result[key][0] < item[key][0]) { |
38 | result[key] = item[key] |
39 | } |
40 | } |
41 | } |
42 | return result |
43 | } |
44 | |
45 | function getParts (value) { |
46 | var splitIndex = value.indexOf(':') |
47 | if (splitIndex > 50) { // HACK: yup |
48 | return [value.slice(0, splitIndex), value.slice(splitIndex + 1)] |
49 | } |
50 | } |
51 | |
52 | function map (msg) { |
53 | if (msg.value.content && msg.value.content.type === 'channel') { |
54 | if (typeof msg.value.content.subscribed === 'boolean') { |
55 | var channel = normalizeChannel(msg.value.content.channel) |
56 | if (channel) { |
57 | var key = `${msg.value.author}:${channel}` |
58 | return { |
59 | [key]: [msg.timestamp, msg.value.content.subscribed] |
60 | } |
61 | } |
62 | } |
63 | } |
64 | } |
65 |
Built with git-ssb-web