git ssb

10+

Matt McKegg / patchwork



Commit 0a2a19abbd326534732b6aee3ec2351e94dd7a32

use sbot plugin and indexed roots for public feed

Matt McKegg committed on 6/22/2017, 6:08:17 AM
Parent: 325f8a3ff6e77b698f12d57000eba1859fe1364d

Files changed

lib/flumeview-channels.jsdeleted
lib/progress-stream.jsdeleted
modules/feed/html/rollup.jschanged
modules/page/html/render/gatherings.jschanged
modules/page/html/render/public.jschanged
modules/progress/obs.jschanged
package.jsonchanged
plugs/channel/obs/recent.jschanged
server-process.jschanged
sbot/channels.jsadded
sbot/index.jsadded
sbot/progress.jsadded
sbot/roots.jsadded
sbot/subscriptions.jsadded
lib/flumeview-channels.jsView
@@ -1,39 +1,0 @@
1-var FlumeReduce = require('flumeview-reduce')
2-
3-exports.name = 'channels'
4-exports.version = require('../package.json').version
5-exports.manifest = {
6- stream: 'source',
7- get: 'async'
8-}
9-
10-exports.init = function (ssb, config) {
11- return ssb._flumeUse('channels', FlumeReduce(1, reduce, map))
12-}
13-
14-function reduce (result, item) {
15- if (!result) result = {}
16- if (item) {
17- var value = result[item.channel]
18- if (!value) {
19- value = result[item.channel] = {count: 0, timestamp: 0}
20- }
21- value.count += 1
22- if (item.timestamp > value.timestamp) {
23- value.timestamp = item.timestamp
24- }
25- }
26- return result
27-}
28-
29-function map (msg) {
30- if (msg.value.content && typeof msg.value.content.channel === 'string') {
31- var channel = msg.value.content.channel
32- if (channel.length > 0 && channel.length < 30) {
33- return {
34- channel: channel.replace(/\s/g, ''),
35- timestamp: msg.timestamp
36- }
37- }
38- }
39-}
lib/progress-stream.jsView
@@ -1,35 +1,0 @@
1-var Pushable = require('pull-pushable')
2-var deepEqual = require('deep-equal')
3-
4-exports.name = 'progressStream'
5-exports.version = require('../package.json').version
6-exports.manifest = {
7- read: 'source'
8-}
9-
10-exports.init = function (ssb, config) {
11- return {
12- read: function (opts) {
13- var lastValue = deepClone(ssb.progress())
14-
15- var timer = setInterval(() => {
16- var newValue = ssb.progress()
17- if (!deepEqual(newValue, lastValue)) {
18- lastValue = deepClone(newValue)
19- pushable.push(lastValue)
20- }
21- }, 200)
22-
23- var pushable = Pushable(() => {
24- clearInterval(timer)
25- })
26-
27- pushable.push(lastValue)
28- return pushable
29- }
30- }
31-}
32-
33-function deepClone (obj) {
34- return JSON.parse(JSON.stringify(obj))
35-}
modules/feed/html/rollup.jsView
@@ -39,9 +39,9 @@
3939 prepend,
4040 rootFilter = returnTrue,
4141 bumpFilter = returnTrue,
4242 displayFilter = returnTrue,
43- updateStream = getStream, // override the stream used for realtime updates
43+ updateStream, // override the stream used for realtime updates
4444 waitFor = true
4545 }) {
4646 var updates = Value(0)
4747 var yourId = api.keys.sync.id()
@@ -70,14 +70,16 @@
7070 refresh()
7171
7272 // display pending updates
7373 pull(
74- updateStream({old: false}),
75- LookupRoot(),
74+ updateStream || pull(
75+ getStream({old: false}),
76+ LookupRoot()
77+ ),
7678 pull.filter((msg) => {
7779 // only render posts that have a root message
7880 var root = msg.root || msg
79- return root && root.value && root.value.content && rootFilter(root) && bumpFilter(msg)
81+ return root && root.value && root.value.content && rootFilter(root) && bumpFilter(msg) && displayFilter(msg)
8082 }),
8183 pull.drain((msg) => {
8284 if (msg.value.content.type === 'vote') return
8385 if (api.app.sync.externalHandler(msg)) return
modules/page/html/render/gatherings.jsView
@@ -4,17 +4,23 @@
44 exports.needs = nest({
55 'feed.pull.type': 'first',
66 'feed.html.rollup': 'first',
77 'feed.pull.public': 'first',
8- 'gathering.sheet.edit': 'first'
8+ 'gathering.sheet.edit': 'first',
9+ 'keys.sync.id': 'first',
10+ 'contact.obs.following': 'first',
11+ 'sbot.pull.stream': 'first'
912 })
1013
1114 exports.gives = nest('page.html.render')
1215
1316 exports.create = function (api) {
1417 return nest('page.html.render', function channel (path) {
1518 if (path !== '/gatherings') return
1619
20+ var id = api.keys.sync.id()
21+ var following = api.contact.obs.following(id)
22+
1723 var prepend = [
1824 h('PageHeading', [
1925 h('h1', [h('strong', 'Gatherings'), ' from your extended network']),
2026 h('div.meta', [
@@ -26,10 +32,16 @@
2632 ]
2733
2834 return api.feed.html.rollup(api.feed.pull.type('gathering'), {
2935 prepend,
36+ bumpFilter: function (msg) {
37+ if (msg.value && msg.value.content && typeof msg.value.content === 'object') {
38+ var author = msg.value.author
39+ return id === author || following().has(author)
40+ }
41+ },
3042 rootFilter: (msg) => msg.value.content.type === 'gathering',
31- updateStream: api.feed.pull.public
43+ updateStream: api.sbot.pull.stream(sbot => sbot.patchwork.latest({ids: [id]}))
3244 })
3345 })
3446
3547 function createGathering () {
modules/page/html/render/public.jsView
@@ -1,5 +1,7 @@
11 var nest = require('depnest')
2+var extend = require('xtend')
3+var pull = require('pull-stream')
24 var { h, send, when, computed, map } = require('mutant')
35
46 exports.needs = nest({
57 sbot: {
@@ -7,8 +9,9 @@
79 connectedPeers: 'first',
810 localPeers: 'first'
911 }
1012 },
13+ 'sbot.pull.stream': 'first',
1114 'feed.pull.public': 'first',
1215 'about.html.image': 'first',
1316 'about.obs.name': 'first',
1417 'invite.sheet': 'first',
@@ -50,35 +53,30 @@
5053 var prepend = [
5154 api.message.html.compose({ meta: { type: 'post' }, placeholder: 'Write a public message' })
5255 ]
5356
54- var feedView = api.feed.html.rollup(api.feed.pull.public, {
57+ var getStream = (opts) => {
58+ if (opts.lt != null && !opts.lt.marker) {
59+ // if an lt has been specified that is not a marker, assume stream is finished
60+ return pull.empty()
61+ } else {
62+ return api.sbot.pull.stream(sbot => sbot.patchwork.roots(extend(opts, { ids: [id] })))
63+ }
64+ }
65+
66+ var feedView = api.feed.html.rollup(getStream, {
5567 prepend,
56- waitFor: computed([
57- following.sync,
58- subscribedChannels.sync
59- ], (...x) => x.every(Boolean)),
60-
61- rootFilter: function (msg) {
62- if (msg.value && msg.value.content && typeof msg.value.content === 'object') {
63- var author = msg.value.author
64- var type = msg.value.content.type
65- var channel = msg.value.content.channel
66-
67- return (
68- id === author ||
69- following().has(author) ||
70- (type === 'message' && subscribedChannels().has(channel))
71- )
72- }
73- },
74-
68+ updateStream: api.sbot.pull.stream(sbot => sbot.patchwork.latest({ids: [id]})),
7569 bumpFilter: function (msg) {
7670 if (msg.value && msg.value.content && typeof msg.value.content === 'object') {
7771 var author = msg.value.author
7872 return id === author || following().has(author)
7973 }
80- }
74+ },
75+ waitFor: computed([
76+ following.sync,
77+ subscribedChannels.sync
78+ ], (...x) => x.every(Boolean))
8179 })
8280
8381 var result = h('div.SplitView', [
8482 h('div.side', [
modules/progress/obs.jsView
@@ -63,9 +63,9 @@
6363 rate: 0
6464 })
6565 }
6666 if (!progress) {
67- progress = ProgressStatus(x => x.progressStream.read(), {
67+ progress = ProgressStatus(x => x.patchwork.progress(), {
6868 indexes: Status(),
6969 migration: Status()
7070 })
7171 }
package.jsonView
@@ -23,10 +23,13 @@
2323 "electron-default-menu": "~1.0.0",
2424 "electron-spellchecker": "^1.0.4",
2525 "electron-window-state": "^4.1.0",
2626 "flatpickr": "^3.0.5-1",
27+ "flumeview-level": "^2.0.3",
28+ "hashlru": "^2.2.0",
2729 "insert-css": "~2.0.0",
2830 "level": "~1.7.0",
31+ "lrucache": "^1.0.2",
2932 "micro-css": "^2.0.0",
3033 "mime-types": "^2.1.15",
3134 "moment": "^2.18.1",
3235 "mutant": "^3.21.0",
@@ -43,9 +46,9 @@
4346 "pull-ping": "^2.0.2",
4447 "pull-pushable": "^2.0.1",
4548 "pull-scroll": "^1.0.4",
4649 "pull-stream": "~3.6.0",
47- "scuttlebot": "github:ssbc/scuttlebot#5a6639559c0aa64e0a5d8858ac3877edfc6096ec",
50+ "scuttlebot": "^10.0.7",
4851 "secure-scuttlebutt": "^16.3.4",
4952 "sorted-array-functions": "~1.0.0",
5053 "spacetime": "^1.0.7",
5154 "ssb-about": "0.1.0",
plugs/channel/obs/recent.jsView
@@ -29,9 +29,9 @@
2929 var sync = Value(false)
3030 channelsLookup = Dict()
3131
3232 pull(
33- api.sbot.pull.stream(sbot => sbot.channels.stream({live: true})),
33+ api.sbot.pull.stream(sbot => sbot.patchwork.channels({live: true})),
3434 pull.drain(msg => {
3535 if (!sync()) {
3636 channelsLookup.transaction(() => {
3737 for (var channel in msg) {
server-process.jsView
@@ -17,10 +17,9 @@
1717 .use(require('scuttlebot/plugins/logging'))
1818 .use(require('ssb-query'))
1919 .use(require('ssb-about'))
2020 .use(require('ssb-contacts'))
21- .use(require('./lib/flumeview-channels'))
22- .use(require('./lib/progress-stream'))
21+ .use(require('./sbot'))
2322
2423 module.exports = function (ssbConfig) {
2524 var context = {
2625 sbot: createSbot(ssbConfig),
sbot/channels.jsView
@@ -1,0 +1,32 @@
1+var FlumeReduce = require('flumeview-reduce')
2+
3+module.exports = function (ssb, config) {
4+ return ssb._flumeUse('patchwork-channels', FlumeReduce(1, reduce, map))
5+}
6+
7+function reduce (result, item) {
8+ if (!result) result = {}
9+ if (item) {
10+ var value = result[item.channel]
11+ if (!value) {
12+ value = result[item.channel] = {count: 0, timestamp: 0}
13+ }
14+ value.count += 1
15+ if (item.timestamp > value.timestamp) {
16+ value.timestamp = item.timestamp
17+ }
18+ }
19+ return result
20+}
21+
22+function map (msg) {
23+ if (msg.value.content && typeof msg.value.content.channel === 'string') {
24+ var channel = msg.value.content.channel
25+ if (channel.length > 0 && channel.length < 30) {
26+ return {
27+ channel: channel.replace(/\s/g, ''),
28+ timestamp: msg.timestamp
29+ }
30+ }
31+ }
32+}
sbot/index.jsView
@@ -1,0 +1,34 @@
1+var Channels = require('./channels')
2+var Subscriptions = require('./subscriptions')
3+var Roots = require('./roots')
4+var Progress = require('./progress')
5+
6+exports.name = 'patchwork'
7+exports.version = require('../package.json').version
8+exports.manifest = {
9+ channels: 'source',
10+ subscriptions: 'source',
11+ roots: 'source',
12+ latest: 'source',
13+ progress: 'source',
14+ getSubscriptions: 'async',
15+ getChannels: 'async'
16+}
17+
18+exports.init = function (ssb, config) {
19+ var progress = Progress(ssb, config)
20+ var channels = Channels(ssb, config)
21+ var subscriptions = Subscriptions(ssb, config)
22+ var roots = Roots(ssb, config)
23+
24+ return {
25+ channels: channels.stream,
26+ subscriptions: subscriptions.stream,
27+ roots: roots.read,
28+ latest: roots.latest,
29+ progress: progress.stream,
30+
31+ getSubscriptions: subscriptions.get,
32+ getChannels: channels.get
33+ }
34+}
sbot/progress.jsView
@@ -1,0 +1,29 @@
1+var Pushable = require('pull-pushable')
2+var deepEqual = require('deep-equal')
3+
4+module.exports = function (ssb, config) {
5+ return {
6+ stream: function (opts) {
7+ var lastValue = deepClone(ssb.progress())
8+
9+ var timer = setInterval(() => {
10+ var newValue = ssb.progress()
11+ if (!deepEqual(newValue, lastValue)) {
12+ lastValue = deepClone(newValue)
13+ pushable.push(lastValue)
14+ }
15+ }, 200)
16+
17+ var pushable = Pushable(() => {
18+ clearInterval(timer)
19+ })
20+
21+ pushable.push(lastValue)
22+ return pushable
23+ }
24+ }
25+}
26+
27+function deepClone (obj) {
28+ return JSON.parse(JSON.stringify(obj))
29+}
sbot/roots.jsView
@@ -1,0 +1,232 @@
1+'use strict'
2+var pull = require('pull-stream')
3+var FlumeViewLevel = require('flumeview-level')
4+var pullCat = require('pull-cat')
5+var HLRU = require('hashlru')
6+var extend = require('xtend')
7+
8+// HACK: pull it out of patchcore
9+var getRoot = require('patchcore/message/sync/root').create().message.sync.root
10+
11+module.exports = function (ssb, config) {
12+ var create = FlumeViewLevel(1, function (msg, seq) {
13+ var result = [
14+ [msg.value.timestamp, getRoot(msg) || msg.key]
15+ ]
16+ return result
17+ })
18+
19+ var index = ssb._flumeUse('patchwork-roots', create)
20+
21+ // cache mostly just to avoid reading the same roots over and over again
22+ // not really big enough for multiple refresh cycles
23+ var cache = HLRU(100)
24+
25+ return {
26+ latest: function ({ids = [ssb.id]}) {
27+ var filter = null
28+ return pull(
29+ // READ INDEX
30+ index.read({old: false}),
31+
32+ // LOAD FILTERS
33+ pull.asyncMap((item, cb) => {
34+ if (!filter) {
35+ // pause stream until filters have loaded
36+ getFilter((err, result) => {
37+ if (err) return cb(err)
38+ filter = result
39+ cb(null, item)
40+ })
41+ } else {
42+ cb(null, item)
43+ }
44+ }),
45+
46+ // BUMP FILTER
47+ pull.filter(item => {
48+ if (filter && item.value && item.value) {
49+ return filter(ids, item.value)
50+ }
51+ }),
52+
53+ // LOOKUP ROOTS
54+ pull.asyncMap((item, cb) => {
55+ var key = item.key[1]
56+ if (key === item.value.key) {
57+ // already a root
58+ cb(null, extend(item.value, {
59+ root: item.value
60+ }))
61+ }
62+ getThruCache(key, (_, value) => {
63+ cb(null, extend(item.value, {
64+ root: value
65+ }))
66+ })
67+ }),
68+
69+ // FILTER
70+ pull.filter(item => {
71+ if (filter && item.root && item.root.value && !getRoot(item.root)) {
72+ return filter(ids, item.root)
73+ }
74+ }),
75+
76+ // MAP
77+ pull.map(item => {
78+ if (item.root && !item.root.key) {
79+ console.log('WRONG:', item)
80+ }
81+ if (item.root && item.root.key !== item.value.key) {
82+ return extend(item.value, { root: item.root })
83+ } else {
84+ return item.value
85+ }
86+ })
87+ )
88+ },
89+ read: function ({ids = [ssb.id], reverse, live, old, limit, lt, gt}) {
90+ var opts = {reverse, live, old}
91+
92+ // handle markers passed in to lt / gt
93+ if (lt && typeof lt.timestamp === 'number') lt = lt.timestamp
94+ if (gt && typeof gt.timestamp === 'number') gt = gt.timestamp
95+ if (typeof lt === 'number') opts.lt = [lt]
96+ if (typeof gt === 'number') opts.gt = [gt]
97+
98+ var seen = new Set()
99+ var marker = {marker: true, timestamp: null}
100+ var filter = null
101+
102+ var stream = pull(
103+
104+ // READ ROOTS
105+ index.read(opts),
106+
107+ // LOAD FILTERS
108+ pull.asyncMap((item, cb) => {
109+ if (!filter) {
110+ // pause stream until filters have loaded
111+ getFilter((err, result) => {
112+ if (err) return cb(err)
113+ filter = result
114+ cb(null, item)
115+ })
116+ } else {
117+ cb(null, item)
118+ }
119+ }),
120+
121+ // BUMP FILTER
122+ pull.filter(item => {
123+ if (filter && item.value && item.value.value) {
124+ return filter(ids, item.value)
125+ }
126+ }),
127+
128+ // MAP ROOTS
129+ pull.map(item => {
130+ if (item.sync) return item
131+ marker.timestamp = item.key[0]
132+ return item.key[1]
133+ }),
134+
135+ // UNIQUE
136+ pull.filter(item => {
137+ if (old === false) return true // don't filter live stream
138+ if (item && item.sync) {
139+ return true
140+ } else if (typeof item === 'string') {
141+ if (!seen.has(item)) {
142+ seen.add(item)
143+ return true
144+ }
145+ }
146+ }),
147+
148+ // LOOKUP (with cache)
149+ pull.asyncMap((item, cb) => {
150+ if (item.sync) return cb(null, item)
151+ var key = item
152+ getThruCache(key, cb)
153+ }),
154+
155+ // ROOT FILTER
156+ pull.filter(msg => {
157+ if (filter && msg.value && !getRoot(msg)) {
158+ return filter(ids, msg)
159+ }
160+ })
161+ )
162+
163+ // TRUNCATE
164+ if (typeof limit === 'number') {
165+ return pullCat([
166+ pull(
167+ stream,
168+ pull.take(limit)
169+ ),
170+
171+ // send truncated marker for resuming search
172+ pull.values([marker])
173+ ])
174+ }
175+ }
176+ }
177+
178+ function getThruCache (key, cb) {
179+ if (cache.has(key)) {
180+ cb(null, cache.get(key))
181+ } else {
182+ ssb.get(key, (_, value) => {
183+ var msg = {key, value}
184+ if (msg.value) {
185+ cache.set(key, msg)
186+ }
187+ cb(null, msg)
188+ })
189+ }
190+ }
191+
192+ function getFilter (cb) {
193+ // TODO: rewrite contacts stream
194+ ssb.contacts.get((err, contacts) => {
195+ if (err) return cb(err)
196+ ssb.patchwork.getSubscriptions((err, subscriptions) => {
197+ if (err) return cb(err)
198+ cb(null, function (ids, msg) {
199+ return (
200+ ids.includes(msg.value.author) ||
201+ checkFollowing(contacts, ids, msg.value.author) ||
202+ checkChannel(subscriptions, ids, msg.value.content.channel)
203+ )
204+ })
205+ })
206+ })
207+ }
208+}
209+
210+function checkFollowing (lookup, ids, target) {
211+ // TODO: rewrite contacts index (for some reason the order is different)
212+ var value = mostRecentValue(ids.map(id => lookup[id].following && lookup[id].following[target]), 1)
213+ return value && value[0]
214+}
215+
216+function checkChannel (lookup, ids, channel) {
217+ channel = typeof channel === 'string' ? channel.replace(/\s/g, '') : null
218+ if (channel) {
219+ var value = mostRecentValue(ids.map(id => lookup[`${id}:channel`]))
220+ return value && value[1]
221+ }
222+}
223+
224+function mostRecentValue (values, timestampIndex = 0) {
225+ var mostRecent = null
226+ values.forEach(value => {
227+ if (value && (!mostRecent || mostRecent[timestampIndex] < value[timestampIndex])) {
228+ mostRecent = value
229+ }
230+ })
231+ return mostRecent
232+}
sbot/subscriptions.jsView
@@ -1,0 +1,29 @@
1+var FlumeReduce = require('flumeview-reduce')
2+
3+module.exports = function (ssb, config) {
4+ return ssb._flumeUse('patchwork-subscriptions', FlumeReduce(1, reduce, map))
5+}
6+
7+function reduce (result, item) {
8+ if (!result) result = []
9+ if (Array.isArray(item)) {
10+ for (var key in item) {
11+ if (!result[key] || result[key][0] < item[key][0]) {
12+ result[key] = item
13+ }
14+ }
15+ }
16+ return result
17+}
18+
19+function map (msg) {
20+ if (msg.value.content && msg.value.content.type === 'channel') {
21+ if (typeof msg.value.content.channel === 'string' && typeof msg.value.content.subscribed === 'boolean') {
22+ var channel = msg.value.content.channel.replace(/\s/g, '')
23+ var key = `${msg.value.author}:${channel}`
24+ return [{
25+ [key]: [msg.timestamp, msg.value.content.subscribed]
26+ }]
27+ }
28+ }
29+}

Built with git-ssb-web