git ssb

10+

Matt McKegg / patchwork



Tree: 4e87fb7f772146e90bc4f79a30bed7079d748a46

Files: 4e87fb7f772146e90bc4f79a30bed7079d748a46 / lib / feed-summary.js

5356 bytesRaw
1var pull = require('pull-stream')
2var pullPushable = require('pull-pushable')
3var pullNext = require('pull-next')
4var SortedArray = require('sorted-array-functions')
5
6module.exports = FeedSummary
7
8function FeedSummary (source, opts, cb) {
9 var bumpFilter = opts && opts.bumpFilter
10 var windowSize = opts && opts.windowSize || 1000
11 var last = null
12 var returned = false
13 var done = false
14 return pullNext(() => {
15 if (!done) {
16 var next = {reverse: true, limit: windowSize, live: false}
17 if (last) {
18 next.lt = last.timestamp || last.value.sequence
19 }
20 var pushable = pullPushable()
21 pull(
22 source(next),
23 pull.collect((err, values) => {
24 if (err) throw err
25 if (!values.length) {
26 done = true
27 pushable.end()
28 if (!returned) cb && cb()
29 returned = true
30 } else {
31 var fromTime = last && last.timestamp || Date.now()
32 last = values[values.length - 1]
33 groupMessages(values, fromTime, bumpFilter, (err, result) => {
34 if (err) throw err
35 result.forEach(v => pushable.push(v))
36 pushable.end()
37 if (!returned) cb && cb()
38 returned = true
39 })
40 }
41 })
42 )
43 }
44 return pushable
45 })
46}
47
48function groupMessages (messages, fromTime, bumpFilter, cb) {
49 var follows = {}
50 var messageUpdates = {}
51 reverseForEach(messages, function (msg) {
52 if (!msg.value) return
53 var c = msg.value.content
54 if (c.type === 'contact') {
55 updateContact(msg, follows)
56 } else if (c.type === 'vote') {
57 if (c.vote && c.vote.link) {
58 // only show digs of posts added in the current window
59 // and only for the main post
60 const group = messageUpdates[c.vote.link]
61 if (group) {
62 if (c.vote.value > 0) {
63 group.lastUpdateType = 'dig'
64 group.digs.add(msg.value.author)
65 // only bump when filter passes
66 if (!bumpFilter || bumpFilter(msg, group)) {
67 group.updated = msg.timestamp
68 }
69 } else {
70 group.digs.delete(msg.value.author)
71 if (group.lastUpdateType === 'dig' && !group.digs.size && !group.replies.length) {
72 group.lastUpdateType = 'reply'
73 }
74 }
75 }
76 }
77 } else {
78 if (c.root || (c.type === 'git-update' && c.repo)) {
79 const group = ensureMessage(c.root || c.repo, messageUpdates)
80 group.fromTime = fromTime
81 group.lastUpdateType = 'reply'
82 group.repliesFrom.add(msg.value.author)
83 SortedArray.add(group.replies, msg, compareUserTimestamp)
84 //group.replies.push(msg)
85 group.channel = group.channel || msg.value.content.channel
86
87 // only bump when filter passes
88 if (!bumpFilter || bumpFilter(msg, group)) {
89 group.updated = msg.timestamp || msg.value.sequence
90 }
91 } else {
92 const group = ensureMessage(msg.key, messageUpdates)
93 group.fromTime = fromTime
94 group.lastUpdateType = 'post'
95 group.updated = msg.timestamp || msg.value.sequence
96 group.author = msg.value.author
97 group.channel = msg.value.content.channel
98 group.message = msg
99 }
100 }
101 }, () => {
102 var result = []
103 Object.keys(follows).forEach((key) => {
104 if (follows[key].updated) {
105 SortedArray.add(result, follows[key], compareUpdated)
106 }
107 })
108 Object.keys(messageUpdates).forEach((key) => {
109 if (messageUpdates[key].updated) {
110 SortedArray.add(result, messageUpdates[key], compareUpdated)
111 }
112 })
113 cb(null, result)
114 })
115}
116
117function compareUpdated (a, b) {
118 return b.updated - a.updated
119}
120
121function reverseForEach (items, fn, cb) {
122 var i = items.length - 1
123 nextBatch()
124
125 function nextBatch () {
126 var start = Date.now()
127 while (i >= 0) {
128 fn(items[i], i)
129 i -= 1
130 if (Date.now() - start > 10) break
131 }
132
133 if (i > 0) {
134 setImmediate(nextBatch)
135 } else {
136 cb && cb()
137 }
138 }
139}
140
141function updateContact (msg, groups) {
142 var c = msg.value.content
143 var id = msg.value.author
144 var group = groups[id]
145 if (c.contact) {
146 if (c.following) {
147 if (!group) {
148 group = groups[id] = {
149 type: 'follow',
150 lastUpdateType: null,
151 contacts: new Set(),
152 updated: 0,
153 author: id,
154 id: id
155 }
156 }
157 group.contacts.add(c.contact)
158 group.updated = msg.timestamp || msg.value.sequence
159 } else {
160 if (group) {
161 group.contacts.delete(c.contact)
162 if (!group.contacts.size) {
163 delete groups[id]
164 }
165 }
166 }
167 }
168}
169
170function ensureMessage (id, groups) {
171 var group = groups[id]
172 if (!group) {
173 group = groups[id] = {
174 type: 'message',
175 repliesFrom: new Set(),
176 replies: [],
177 message: null,
178 messageId: id,
179 digs: new Set(),
180 updated: 0
181 }
182 }
183 return group
184}
185
186function compareUserTimestamp (a, b) {
187 var isClose = !a.timestamp || !b.timestamp || Math.abs(a.timestamp - b.timestamp) < (10 * 60e3)
188 if (isClose) {
189 // recieved close together, use provided timestamps
190 return a.value.timestamp - b.value.timestamp
191 } else {
192 return a.timestamp - b.timestamp
193 }
194}
195

Built with git-ssb-web