git ssb

10+

Matt McKegg / patchwork



Tree: 61a89f2614a9e41e661b0fca3162cd220b9c1b8e

Files: 61a89f2614a9e41e661b0fca3162cd220b9c1b8e / lib / feed-summary.js

4982 bytesRaw
1var pull = require('pull-stream')
2var pullPushable = require('pull-pushable')
3var pullNext = require('pull-next')
4var SortedArray = require('sorted-array-functions')
5
6module.exports = FeedSummary
7
8function FeedSummary (source, opts, cb) {
9 var bumpFilter = opts && opts.bumpFilter
10 var windowSize = opts && opts.windowSize || 1000
11 var last = null
12 var returned = false
13 var done = false
14 return pullNext(() => {
15 if (!done) {
16 var next = {reverse: true, limit: windowSize, live: false}
17 if (last) {
18 next.lt = last.timestamp || last.value.sequence
19 }
20 var pushable = pullPushable()
21 pull(
22 source(next),
23 pull.collect((err, values) => {
24 if (err) throw err
25 if (!values.length) {
26 done = true
27 pushable.end()
28 if (!returned) cb && cb()
29 returned = true
30 } else {
31 var fromTime = last && last.timestamp || Date.now()
32 last = values[values.length - 1]
33 groupMessages(values, fromTime, bumpFilter, (err, result) => {
34 if (err) return cb(err)
35 result.forEach(v => pushable.push(v))
36 pushable.end()
37 if (!returned) cb && cb()
38 returned = true
39 })
40 }
41 })
42 )
43 }
44 return pushable
45 })
46}
47
48function groupMessages (messages, fromTime, bumpFilter, cb) {
49 var follows = {}
50 var messageUpdates = {}
51 reverseForEach(messages, function (msg) {
52 if (!msg.value) return
53 var c = msg.value.content
54 if (c.type === 'contact') {
55 updateContact(msg, follows)
56 } else if (c.type === 'vote') {
57 if (c.vote && c.vote.link) {
58 // only show digs of posts added in the current window
59 // and only for the main post
60 const group = messageUpdates[c.vote.link]
61 if (group) {
62 if (c.vote.value > 0) {
63 group.lastUpdateType = 'dig'
64 group.digs.add(msg.value.author)
65 // only bump when filter passes
66 if (!bumpFilter || bumpFilter(msg, group)) {
67 group.updated = msg.timestamp
68 }
69 } else {
70 group.digs.delete(msg.value.author)
71 if (group.lastUpdateType === 'dig' && !group.digs.size && !group.replies.length) {
72 group.lastUpdateType = 'reply'
73 }
74 }
75 }
76 }
77 } else {
78 if (c.root) {
79 const group = ensureMessage(c.root, messageUpdates)
80 group.fromTime = fromTime
81 group.lastUpdateType = 'reply'
82 group.repliesFrom.add(msg.value.author)
83 //SortedArray.add(group.replies, msg, compareUserTimestamp)
84 group.replies.push(msg)
85 group.channel = group.channel || msg.value.content.channel
86
87 // only bump when filter passes
88 if (!bumpFilter || bumpFilter(msg, group)) {
89 group.updated = msg.timestamp || msg.value.sequence
90 }
91 } else {
92 const group = ensureMessage(msg.key, messageUpdates)
93 group.fromTime = fromTime
94 group.lastUpdateType = 'post'
95 group.updated = msg.timestamp || msg.value.sequence
96 group.author = msg.value.author
97 group.channel = msg.value.content.channel
98 group.message = msg
99 }
100 }
101 }, () => {
102 var result = []
103 Object.keys(follows).forEach((key) => {
104 if (follows[key].updated) {
105 SortedArray.add(result, follows[key], compareUpdated)
106 }
107 })
108 Object.keys(messageUpdates).forEach((key) => {
109 if (messageUpdates[key].updated) {
110 SortedArray.add(result, messageUpdates[key], compareUpdated)
111 }
112 })
113 cb(null, result)
114 })
115}
116
117function compareUpdated (a, b) {
118 return b.updated - a.updated
119}
120
121function reverseForEach (items, fn, cb) {
122 var i = items.length - 1
123 var start = Date.now()
124 nextBatch()
125
126 function nextBatch () {
127 while (i >= 0 && Date.now() - start < 10) {
128 fn(items[i], i)
129 i -= 1
130 }
131
132 if (i > 0) {
133 setImmediate(nextBatch)
134 } else {
135 cb && cb()
136 }
137 }
138}
139
140function updateContact (msg, groups) {
141 var c = msg.value.content
142 var id = msg.value.author
143 var group = groups[id]
144 if (c.contact) {
145 if (c.following) {
146 if (!group) {
147 group = groups[id] = {
148 type: 'follow',
149 lastUpdateType: null,
150 contacts: new Set(),
151 updated: 0,
152 author: id,
153 id: id
154 }
155 }
156 group.contacts.add(c.contact)
157 group.updated = msg.timestamp || msg.value.sequence
158 } else {
159 if (group) {
160 group.contacts.delete(c.contact)
161 if (!group.contacts.size) {
162 delete groups[id]
163 }
164 }
165 }
166 }
167}
168
169function ensureMessage (id, groups) {
170 var group = groups[id]
171 if (!group) {
172 group = groups[id] = {
173 type: 'message',
174 repliesFrom: new Set(),
175 replies: [],
176 message: null,
177 messageId: id,
178 digs: new Set(),
179 updated: 0
180 }
181 }
182 return group
183}
184

Built with git-ssb-web