Files: c53665fa2996b120b111466f622b638da4f2f8cf / modules / feed / pull / summary.js
5579 bytesRaw
1 | var pull = require('pull-stream') |
2 | var pullDefer = require('pull-defer') |
3 | var pullNext = require('pull-next') |
4 | var SortedArray = require('sorted-array-functions') |
5 | var nest = require('depnest') |
6 | |
7 | exports.gives = nest({ |
8 | 'feed.pull': [ 'summary' ] |
9 | }) |
10 | |
11 | exports.create = function () { |
12 | return nest({ |
13 | 'feed.pull': { summary } |
14 | }) |
15 | } |
16 | |
17 | function summary (source, opts, cb) { |
18 | var bumpFilter = opts && opts.bumpFilter |
19 | var windowSize = opts && opts.windowSize || 1000 |
20 | var last = null |
21 | var returned = false |
22 | var done = false |
23 | return pullNext(() => { |
24 | if (!done) { |
25 | var next = {reverse: true, limit: windowSize, live: false} |
26 | if (last) { |
27 | next.lt = last.timestamp || last.value.sequence |
28 | } |
29 | var deferred = pullDefer.source() |
30 | pull( |
31 | source(next), |
32 | pull.collect((err, values) => { |
33 | if (err) throw err |
34 | if (!values.length) { |
35 | done = true |
36 | deferred.resolve(pull.values([])) |
37 | if (!returned) cb && cb() |
38 | returned = true |
39 | } else { |
40 | var fromTime = last && last.timestamp || Date.now() |
41 | last = values[values.length - 1] |
42 | groupMessages(values, fromTime, bumpFilter, (err, result) => { |
43 | if (err) throw err |
44 | deferred.resolve( |
45 | pull.values(result) |
46 | ) |
47 | if (!returned) cb && cb() |
48 | returned = true |
49 | }) |
50 | } |
51 | }) |
52 | ) |
53 | } |
54 | return deferred |
55 | }) |
56 | } |
57 | |
58 | function groupMessages (messages, fromTime, bumpFilter, cb) { |
59 | var follows = {} |
60 | var messageUpdates = {} |
61 | reverseForEach(messages, function (msg) { |
62 | if (!msg.value) return |
63 | var c = msg.value.content |
64 | if (c.type === 'contact') { |
65 | updateContact(msg, follows) |
66 | } else if (c.type === 'vote') { |
67 | if (c.vote && c.vote.link) { |
68 | // only show likes of posts added in the current window |
69 | // and only for the main post |
70 | const group = messageUpdates[c.vote.link] |
71 | if (group) { |
72 | if (c.vote.value > 0) { |
73 | group.lastUpdateType = 'like' |
74 | group.likes.add(msg.value.author) |
75 | // only bump when filter passes |
76 | if (!bumpFilter || bumpFilter(msg, group)) { |
77 | group.updated = msg.timestamp |
78 | } |
79 | } else { |
80 | group.likes.delete(msg.value.author) |
81 | if (group.lastUpdateType === 'like' && !group.likes.size && !group.replies.length) { |
82 | group.lastUpdateType = 'reply' |
83 | } |
84 | } |
85 | } |
86 | } |
87 | } else { |
88 | if (c.root || (c.type === 'git-update' && c.repo)) { |
89 | const group = ensureMessage(c.root || c.repo, messageUpdates) |
90 | group.fromTime = fromTime |
91 | group.lastUpdateType = 'reply' |
92 | group.repliesFrom.add(msg.value.author) |
93 | SortedArray.add(group.replies, msg, compareUserTimestamp) |
94 | //group.replies.push(msg) |
95 | group.channel = group.channel || msg.value.content.channel |
96 | |
97 | // only bump when filter passes |
98 | if (!bumpFilter || bumpFilter(msg, group)) { |
99 | group.updated = msg.timestamp || msg.value.sequence |
100 | } |
101 | } else { |
102 | const group = ensureMessage(msg.key, messageUpdates) |
103 | group.fromTime = fromTime |
104 | group.lastUpdateType = 'post' |
105 | group.updated = msg.timestamp || msg.value.sequence |
106 | group.author = msg.value.author |
107 | group.channel = msg.value.content.channel |
108 | group.message = msg |
109 | group.boxed = typeof msg.value.content === 'string' |
110 | } |
111 | } |
112 | }, () => { |
113 | var result = [] |
114 | Object.keys(follows).forEach((key) => { |
115 | if (follows[key].updated) { |
116 | SortedArray.add(result, follows[key], compareUpdated) |
117 | } |
118 | }) |
119 | Object.keys(messageUpdates).forEach((key) => { |
120 | if (messageUpdates[key].updated) { |
121 | SortedArray.add(result, messageUpdates[key], compareUpdated) |
122 | } |
123 | }) |
124 | cb(null, result) |
125 | }) |
126 | } |
127 | |
128 | function compareUpdated (a, b) { |
129 | return b.updated - a.updated |
130 | } |
131 | |
132 | function reverseForEach (items, fn, cb) { |
133 | var i = items.length - 1 |
134 | nextBatch() |
135 | |
136 | function nextBatch () { |
137 | var start = Date.now() |
138 | while (i >= 0) { |
139 | fn(items[i], i) |
140 | i -= 1 |
141 | if (Date.now() - start > 10) break |
142 | } |
143 | |
144 | if (i > 0) { |
145 | setImmediate(nextBatch) |
146 | } else { |
147 | cb && cb() |
148 | } |
149 | } |
150 | } |
151 | |
152 | function updateContact (msg, groups) { |
153 | var c = msg.value.content |
154 | var id = msg.value.author |
155 | var group = groups[id] |
156 | if (c.contact) { |
157 | if (c.following) { |
158 | if (!group) { |
159 | group = groups[id] = { |
160 | type: 'follow', |
161 | lastUpdateType: null, |
162 | contacts: new Set(), |
163 | updated: 0, |
164 | author: id, |
165 | id: id |
166 | } |
167 | } |
168 | group.contacts.add(c.contact) |
169 | group.updated = msg.timestamp || msg.value.sequence |
170 | } else { |
171 | if (group) { |
172 | group.contacts.delete(c.contact) |
173 | if (!group.contacts.size) { |
174 | delete groups[id] |
175 | } |
176 | } |
177 | } |
178 | } |
179 | } |
180 | |
181 | function ensureMessage (id, groups) { |
182 | var group = groups[id] |
183 | if (!group) { |
184 | group = groups[id] = { |
185 | type: 'message', |
186 | repliesFrom: new Set(), |
187 | replies: [], |
188 | message: null, |
189 | messageId: id, |
190 | likes: new Set(), |
191 | updated: 0 |
192 | } |
193 | } |
194 | return group |
195 | } |
196 | |
197 | function compareUserTimestamp (a, b) { |
198 | var isClose = !a.timestamp || !b.timestamp || Math.abs(a.timestamp - b.timestamp) < (10 * 60e3) |
199 | if (isClose) { |
200 | // recieved close together, use provided timestamps |
201 | return a.value.timestamp - b.value.timestamp |
202 | } else { |
203 | return a.timestamp - b.timestamp |
204 | } |
205 | } |
206 |
Built with git-ssb-web