git ssb

0+

alanz / patchwork



forked from Matt McKegg / patchwork

Tree: afa59d8bd5fdb35e2fc6204c459fed1a20ae6b03

Files: afa59d8bd5fdb35e2fc6204c459fed1a20ae6b03 / modules / feed / pull / summary.js

6559 bytesRaw
1var pull = require('pull-stream')
2var pullDefer = require('pull-defer')
3var pullNext = require('pull-next')
4var SortedArray = require('sorted-array-functions')
5var nest = require('depnest')
6var ref = require('ssb-ref')
7
8exports.gives = nest({
9 'feed.pull': [ 'summary' ]
10})
11
12exports.create = function () {
13 return nest({
14 'feed.pull': { summary }
15 })
16}
17
18function summary (source, opts, cb) {
19 var bumpFilter = opts && opts.bumpFilter
20 var windowSize = opts && opts.windowSize || 1000
21 var last = null
22 var returned = false
23 var done = false
24 return pullNext(() => {
25 if (!done) {
26 var next = {reverse: true, limit: windowSize, live: false}
27 if (last) {
28 next.lt = last.timestamp || last.value.sequence
29 }
30 var deferred = pullDefer.source()
31 pull(
32 source(next),
33 pull.collect((err, values) => {
34 if (err) throw err
35 if (!values.length) {
36 done = true
37 deferred.resolve(pull.values([]))
38 if (!returned) cb && cb()
39 returned = true
40 } else {
41 var fromTime = last && last.timestamp || Date.now()
42 last = values[values.length - 1]
43 groupMessages(values, fromTime, bumpFilter, (err, result) => {
44 if (err) throw err
45 deferred.resolve(
46 pull.values(result)
47 )
48 if (!returned) cb && cb()
49 returned = true
50 })
51 }
52 })
53 )
54 }
55 return deferred
56 })
57}
58
59function groupMessages (messages, fromTime, bumpFilter, cb) {
60 var subscribes = {}
61 var follows = {}
62 var messageUpdates = {}
63 reverseForEach(messages, function (msg) {
64 if (!msg.value) return
65 var c = msg.value.content
66 if (c.type === 'contact') {
67 updateContact(msg, follows)
68 } else if (c.type === 'channel') {
69 updateChannel(msg, subscribes)
70 } else if (c.type === 'vote') {
71 if (c.vote && c.vote.link) {
72 // only show likes of posts added in the current window
73 // and only for the main post
74 const group = messageUpdates[c.vote.link]
75 if (group) {
76 if (c.vote.value > 0) {
77 group.lastUpdateType = 'like'
78 group.likes.add(msg.value.author)
79 bumpIfNeeded(group, msg, bumpFilter)
80 } else {
81 group.likes.delete(msg.value.author)
82 if (group.lastUpdateType === 'like' && !group.likes.size && !group.replies.length) {
83 group.lastUpdateType = 'reply'
84 }
85 }
86 }
87 }
88 } else {
89 if (c.root || (c.type === 'git-update' && c.repo)) {
90 const group = ensureMessage(c.root || c.repo, messageUpdates)
91 group.fromTime = fromTime
92 group.lastUpdateType = 'reply'
93 group.repliesFrom.add(msg.value.author)
94 SortedArray.add(group.replies, msg, compareUserTimestamp)
95 group.channel = group.channel || msg.value.content.channel
96 bumpIfNeeded(group, msg, bumpFilter)
97 } else {
98 const group = ensureMessage(msg.key, messageUpdates)
99 group.fromTime = fromTime
100 group.lastUpdateType = 'post'
101 group.updated = msg.timestamp || msg.value.sequence
102 group.author = msg.value.author
103 group.channel = msg.value.content.channel
104 group.message = msg
105 group.boxed = typeof msg.value.content === 'string'
106 }
107 }
108 }, () => {
109 var result = []
110 Object.keys(follows).forEach((key) => {
111 if (follows[key].updated) {
112 SortedArray.add(result, follows[key], compareUpdated)
113 }
114 })
115 Object.keys(subscribes).forEach((key) => {
116 if (subscribes[key].updated) {
117 SortedArray.add(result, subscribes[key], compareUpdated)
118 }
119 })
120 Object.keys(messageUpdates).forEach((key) => {
121 if (messageUpdates[key].updated) {
122 SortedArray.add(result, messageUpdates[key], compareUpdated)
123 }
124 })
125 cb(null, result)
126 })
127}
128
129function bumpIfNeeded (group, msg, bumpFilter) {
130 // only bump when filter passes
131 var newUpdated = msg.timestamp || msg.value.sequence
132 if (!group.updated || ((!bumpFilter || bumpFilter(msg, group)) && newUpdated > group.updated)) {
133 group.updated = newUpdated
134 }
135}
136
137function compareUpdated (a, b) {
138 return b.updated - a.updated
139}
140
141function reverseForEach (items, fn, cb) {
142 var i = items.length - 1
143 nextBatch()
144
145 function nextBatch () {
146 var start = Date.now()
147 while (i >= 0) {
148 fn(items[i], i)
149 i -= 1
150 if (Date.now() - start > 10) break
151 }
152
153 if (i > 0) {
154 setImmediate(nextBatch)
155 } else {
156 cb && cb()
157 }
158 }
159}
160
161function updateContact (msg, groups) {
162 var c = msg.value.content
163 var id = msg.value.author
164 var group = groups[id]
165 if (ref.isFeed(c.contact)) {
166 if (c.following) {
167 if (!group) {
168 group = groups[id] = {
169 type: 'follow',
170 lastUpdateType: null,
171 contacts: new Set(),
172 updated: 0,
173 author: id,
174 id: id
175 }
176 }
177 group.contacts.add(c.contact)
178 group.updated = msg.timestamp || msg.value.sequence
179 } else {
180 if (group) {
181 group.contacts.delete(c.contact)
182 if (!group.contacts.size) {
183 delete groups[id]
184 }
185 }
186 }
187 }
188}
189
190function updateChannel (msg, groups) {
191 var c = msg.value.content
192 var id = msg.value.author
193 var group = groups[id]
194 if (typeof c.channel === 'string') {
195 if (c.subscribed) {
196 if (!group) {
197 group = groups[id] = {
198 type: 'subscribe',
199 lastUpdateType: null,
200 channels: new Set(),
201 updated: 0,
202 author: id,
203 id: id
204 }
205 }
206 group.channels.add(c.channel)
207 group.updated = msg.timestamp || msg.value.sequence
208 } else {
209 if (group) {
210 group.channels.delete(c.channel)
211 if (!group.channels.size) {
212 delete groups[id]
213 }
214 }
215 }
216 }
217}
218
219function ensureMessage (id, groups) {
220 var group = groups[id]
221 if (!group) {
222 group = groups[id] = {
223 type: 'message',
224 repliesFrom: new Set(),
225 replies: [],
226 message: null,
227 messageId: id,
228 likes: new Set(),
229 updated: 0
230 }
231 }
232 return group
233}
234
235function compareUserTimestamp (a, b) {
236 var isClose = !a.timestamp || !b.timestamp || Math.abs(a.timestamp - b.timestamp) < (10 * 60e3)
237 if (isClose) {
238 // recieved close together, use provided timestamps
239 return a.value.timestamp - b.value.timestamp
240 } else {
241 return a.timestamp - b.timestamp
242 }
243}
244

Built with git-ssb-web