Files: 58ab0241031aa549a35cce1e678c27065ae66221 / lib / plugins / channels.js
5642 bytesRaw
1 | const FlumeReduce = require('flumeview-reduce') |
2 | const normalizeChannel = require('ssb-ref').normalizeChannel |
3 | const PullPushable = require('pull-pushable') |
4 | const pull = require('pull-stream') |
5 | const sorted = require('sorted-array-functions') |
6 | const Abortable = require('pull-abortable') |
7 | const collator = new Intl.Collator('default', { sensitivity: 'base', usage: 'search' }) |
8 | |
9 | exports.manifest = { |
10 | get: 'async', |
11 | stream: 'source', |
12 | suggest: 'async', |
13 | recentStream: 'source' |
14 | } |
15 | |
16 | exports.init = function (ssb) { |
17 | const index = ssb._flumeUse('patchwork-channels', FlumeReduce(3, reduce, map)) |
18 | return { |
19 | get: index.get, |
20 | stream: index.stream, |
21 | suggest: function ({ text, limit }, cb) { |
22 | index.get((err, channels) => { |
23 | if (err) return cb(err) |
24 | ssb.patchwork.subscriptions2.get({ id: ssb.id }, (err, subscriptions) => { |
25 | if (err) return cb(err) |
26 | let result = [] |
27 | |
28 | if (typeof text === 'string' && text.trim().length) { |
29 | const matches = getMatches(channels, text) |
30 | result = sort(matches, subscriptions) |
31 | } else { |
32 | // suggest subscribed channels by default, sorted by most recent posts |
33 | for (const channel in subscriptions) { |
34 | if (subscriptions[channel].subscribed) { |
35 | result.push(channel) |
36 | } |
37 | result.sort((a, b) => { |
38 | return (channels[b] && (channels[b].updatedAt || 0)) - (channels[a] && (channels[a].updatedAt || 0)) |
39 | }) |
40 | } |
41 | } |
42 | |
43 | if (limit) { |
44 | result = result.slice(0, limit) |
45 | } |
46 | |
47 | // add subscribed and count attribute |
48 | result = result.map(id => { |
49 | return { |
50 | id, |
51 | subscribed: subscriptions[id] && subscriptions[id].subscribed, |
52 | count: channels[id] && channels[id].count |
53 | } |
54 | }) |
55 | |
56 | cb(null, result) |
57 | }) |
58 | }) |
59 | }, |
60 | recentStream: function ({ limit = 10, throttle = 20e3 }) { |
61 | const aborter = Abortable() |
62 | const stream = PullPushable(() => { |
63 | aborter.abort() |
64 | }) |
65 | let sync = false |
66 | const lastUpdated = [] |
67 | let queued = false |
68 | |
69 | pull( |
70 | index.stream({ live: true }), |
71 | aborter, |
72 | pull.drain(data => { |
73 | for (const channel in data) { |
74 | const updatedAt = data[channel].timestamp |
75 | if (sync) { |
76 | // make sure there isn't a double up! |
77 | const existingItemIndex = lastUpdated.findIndex(value => value[0] === channel) |
78 | if (existingItemIndex >= 0) { |
79 | lastUpdated.splice(existingItemIndex, 1) |
80 | } |
81 | } |
82 | sorted.add(lastUpdated, [channel, updatedAt], mostRecent) |
83 | } |
84 | if (!sync) { |
85 | sync = true |
86 | sendLatest() |
87 | } else { |
88 | queueSend() |
89 | } |
90 | }) |
91 | ) |
92 | |
93 | return stream |
94 | |
95 | function sendLatest () { |
96 | // truncate list to speed up future updates (and save memory) |
97 | lastUpdated.length = Math.min(lastUpdated.length, limit) |
98 | stream.push(lastUpdated.map(item => item[0])) |
99 | queued = false |
100 | } |
101 | |
102 | function queueSend () { |
103 | if (!queued) { |
104 | queued = true |
105 | setTimeout(sendLatest, throttle || 20e3) |
106 | } |
107 | } |
108 | } |
109 | } |
110 | } |
111 | |
112 | function mostRecent (a, b) { |
113 | return b[1] - a[1] |
114 | } |
115 | |
116 | function reduce (result, item) { |
117 | if (!result) result = {} |
118 | if (item) { |
119 | for (const channel in item) { |
120 | let value = result[channel] |
121 | if (!value) { |
122 | value = result[channel] = { count: 0, timestamp: 0 } |
123 | } |
124 | value.count += 1 |
125 | |
126 | if (item[channel].timestamp > value.timestamp) { |
127 | value.timestamp = item[channel].timestamp |
128 | } |
129 | } |
130 | } |
131 | return result |
132 | } |
133 | |
134 | function map (msg) { |
135 | if (msg.value.content) { |
136 | // filter out likes and subscriptions |
137 | const isLike = msg.value.content.type === 'vote' |
138 | const isSubscription = msg.value.content.type === 'channel' |
139 | |
140 | if (!isLike && !isSubscription) { |
141 | const channels = getChannels(msg) |
142 | if (channels.length) { |
143 | return channels.reduce((result, channel) => { |
144 | const timestamp = Math.min(msg.value.timestamp, msg.timestamp) |
145 | result[channel] = { timestamp } |
146 | return result |
147 | }, {}) |
148 | } |
149 | } |
150 | } |
151 | } |
152 | |
153 | function getChannels (msg) { |
154 | const result = [] |
155 | if (msg.value && msg.value.content) { |
156 | const channel = normalizeChannel(msg.value.content.channel) |
157 | if (channel) { |
158 | result.push(channel) |
159 | } |
160 | if (Array.isArray(msg.value.content.mentions) && msg.value.content.type === 'post') { |
161 | msg.value.content.mentions.forEach(mention => { |
162 | if (Object.keys(mention).length === 1 && typeof mention.link === 'string' && mention.link.startsWith('#')) { |
163 | const tag = normalizeChannel(mention.link.slice(1)) |
164 | if (tag) { |
165 | result.push(tag) |
166 | } |
167 | } |
168 | }) |
169 | } |
170 | } |
171 | return result |
172 | } |
173 | |
174 | function getMatches (channels, text) { |
175 | const result = [] |
176 | |
177 | for (const channel in channels) { |
178 | if (startsWith(channel, text)) { |
179 | result.push(channel) |
180 | } |
181 | } |
182 | |
183 | return result |
184 | } |
185 | |
186 | function startsWith (text, startsWith) { |
187 | return collator.compare(text.slice(0, startsWith.length), startsWith) === 0 |
188 | } |
189 | |
190 | function sort (items, subscribed) { |
191 | return items.sort((a, b) => { |
192 | return compareBool(subscribed[a] && subscribed[a].subscribed, subscribed[b] && subscribed[b].subscribed) || |
193 | a.length - b.length |
194 | }) |
195 | } |
196 | |
197 | function compareBool (a, b) { |
198 | if (a === b) { |
199 | return 0 |
200 | } else if (a) { |
201 | return -1 |
202 | } else { |
203 | return 1 |
204 | } |
205 | } |
206 |
Built with git-ssb-web