Files: 36cd0eeba48657072d1b4544e72acf7e5eead61f / lib / feed-summary.js
5307 bytesRaw
1 | var pull = require('pull-stream') |
2 | var pullPushable = require('pull-pushable') |
3 | var pullNext = require('pull-next') |
4 | var SortedArray = require('sorted-array-functions') |
5 | |
6 | module.exports = FeedSummary |
7 | |
8 | function FeedSummary (source, opts, cb) { |
9 | var bumpFilter = opts && opts.bumpFilter |
10 | var windowSize = opts && opts.windowSize || 1000 |
11 | var last = null |
12 | var returned = false |
13 | var done = false |
14 | return pullNext(() => { |
15 | if (!done) { |
16 | var next = {reverse: true, limit: windowSize, live: false} |
17 | if (last) { |
18 | next.lt = last.timestamp || last.value.sequence |
19 | } |
20 | var pushable = pullPushable() |
21 | pull( |
22 | source(next), |
23 | pull.collect((err, values) => { |
24 | if (err) throw err |
25 | if (!values.length) { |
26 | done = true |
27 | pushable.end() |
28 | if (!returned) cb && cb() |
29 | returned = true |
30 | } else { |
31 | var fromTime = last && last.timestamp || Date.now() |
32 | last = values[values.length - 1] |
33 | groupMessages(values, fromTime, bumpFilter, (err, result) => { |
34 | if (err) throw err |
35 | result.forEach(v => pushable.push(v)) |
36 | pushable.end() |
37 | if (!returned) cb && cb() |
38 | returned = true |
39 | }) |
40 | } |
41 | }) |
42 | ) |
43 | } |
44 | return pushable |
45 | }) |
46 | } |
47 | |
48 | function groupMessages (messages, fromTime, bumpFilter, cb) { |
49 | var follows = {} |
50 | var messageUpdates = {} |
51 | reverseForEach(messages, function (msg) { |
52 | if (!msg.value) return |
53 | var c = msg.value.content |
54 | if (c.type === 'contact') { |
55 | updateContact(msg, follows) |
56 | } else if (c.type === 'vote') { |
57 | if (c.vote && c.vote.link) { |
58 | // only show digs of posts added in the current window |
59 | // and only for the main post |
60 | const group = messageUpdates[c.vote.link] |
61 | if (group) { |
62 | if (c.vote.value > 0) { |
63 | group.lastUpdateType = 'dig' |
64 | group.digs.add(msg.value.author) |
65 | // only bump when filter passes |
66 | if (!bumpFilter || bumpFilter(msg, group)) { |
67 | group.updated = msg.timestamp |
68 | } |
69 | } else { |
70 | group.digs.delete(msg.value.author) |
71 | if (group.lastUpdateType === 'dig' && !group.digs.size && !group.replies.length) { |
72 | group.lastUpdateType = 'reply' |
73 | } |
74 | } |
75 | } |
76 | } |
77 | } else { |
78 | if (c.root) { |
79 | const group = ensureMessage(c.root, messageUpdates) |
80 | group.fromTime = fromTime |
81 | group.lastUpdateType = 'reply' |
82 | group.repliesFrom.add(msg.value.author) |
83 | SortedArray.add(group.replies, msg, compareUserTimestamp) |
84 | //group.replies.push(msg) |
85 | group.channel = group.channel || msg.value.content.channel |
86 | |
87 | // only bump when filter passes |
88 | if (!bumpFilter || bumpFilter(msg, group)) { |
89 | group.updated = msg.timestamp || msg.value.sequence |
90 | } |
91 | } else { |
92 | const group = ensureMessage(msg.key, messageUpdates) |
93 | group.fromTime = fromTime |
94 | group.lastUpdateType = 'post' |
95 | group.updated = msg.timestamp || msg.value.sequence |
96 | group.author = msg.value.author |
97 | group.channel = msg.value.content.channel |
98 | group.message = msg |
99 | } |
100 | } |
101 | }, () => { |
102 | var result = [] |
103 | Object.keys(follows).forEach((key) => { |
104 | if (follows[key].updated) { |
105 | SortedArray.add(result, follows[key], compareUpdated) |
106 | } |
107 | }) |
108 | Object.keys(messageUpdates).forEach((key) => { |
109 | if (messageUpdates[key].updated) { |
110 | SortedArray.add(result, messageUpdates[key], compareUpdated) |
111 | } |
112 | }) |
113 | cb(null, result) |
114 | }) |
115 | } |
116 | |
117 | function compareUpdated (a, b) { |
118 | return b.updated - a.updated |
119 | } |
120 | |
121 | function reverseForEach (items, fn, cb) { |
122 | var i = items.length - 1 |
123 | nextBatch() |
124 | |
125 | function nextBatch () { |
126 | var start = Date.now() |
127 | while (i >= 0) { |
128 | fn(items[i], i) |
129 | i -= 1 |
130 | if (Date.now() - start > 10) break |
131 | } |
132 | |
133 | if (i > 0) { |
134 | setImmediate(nextBatch) |
135 | } else { |
136 | cb && cb() |
137 | } |
138 | } |
139 | } |
140 | |
141 | function updateContact (msg, groups) { |
142 | var c = msg.value.content |
143 | var id = msg.value.author |
144 | var group = groups[id] |
145 | if (c.contact) { |
146 | if (c.following) { |
147 | if (!group) { |
148 | group = groups[id] = { |
149 | type: 'follow', |
150 | lastUpdateType: null, |
151 | contacts: new Set(), |
152 | updated: 0, |
153 | author: id, |
154 | id: id |
155 | } |
156 | } |
157 | group.contacts.add(c.contact) |
158 | group.updated = msg.timestamp || msg.value.sequence |
159 | } else { |
160 | if (group) { |
161 | group.contacts.delete(c.contact) |
162 | if (!group.contacts.size) { |
163 | delete groups[id] |
164 | } |
165 | } |
166 | } |
167 | } |
168 | } |
169 | |
170 | function ensureMessage (id, groups) { |
171 | var group = groups[id] |
172 | if (!group) { |
173 | group = groups[id] = { |
174 | type: 'message', |
175 | repliesFrom: new Set(), |
176 | replies: [], |
177 | message: null, |
178 | messageId: id, |
179 | digs: new Set(), |
180 | updated: 0 |
181 | } |
182 | } |
183 | return group |
184 | } |
185 | |
186 | function compareUserTimestamp (a, b) { |
187 | var isClose = !a.timestamp || !b.timestamp || Math.abs(a.timestamp - b.timestamp) < (10 * 60e3) |
188 | if (isClose) { |
189 | // recieved close together, use provided timestamps |
190 | return a.value.timestamp - b.value.timestamp |
191 | } else { |
192 | return a.timestamp - b.timestamp |
193 | } |
194 | } |
195 |
Built with git-ssb-web