Files: c38545a2f64b06e7150cd79d548831b8b753c405 / modules / feed / pull / summary.js
7859 bytesRaw
1 | var pull = require('pull-stream') |
2 | var pullDefer = require('pull-defer') |
3 | var pullNext = require('pull-next') |
4 | var SortedArray = require('sorted-array-functions') |
5 | var nest = require('depnest') |
6 | var ref = require('ssb-ref') |
7 | var sustained = require('../../../lib/sustained') |
8 | var Value = require('mutant/value') |
9 | |
10 | exports.gives = nest({ |
11 | 'feed.pull': [ 'summary' ] |
12 | }) |
13 | |
14 | exports.create = function () { |
15 | return nest({ |
16 | 'feed.pull': { summary } |
17 | }) |
18 | } |
19 | |
20 | function summary (source, opts, cb) { |
21 | var bumpFilter = opts && opts.bumpFilter |
22 | var windowSize = opts && opts.windowSize || 1000 |
23 | var prioritized = opts && opts.prioritized || {} |
24 | var getSequence = opts && opts.getSequence |
25 | |
26 | var loading = Value(true) |
27 | |
28 | var last = null |
29 | var returned = false |
30 | var done = false |
31 | |
32 | var result = pullNext(() => { |
33 | if (!done) { |
34 | loading.set(true) |
35 | var next = {reverse: true, limit: windowSize, live: false} |
36 | if (last) { |
37 | next.lt = typeof getSequence === 'function' ? getSequence(last) : last.timestamp |
38 | } |
39 | var deferred = pullDefer.source() |
40 | pull( |
41 | source(next), |
42 | pull.collect((err, values) => { |
43 | loading.set(false) |
44 | if (err) throw err |
45 | if (!values.length) { |
46 | done = true |
47 | deferred.resolve(pull.values([])) |
48 | if (!returned) cb && cb() |
49 | returned = true |
50 | } else { |
51 | var fromTime = last && last.timestamp || Date.now() |
52 | last = values[values.length - 1] |
53 | groupMessages(values, fromTime, {bumpFilter, prioritized}, (err, result) => { |
54 | if (err) throw err |
55 | deferred.resolve( |
56 | pull.values(result) |
57 | ) |
58 | if (!returned) cb && cb() |
59 | returned = true |
60 | }) |
61 | } |
62 | }) |
63 | ) |
64 | } |
65 | return deferred |
66 | }) |
67 | |
68 | // switch to loading state immediately, only revert after no loading for > 200 ms |
69 | result.loading = sustained(loading, 500, x => x) |
70 | |
71 | return result |
72 | } |
73 | |
74 | function groupMessages (messages, fromTime, opts, cb) { |
75 | var subscribes = {} |
76 | var follows = {} |
77 | var messageUpdates = {} |
78 | reverseForEach(messages, function (msg) { |
79 | if (!msg.value) return |
80 | var c = msg.value.content |
81 | if (c.type === 'contact') { |
82 | updateContact(msg, follows, opts) |
83 | } else if (c.type === 'channel') { |
84 | updateChannel(msg, subscribes, opts) |
85 | } else if (c.type === 'vote') { |
86 | if (c.vote && c.vote.link) { |
87 | // only show likes of posts added in the current window |
88 | // and only for the main post |
89 | const group = messageUpdates[c.vote.link] |
90 | if (group) { |
91 | if (c.vote.value > 0) { |
92 | group.likes.add(msg.value.author) |
93 | group.relatedMessages.push(msg) |
94 | } else { |
95 | group.likes.delete(msg.value.author) |
96 | group.relatedMessages.push(msg) |
97 | } |
98 | } |
99 | } |
100 | } else { |
101 | if (c.root) { |
102 | const group = ensureMessage(c.root, messageUpdates) |
103 | group.fromTime = fromTime |
104 | group.repliesFrom.add(msg.value.author) |
105 | SortedArray.add(group.replies, msg, compareUserTimestamp) |
106 | group.channel = group.channel || msg.value.content.channel |
107 | group.relatedMessages.push(msg) |
108 | } else { |
109 | const group = ensureMessage(msg.key, messageUpdates) |
110 | group.fromTime = fromTime |
111 | group.lastUpdateType = 'post' |
112 | group.updated = msg.timestamp || msg.value.sequence |
113 | group.author = msg.value.author |
114 | group.channel = msg.value.content.channel |
115 | group.message = msg |
116 | group.boxed = typeof msg.value.content === 'string' |
117 | } |
118 | } |
119 | }, () => { |
120 | var result = [] |
121 | Object.keys(follows).forEach((key) => { |
122 | bumpIfNeeded(follows[key], opts) |
123 | if (follows[key].updated) { |
124 | SortedArray.add(result, follows[key], compareUpdated) |
125 | } |
126 | }) |
127 | Object.keys(subscribes).forEach((key) => { |
128 | bumpIfNeeded(subscribes[key], opts) |
129 | if (subscribes[key].updated) { |
130 | SortedArray.add(result, subscribes[key], compareUpdated) |
131 | } |
132 | }) |
133 | Object.keys(messageUpdates).forEach((key) => { |
134 | bumpIfNeeded(messageUpdates[key], opts) |
135 | if (messageUpdates[key].updated) { |
136 | SortedArray.add(result, messageUpdates[key], compareUpdated) |
137 | } |
138 | }) |
139 | cb(null, result) |
140 | }) |
141 | } |
142 | |
143 | function bumpIfNeeded (group, {bumpFilter, prioritized}) { |
144 | group.relatedMessages.forEach(msg => { |
145 | if (prioritized[msg.key] && group.priority < prioritized[msg.key]) { |
146 | group.priority = prioritized[msg.key] |
147 | } |
148 | |
149 | var shouldBump = !bumpFilter || bumpFilter(msg, group) |
150 | |
151 | // only bump when filter passes |
152 | var newUpdated = msg.timestamp || msg.value.sequence |
153 | if (!group.updated || (shouldBump && newUpdated > group.updated)) { |
154 | group.updated = newUpdated |
155 | if (msg.value.content.type === 'vote') { |
156 | if (group.likes.size) { |
157 | group.lastUpdateType = 'like' |
158 | } else if (group.repliesFrom.size) { |
159 | group.lastUpdateType = 'reply' |
160 | } else if (group.message) { |
161 | group.lastUpdateType = 'post' |
162 | } |
163 | } |
164 | |
165 | if (msg.value.content.type === 'post') { |
166 | if (msg.value.content.root) { |
167 | group.lastUpdateType = 'reply' |
168 | } else { |
169 | group.lastUpdateType = 'post' |
170 | } |
171 | } |
172 | } |
173 | }) |
174 | } |
175 | |
176 | function compareUpdated (a, b) { |
177 | // highest priority first |
178 | // then most recent date |
179 | return b.priority - a.priority || b.updated - a.updated |
180 | } |
181 | |
182 | function reverseForEach (items, fn, cb) { |
183 | var i = items.length - 1 |
184 | nextBatch() |
185 | |
186 | function nextBatch () { |
187 | var start = Date.now() |
188 | while (i >= 0) { |
189 | fn(items[i], i) |
190 | i -= 1 |
191 | if (Date.now() - start > 10) break |
192 | } |
193 | |
194 | if (i > 0) { |
195 | setImmediate(nextBatch) |
196 | } else { |
197 | cb && cb() |
198 | } |
199 | } |
200 | } |
201 | |
202 | function updateContact (msg, groups, opts) { |
203 | var c = msg.value.content |
204 | var id = msg.value.author |
205 | var group = groups[id] |
206 | if (ref.isFeed(c.contact)) { |
207 | if (c.following) { |
208 | if (!group) { |
209 | group = groups[id] = { |
210 | type: 'follow', |
211 | priority: 0, |
212 | relatedMessages: [], |
213 | lastUpdateType: null, |
214 | contacts: new Set(), |
215 | updated: 0, |
216 | author: id, |
217 | id: id |
218 | } |
219 | } |
220 | group.contacts.add(c.contact) |
221 | group.relatedMessages.push(msg) |
222 | } else { |
223 | if (group) { |
224 | group.contacts.delete(c.contact) |
225 | if (!group.contacts.size) { |
226 | delete groups[id] |
227 | } |
228 | } |
229 | } |
230 | } |
231 | } |
232 | |
233 | function updateChannel (msg, groups, opts) { |
234 | var c = msg.value.content |
235 | var channel = c.channel |
236 | var group = groups[channel] |
237 | if (typeof channel === 'string') { |
238 | if (c.subscribed) { |
239 | if (!group) { |
240 | group = groups[channel] = { |
241 | type: 'subscribe', |
242 | priority: 0, |
243 | relatedMessages: [], |
244 | lastUpdateType: null, |
245 | subscribers: new Set(), |
246 | updated: 0, |
247 | channel |
248 | } |
249 | } |
250 | group.subscribers.add(msg.value.author) |
251 | group.relatedMessages.push(msg) |
252 | } else { |
253 | if (group) { |
254 | group.subscribers.delete(msg.value.author) |
255 | if (!group.subscribers.size) { |
256 | delete groups[channel] |
257 | } |
258 | } |
259 | } |
260 | } |
261 | } |
262 | |
263 | function ensureMessage (id, groups) { |
264 | var group = groups[id] |
265 | if (!group) { |
266 | group = groups[id] = { |
267 | type: 'message', |
268 | priority: 0, |
269 | repliesFrom: new Set(), |
270 | relatedMessages: [], |
271 | replies: [], |
272 | message: null, |
273 | messageId: id, |
274 | likes: new Set(), |
275 | updated: 0 |
276 | } |
277 | } |
278 | return group |
279 | } |
280 | |
281 | function compareUserTimestamp (a, b) { |
282 | var isClose = !a.timestamp || !b.timestamp || Math.abs(a.timestamp - b.timestamp) < (10 * 60e3) |
283 | if (isClose) { |
284 | // recieved close together, use provided timestamps |
285 | return a.value.timestamp - b.value.timestamp |
286 | } else { |
287 | return a.timestamp - b.timestamp |
288 | } |
289 | } |
290 |
Built with git-ssb-web