git ssb

0+

alanz / patchwork



forked from Matt McKegg / patchwork

Tree: c53665fa2996b120b111466f622b638da4f2f8cf

Files: c53665fa2996b120b111466f622b638da4f2f8cf / modules / feed / pull / summary.js

5579 bytesRaw
1var pull = require('pull-stream')
2var pullDefer = require('pull-defer')
3var pullNext = require('pull-next')
4var SortedArray = require('sorted-array-functions')
5var nest = require('depnest')
6
7exports.gives = nest({
8 'feed.pull': [ 'summary' ]
9})
10
11exports.create = function () {
12 return nest({
13 'feed.pull': { summary }
14 })
15}
16
17function summary (source, opts, cb) {
18 var bumpFilter = opts && opts.bumpFilter
19 var windowSize = opts && opts.windowSize || 1000
20 var last = null
21 var returned = false
22 var done = false
23 return pullNext(() => {
24 if (!done) {
25 var next = {reverse: true, limit: windowSize, live: false}
26 if (last) {
27 next.lt = last.timestamp || last.value.sequence
28 }
29 var deferred = pullDefer.source()
30 pull(
31 source(next),
32 pull.collect((err, values) => {
33 if (err) throw err
34 if (!values.length) {
35 done = true
36 deferred.resolve(pull.values([]))
37 if (!returned) cb && cb()
38 returned = true
39 } else {
40 var fromTime = last && last.timestamp || Date.now()
41 last = values[values.length - 1]
42 groupMessages(values, fromTime, bumpFilter, (err, result) => {
43 if (err) throw err
44 deferred.resolve(
45 pull.values(result)
46 )
47 if (!returned) cb && cb()
48 returned = true
49 })
50 }
51 })
52 )
53 }
54 return deferred
55 })
56}
57
58function groupMessages (messages, fromTime, bumpFilter, cb) {
59 var follows = {}
60 var messageUpdates = {}
61 reverseForEach(messages, function (msg) {
62 if (!msg.value) return
63 var c = msg.value.content
64 if (c.type === 'contact') {
65 updateContact(msg, follows)
66 } else if (c.type === 'vote') {
67 if (c.vote && c.vote.link) {
68 // only show likes of posts added in the current window
69 // and only for the main post
70 const group = messageUpdates[c.vote.link]
71 if (group) {
72 if (c.vote.value > 0) {
73 group.lastUpdateType = 'like'
74 group.likes.add(msg.value.author)
75 // only bump when filter passes
76 if (!bumpFilter || bumpFilter(msg, group)) {
77 group.updated = msg.timestamp
78 }
79 } else {
80 group.likes.delete(msg.value.author)
81 if (group.lastUpdateType === 'like' && !group.likes.size && !group.replies.length) {
82 group.lastUpdateType = 'reply'
83 }
84 }
85 }
86 }
87 } else {
88 if (c.root || (c.type === 'git-update' && c.repo)) {
89 const group = ensureMessage(c.root || c.repo, messageUpdates)
90 group.fromTime = fromTime
91 group.lastUpdateType = 'reply'
92 group.repliesFrom.add(msg.value.author)
93 SortedArray.add(group.replies, msg, compareUserTimestamp)
94 //group.replies.push(msg)
95 group.channel = group.channel || msg.value.content.channel
96
97 // only bump when filter passes
98 if (!bumpFilter || bumpFilter(msg, group)) {
99 group.updated = msg.timestamp || msg.value.sequence
100 }
101 } else {
102 const group = ensureMessage(msg.key, messageUpdates)
103 group.fromTime = fromTime
104 group.lastUpdateType = 'post'
105 group.updated = msg.timestamp || msg.value.sequence
106 group.author = msg.value.author
107 group.channel = msg.value.content.channel
108 group.message = msg
109 group.boxed = typeof msg.value.content === 'string'
110 }
111 }
112 }, () => {
113 var result = []
114 Object.keys(follows).forEach((key) => {
115 if (follows[key].updated) {
116 SortedArray.add(result, follows[key], compareUpdated)
117 }
118 })
119 Object.keys(messageUpdates).forEach((key) => {
120 if (messageUpdates[key].updated) {
121 SortedArray.add(result, messageUpdates[key], compareUpdated)
122 }
123 })
124 cb(null, result)
125 })
126}
127
128function compareUpdated (a, b) {
129 return b.updated - a.updated
130}
131
132function reverseForEach (items, fn, cb) {
133 var i = items.length - 1
134 nextBatch()
135
136 function nextBatch () {
137 var start = Date.now()
138 while (i >= 0) {
139 fn(items[i], i)
140 i -= 1
141 if (Date.now() - start > 10) break
142 }
143
144 if (i > 0) {
145 setImmediate(nextBatch)
146 } else {
147 cb && cb()
148 }
149 }
150}
151
152function updateContact (msg, groups) {
153 var c = msg.value.content
154 var id = msg.value.author
155 var group = groups[id]
156 if (c.contact) {
157 if (c.following) {
158 if (!group) {
159 group = groups[id] = {
160 type: 'follow',
161 lastUpdateType: null,
162 contacts: new Set(),
163 updated: 0,
164 author: id,
165 id: id
166 }
167 }
168 group.contacts.add(c.contact)
169 group.updated = msg.timestamp || msg.value.sequence
170 } else {
171 if (group) {
172 group.contacts.delete(c.contact)
173 if (!group.contacts.size) {
174 delete groups[id]
175 }
176 }
177 }
178 }
179}
180
181function ensureMessage (id, groups) {
182 var group = groups[id]
183 if (!group) {
184 group = groups[id] = {
185 type: 'message',
186 repliesFrom: new Set(),
187 replies: [],
188 message: null,
189 messageId: id,
190 likes: new Set(),
191 updated: 0
192 }
193 }
194 return group
195}
196
197function compareUserTimestamp (a, b) {
198 var isClose = !a.timestamp || !b.timestamp || Math.abs(a.timestamp - b.timestamp) < (10 * 60e3)
199 if (isClose) {
200 // recieved close together, use provided timestamps
201 return a.value.timestamp - b.value.timestamp
202 } else {
203 return a.timestamp - b.timestamp
204 }
205}
206

Built with git-ssb-web