git ssb

0+

alanz / patchwork



forked from Matt McKegg / patchwork

Commit 348b30605149d3cc90a81ff2b0a7163f874da006

experiment with custom backlinks subscription in patchwork sbot plugin

to fix memory leak in #589
Matt McKegg committed on 7/10/2017, 6:19:17 AM
Parent: 198fa7fcacacdb6a915c1f998c047c40742efbf2

Files changed

overrides/patchcore/backlinks/obs.jsadded
sbot/index.jschanged
sbot/live-backlinks.jsadded
overrides/patchcore/backlinks/obs.jsView
@@ -1,0 +1,120 @@
1+var nest = require('depnest')
2+var Value = require('mutant/value')
3+var onceTrue = require('mutant/once-true')
4+var computed = require('mutant/computed')
5+var resolve = require('mutant/resolve')
6+var pull = require('pull-stream')
7+var onceIdle = require('mutant/once-idle')
8+
9+exports.needs = nest({
10+ 'sbot.pull.backlinks': 'first',
11+ 'sbot.obs.connection': 'first',
12+ 'sbot.pull.stream': 'first'
13+})
14+
15+exports.gives = nest('backlinks.obs.for', true)
16+
17+exports.create = function (api) {
18+ var cache = {}
19+ var collections = {}
20+
21+ var loaded = false
22+
23+ // cycle remove sets for fast cleanup
24+ var newRemove = new Set()
25+ var oldRemove = new Set()
26+
27+ // run cache cleanup every 5 seconds
28+ // an item will be removed from cache between 5 - 10 seconds after release
29+ // this ensures that the data is still available for a page reload
30+ var timer = setInterval(() => {
31+ oldRemove.forEach(id => {
32+ if (cache[id]) {
33+ unsubscribe(id)
34+ delete collections[id]
35+ delete cache[id]
36+ }
37+ })
38+ oldRemove.clear()
39+
40+ // cycle
41+ var hold = oldRemove
42+ oldRemove = newRemove
43+ newRemove = hold
44+ }, 5e3)
45+
46+ if (timer.unref) timer.unref()
47+
48+ return nest({
49+ 'backlinks.obs.for': (id) => backlinks(id)
50+ })
51+
52+ function backlinks (id) {
53+ load()
54+ if (!cache[id]) {
55+ var sync = Value(false)
56+ var collection = Value([])
57+ subscribe(id)
58+
59+ // try not to saturate the thread
60+ onceIdle(() => {
61+ pull(
62+ api.sbot.pull.backlinks({
63+ query: [ {$filter: { dest: id }} ],
64+ index: 'DTA' // use asserted timestamps
65+ }),
66+ pull.drain((msg) => {
67+ var value = resolve(collection)
68+ value.push(msg)
69+ collection.set(value)
70+ }, () => {
71+ sync.set(true)
72+ })
73+ )
74+ })
75+
76+ collections[id] = collection
77+ cache[id] = computed([collection], x => x, {
78+ onListen: () => use(id),
79+ onUnlisten: () => release(id)
80+ })
81+
82+ cache[id].sync = sync
83+ }
84+ return cache[id]
85+ }
86+
87+ function load () {
88+ if (!loaded) {
89+ pull(
90+ api.sbot.pull.stream(sbot => sbot.patchwork.liveBacklinks.stream()),
91+ pull.drain(msg => {
92+ var collection = collections[msg.dest]
93+ if (collection) {
94+ var value = resolve(collection)
95+ value.push(msg)
96+ collection.set(value)
97+ }
98+ })
99+ )
100+ loaded = true
101+ }
102+ }
103+
104+ function use (id) {
105+ newRemove.delete(id)
106+ oldRemove.delete(id)
107+ }
108+
109+ function release (id) {
110+ newRemove.add(id)
111+ }
112+
113+ function subscribe (id) {
114+ onceTrue(api.sbot.obs.connection(), (sbot) => sbot.patchwork.liveBacklinks.subscribe(id))
115+ }
116+
117+ function unsubscribe (id) {
118+ onceTrue(api.sbot.obs.connection(), (sbot) => sbot.patchwork.liveBacklinks.unsubscribe(id))
119+ }
120+}
sbot/index.jsView
@@ -3,8 +3,9 @@
33 var Roots = require('./roots')
44 var Progress = require('./progress')
55 var Search = require('./search')
66 var RecentFeeds = require('./recent-feeds')
7+var LiveBacklinks = require('./live-backlinks')
78
89 exports.name = 'patchwork'
910 exports.version = require('../package.json').version
1011 exports.manifest = {
@@ -15,9 +16,14 @@
1516 linearSearch: 'source',
1617 progress: 'source',
1718 recentFeeds: 'source',
1819 getSubscriptions: 'async',
19- getChannels: 'async'
20+ getChannels: 'async',
21+ liveBacklinks: {
22+ subscribe: 'sync',
23+ unsubscribe: 'sync',
24+ stream: 'source'
25+ }
2026 }
2127
2228 exports.init = function (ssb, config) {
2329 var progress = Progress(ssb, config)
@@ -35,7 +41,8 @@
3541 progress: progress.stream,
3642 recentFeeds: recentFeeds.stream,
3743 linearSearch: search.linear,
3844 getSubscriptions: subscriptions.get,
39- getChannels: channels.get
45+ getChannels: channels.get,
46+ liveBacklinks: LiveBacklinks(ssb, config)
4047 }
4148 }
sbot/live-backlinks.jsView
@@ -1,0 +1,20 @@
1+var pull = require('pull-stream')
2+module.exports = function (ssb, config) {
3+ var subscriptions = new Set()
4+ return {
5+ subscribe: function (id) {
6+ subscriptions.add(id)
7+ console.log('subscribe', id)
8+ },
9+ unsubscribe: function (id) {
10+ subscriptions.delete(id)
11+ console.log('unsubscribe', id)
12+ },
13+ stream: function (id) {
14+ return pull(
15+ ssb.backlinks.read({old: false, index: 'DTS'}),
16+ pull.filter(x => subscriptions.has(x.dest))
17+ )
18+ }
19+ }
20+}

Built with git-ssb-web