git ssb

0+

alanz / patchwork



forked from Matt McKegg / patchwork

Tree: 46d6da03cf4ba28bbedeafd78648e0111d0bc3f7

Files: 46d6da03cf4ba28bbedeafd78648e0111d0bc3f7 / modules / feed / pull / summary.js

5620 bytesRaw
1var pull = require('pull-stream')
2var pullDefer = require('pull-defer')
3var pullNext = require('pull-next')
4var SortedArray = require('sorted-array-functions')
5var nest = require('depnest')
6var ref = require('ssb-ref')
7
8exports.gives = nest({
9 'feed.pull': [ 'summary' ]
10})
11
12exports.create = function () {
13 return nest({
14 'feed.pull': { summary }
15 })
16}
17
18function summary (source, opts, cb) {
19 var bumpFilter = opts && opts.bumpFilter
20 var windowSize = opts && opts.windowSize || 1000
21 var last = null
22 var returned = false
23 var done = false
24 return pullNext(() => {
25 if (!done) {
26 var next = {reverse: true, limit: windowSize, live: false}
27 if (last) {
28 next.lt = last.timestamp || last.value.sequence
29 }
30 var deferred = pullDefer.source()
31 pull(
32 source(next),
33 pull.collect((err, values) => {
34 if (err) throw err
35 if (!values.length) {
36 done = true
37 deferred.resolve(pull.values([]))
38 if (!returned) cb && cb()
39 returned = true
40 } else {
41 var fromTime = last && last.timestamp || Date.now()
42 last = values[values.length - 1]
43 groupMessages(values, fromTime, bumpFilter, (err, result) => {
44 if (err) throw err
45 deferred.resolve(
46 pull.values(result)
47 )
48 if (!returned) cb && cb()
49 returned = true
50 })
51 }
52 })
53 )
54 }
55 return deferred
56 })
57}
58
59function groupMessages (messages, fromTime, bumpFilter, cb) {
60 var follows = {}
61 var messageUpdates = {}
62 reverseForEach(messages, function (msg) {
63 if (!msg.value) return
64 var c = msg.value.content
65 if (c.type === 'contact') {
66 updateContact(msg, follows)
67 } else if (c.type === 'vote') {
68 if (c.vote && c.vote.link) {
69 // only show likes of posts added in the current window
70 // and only for the main post
71 const group = messageUpdates[c.vote.link]
72 if (group) {
73 if (c.vote.value > 0) {
74 group.lastUpdateType = 'like'
75 group.likes.add(msg.value.author)
76 // only bump when filter passes
77 if (!bumpFilter || bumpFilter(msg, group)) {
78 group.updated = msg.timestamp
79 }
80 } else {
81 group.likes.delete(msg.value.author)
82 if (group.lastUpdateType === 'like' && !group.likes.size && !group.replies.length) {
83 group.lastUpdateType = 'reply'
84 }
85 }
86 }
87 }
88 } else {
89 if (c.root || (c.type === 'git-update' && c.repo)) {
90 const group = ensureMessage(c.root || c.repo, messageUpdates)
91 group.fromTime = fromTime
92 group.lastUpdateType = 'reply'
93 group.repliesFrom.add(msg.value.author)
94 SortedArray.add(group.replies, msg, compareUserTimestamp)
95 //group.replies.push(msg)
96 group.channel = group.channel || msg.value.content.channel
97
98 // only bump when filter passes
99 if (!bumpFilter || bumpFilter(msg, group)) {
100 group.updated = msg.timestamp || msg.value.sequence
101 }
102 } else {
103 const group = ensureMessage(msg.key, messageUpdates)
104 group.fromTime = fromTime
105 group.lastUpdateType = 'post'
106 group.updated = msg.timestamp || msg.value.sequence
107 group.author = msg.value.author
108 group.channel = msg.value.content.channel
109 group.message = msg
110 group.boxed = typeof msg.value.content === 'string'
111 }
112 }
113 }, () => {
114 var result = []
115 Object.keys(follows).forEach((key) => {
116 if (follows[key].updated) {
117 SortedArray.add(result, follows[key], compareUpdated)
118 }
119 })
120 Object.keys(messageUpdates).forEach((key) => {
121 if (messageUpdates[key].updated) {
122 SortedArray.add(result, messageUpdates[key], compareUpdated)
123 }
124 })
125 cb(null, result)
126 })
127}
128
129function compareUpdated (a, b) {
130 return b.updated - a.updated
131}
132
133function reverseForEach (items, fn, cb) {
134 var i = items.length - 1
135 nextBatch()
136
137 function nextBatch () {
138 var start = Date.now()
139 while (i >= 0) {
140 fn(items[i], i)
141 i -= 1
142 if (Date.now() - start > 10) break
143 }
144
145 if (i > 0) {
146 setImmediate(nextBatch)
147 } else {
148 cb && cb()
149 }
150 }
151}
152
153function updateContact (msg, groups) {
154 var c = msg.value.content
155 var id = msg.value.author
156 var group = groups[id]
157 if (ref.isFeed(c.contact)) {
158 if (c.following) {
159 if (!group) {
160 group = groups[id] = {
161 type: 'follow',
162 lastUpdateType: null,
163 contacts: new Set(),
164 updated: 0,
165 author: id,
166 id: id
167 }
168 }
169 group.contacts.add(c.contact)
170 group.updated = msg.timestamp || msg.value.sequence
171 } else {
172 if (group) {
173 group.contacts.delete(c.contact)
174 if (!group.contacts.size) {
175 delete groups[id]
176 }
177 }
178 }
179 }
180}
181
182function ensureMessage (id, groups) {
183 var group = groups[id]
184 if (!group) {
185 group = groups[id] = {
186 type: 'message',
187 repliesFrom: new Set(),
188 replies: [],
189 message: null,
190 messageId: id,
191 likes: new Set(),
192 updated: 0
193 }
194 }
195 return group
196}
197
198function compareUserTimestamp (a, b) {
199 var isClose = !a.timestamp || !b.timestamp || Math.abs(a.timestamp - b.timestamp) < (10 * 60e3)
200 if (isClose) {
201 // recieved close together, use provided timestamps
202 return a.value.timestamp - b.value.timestamp
203 } else {
204 return a.timestamp - b.timestamp
205 }
206}
207

Built with git-ssb-web