Files: 4fe728780fc41a67f0e103a0221e05b4b9c93423 / core / index.js
7246 bytesRaw
1 | // var mdns = require('bonjour')() |
2 | // var dnsdisco = require('dns-discovery') |
3 | // both error if no network interface connection |
4 | var util = require('../util') |
5 | var keer = require('ssb-keys') |
6 | var hyperlog = require('hyperlog') |
7 | var swarmlog = require('swarmlog') |
8 | var pull = require('pull-stream') |
9 | var str2ps = require('stream-to-pull-stream') |
10 | var toPull = str2ps |
11 | var toStream = require('pull-stream-to-stream') |
12 | var emStream = require('emit-stream') |
13 | var muxrpc = require('muxrpc') |
14 | var emStream = require('emit-stream') |
15 | var emitter = require('events').EventEmitter |
16 | |
17 | var hkv = require('hyperkv') |
18 | var hldex = require('hyperlog-index') |
19 | |
20 | |
21 | var $ = module.exports = {} |
22 | |
23 | $.name = 'dexbot' // change to 'rpc' |
24 | |
25 | // dexbot is a *SUPA!!!* |
26 | |
27 | $.manifest = { |
28 | rc: 'duplex', |
29 | callback: 'duplex', |
30 | emit: 'async', |
31 | assimilate: 'duplex', |
32 | greet: 'async', |
33 | createLog: 'duplex', |
34 | getLog: 'source', |
35 | requestPublicKey: 'async', |
36 | netcast: 'duplex', |
37 | bonjour: 'source', |
38 | connect: 'async', |
39 | swarmLog: 'duplex', |
40 | stderr: 'source', |
41 | greeting: 'async', |
42 | sign: 'async', |
43 | replicate: { |
44 | 'push': 'sink', |
45 | 'pull': 'source', |
46 | 'sync': 'duplex' |
47 | }, |
48 | log: { |
49 | 'updates' : 'source', |
50 | 'heads' : 'async', |
51 | 'headStream' : 'source', |
52 | 'add' : 'async', |
53 | 'get' : 'async', |
54 | 'append': 'async', |
55 | 'batch': 'async' |
56 | } |
57 | } |
58 | |
59 | $.permissions = { |
60 | uxer : ['emit'], |
61 | anonymous: ['rc', 'assimilate', 'callback', 'connect', 'createLog', 'getLog', 'netcast'], |
62 | replicate: ['push', 'pull', 'sync'], |
63 | log : ['add', 'append', 'batch', 'get', 'heads', 'headStream', 'updates'], |
64 | sign: ['sign', 'onConnect'], |
65 | require: ['require'] |
66 | } |
67 | |
68 | $.init = function(dex, bot){ |
69 | var self = dex |
70 | var node = dex |
71 | var rpc = {replicate: {}, log: {}} |
72 | var logs = bot.logs |
73 | var peers = {} |
74 | var kv = hkv({ |
75 | db: bot.db, |
76 | log: hyperlog(bot.db.sublevel('kvi:' + dex.id)) |
77 | }) |
78 | kv.get(dex.id + ':logs', function(err, data){ |
79 | |
80 | }) |
81 | $.permissions.replicate.forEach(function(e){ |
82 | rpc.replicate[e] = function(opts){ |
83 | opts = opts || {} |
84 | var id = opts.id |
85 | var type = $.manifest.replicate[e] |
86 | var log = logs[id] || bot.log.replicate({mode: e, live: opts.live || false}) |
87 | var stream = str2ps[type](log, function(err){ |
88 | console.log(err) |
89 | }) |
90 | return stream |
91 | } |
92 | }) |
93 | $.permissions.log.forEach(function(e){ |
94 | var type = $.manifest[e] |
95 | if(type === 'async') rpc.log[e] = bot.log[e] |
96 | else{ // source stream |
97 | switch(e){ |
98 | case 'headStream': |
99 | rpc.log[e] = function(opts){ |
100 | return str2ps(bot.log.heads(opts)) |
101 | } |
102 | break; |
103 | case 'updates': |
104 | rpc.log[e] = function(opts){ |
105 | return str2ps(bot.log.createReadStream(opts)) |
106 | } |
107 | break; |
108 | } |
109 | } |
110 | }) |
111 | var core = { |
112 | 'emit' : function(channel, data, cb){ |
113 | dex.emit(channel, data) |
114 | if(cb) cb(null, true) |
115 | }, |
116 | 'callback' : function(id){ |
117 | var em = new emitter |
118 | var st = emStream(em) |
119 | var dupe = toPull.duplex(st) |
120 | var rst = emStream(st) |
121 | dex.on('to:' + id, function(data){ |
122 | console.log(data) |
123 | em.emit('to:' + id, {from: bot.keys.id, msg: data}) |
124 | }) |
125 | |
126 | rst.on('to:'+bot.keys.id,function(data){ |
127 | dex.emit('to:'+bot.keys.id, data) |
128 | // console.log(data) |
129 | }) |
130 | return dupe |
131 | }, |
132 | 'sign': function(msg, cb){ |
133 | var signed = keer.signObj(bot.keys, {msg: msg}) |
134 | cb(null, signed) |
135 | }, |
136 | 'rc': function(id){ |
137 | var client = muxrpc(dex.getManifest(), {})() |
138 | peers[id].client = client |
139 | var stream = client.createStream() |
140 | var dupe = client.dexbot.callback(bot.keys.id) |
141 | dex.on('to:'+bot.keys.id, function(data){ |
142 | dupe.emit('to:'+bot.keys.id, data) |
143 | }) |
144 | |
145 | return stream |
146 | }, |
147 | 'connect': function(peer, cb){ |
148 | node.connect(peer.host, function(err, rpc){ |
149 | if(err) console.log(err) // publish errloggify this callback if the method sticks |
150 | peers[rpc.id] = {rpc: rpc, id: rpc.id, known_hosts: [peer.address]} |
151 | if(cb) cb(null, rpc) |
152 | |
153 | // give peer yr rpc |
154 | /* |
155 | var server = muxrpc({}, dex.getManifest())(dex) |
156 | var rc = rpc.dexbot.rc(bot.keys.id) |
157 | var local = server.createStream() |
158 | pull(rc, local, rc) |
159 | */ |
160 | // set up two way messaging |
161 | var pst = rpc.dexbot.callback(bot.keys.id) |
162 | var dupe = emStream(toStream(pst)) |
163 | |
164 | var rdupe = emStream(dupe) |
165 | var tp = toPull.duplex(rdupe) |
166 | |
167 | pull(pst, tp, pst) |
168 | |
169 | dex.on('to:'+rpc.id, function(data){ |
170 | dupe.emit('to:'+rpc.id, {from: bot.keys.id, msg: data}) |
171 | }) |
172 | |
173 | dupe.on('to:'+bot.keys.id, function(data){ |
174 | dex.emit('to:'+bot.keys.id, data) |
175 | }) |
176 | }) |
177 | }, |
178 | 'netcast': function(mesg){ |
179 | |
180 | var distance = mesg.distance || 0 |
181 | |
182 | var log = hyperlog(bot.db.sublevel('netcast')) |
183 | |
184 | var local = str2ps.duplex(log.replicate({live:true}), function(err){ |
185 | //console.log('remote error or completion?', err) |
186 | }) |
187 | |
188 | log.add(mesg.head || undefined, JSON.stringify(Object.keys(node.peers)), function(err, doc){ |
189 | |
190 | mesg.distance-- |
191 | mesg.auth = node.address() |
192 | mesg.head = doc.key |
193 | |
194 | if(distance < 0) //??? |
195 | |
196 | Object.keys(node.peers).forEach(function(peer){ |
197 | if(distance > 0 && !(mesg.publicKey === peer)){ |
198 | remote = peer.dexbot.netcast(mesg) |
199 | pull(local, remote, local) |
200 | log.on('add', function(data){ |
201 | console.log(data.toString()) |
202 | }) |
203 | } |
204 | }) |
205 | |
206 | }) |
207 | |
208 | return local |
209 | }, |
210 | 'stderr' : function( replicate){ |
211 | var type = replicate ? 'replicate' : 'changes' |
212 | var stream = str2ps.duplex(errLogDB[type]({live:true}), errLog()) |
213 | return stream |
214 | }, |
215 | 'swarmLog' : function(publicKey){ |
216 | var log = swarmlog({ |
217 | keys: publicKey, |
218 | sodium: require('chloride/browser'), |
219 | db: bot.db.sublevel('swarm:' + publicKey), |
220 | valueEncoding: 'json', |
221 | hubs: [ 'https://signalhub.mafintosh.com' ] |
222 | }) |
223 | var stream = str2ps(log) |
224 | return stream |
225 | }, |
226 | 'getLog' : function(name){ |
227 | var log = logs[name] || hyperlog(bot.db.sublevel(name)) |
228 | var stream = str2ps.source(log.createReadStream({live: true})) |
229 | //log.on('add', function(d){console.log(d)}) |
230 | return stream |
231 | }, |
232 | 'assimilate': function(id, peer){ |
233 | var log = dex.dexbot.createLog(id) |
234 | console.log(dex.peers) |
235 | dex.peers[peer] |
236 | return log |
237 | }, |
238 | 'createLog': function(name){ // name will usually be a public key |
239 | var log = logs[name] || hyperlog(bot.db.sublevel(name)) |
240 | logs[name] = log // put these in a hyperkv store, with updates to status: live (connected), last_known_whatabouts (previous replication) |
241 | |
242 | log.on('end', function(){ |
243 | logs[name] = null |
244 | }) |
245 | var stream = str2ps.duplex(log.replicate({live:true, mode:'sync'}), function(err){ |
246 | //console.log('remote error or completion?', err) |
247 | }) |
248 | return stream |
249 | } |
250 | } |
251 | Object.assign(rpc, core) |
252 | return rpc |
253 | } |
254 |
Built with git-ssb-web