Commit ef0e10c810ad27d20f17c2d63252757f3d0f4cb9
update index to include timestamps
mix irving committed on 2/15/2018, 2:52:29 AMParent: 8c7fdd33cde838895b5c0ab9073e759d1a631da8
Files changed
index.js | changed |
package-lock.json | changed |
index.js | ||
---|---|---|
@@ -1,74 +1,78 @@ | ||
1 | 1 … | const FlumeView = require('flumeview-reduce') |
2 | 2 … | const get = require('lodash/get') |
3 | 3 … | const set = require('lodash/set') |
4 | 4 … | |
5 | -const FLUME_VIEW_VERSION = 1.0 | |
5 … | +const FLUME_VIEW_VERSION = 1.1 | |
6 | 6 … | |
7 | 7 … | module.exports = { |
8 | 8 … | name: 'channel', |
9 | 9 … | version: require('./package.json').version, |
10 | 10 … | manifest: { |
11 | 11 … | get: 'async', |
12 | 12 … | stream: 'source', |
13 | - subscriptions: 'async' | |
13 … | + subscriptions: 'async', | |
14 … | + reduce: 'sync' | |
14 | 15 … | }, |
15 | 16 … | init: (server, config) => { |
16 | 17 … | // console.log('///// CHANNELS plugin loaded /////') |
17 | 18 … | |
18 | 19 … | const view = server._flumeUse( |
19 | - 'channels', | |
20 … | + 'channel', | |
20 | 21 … | FlumeView(FLUME_VIEW_VERSION, reduce, map, null, initialState()) |
21 | 22 … | ) |
22 | 23 … | |
23 | 24 … | return { |
24 | 25 … | get: view.get, |
25 | 26 … | subscriptions: view.get, |
26 | 27 … | stream: view.stream, |
28 … | + reduce | |
27 | 29 … | } |
28 | 30 … | } |
29 | 31 … | } |
30 | 32 … | |
31 | 33 … | function initialState() { |
32 | 34 … | return {} |
33 | 35 … | } |
34 | 36 … | |
35 | - | |
36 | 37 … | function map(msg) { |
37 | 38 … | if (get(msg, 'value.content.type') !== 'channel') return null |
38 | 39 … | |
39 | 40 … | const author = msg.value.author |
40 | - const channel = get(msg, 'value.content.channel') | |
41 … | + const timestamp = msg.value.timestamp | |
42 … | + const channel = get(msg, 'value.content.channel', '').replace(/^#/, '') | |
41 | 43 … | const subscribed = get(msg, 'value.content.subscribed') |
42 | 44 … | |
43 | 45 … | if (typeof channel === undefined || typeof subscribed === undefined) { |
44 | 46 … | console.log('Malformed channel subscription', msg) |
45 | 47 … | return null |
46 | 48 … | } |
47 | 49 … | |
48 | 50 … | return { |
51 … | + author, | |
52 … | + timestamp, | |
49 | 53 … | channel, |
50 | - author, | |
51 | 54 … | subscribed |
52 | 55 … | } |
53 | 56 … | } |
54 | 57 … | |
55 | -function reduce(soFar, newSub) { | |
58 … | +function reduce (soFar, newSub) { | |
56 | 59 … | process.stdout.write('c') |
57 | - const { channel, author, subscribed } = newSub | |
60 … | + const { author, timestamp, channel, subscribed } = newSub | |
58 | 61 … | |
59 | - let channelSubs = get(soFar, [channel], []) | |
62 … | + var channelSubs = get(soFar, [channel], []) | |
63 … | + var current = channelSubs.find(entry => entry[0] === author) | |
60 | 64 … | |
61 | - channelSubs = new Set(channelSubs) | |
65 … | + // if current recorded statement was more recent than this 'newSub', ignore newSub | |
66 … | + if (current && current[1] > timestamp) return soFar | |
62 | 67 … | |
63 | - if (subscribed) { | |
64 | - channelSubs.add(author) | |
65 | - } else { | |
66 | - channelSubs.delete(author) | |
67 | - } | |
68 … | + // remove all subs entries for this author | |
69 … | + channelSubs = channelSubs.filter(entry => entry[0] !== author) | |
68 | 70 … | |
69 | - soFar[channel] = [...channelSubs] | |
71 … | + if (subscribed) channelSubs.push([author, Number(new Date())]) | |
70 | 72 … | |
73 … | + soFar[channel] = channelSubs | |
74 … | + | |
71 | 75 … | return soFar |
72 | 76 … | } |
73 | 77 … | |
74 | 78 … | // state: |
Built with git-ssb-web