git ssb

10+

Matt McKegg / patchwork



Tree: 7e11036395d46d3c6fbcc56c03eef30bbba196be

Files: 7e11036395d46d3c6fbcc56c03eef30bbba196be / modules / feed / pull / summary.js

7748 bytesRaw
1var pull = require('pull-stream')
2var pullDefer = require('pull-defer')
3var pullNext = require('pull-next')
4var SortedArray = require('sorted-array-functions')
5var nest = require('depnest')
6var ref = require('ssb-ref')
7var sustained = require('../../../lib/sustained')
8var Value = require('mutant/value')
9
10exports.gives = nest({
11 'feed.pull': [ 'summary' ]
12})
13
14exports.create = function () {
15 return nest({
16 'feed.pull': { summary }
17 })
18}
19
20function summary (source, opts, cb) {
21 var bumpFilter = opts && opts.bumpFilter
22 var windowSize = opts && opts.windowSize || 1000
23 var prioritized = opts && opts.prioritized || {}
24
25 var loading = Value(true)
26
27 var last = null
28 var returned = false
29 var done = false
30
31 var result = pullNext(() => {
32 if (!done) {
33 loading.set(true)
34 var next = {reverse: true, limit: windowSize, live: false}
35 if (last) {
36 next.lt = last
37 }
38 var deferred = pullDefer.source()
39 pull(
40 source(next),
41 pull.collect((err, values) => {
42 loading.set(false)
43 if (err) throw err
44 if (!values.length) {
45 done = true
46 deferred.resolve(pull.values([]))
47 if (!returned) cb && cb()
48 returned = true
49 } else {
50 var fromTime = last && last.timestamp || Date.now()
51 last = values[values.length - 1]
52 groupMessages(values, fromTime, {bumpFilter, prioritized}, (err, result) => {
53 if (err) throw err
54 deferred.resolve(
55 pull.values(result)
56 )
57 if (!returned) cb && cb()
58 returned = true
59 })
60 }
61 })
62 )
63 }
64 return deferred
65 })
66
67 // switch to loading state immediately, only revert after no loading for > 200 ms
68 result.loading = sustained(loading, 500, x => x)
69
70 return result
71}
72
73function groupMessages (messages, fromTime, opts, cb) {
74 var subscribes = {}
75 var follows = {}
76 var messageUpdates = {}
77 reverseForEach(messages, function (msg) {
78 if (!msg.value) return
79 var c = msg.value.content
80 if (c.type === 'contact') {
81 updateContact(msg, follows, opts)
82 } else if (c.type === 'channel') {
83 updateChannel(msg, subscribes, opts)
84 } else if (c.type === 'vote') {
85 if (c.vote && c.vote.link) {
86 // only show likes of posts added in the current window
87 // and only for the main post
88 const group = messageUpdates[c.vote.link]
89 if (group) {
90 if (c.vote.value > 0) {
91 group.likes.add(msg.value.author)
92 group.relatedMessages.push(msg)
93 } else {
94 group.likes.delete(msg.value.author)
95 group.relatedMessages.push(msg)
96 }
97 }
98 }
99 } else {
100 if (c.root) {
101 const group = ensureMessage(c.root, messageUpdates)
102 group.fromTime = fromTime
103 group.repliesFrom.add(msg.value.author)
104 SortedArray.add(group.replies, msg, compareUserTimestamp)
105 group.channel = group.channel || msg.value.content.channel
106 group.relatedMessages.push(msg)
107 } else {
108 const group = ensureMessage(msg.key, messageUpdates)
109 group.fromTime = fromTime
110 group.lastUpdateType = 'post'
111 group.updated = msg.timestamp || msg.value.sequence
112 group.author = msg.value.author
113 group.channel = msg.value.content.channel
114 group.message = msg
115 group.boxed = typeof msg.value.content === 'string'
116 }
117 }
118 }, () => {
119 var result = []
120 Object.keys(follows).forEach((key) => {
121 bumpIfNeeded(follows[key], opts)
122 if (follows[key].updated) {
123 SortedArray.add(result, follows[key], compareUpdated)
124 }
125 })
126 Object.keys(subscribes).forEach((key) => {
127 bumpIfNeeded(subscribes[key], opts)
128 if (subscribes[key].updated) {
129 SortedArray.add(result, subscribes[key], compareUpdated)
130 }
131 })
132 Object.keys(messageUpdates).forEach((key) => {
133 bumpIfNeeded(messageUpdates[key], opts)
134 if (messageUpdates[key].updated) {
135 SortedArray.add(result, messageUpdates[key], compareUpdated)
136 }
137 })
138 cb(null, result)
139 })
140}
141
142function bumpIfNeeded (group, {bumpFilter, prioritized}) {
143 group.relatedMessages.forEach(msg => {
144 if (prioritized[msg.key] && group.priority < prioritized[msg.key]) {
145 group.priority = prioritized[msg.key]
146 }
147
148 var shouldBump = !bumpFilter || bumpFilter(msg, group)
149
150 // only bump when filter passes
151 var newUpdated = msg.timestamp || msg.value.sequence
152 if (!group.updated || (shouldBump && newUpdated > group.updated)) {
153 group.updated = newUpdated
154 if (msg.value.content.type === 'vote') {
155 if (group.likes.size) {
156 group.lastUpdateType = 'like'
157 } else if (group.repliesFrom.size) {
158 group.lastUpdateType = 'reply'
159 } else if (group.message) {
160 group.lastUpdateType = 'post'
161 }
162 }
163
164 if (msg.value.content.type === 'post') {
165 if (msg.value.content.root) {
166 group.lastUpdateType = 'reply'
167 } else {
168 group.lastUpdateType = 'post'
169 }
170 }
171 }
172 })
173}
174
175function compareUpdated (a, b) {
176 // highest priority first
177 // then most recent date
178 return b.priority - a.priority || b.updated - a.updated
179}
180
181function reverseForEach (items, fn, cb) {
182 var i = items.length - 1
183 nextBatch()
184
185 function nextBatch () {
186 var start = Date.now()
187 while (i >= 0) {
188 fn(items[i], i)
189 i -= 1
190 if (Date.now() - start > 10) break
191 }
192
193 if (i > 0) {
194 setImmediate(nextBatch)
195 } else {
196 cb && cb()
197 }
198 }
199}
200
201function updateContact (msg, groups, opts) {
202 var c = msg.value.content
203 var id = msg.value.author
204 var group = groups[id]
205 if (ref.isFeed(c.contact)) {
206 if (c.following) {
207 if (!group) {
208 group = groups[id] = {
209 type: 'follow',
210 priority: 0,
211 relatedMessages: [],
212 lastUpdateType: null,
213 contacts: new Set(),
214 updated: 0,
215 author: id,
216 id: id
217 }
218 }
219 group.contacts.add(c.contact)
220 group.relatedMessages.push(msg)
221 } else {
222 if (group) {
223 group.contacts.delete(c.contact)
224 if (!group.contacts.size) {
225 delete groups[id]
226 }
227 }
228 }
229 }
230}
231
232function updateChannel (msg, groups, opts) {
233 var c = msg.value.content
234 var channel = c.channel
235 var group = groups[channel]
236 if (typeof channel === 'string') {
237 if (c.subscribed) {
238 if (!group) {
239 group = groups[channel] = {
240 type: 'subscribe',
241 priority: 0,
242 relatedMessages: [],
243 lastUpdateType: null,
244 subscribers: new Set(),
245 updated: 0,
246 channel
247 }
248 }
249 group.subscribers.add(msg.value.author)
250 group.relatedMessages.push(msg)
251 } else {
252 if (group) {
253 group.subscribers.delete(msg.value.author)
254 if (!group.subscribers.size) {
255 delete groups[channel]
256 }
257 }
258 }
259 }
260}
261
262function ensureMessage (id, groups) {
263 var group = groups[id]
264 if (!group) {
265 group = groups[id] = {
266 type: 'message',
267 priority: 0,
268 repliesFrom: new Set(),
269 relatedMessages: [],
270 replies: [],
271 message: null,
272 messageId: id,
273 likes: new Set(),
274 updated: 0
275 }
276 }
277 return group
278}
279
280function compareUserTimestamp (a, b) {
281 var isClose = !a.timestamp || !b.timestamp || Math.abs(a.timestamp - b.timestamp) < (10 * 60e3)
282 if (isClose) {
283 // recieved close together, use provided timestamps
284 return a.value.timestamp - b.value.timestamp
285 } else {
286 return a.timestamp - b.timestamp
287 }
288}
289

Built with git-ssb-web