git ssb

0+

alanz / patchwork



forked from Matt McKegg / patchwork

Commit 1e42ee41e11726c1e33f705b77d8ead24869af20

remove sbot fork, extract out to PR

closes #447
Matt McKegg committed on 3/27/2017, 5:08:28 AM
Parent: 454dc73d98065ea261663450ba6c7433954706bd

Files changed

lib/friends-with-sync.jsdeleted
lib/replicate-with-progress.jsdeleted
modules/app/html/progress-notifier.jschanged
modules/progress/obs.jschanged
package.jsonchanged
server-process.jschanged
lib/friends-with-sync.jsView
@@ -1,184 +1,0 @@
1-var Graphmitter = require('graphmitter')
2-var pull = require('pull-stream')
3-var mlib = require('ssb-msgs')
4-var memview = require('level-memview')
5-var pushable = require('pull-pushable')
6-var mdm = require('mdmanifest')
7-var valid = require('scuttlebot/lib/validators')
8-var apidoc = require('scuttlebot/lib/apidocs').friends
9-
10-// friends plugin
11-// methods to analyze the social graph
12-// maintains a 'follow' and 'flag' graph
13-
14-function isFunction (f) {
15- return 'function' === typeof f
16-}
17-
18-function isString (s) {
19- return 'string' === typeof s
20-}
21-
22-function isFriend (friends, a, b) {
23- return friends[a] && friends[b] && friends[a][b] && friends[b][a]
24-}
25-
26-exports.name = 'friends'
27-exports.version = '1.0.0'
28-exports.manifest = mdm.manifest(apidoc)
29-
30-exports.init = function (sbot, config) {
31-
32- var graphs = {
33- follow: new Graphmitter(),
34- flag: new Graphmitter()
35- }
36-
37- // view processor
38- var syncCbs = []
39- function awaitSync (cb) {
40- if (syncCbs) syncCbs.push(cb)
41- else cb()
42- }
43-
44- // read/watch the log for changes to the social graph
45- pull(sbot.createLogStream({ live: true }), pull.drain(function (msg) {
46-
47- if (msg.sync) {
48- syncCbs.forEach(function (cb) { cb() })
49- syncCbs = null
50-
51- if (sbot.gossip) {
52- // prioritize friends
53- var friends = graphs['follow'].toJSON()
54- sbot.gossip.peers().forEach(function(peer) {
55- if (isFriend(friends, sbot.id, peer.key)) {
56- sbot.gossip.add(peer, 'friends')
57- }
58- })
59- }
60-
61- return
62- }
63-
64- var c = msg.value.content
65- if (c.type == 'contact') {
66- mlib.asLinks(c.contact, 'feed').forEach(function (link) {
67- if ('following' in c) {
68- if (c.following)
69- graphs.follow.edge(msg.value.author, link.link, true)
70- else
71- graphs.follow.del(msg.value.author, link.link)
72-
73- }
74- if ('flagged' in c) {
75- if (c.flagged)
76- graphs.flag.edge(msg.value.author, link.link, c.flagged)
77- else
78- graphs.flag.del(msg.value.author, link.link)
79- }
80- })
81- }
82- }))
83-
84- return {
85-
86- get: valid.sync(function (opts) {
87- var g = graphs[opts.graph || 'follow']
88- if(!g) throw new Error('opts.graph must be provided')
89- return g.get(opts.source, opts.dest)
90- }, 'object?'),
91-
92- all: valid.async(function (graph, cb) {
93- if (typeof graph == 'function') {
94- cb = graph
95- graph = null
96- }
97- if (!graph)
98- graph = 'follow'
99- awaitSync(function () {
100- cb(null, graphs[graph] ? graphs[graph].toJSON() : null)
101- })
102- }, 'string?'),
103-
104- path: valid.sync(function (opts) {
105- if(isString(opts))
106- opts = {source: sbot.id, dest: opts}
107- return graphs.follow.path(opts)
108-
109- }, 'string|object'),
110-
111- createFriendStream: valid.source(function (opts) {
112- opts = opts || {}
113- var live = opts.live === true
114- var meta = opts.meta === true
115- var start = opts.start || sbot.id
116- var graph = graphs[opts.graph || 'follow']
117- if(!graph)
118- return pull.error(new Error('unknown graph:' + opts.graph))
119- var cancel, ps = pushable(function () {
120- cancel && cancel()
121- })
122-
123- function push (to, hops) {
124- return ps.push(meta ? {id: to, hops: hops} : to)
125- }
126-
127- if (live) {
128- awaitSync(function () {
129- ps.push({sync: true})
130- })
131- }
132-
133- //by default, also emit your own key.
134- if(opts.self !== false)
135- push(start, 0)
136-
137- var conf = config.friends || {}
138- cancel = graph.traverse({
139- start: start,
140- hops: opts.hops || conf.hops || 3,
141- max: opts.dunbar || conf.dunbar || 150,
142- each: function (_, to, hops) {
143- if(to !== start) push(to, hops)
144- }
145- })
146-
147- if(!live) { cancel(); ps.end() }
148-
149- return ps
150- }, 'createFriendStreamOpts?'),
151-
152- hops: valid.async(function (start, graph, opts, cb) {
153- if (typeof opts == 'function') { // (start|opts, graph, cb)
154- cb = opts
155- opts = null
156- } else if (typeof graph == 'function') { // (start|opts, cb)
157- cb = graph
158- opts = graph = null
159- }
160- opts = opts || {}
161- if(isString(start)) { // (start, ...)
162- // first arg is id string
163- opts.start = start
164- } else if (start && typeof start == 'object') { // (opts, ...)
165- // first arg is opts
166- for (var k in start)
167- opts[k] = start[k]
168- }
169-
170- var conf = config.friends || {}
171- opts.start = opts.start || sbot.id
172- opts.dunbar = opts.dunbar || conf.dunbar || 150
173- opts.hops = opts.hops || conf.hops || 3
174-
175- var g = graphs[graph || 'follow']
176- if (!g)
177- return cb(new Error('Invalid graph type: '+graph))
178-
179- awaitSync(function () {
180- cb(null, g.traverse(opts))
181- })
182- }, ['feedId', 'string?', 'object?'], ['createFriendStreamOpts'])
183- }
184-}
lib/replicate-with-progress.jsView
@@ -1,280 +1,0 @@
1-'use strict'
2-var pull = require('pull-stream')
3-var para = require('pull-paramap')
4-var Notify = require('pull-notify')
5-var Cat = require('pull-cat')
6-var Debounce = require('observ-debounce')
7-var mdm = require('mdmanifest')
8-var apidoc = require('scuttlebot/lib/apidocs').replicate
9-var MutantToPull = require('./mutant-to-pull')
10-var {Struct, Dict} = require('mutant')
11-
12-var Pushable = require('pull-pushable')
13-
14-// compatibility function for old implementations of `latestSequence`
15-function toSeq (s) {
16- return typeof s === 'number' ? s : s.sequence
17-}
18-
19-module.exports = {
20- name: 'replicate',
21- version: '2.0.0',
22- manifest: mdm.manifest(apidoc),
23- init: function (sbot, config) {
24- var debounce = Debounce(200)
25- var listeners = {}
26- var newPeer = Notify()
27-
28- // keep track of sync progress and provide to client
29-
30- var start = null
31- var count = 0
32- var rate = 0
33- var toSend = {}
34- var peerHas = {}
35- var pendingPeer = {}
36-
37- window.pendingPeer = pendingPeer
38-
39- var syncStatus = Struct({
40- type: 'global',
41- incomplete: 0,
42- pending: 0,
43- pendingPeers: Dict({}, {fixedIndexing: true}),
44- feeds: null,
45- rate: 0
46- })
47-
48- window.syncStatus = syncStatus
49-
50- debounce(function () {
51- var totalPending = 0
52- var feeds = Object.keys(toSend).length
53- var peers = {}
54- var pendingFeeds = new Set()
55-
56- Object.keys(pendingPeer).forEach(function (peerId) {
57- if (pendingPeer[peerId]) {
58- totalPending += 1
59-
60- Object.keys(toSend).forEach(function (feedId) {
61- if (peerHas[peerId] && peerHas[peerId][feedId]) {
62- if (peerHas[peerId][feedId] > toSend[feedId]) {
63- pendingFeeds.add(feedId)
64- }
65- }
66- })
67-
68- peers[peerId] = pendingPeer[peerId]
69- }
70- })
71-
72- syncStatus.set({
73- incomplete: pendingFeeds.size,
74- feeds: syncStatus.loadedFriends ? feeds : null,
75- pendingPeers: peers,
76- pending: totalPending,
77- rate: rate
78- }, {merge: true})
79- })
80-
81- pull(
82- sbot.createLogStream({old: false, live: true, sync: false, keys: false}),
83- pull.drain(function (e) {
84- // track writes per second, mainly used for developing initial sync.
85- if (!start) start = Date.now()
86- var time = (Date.now() - start) / 1000
87- if (time >= 1) {
88- rate = count / time
89- start = Date.now()
90- count = 0
91- }
92- var pushable = listeners[e.author]
93-
94- if (pushable && pushable.sequence === e.sequence) {
95- pushable.sequence++
96- pushable.forEach(function (p) {
97- p.push(e)
98- })
99- }
100- count++
101- addPeer({id: e.author, sequence: e.sequence})
102- })
103- )
104-
105- sbot.createHistoryStream.hook(function (fn, args) {
106- var upto = args[0] || {}
107- var seq = upto.sequence || upto.seq
108-
109- if (this._emit) this._emit('call:createHistoryStream', args[0])
110-
111- // if we are calling this locally, skip cleverness
112- if (this === sbot) return fn.call(this, upto)
113-
114- // keep track of each requested value, per feed / per peer.
115- peerHas[this.id] = peerHas[this.id] || {}
116- peerHas[this.id][upto.id] = seq - 1
117-
118- debounce.set()
119-
120- // handle creating lots of history streams efficiently.
121- // maybe this could be optimized in map-filter-reduce queries instead?
122- if (toSend[upto.id] == null || (seq > toSend[upto.id])) {
123- upto.old = false
124- if (!upto.live) return pull.empty()
125- var pushable = listeners[upto.id] = listeners[upto.id] || []
126- var p = Pushable(function () {
127- var i = pushable.indexOf(p)
128- pushable.splice(i, 1)
129- })
130- pushable.push(p)
131- pushable.sequence = seq
132- return p
133- }
134- return fn.call(this, upto)
135- })
136-
137- // collect the IDs of feeds we want to request
138- var opts = config.replication || {}
139- opts.hops = opts.hops || 3
140- opts.dunbar = opts.dunbar || 150
141- opts.live = true
142- opts.meta = true
143-
144- function localPeers () {
145- if (!sbot.gossip) return
146- sbot.gossip.peers().forEach(function (e) {
147- if (e.source === 'local' && toSend[e.key] == null) {
148- sbot.latestSequence(e.key, function (err, seq) {
149- addPeer({id: e.key, sequence: err ? 0 : toSeq(seq)})
150- })
151- }
152- })
153- }
154-
155- // also request local peers.
156- if (sbot.gossip) {
157- // if we have the gossip plugin active, then include new local peers
158- // so that you can put a name to someone on your local network.
159- var int = setInterval(localPeers, 1000)
160- if (int.unref) int.unref()
161- localPeers()
162- }
163-
164- function loadedFriends () {
165- syncStatus.loadedFriends = true
166- debounce.set()
167- }
168-
169- function addPeer (upto) {
170- if (upto.sync) return loadedFriends()
171- if (!upto.id) return console.log('invalid', upto)
172-
173- if (toSend[upto.id] == null) {
174- toSend[upto.id] = Math.max(toSend[upto.id] || 0, upto.sequence || upto.seq || 0)
175- newPeer({ id: upto.id, sequence: toSend[upto.id], type: 'new' })
176- debounce.set()
177- } else {
178- toSend[upto.id] = Math.max(toSend[upto.id] || 0, upto.sequence || upto.seq || 0)
179- }
180-
181- debounce.set()
182- }
183-
184- // create read-streams for the desired feeds
185- pull(
186- sbot.friends.createFriendStream(opts),
187- // filter out duplicates, and also keep track of what we expect to receive
188- // lookup the latest sequence from each user
189- para(function (data, cb) {
190- if (data.sync) return cb(null, data)
191- var id = data.id || data
192- sbot.latestSequence(id, function (err, seq) {
193- cb(null, {
194- id: id, sequence: err ? 0 : toSeq(seq)
195- })
196- })
197- }, 32),
198- pull.drain(addPeer, loadedFriends)
199- )
200-
201- function upto (opts) {
202- opts = opts || {}
203- var ary = Object.keys(toSend).map(function (k) {
204- return { id: k, sequence: toSend[k] }
205- })
206- if (opts.live) {
207- return Cat([pull.values(ary), pull.once({sync: true}), newPeer.listen()])
208- }
209-
210- return pull.values(ary)
211- }
212-
213- sbot.on('rpc:connect', function (rpc) {
214- // this is the cli client, just ignore.
215- if (rpc.id === sbot.id) return
216- // check for local peers, or manual connections.
217- localPeers()
218- sbot.emit('replicate:start', rpc)
219- rpc.on('closed', function () {
220- sbot.emit('replicate:finish', toSend)
221- })
222- pull(
223- upto({live: opts.live}),
224- pull.drain(function (upto) {
225- if (upto.sync) return
226- var last = (upto.sequence || upto.seq || 0)
227- pendingPeer[rpc.id] = (pendingPeer[rpc.id] || 0) + 1
228- debounce.set()
229-
230- pull(
231- rpc.createHistoryStream({
232- id: upto.id,
233- seq: last + 1,
234- live: false,
235- keys: false
236- }),
237- pull.through((msg) => {
238- start = Math.max(start, msg.sequence)
239- }),
240- sbot.createWriteStream(function () {
241- // TODO: do something with the error
242- // this seems to be thrown fairly regularly whenever something weird happens to the stream
243-
244- pendingPeer[rpc.id] -= 1
245- debounce.set()
246-
247- // all synched, now lets keep watching for live changes
248- // need to handle this separately because there is no {sync: true} event with HistoryStream
249- // and we want to notify the client that sync has completed
250-
251- pull(
252- rpc.createHistoryStream({
253- id: upto.id,
254- seq: last + 1,
255- sequence: last + 1, // HACK: some clients won't stream if we don't specify this as sequence
256- live: true,
257- keys: false
258- }),
259- sbot.createWriteStream(function () {
260- // TODO: handle error
261- })
262- )
263- })
264- )
265- }, function (err) {
266- if (err) {
267- sbot.emit('log:error', ['replication', rpc.id, 'error', err])
268- }
269- })
270- )
271- })
272-
273- return {
274- changes: function () {
275- return MutantToPull(syncStatus)
276- },
277- upto: upto
278- }
279- }
280-}
modules/app/html/progress-notifier.jsView
@@ -33,24 +33,24 @@
3333 return (maxQueryPending - pending) / maxQueryPending
3434 }
3535 })
3636
37- var downloadProgress = computed([progress.feeds, progress.incomplete], (feeds, incomplete) => {
37+ var downloadProgress = computed([progress.feeds, progress.incompleteFeeds], (feeds, incomplete) => {
3838 if (feeds) {
3939 return clamp((feeds - incomplete) / feeds)
4040 } else {
4141 return 1
4242 }
4343 })
4444
45- var hidden = computed([progress.incomplete, indexing], (incomplete, indexing) => {
45+ var hidden = computed([progress.incompleteFeeds, indexing], (incomplete, indexing) => {
4646 return incomplete < 5 && !indexing
4747 })
4848
4949 return h('div.info', { hidden: sustained(hidden, 2000) }, [
5050 h('div.status', [
5151 h('Loading -small', [
52- when(computed(progress.incomplete, (v) => v > 5),
52+ when(computed(progress.incompleteFeeds, (v) => v > 5),
5353 ['Downloading new messages', h('progress', { style: {'margin-left': '10px'}, min: 0, max: 1, value: downloadProgress })],
5454 when(indexing, [
5555 ['Indexing database', h('progress', { style: {'margin-left': '10px'}, min: 0, max: 1, value: indexProgress })]
5656 ], 'Scuttling...')
modules/progress/obs.jsView
@@ -49,11 +49,10 @@
4949
5050 function load () {
5151 if (!syncStatus) {
5252 syncStatus = ProgressStatus(x => x.replicate.changes, {
53- incomplete: 0,
53+ incompleteFeeds: 0,
5454 pendingPeers: Dict({}, {fixedIndexing: true}),
55- pending: 0,
5655 feeds: null,
5756 rate: 0
5857 })
5958 }
package.jsonView
@@ -49,9 +49,9 @@
4949 "pull-ping": "^2.0.2",
5050 "pull-pushable": "^2.0.1",
5151 "pull-scroll": "github:mmckegg/pull-scroll#0fddc085fc82cfce9f3ccfd4096fce99843a1e51",
5252 "pull-stream": "~3.4.5",
53- "scuttlebot": "^9.4.4",
53+ "scuttlebot": "github:ssbc/scuttlebot#6c16948e4f8487e5bf2c4df17b0564391d4b0ddf",
5454 "sorted-array-functions": "~1.0.0",
5555 "ssb-avatar": "^0.2.0",
5656 "ssb-blobs": "~0.1.7",
5757 "ssb-keys": "~7.0.0",
server-process.jsView
@@ -5,10 +5,10 @@
55
66 var createSbot = require('scuttlebot')
77 .use(require('scuttlebot/plugins/master'))
88 .use(require('scuttlebot/plugins/gossip'))
9- .use(require('./lib/friends-with-sync'))
10- .use(require('./lib/replicate-with-progress'))
9+ .use(require('scuttlebot/plugins/friends'))
10+ .use(require('scuttlebot/plugins/replicate'))
1111 .use(require('ssb-blobs'))
1212 .use(require('scuttlebot/plugins/invite'))
1313 .use(require('scuttlebot/plugins/block'))
1414 .use(require('scuttlebot/plugins/local'))

Built with git-ssb-web