Files: b626415ac9a778373cd5c4b1dcd55f753b35fac8 / core / index.js
8819 bytesRaw
1 | // var mdns = require('bonjour')() |
2 | // var dnsdisco = require('dns-discovery') |
3 | // both error if no network interface connection |
4 | var util = require('../util') |
5 | var keer = require('ssb-keys') |
6 | var hyperlog = require('hyperlog') |
7 | var swarmlog = require('swarmlog') |
8 | var pull = require('pull-stream') |
9 | var str2ps = require('stream-to-pull-stream') |
10 | var toPull = str2ps |
11 | var toStream = require('pull-stream-to-stream') |
12 | var emStream = require('emit-stream') |
13 | var muxrpc = require('muxrpc') |
14 | var emStream = require('emit-stream') |
15 | var emitter = require('events').EventEmitter |
16 | |
17 | var hlkv = require('hyperkv') |
18 | var hldex = require('hyperlog-index') |
19 | |
20 | |
21 | var $ = module.exports = {} |
22 | |
23 | $.name = 'dexbot' |
24 | |
25 | $.manifest = { |
26 | rc: 'duplex', |
27 | callback: 'duplex', |
28 | emit: 'async', |
29 | assimilate: 'duplex', |
30 | greet: 'async', |
31 | createLog: 'duplex', |
32 | getLog: 'source', |
33 | requestPublicKey: 'async', |
34 | netcast: 'duplex', |
35 | bonjour: 'source', |
36 | connect: 'async', |
37 | swarmLog: 'duplex', |
38 | stderr: 'source', |
39 | greeting: 'async', |
40 | sign: 'async', |
41 | replicate: { |
42 | 'push': 'sink', |
43 | 'pull': 'source', |
44 | 'sync': 'duplex' |
45 | }, |
46 | log: { |
47 | 'updates' : 'source', |
48 | 'heads' : 'async', |
49 | 'headStream' : 'source', |
50 | 'add' : 'async', |
51 | 'get' : 'async', |
52 | 'append': 'async', |
53 | 'batch': 'async' |
54 | } |
55 | } |
56 | |
57 | $.permissions = { |
58 | uxer : ['emit'], |
59 | anonymous: ['rc', 'assimilate', 'callback', 'greet', 'bonjour', 'connect', 'createLog', 'getLog', 'netcast', 'greeting'], |
60 | replicate: ['push', 'pull', 'sync'], |
61 | log : ['add', 'append', 'batch', 'get', 'heads', 'headStream', 'updates'], |
62 | sign: ['sign', 'onConnect'], |
63 | require: ['require'] |
64 | } |
65 | |
66 | $.init = function(dex, bot){ |
67 | var self = dex |
68 | var node = dex |
69 | var rpc = {replicate: {}, log: {}} |
70 | var logs = bot.logs |
71 | $.permissions.replicate.forEach(function(e){ |
72 | rpc.replicate[e] = function(opts){ |
73 | opts = opts || {} |
74 | var type = $.manifest.replicate[e] |
75 | var log = bot.log.replicate({mode: e, live: opts.live || false}) |
76 | var stream = str2ps[type](log, function(err){ |
77 | console.log(err) |
78 | }) |
79 | return stream |
80 | } |
81 | }) |
82 | $.permissions.log.forEach(function(e){ |
83 | var type = $.manifest[e] |
84 | if(type === 'async') rpc.log[e] = bot.log[e] |
85 | else{ // source stream |
86 | switch(e){ |
87 | case 'headStream': |
88 | rpc.log[e] = function(opts){ |
89 | return str2ps(bot.log.heads(opts)) |
90 | } |
91 | break; |
92 | case 'updates': |
93 | rpc.log[e] = function(opts){ |
94 | return str2ps(bot.log.createReadStream(opts)) |
95 | } |
96 | break; |
97 | } |
98 | } |
99 | }) |
100 | var core = { |
101 | 'call' : function(msg, cb){ |
102 | if(msg.to){ |
103 | dex.emit('to:'+msg.to, msg.data) |
104 | if(cb) cb(null, true) |
105 | } |
106 | else cb(null, false) |
107 | }, |
108 | 'emit' : function(channel, data, cb){ |
109 | dex.emit(channel, data) |
110 | if(cb) cb(null, true) |
111 | }, |
112 | 'callback' : function(id){ |
113 | var em = new emitter |
114 | var st = emStream(em) |
115 | var dupe = toPull.duplex(st) |
116 | var rst = emStream(st) |
117 | dex.on('to:' + id, function(data){ |
118 | console.log(data) |
119 | em.emit('to:' + id, {from: bot.keys.id, msg: data}) |
120 | }) |
121 | |
122 | rst.on('to:'+bot.keys.id,function(data){ |
123 | dex.emit('to:'+bot.keys.id, data) |
124 | console.log(data) |
125 | }) |
126 | return dupe |
127 | }, |
128 | 'sign': function(msg, cb){ |
129 | var signed = keer.signObj(bot.keys, {msg: msg}) |
130 | cb(null, signed) |
131 | }, |
132 | 'bonjour': function(){ |
133 | //console.log(bot) |
134 | var record = { |
135 | type: 'dexbot', |
136 | port: 12111, // fake cuz why port? idk... also, don't want to parse node.adress() for port #buh |
137 | // see one secret-stack to read address |
138 | name: bot.name, |
139 | host: node.getAddress() |
140 | } |
141 | mdns.publish(record) |
142 | |
143 | }, |
144 | 'greet': function(cb){ |
145 | ;(function(_cb){mdns.find({type: 'dexbot'}, function(service){ |
146 | if(service.host === node.getAddress()) return |
147 | else{ |
148 | //console.log(service, _cb) |
149 | if(_cb) _cb(service)// add to list of known bots, loookup, etc |
150 | } |
151 | })})(cb) |
152 | }, |
153 | 'rc': function(id){ |
154 | var client = muxrpc(dex.getManifest(), {})() |
155 | peers[id] = client |
156 | var stream = client.createStream() |
157 | var dupe = client.dexbot.callback(bot.keys.id) |
158 | dex.on('to:'+bot.keys.id, function(data){ |
159 | dupe.emit('to:'+bot.keys.id, data) |
160 | }) |
161 | |
162 | return stream |
163 | }, |
164 | 'connect': function(peer, cb){ |
165 | node.connect(peer.host, function(err, rpc){ |
166 | if(err) console.log(err) // publish errloggify this callback if the method sticks |
167 | if(cb) cb(null, rpc) |
168 | // give peer yr rpc |
169 | /* |
170 | var server = muxrpc({}, dex.getManifest())(dex) |
171 | var rc = rpc.dexbot.rc(bot.keys.id) |
172 | var local = server.createStream() |
173 | pull(rc, local, rc) |
174 | */ |
175 | // set up two way messaging |
176 | var pst = rpc.dexbot.callback(bot.keys.id) |
177 | var dupe = emStream(toStream(pst)) |
178 | |
179 | var rdupe = emStream(dupe) |
180 | var tp = toPull.duplex(rdupe) |
181 | |
182 | pull(pst, tp, pst) |
183 | |
184 | dex.on('to:'+rpc.id, function(data){ |
185 | dupe.emit('to:'+rpc.id, {from: bot.keys.id, msg: data}) |
186 | }) |
187 | |
188 | dupe.on('to:'+bot.keys.id, function(data){ |
189 | dex.emit('to:'+bot.keys.id, data) |
190 | }) |
191 | |
192 | |
193 | /* |
194 | rpc.manifest(function(err, data){ |
195 | // console.log(err, data) |
196 | }) |
197 | //rpc.dexbot.greet(self.name, function(err, greets){ |
198 | // console.log(greets) |
199 | //}) |
200 | var log = hyperlog(bot.db.sublevel()) |
201 | var local = str2ps.duplex(log.replicate({live : true}), function(err){ |
202 | //console.log('local err or ending?', err) |
203 | }) |
204 | var random = rpc.dexbot.createLog() |
205 | var remote = rpc.dexbot.netcast({ |
206 | distance: 3, |
207 | head: null, |
208 | author: '1234567890' |
209 | }) |
210 | var x |
211 | //pull(local, remote, local) |
212 | pull(local, random, local) |
213 | log.on('add', function(data){ |
214 | //console.log(data.value.toString() + service.name) |
215 | }) |
216 | */ |
217 | }) |
218 | |
219 | }, |
220 | 'netcast': function(mesg){ |
221 | |
222 | var distance = mesg.distance || 0 |
223 | |
224 | var log = hyperlog(bot.db.sublevel('netcast')) |
225 | |
226 | var local = str2ps.duplex(log.replicate({live:true}), function(err){ |
227 | //console.log('remote error or completion?', err) |
228 | }) |
229 | |
230 | log.add(mesg.head || undefined, JSON.stringify(Object.keys(node.peers)), function(err, doc){ |
231 | |
232 | mesg.distance-- |
233 | mesg.auth = node.address() |
234 | mesg.head = doc.key |
235 | |
236 | if(distance < 0) //??? |
237 | |
238 | Object.keys(node.peers).forEach(function(peer){ |
239 | if(distance > 0 && !(mesg.publicKey === peer)){ |
240 | remote = peer.dexbot.netcast(mesg) |
241 | pull(local, remote, local) |
242 | log.on('add', function(data){ |
243 | console.log(data.toString()) |
244 | }) |
245 | } |
246 | }) |
247 | |
248 | }) |
249 | |
250 | return local |
251 | }, |
252 | 'stderr' : function( replicate){ |
253 | var type = replicate ? 'replicate' : 'changes' |
254 | var stream = str2ps.duplex(errLogDB[type]({live:true}), errLog()) |
255 | return stream |
256 | }, |
257 | 'swarmLog' : function(publicKey){ |
258 | var log = swarmlog({ |
259 | keys: publicKey, |
260 | sodium: require('chloride/browser'), |
261 | db: bot.db.sublevel('swarm:' + publicKey), |
262 | valueEncoding: 'json', |
263 | hubs: [ 'https://signalhub.mafintosh.com' ] |
264 | }) |
265 | var stream = str2ps(log) |
266 | return stream |
267 | }, |
268 | 'getLog' : function(name){ |
269 | var log = logs[name] || hyperlog(bot.db.sublevel(name)) |
270 | var stream = str2ps.source(log.createReadStream({live: true})) |
271 | //log.on('add', function(d){console.log(d)}) |
272 | return stream |
273 | }, |
274 | 'assimilate': function(id, peer){ |
275 | var log = dex.dexbot.createLog(id) |
276 | console.log(dex.peers) |
277 | dex.peers[peer] |
278 | return log |
279 | }, |
280 | 'createLog': function(name){ // name will usually be a public key |
281 | var log = logs[name] || hyperlog(bot.db.sublevel(name)) |
282 | logs[name] = log // put these in a hyperkv store, with updates to status: live (connected), last_known_whatabouts (previous replication) |
283 | // etc |
284 | |
285 | //log.on('preadd', function(node){ console.log(node)}) |
286 | log.on('end', function(){ |
287 | logs[name] = null |
288 | }) |
289 | var stream = str2ps.duplex(log.replicate({live:true, mode:'sync'}), function(err){ |
290 | //console.log('remote error or completion?', err) |
291 | }) |
292 | return stream |
293 | }, |
294 | 'greeting': function(name, cb){ |
295 | cb(null, bot.name + ': GREETINGS TO ' + name) |
296 | } |
297 | } |
298 | Object.assign(rpc, core) |
299 | return rpc |
300 | } |
301 |
Built with git-ssb-web