git ssb

10+

Matt McKegg / patchwork



Tree: 3043037e5920603ef66d72b8f0ee962925650cca

Files: 3043037e5920603ef66d72b8f0ee962925650cca / modules / feed / pull / summary.js

7452 bytesRaw
1var pull = require('pull-stream')
2var pullDefer = require('pull-defer')
3var pullNext = require('pull-next')
4var SortedArray = require('sorted-array-functions')
5var nest = require('depnest')
6var ref = require('ssb-ref')
7
8exports.gives = nest({
9 'feed.pull': [ 'summary' ]
10})
11
12exports.create = function () {
13 return nest({
14 'feed.pull': { summary }
15 })
16}
17
18function summary (source, opts, cb) {
19 var bumpFilter = opts && opts.bumpFilter
20 var windowSize = opts && opts.windowSize || 1000
21 var prioritized = opts && opts.prioritized || {}
22 var last = null
23 var returned = false
24 var done = false
25 return pullNext(() => {
26 if (!done) {
27 var next = {reverse: true, limit: windowSize, live: false}
28 if (last) {
29 next.lt = last.timestamp || last.value.sequence
30 }
31 var deferred = pullDefer.source()
32 pull(
33 source(next),
34 pull.collect((err, values) => {
35 if (err) throw err
36 if (!values.length) {
37 done = true
38 deferred.resolve(pull.values([]))
39 if (!returned) cb && cb()
40 returned = true
41 } else {
42 var fromTime = last && last.timestamp || Date.now()
43 last = values[values.length - 1]
44 groupMessages(values, fromTime, {bumpFilter, prioritized}, (err, result) => {
45 if (err) throw err
46 deferred.resolve(
47 pull.values(result)
48 )
49 if (!returned) cb && cb()
50 returned = true
51 })
52 }
53 })
54 )
55 }
56 return deferred
57 })
58}
59
60function groupMessages (messages, fromTime, opts, cb) {
61 var subscribes = {}
62 var follows = {}
63 var messageUpdates = {}
64 reverseForEach(messages, function (msg) {
65 if (!msg.value) return
66 var c = msg.value.content
67 if (c.type === 'contact') {
68 updateContact(msg, follows, opts)
69 } else if (c.type === 'channel') {
70 updateChannel(msg, subscribes, opts)
71 } else if (c.type === 'vote') {
72 if (c.vote && c.vote.link) {
73 // only show likes of posts added in the current window
74 // and only for the main post
75 const group = messageUpdates[c.vote.link]
76 if (group) {
77 if (c.vote.value > 0) {
78 group.likes.add(msg.value.author)
79 group.relatedMessages.push(msg)
80 } else {
81 group.likes.delete(msg.value.author)
82 group.relatedMessages.push(msg)
83 }
84 }
85 }
86 } else {
87 if (c.root) {
88 const group = ensureMessage(c.root, messageUpdates)
89 group.fromTime = fromTime
90 group.repliesFrom.add(msg.value.author)
91 SortedArray.add(group.replies, msg, compareUserTimestamp)
92 group.channel = group.channel || msg.value.content.channel
93 group.relatedMessages.push(msg)
94 } else {
95 const group = ensureMessage(msg.key, messageUpdates)
96 group.fromTime = fromTime
97 group.lastUpdateType = 'post'
98 group.updated = msg.timestamp || msg.value.sequence
99 group.author = msg.value.author
100 group.channel = msg.value.content.channel
101 group.message = msg
102 group.boxed = typeof msg.value.content === 'string'
103 }
104 }
105 }, () => {
106 var result = []
107 Object.keys(follows).forEach((key) => {
108 bumpIfNeeded(follows[key], opts)
109 if (follows[key].updated) {
110 SortedArray.add(result, follows[key], compareUpdated)
111 }
112 })
113 Object.keys(subscribes).forEach((key) => {
114 bumpIfNeeded(subscribes[key], opts)
115 if (subscribes[key].updated) {
116 SortedArray.add(result, subscribes[key], compareUpdated)
117 }
118 })
119 Object.keys(messageUpdates).forEach((key) => {
120 bumpIfNeeded(messageUpdates[key], opts)
121 if (messageUpdates[key].updated) {
122 SortedArray.add(result, messageUpdates[key], compareUpdated)
123 }
124 })
125 cb(null, result)
126 })
127}
128
129function bumpIfNeeded (group, {bumpFilter, prioritized}) {
130 group.relatedMessages.forEach(msg => {
131 if (prioritized[msg.key] && group.priority < prioritized[msg.key]) {
132 group.priority = prioritized[msg.key]
133 }
134
135 var shouldBump = !bumpFilter || bumpFilter(msg, group)
136
137 // only bump when filter passes
138 var newUpdated = msg.timestamp || msg.value.sequence
139 if (!group.updated || (shouldBump && newUpdated > group.updated)) {
140 group.updated = newUpdated
141 if (msg.value.content.type === 'vote') {
142 if (group.likes.size) {
143 group.lastUpdateType = 'like'
144 } else if (group.repliesFrom.size) {
145 group.lastUpdateType = 'reply'
146 } else if (group.message) {
147 group.lastUpdateType = 'post'
148 }
149 }
150
151 if (msg.value.content.type === 'post') {
152 if (msg.value.content.root) {
153 group.lastUpdateType = 'reply'
154 } else {
155 group.lastUpdateType = 'post'
156 }
157 }
158 }
159 })
160}
161
162function compareUpdated (a, b) {
163 // highest priority first
164 // then most recent date
165 return b.priority - a.priority || b.updated - a.updated
166}
167
168function reverseForEach (items, fn, cb) {
169 var i = items.length - 1
170 nextBatch()
171
172 function nextBatch () {
173 var start = Date.now()
174 while (i >= 0) {
175 fn(items[i], i)
176 i -= 1
177 if (Date.now() - start > 10) break
178 }
179
180 if (i > 0) {
181 setImmediate(nextBatch)
182 } else {
183 cb && cb()
184 }
185 }
186}
187
188function updateContact (msg, groups, opts) {
189 var c = msg.value.content
190 var id = msg.value.author
191 var group = groups[id]
192 if (ref.isFeed(c.contact)) {
193 if (c.following) {
194 if (!group) {
195 group = groups[id] = {
196 type: 'follow',
197 priority: 0,
198 relatedMessages: [],
199 lastUpdateType: null,
200 contacts: new Set(),
201 updated: 0,
202 author: id,
203 id: id
204 }
205 }
206 group.contacts.add(c.contact)
207 group.relatedMessages.push(msg)
208 } else {
209 if (group) {
210 group.contacts.delete(c.contact)
211 if (!group.contacts.size) {
212 delete groups[id]
213 }
214 }
215 }
216 }
217}
218
219function updateChannel (msg, groups, opts) {
220 var c = msg.value.content
221 var channel = c.channel
222 var group = groups[channel]
223 if (typeof channel === 'string') {
224 if (c.subscribed) {
225 if (!group) {
226 group = groups[channel] = {
227 type: 'subscribe',
228 priority: 0,
229 relatedMessages: [],
230 lastUpdateType: null,
231 subscribers: new Set(),
232 updated: 0,
233 channel
234 }
235 }
236 group.subscribers.add(msg.value.author)
237 group.relatedMessages.push(msg)
238 } else {
239 if (group) {
240 group.subscribers.delete(msg.value.author)
241 if (!group.subscribers.size) {
242 delete groups[channel]
243 }
244 }
245 }
246 }
247}
248
249function ensureMessage (id, groups) {
250 var group = groups[id]
251 if (!group) {
252 group = groups[id] = {
253 type: 'message',
254 priority: 0,
255 repliesFrom: new Set(),
256 relatedMessages: [],
257 replies: [],
258 message: null,
259 messageId: id,
260 likes: new Set(),
261 updated: 0
262 }
263 }
264 return group
265}
266
267function compareUserTimestamp (a, b) {
268 var isClose = !a.timestamp || !b.timestamp || Math.abs(a.timestamp - b.timestamp) < (10 * 60e3)
269 if (isClose) {
270 // recieved close together, use provided timestamps
271 return a.value.timestamp - b.value.timestamp
272 } else {
273 return a.timestamp - b.timestamp
274 }
275}
276

Built with git-ssb-web