git ssb

1+

Daan Patchwork / patchwork



Tree: 2aba282967b6a030aeb637a4592a0572ac1451b7

Files: 2aba282967b6a030aeb637a4592a0572ac1451b7 / lib / plugins / channels.js

5642 bytesRaw
1const FlumeReduce = require('flumeview-reduce')
2const normalizeChannel = require('ssb-ref').normalizeChannel
3const PullPushable = require('pull-pushable')
4const pull = require('pull-stream')
5const sorted = require('sorted-array-functions')
6const Abortable = require('pull-abortable')
7const collator = new Intl.Collator('default', { sensitivity: 'base', usage: 'search' })
8
9exports.manifest = {
10 get: 'async',
11 stream: 'source',
12 suggest: 'async',
13 recentStream: 'source'
14}
15
16exports.init = function (ssb) {
17 const index = ssb._flumeUse('patchwork-channels', FlumeReduce(3, reduce, map))
18 return {
19 get: index.get,
20 stream: index.stream,
21 suggest: function ({ text, limit }, cb) {
22 index.get((err, channels) => {
23 if (err) return cb(err)
24 ssb.patchwork.subscriptions2.get({ id: ssb.id }, (err, subscriptions) => {
25 if (err) return cb(err)
26 let result = []
27
28 if (typeof text === 'string' && text.trim().length) {
29 const matches = getMatches(channels, text)
30 result = sort(matches, subscriptions)
31 } else {
32 // suggest subscribed channels by default, sorted by most recent posts
33 for (const channel in subscriptions) {
34 if (subscriptions[channel].subscribed) {
35 result.push(channel)
36 }
37 result.sort((a, b) => {
38 return (channels[b] && (channels[b].updatedAt || 0)) - (channels[a] && (channels[a].updatedAt || 0))
39 })
40 }
41 }
42
43 if (limit) {
44 result = result.slice(0, limit)
45 }
46
47 // add subscribed and count attribute
48 result = result.map(id => {
49 return {
50 id,
51 subscribed: subscriptions[id] && subscriptions[id].subscribed,
52 count: channels[id] && channels[id].count
53 }
54 })
55
56 cb(null, result)
57 })
58 })
59 },
60 recentStream: function ({ limit = 10, throttle = 20e3 }) {
61 const aborter = Abortable()
62 const stream = PullPushable(() => {
63 aborter.abort()
64 })
65 let sync = false
66 const lastUpdated = []
67 let queued = false
68
69 pull(
70 index.stream({ live: true }),
71 aborter,
72 pull.drain(data => {
73 for (const channel in data) {
74 const updatedAt = data[channel].timestamp
75 if (sync) {
76 // make sure there isn't a double up!
77 const existingItemIndex = lastUpdated.findIndex(value => value[0] === channel)
78 if (existingItemIndex >= 0) {
79 lastUpdated.splice(existingItemIndex, 1)
80 }
81 }
82 sorted.add(lastUpdated, [channel, updatedAt], mostRecent)
83 }
84 if (!sync) {
85 sync = true
86 sendLatest()
87 } else {
88 queueSend()
89 }
90 })
91 )
92
93 return stream
94
95 function sendLatest () {
96 // truncate list to speed up future updates (and save memory)
97 lastUpdated.length = Math.min(lastUpdated.length, limit)
98 stream.push(lastUpdated.map(item => item[0]))
99 queued = false
100 }
101
102 function queueSend () {
103 if (!queued) {
104 queued = true
105 setTimeout(sendLatest, throttle || 20e3)
106 }
107 }
108 }
109 }
110}
111
112function mostRecent (a, b) {
113 return b[1] - a[1]
114}
115
116function reduce (result, item) {
117 if (!result) result = {}
118 if (item) {
119 for (const channel in item) {
120 let value = result[channel]
121 if (!value) {
122 value = result[channel] = { count: 0, timestamp: 0 }
123 }
124 value.count += 1
125
126 if (item[channel].timestamp > value.timestamp) {
127 value.timestamp = item[channel].timestamp
128 }
129 }
130 }
131 return result
132}
133
134function map (msg) {
135 if (msg.value.content) {
136 // filter out likes and subscriptions
137 const isLike = msg.value.content.type === 'vote'
138 const isSubscription = msg.value.content.type === 'channel'
139
140 if (!isLike && !isSubscription) {
141 const channels = getChannels(msg)
142 if (channels.length) {
143 return channels.reduce((result, channel) => {
144 const timestamp = Math.min(msg.value.timestamp, msg.timestamp)
145 result[channel] = { timestamp }
146 return result
147 }, {})
148 }
149 }
150 }
151}
152
153function getChannels (msg) {
154 const result = []
155 if (msg.value && msg.value.content) {
156 const channel = normalizeChannel(msg.value.content.channel)
157 if (channel) {
158 result.push(channel)
159 }
160 if (Array.isArray(msg.value.content.mentions) && msg.value.content.type === 'post') {
161 msg.value.content.mentions.forEach(mention => {
162 if (Object.keys(mention).length === 1 && typeof mention.link === 'string' && mention.link.startsWith('#')) {
163 const tag = normalizeChannel(mention.link.slice(1))
164 if (tag) {
165 result.push(tag)
166 }
167 }
168 })
169 }
170 }
171 return result
172}
173
174function getMatches (channels, text) {
175 const result = []
176
177 for (const channel in channels) {
178 if (startsWith(channel, text)) {
179 result.push(channel)
180 }
181 }
182
183 return result
184}
185
186function startsWith (text, startsWith) {
187 return collator.compare(text.slice(0, startsWith.length), startsWith) === 0
188}
189
190function sort (items, subscribed) {
191 return items.sort((a, b) => {
192 return compareBool(subscribed[a] && subscribed[a].subscribed, subscribed[b] && subscribed[b].subscribed) ||
193 a.length - b.length
194 })
195}
196
197function compareBool (a, b) {
198 if (a === b) {
199 return 0
200 } else if (a) {
201 return -1
202 } else {
203 return 1
204 }
205}
206

Built with git-ssb-web