Files: e3335737af22bf6b4840a98dd264acc819735c37 / index.js
1832 bytesRaw
1 | const FlumeView = require('flumeview-reduce') |
2 | const get = require('lodash/get') |
3 | const set = require('lodash/set') |
4 | |
5 | const FLUME_VIEW_VERSION = 1.1 |
6 | |
7 | module.exports = { |
8 | name: 'channel', |
9 | version: require('./package.json').version, |
10 | manifest: { |
11 | get: 'async', |
12 | stream: 'source', |
13 | subscriptions: 'async', |
14 | reduce: 'sync' |
15 | }, |
16 | init: (server, config) => { |
17 | const view = server._flumeUse( |
18 | 'channel', |
19 | FlumeView(FLUME_VIEW_VERSION, reduce, map, null, initialState()) |
20 | ) |
21 | |
22 | return { |
23 | get: view.get, |
24 | subscriptions: view.get, |
25 | stream: view.stream, |
26 | reduce // TODO - remove this when flumeview offers something similar |
27 | } |
28 | } |
29 | } |
30 | |
31 | function initialState() { |
32 | return {} |
33 | } |
34 | |
35 | function map(msg) { |
36 | if (get(msg, 'value.content.type') !== 'channel') return null |
37 | |
38 | const author = msg.value.author |
39 | const timestamp = msg.value.timestamp |
40 | const channel = get(msg, 'value.content.channel', '').replace(/^#/, '') |
41 | const subscribed = get(msg, 'value.content.subscribed') |
42 | |
43 | if (typeof channel === undefined || typeof subscribed === undefined) { |
44 | console.log('Malformed channel subscription', msg) |
45 | return null |
46 | } |
47 | |
48 | return { |
49 | author, |
50 | timestamp, |
51 | channel, |
52 | subscribed |
53 | } |
54 | } |
55 | |
56 | function reduce (soFar, newSub) { |
57 | process.stdout.write('c') |
58 | const { author, timestamp, channel, subscribed } = newSub |
59 | |
60 | var channelSubs = get(soFar, [channel], []) |
61 | var current = channelSubs.find(entry => entry[0] === author) |
62 | |
63 | // if current recorded statement was more recent than this 'newSub', ignore newSub |
64 | if (current && current[1] > timestamp) return soFar |
65 | |
66 | // remove all subs entries for this author |
67 | channelSubs = channelSubs.filter(entry => entry[0] !== author) |
68 | |
69 | if (subscribed) channelSubs.push([author, Number(new Date())]) |
70 | |
71 | soFar[channel] = channelSubs |
72 | |
73 | return soFar |
74 | } |
75 | |
76 |
Built with git-ssb-web