git ssb

0+

alanz / patchwork



forked from Matt McKegg / patchwork

Tree: e08d5a8b3a1f4197ef31f35de3e758bb2600f214

Files: e08d5a8b3a1f4197ef31f35de3e758bb2600f214 / modules / feed / pull / summary.js

5552 bytesRaw
1var pull = require('pull-stream')
2var pullPushable = require('pull-pushable')
3var pullNext = require('pull-next')
4var SortedArray = require('sorted-array-functions')
5var nest = require('depnest')
6
7exports.gives = nest({
8 'feed.pull': [ 'summary' ]
9})
10
11exports.create = function () {
12 return nest({
13 'feed.pull': { summary }
14 })
15}
16
17function summary (source, opts, cb) {
18 var bumpFilter = opts && opts.bumpFilter
19 var windowSize = opts && opts.windowSize || 1000
20 var last = null
21 var returned = false
22 var done = false
23 return pullNext(() => {
24 if (!done) {
25 var next = {reverse: true, limit: windowSize, live: false}
26 if (last) {
27 next.lt = last.timestamp || last.value.sequence
28 }
29 var pushable = pullPushable()
30 pull(
31 source(next),
32 pull.collect((err, values) => {
33 if (err) throw err
34 if (!values.length) {
35 done = true
36 pushable.end()
37 if (!returned) cb && cb()
38 returned = true
39 } else {
40 var fromTime = last && last.timestamp || Date.now()
41 last = values[values.length - 1]
42 groupMessages(values, fromTime, bumpFilter, (err, result) => {
43 if (err) throw err
44 result.forEach(v => pushable.push(v))
45 pushable.end()
46 if (!returned) cb && cb()
47 returned = true
48 })
49 }
50 })
51 )
52 }
53 return pushable
54 })
55}
56
57function groupMessages (messages, fromTime, bumpFilter, cb) {
58 var follows = {}
59 var messageUpdates = {}
60 reverseForEach(messages, function (msg) {
61 if (!msg.value) return
62 var c = msg.value.content
63 if (c.type === 'contact') {
64 updateContact(msg, follows)
65 } else if (c.type === 'vote') {
66 if (c.vote && c.vote.link) {
67 // only show digs of posts added in the current window
68 // and only for the main post
69 const group = messageUpdates[c.vote.link]
70 if (group) {
71 if (c.vote.value > 0) {
72 group.lastUpdateType = 'dig'
73 group.digs.add(msg.value.author)
74 // only bump when filter passes
75 if (!bumpFilter || bumpFilter(msg, group)) {
76 group.updated = msg.timestamp
77 }
78 } else {
79 group.digs.delete(msg.value.author)
80 if (group.lastUpdateType === 'dig' && !group.digs.size && !group.replies.length) {
81 group.lastUpdateType = 'reply'
82 }
83 }
84 }
85 }
86 } else {
87 if (c.root || (c.type === 'git-update' && c.repo)) {
88 const group = ensureMessage(c.root || c.repo, messageUpdates)
89 group.fromTime = fromTime
90 group.lastUpdateType = 'reply'
91 group.repliesFrom.add(msg.value.author)
92 SortedArray.add(group.replies, msg, compareUserTimestamp)
93 //group.replies.push(msg)
94 group.channel = group.channel || msg.value.content.channel
95
96 // only bump when filter passes
97 if (!bumpFilter || bumpFilter(msg, group)) {
98 group.updated = msg.timestamp || msg.value.sequence
99 }
100 } else {
101 const group = ensureMessage(msg.key, messageUpdates)
102 group.fromTime = fromTime
103 group.lastUpdateType = 'post'
104 group.updated = msg.timestamp || msg.value.sequence
105 group.author = msg.value.author
106 group.channel = msg.value.content.channel
107 group.message = msg
108 group.boxed = typeof msg.value.content === 'string'
109 }
110 }
111 }, () => {
112 var result = []
113 Object.keys(follows).forEach((key) => {
114 if (follows[key].updated) {
115 SortedArray.add(result, follows[key], compareUpdated)
116 }
117 })
118 Object.keys(messageUpdates).forEach((key) => {
119 if (messageUpdates[key].updated) {
120 SortedArray.add(result, messageUpdates[key], compareUpdated)
121 }
122 })
123 cb(null, result)
124 })
125}
126
127function compareUpdated (a, b) {
128 return b.updated - a.updated
129}
130
131function reverseForEach (items, fn, cb) {
132 var i = items.length - 1
133 nextBatch()
134
135 function nextBatch () {
136 var start = Date.now()
137 while (i >= 0) {
138 fn(items[i], i)
139 i -= 1
140 if (Date.now() - start > 10) break
141 }
142
143 if (i > 0) {
144 setImmediate(nextBatch)
145 } else {
146 cb && cb()
147 }
148 }
149}
150
151function updateContact (msg, groups) {
152 var c = msg.value.content
153 var id = msg.value.author
154 var group = groups[id]
155 if (c.contact) {
156 if (c.following) {
157 if (!group) {
158 group = groups[id] = {
159 type: 'follow',
160 lastUpdateType: null,
161 contacts: new Set(),
162 updated: 0,
163 author: id,
164 id: id
165 }
166 }
167 group.contacts.add(c.contact)
168 group.updated = msg.timestamp || msg.value.sequence
169 } else {
170 if (group) {
171 group.contacts.delete(c.contact)
172 if (!group.contacts.size) {
173 delete groups[id]
174 }
175 }
176 }
177 }
178}
179
180function ensureMessage (id, groups) {
181 var group = groups[id]
182 if (!group) {
183 group = groups[id] = {
184 type: 'message',
185 repliesFrom: new Set(),
186 replies: [],
187 message: null,
188 messageId: id,
189 digs: new Set(),
190 updated: 0
191 }
192 }
193 return group
194}
195
196function compareUserTimestamp (a, b) {
197 var isClose = !a.timestamp || !b.timestamp || Math.abs(a.timestamp - b.timestamp) < (10 * 60e3)
198 if (isClose) {
199 // recieved close together, use provided timestamps
200 return a.value.timestamp - b.value.timestamp
201 } else {
202 return a.timestamp - b.timestamp
203 }
204}
205

Built with git-ssb-web