git ssb

0+

alanz / patchwork



forked from Matt McKegg / patchwork

Tree: 0c32ba4ecf1c98ab797d92daa8ee0bfa0f1f4fa5

Files: 0c32ba4ecf1c98ab797d92daa8ee0bfa0f1f4fa5 / modules / feed / pull / summary.js

7859 bytesRaw
1var pull = require('pull-stream')
2var pullDefer = require('pull-defer')
3var pullNext = require('pull-next')
4var SortedArray = require('sorted-array-functions')
5var nest = require('depnest')
6var ref = require('ssb-ref')
7var sustained = require('../../../lib/sustained')
8var Value = require('mutant/value')
9
10exports.gives = nest({
11 'feed.pull': [ 'summary' ]
12})
13
14exports.create = function () {
15 return nest({
16 'feed.pull': { summary }
17 })
18}
19
20function summary (source, opts, cb) {
21 var bumpFilter = opts && opts.bumpFilter
22 var windowSize = opts && opts.windowSize || 1000
23 var prioritized = opts && opts.prioritized || {}
24 var getSequence = opts && opts.getSequence
25
26 var loading = Value(true)
27
28 var last = null
29 var returned = false
30 var done = false
31
32 var result = pullNext(() => {
33 if (!done) {
34 loading.set(true)
35 var next = {reverse: true, limit: windowSize, live: false}
36 if (last) {
37 next.lt = typeof getSequence === 'function' ? getSequence(last) : last.timestamp
38 }
39 var deferred = pullDefer.source()
40 pull(
41 source(next),
42 pull.collect((err, values) => {
43 loading.set(false)
44 if (err) throw err
45 if (!values.length) {
46 done = true
47 deferred.resolve(pull.values([]))
48 if (!returned) cb && cb()
49 returned = true
50 } else {
51 var fromTime = last && last.timestamp || Date.now()
52 last = values[values.length - 1]
53 groupMessages(values, fromTime, {bumpFilter, prioritized}, (err, result) => {
54 if (err) throw err
55 deferred.resolve(
56 pull.values(result)
57 )
58 if (!returned) cb && cb()
59 returned = true
60 })
61 }
62 })
63 )
64 }
65 return deferred
66 })
67
68 // switch to loading state immediately, only revert after no loading for > 200 ms
69 result.loading = sustained(loading, 500, x => x)
70
71 return result
72}
73
74function groupMessages (messages, fromTime, opts, cb) {
75 var subscribes = {}
76 var follows = {}
77 var messageUpdates = {}
78 reverseForEach(messages, function (msg) {
79 if (!msg.value) return
80 var c = msg.value.content
81 if (c.type === 'contact') {
82 updateContact(msg, follows, opts)
83 } else if (c.type === 'channel') {
84 updateChannel(msg, subscribes, opts)
85 } else if (c.type === 'vote') {
86 if (c.vote && c.vote.link) {
87 // only show likes of posts added in the current window
88 // and only for the main post
89 const group = messageUpdates[c.vote.link]
90 if (group) {
91 if (c.vote.value > 0) {
92 group.likes.add(msg.value.author)
93 group.relatedMessages.push(msg)
94 } else {
95 group.likes.delete(msg.value.author)
96 group.relatedMessages.push(msg)
97 }
98 }
99 }
100 } else {
101 if (c.root) {
102 const group = ensureMessage(c.root, messageUpdates)
103 group.fromTime = fromTime
104 group.repliesFrom.add(msg.value.author)
105 SortedArray.add(group.replies, msg, compareUserTimestamp)
106 group.channel = group.channel || msg.value.content.channel
107 group.relatedMessages.push(msg)
108 } else {
109 const group = ensureMessage(msg.key, messageUpdates)
110 group.fromTime = fromTime
111 group.lastUpdateType = 'post'
112 group.updated = msg.timestamp || msg.value.sequence
113 group.author = msg.value.author
114 group.channel = msg.value.content.channel
115 group.message = msg
116 group.boxed = typeof msg.value.content === 'string'
117 }
118 }
119 }, () => {
120 var result = []
121 Object.keys(follows).forEach((key) => {
122 bumpIfNeeded(follows[key], opts)
123 if (follows[key].updated) {
124 SortedArray.add(result, follows[key], compareUpdated)
125 }
126 })
127 Object.keys(subscribes).forEach((key) => {
128 bumpIfNeeded(subscribes[key], opts)
129 if (subscribes[key].updated) {
130 SortedArray.add(result, subscribes[key], compareUpdated)
131 }
132 })
133 Object.keys(messageUpdates).forEach((key) => {
134 bumpIfNeeded(messageUpdates[key], opts)
135 if (messageUpdates[key].updated) {
136 SortedArray.add(result, messageUpdates[key], compareUpdated)
137 }
138 })
139 cb(null, result)
140 })
141}
142
143function bumpIfNeeded (group, {bumpFilter, prioritized}) {
144 group.relatedMessages.forEach(msg => {
145 if (prioritized[msg.key] && group.priority < prioritized[msg.key]) {
146 group.priority = prioritized[msg.key]
147 }
148
149 var shouldBump = !bumpFilter || bumpFilter(msg, group)
150
151 // only bump when filter passes
152 var newUpdated = msg.timestamp || msg.value.sequence
153 if (!group.updated || (shouldBump && newUpdated > group.updated)) {
154 group.updated = newUpdated
155 if (msg.value.content.type === 'vote') {
156 if (group.likes.size) {
157 group.lastUpdateType = 'like'
158 } else if (group.repliesFrom.size) {
159 group.lastUpdateType = 'reply'
160 } else if (group.message) {
161 group.lastUpdateType = 'post'
162 }
163 }
164
165 if (msg.value.content.type === 'post') {
166 if (msg.value.content.root) {
167 group.lastUpdateType = 'reply'
168 } else {
169 group.lastUpdateType = 'post'
170 }
171 }
172 }
173 })
174}
175
176function compareUpdated (a, b) {
177 // highest priority first
178 // then most recent date
179 return b.priority - a.priority || b.updated - a.updated
180}
181
182function reverseForEach (items, fn, cb) {
183 var i = items.length - 1
184 nextBatch()
185
186 function nextBatch () {
187 var start = Date.now()
188 while (i >= 0) {
189 fn(items[i], i)
190 i -= 1
191 if (Date.now() - start > 10) break
192 }
193
194 if (i > 0) {
195 setImmediate(nextBatch)
196 } else {
197 cb && cb()
198 }
199 }
200}
201
202function updateContact (msg, groups, opts) {
203 var c = msg.value.content
204 var id = msg.value.author
205 var group = groups[id]
206 if (ref.isFeed(c.contact)) {
207 if (c.following) {
208 if (!group) {
209 group = groups[id] = {
210 type: 'follow',
211 priority: 0,
212 relatedMessages: [],
213 lastUpdateType: null,
214 contacts: new Set(),
215 updated: 0,
216 author: id,
217 id: id
218 }
219 }
220 group.contacts.add(c.contact)
221 group.relatedMessages.push(msg)
222 } else {
223 if (group) {
224 group.contacts.delete(c.contact)
225 if (!group.contacts.size) {
226 delete groups[id]
227 }
228 }
229 }
230 }
231}
232
233function updateChannel (msg, groups, opts) {
234 var c = msg.value.content
235 var channel = c.channel
236 var group = groups[channel]
237 if (typeof channel === 'string') {
238 if (c.subscribed) {
239 if (!group) {
240 group = groups[channel] = {
241 type: 'subscribe',
242 priority: 0,
243 relatedMessages: [],
244 lastUpdateType: null,
245 subscribers: new Set(),
246 updated: 0,
247 channel
248 }
249 }
250 group.subscribers.add(msg.value.author)
251 group.relatedMessages.push(msg)
252 } else {
253 if (group) {
254 group.subscribers.delete(msg.value.author)
255 if (!group.subscribers.size) {
256 delete groups[channel]
257 }
258 }
259 }
260 }
261}
262
263function ensureMessage (id, groups) {
264 var group = groups[id]
265 if (!group) {
266 group = groups[id] = {
267 type: 'message',
268 priority: 0,
269 repliesFrom: new Set(),
270 relatedMessages: [],
271 replies: [],
272 message: null,
273 messageId: id,
274 likes: new Set(),
275 updated: 0
276 }
277 }
278 return group
279}
280
281function compareUserTimestamp (a, b) {
282 var isClose = !a.timestamp || !b.timestamp || Math.abs(a.timestamp - b.timestamp) < (10 * 60e3)
283 if (isClose) {
284 // recieved close together, use provided timestamps
285 return a.value.timestamp - b.value.timestamp
286 } else {
287 return a.timestamp - b.timestamp
288 }
289}
290

Built with git-ssb-web