Files: 8a1eb0ef17ecc818851e316893ea845d9442655f / modules / feed / pull / summary.js
6559 bytesRaw
1 | var pull = require('pull-stream') |
2 | var pullDefer = require('pull-defer') |
3 | var pullNext = require('pull-next') |
4 | var SortedArray = require('sorted-array-functions') |
5 | var nest = require('depnest') |
6 | var ref = require('ssb-ref') |
7 | |
8 | exports.gives = nest({ |
9 | 'feed.pull': [ 'summary' ] |
10 | }) |
11 | |
12 | exports.create = function () { |
13 | return nest({ |
14 | 'feed.pull': { summary } |
15 | }) |
16 | } |
17 | |
18 | function summary (source, opts, cb) { |
19 | var bumpFilter = opts && opts.bumpFilter |
20 | var windowSize = opts && opts.windowSize || 1000 |
21 | var last = null |
22 | var returned = false |
23 | var done = false |
24 | return pullNext(() => { |
25 | if (!done) { |
26 | var next = {reverse: true, limit: windowSize, live: false} |
27 | if (last) { |
28 | next.lt = last.timestamp || last.value.sequence |
29 | } |
30 | var deferred = pullDefer.source() |
31 | pull( |
32 | source(next), |
33 | pull.collect((err, values) => { |
34 | if (err) throw err |
35 | if (!values.length) { |
36 | done = true |
37 | deferred.resolve(pull.values([])) |
38 | if (!returned) cb && cb() |
39 | returned = true |
40 | } else { |
41 | var fromTime = last && last.timestamp || Date.now() |
42 | last = values[values.length - 1] |
43 | groupMessages(values, fromTime, bumpFilter, (err, result) => { |
44 | if (err) throw err |
45 | deferred.resolve( |
46 | pull.values(result) |
47 | ) |
48 | if (!returned) cb && cb() |
49 | returned = true |
50 | }) |
51 | } |
52 | }) |
53 | ) |
54 | } |
55 | return deferred |
56 | }) |
57 | } |
58 | |
59 | function groupMessages (messages, fromTime, bumpFilter, cb) { |
60 | var subscribes = {} |
61 | var follows = {} |
62 | var messageUpdates = {} |
63 | reverseForEach(messages, function (msg) { |
64 | if (!msg.value) return |
65 | var c = msg.value.content |
66 | if (c.type === 'contact') { |
67 | updateContact(msg, follows) |
68 | } else if (c.type === 'channel') { |
69 | updateChannel(msg, subscribes) |
70 | } else if (c.type === 'vote') { |
71 | if (c.vote && c.vote.link) { |
72 | // only show likes of posts added in the current window |
73 | // and only for the main post |
74 | const group = messageUpdates[c.vote.link] |
75 | if (group) { |
76 | if (c.vote.value > 0) { |
77 | group.lastUpdateType = 'like' |
78 | group.likes.add(msg.value.author) |
79 | bumpIfNeeded(group, msg, bumpFilter) |
80 | } else { |
81 | group.likes.delete(msg.value.author) |
82 | if (group.lastUpdateType === 'like' && !group.likes.size && !group.replies.length) { |
83 | group.lastUpdateType = 'reply' |
84 | } |
85 | } |
86 | } |
87 | } |
88 | } else { |
89 | if (c.root || (c.type === 'git-update' && c.repo)) { |
90 | const group = ensureMessage(c.root || c.repo, messageUpdates) |
91 | group.fromTime = fromTime |
92 | group.lastUpdateType = 'reply' |
93 | group.repliesFrom.add(msg.value.author) |
94 | SortedArray.add(group.replies, msg, compareUserTimestamp) |
95 | group.channel = group.channel || msg.value.content.channel |
96 | bumpIfNeeded(group, msg, bumpFilter) |
97 | } else { |
98 | const group = ensureMessage(msg.key, messageUpdates) |
99 | group.fromTime = fromTime |
100 | group.lastUpdateType = 'post' |
101 | group.updated = msg.timestamp || msg.value.sequence |
102 | group.author = msg.value.author |
103 | group.channel = msg.value.content.channel |
104 | group.message = msg |
105 | group.boxed = typeof msg.value.content === 'string' |
106 | } |
107 | } |
108 | }, () => { |
109 | var result = [] |
110 | Object.keys(follows).forEach((key) => { |
111 | if (follows[key].updated) { |
112 | SortedArray.add(result, follows[key], compareUpdated) |
113 | } |
114 | }) |
115 | Object.keys(subscribes).forEach((key) => { |
116 | if (subscribes[key].updated) { |
117 | SortedArray.add(result, subscribes[key], compareUpdated) |
118 | } |
119 | }) |
120 | Object.keys(messageUpdates).forEach((key) => { |
121 | if (messageUpdates[key].updated) { |
122 | SortedArray.add(result, messageUpdates[key], compareUpdated) |
123 | } |
124 | }) |
125 | cb(null, result) |
126 | }) |
127 | } |
128 | |
129 | function bumpIfNeeded (group, msg, bumpFilter) { |
130 | // only bump when filter passes |
131 | var newUpdated = msg.timestamp || msg.value.sequence |
132 | if (!group.updated || ((!bumpFilter || bumpFilter(msg, group)) && newUpdated > group.updated)) { |
133 | group.updated = newUpdated |
134 | } |
135 | } |
136 | |
137 | function compareUpdated (a, b) { |
138 | return b.updated - a.updated |
139 | } |
140 | |
141 | function reverseForEach (items, fn, cb) { |
142 | var i = items.length - 1 |
143 | nextBatch() |
144 | |
145 | function nextBatch () { |
146 | var start = Date.now() |
147 | while (i >= 0) { |
148 | fn(items[i], i) |
149 | i -= 1 |
150 | if (Date.now() - start > 10) break |
151 | } |
152 | |
153 | if (i > 0) { |
154 | setImmediate(nextBatch) |
155 | } else { |
156 | cb && cb() |
157 | } |
158 | } |
159 | } |
160 | |
161 | function updateContact (msg, groups) { |
162 | var c = msg.value.content |
163 | var id = msg.value.author |
164 | var group = groups[id] |
165 | if (ref.isFeed(c.contact)) { |
166 | if (c.following) { |
167 | if (!group) { |
168 | group = groups[id] = { |
169 | type: 'follow', |
170 | lastUpdateType: null, |
171 | contacts: new Set(), |
172 | updated: 0, |
173 | author: id, |
174 | id: id |
175 | } |
176 | } |
177 | group.contacts.add(c.contact) |
178 | group.updated = msg.timestamp || msg.value.sequence |
179 | } else { |
180 | if (group) { |
181 | group.contacts.delete(c.contact) |
182 | if (!group.contacts.size) { |
183 | delete groups[id] |
184 | } |
185 | } |
186 | } |
187 | } |
188 | } |
189 | |
190 | function updateChannel (msg, groups) { |
191 | var c = msg.value.content |
192 | var id = msg.value.author |
193 | var group = groups[id] |
194 | if (typeof c.channel === 'string') { |
195 | if (c.subscribed) { |
196 | if (!group) { |
197 | group = groups[id] = { |
198 | type: 'subscribe', |
199 | lastUpdateType: null, |
200 | channels: new Set(), |
201 | updated: 0, |
202 | author: id, |
203 | id: id |
204 | } |
205 | } |
206 | group.channels.add(c.channel) |
207 | group.updated = msg.timestamp || msg.value.sequence |
208 | } else { |
209 | if (group) { |
210 | group.channels.delete(c.channel) |
211 | if (!group.channels.size) { |
212 | delete groups[id] |
213 | } |
214 | } |
215 | } |
216 | } |
217 | } |
218 | |
219 | function ensureMessage (id, groups) { |
220 | var group = groups[id] |
221 | if (!group) { |
222 | group = groups[id] = { |
223 | type: 'message', |
224 | repliesFrom: new Set(), |
225 | replies: [], |
226 | message: null, |
227 | messageId: id, |
228 | likes: new Set(), |
229 | updated: 0 |
230 | } |
231 | } |
232 | return group |
233 | } |
234 | |
235 | function compareUserTimestamp (a, b) { |
236 | var isClose = !a.timestamp || !b.timestamp || Math.abs(a.timestamp - b.timestamp) < (10 * 60e3) |
237 | if (isClose) { |
238 | // recieved close together, use provided timestamps |
239 | return a.value.timestamp - b.value.timestamp |
240 | } else { |
241 | return a.timestamp - b.timestamp |
242 | } |
243 | } |
244 |
Built with git-ssb-web