git ssb

1+

cheerbitch / dexbot



Tree: b626415ac9a778373cd5c4b1dcd55f753b35fac8

Files: b626415ac9a778373cd5c4b1dcd55f753b35fac8 / core / index.js

8819 bytesRaw
1// var mdns = require('bonjour')()
2// var dnsdisco = require('dns-discovery')
3// both error if no network interface connection
4var util = require('../util')
5var keer = require('ssb-keys')
6var hyperlog = require('hyperlog')
7var swarmlog = require('swarmlog')
8var pull = require('pull-stream')
9var str2ps = require('stream-to-pull-stream')
10var toPull = str2ps
11var toStream = require('pull-stream-to-stream')
12var emStream = require('emit-stream')
13var muxrpc = require('muxrpc')
14var emStream = require('emit-stream')
15var emitter = require('events').EventEmitter
16
17var hlkv = require('hyperkv')
18var hldex = require('hyperlog-index')
19
20
21var $ = module.exports = {}
22
23$.name = 'dexbot'
24
25$.manifest = {
26 rc: 'duplex',
27 callback: 'duplex',
28 emit: 'async',
29 assimilate: 'duplex',
30 greet: 'async',
31 createLog: 'duplex',
32 getLog: 'source',
33 requestPublicKey: 'async',
34 netcast: 'duplex',
35 bonjour: 'source',
36 connect: 'async',
37 swarmLog: 'duplex',
38 stderr: 'source',
39 greeting: 'async',
40 sign: 'async',
41 replicate: {
42 'push': 'sink',
43 'pull': 'source',
44 'sync': 'duplex'
45 },
46 log: {
47 'updates' : 'source',
48 'heads' : 'async',
49 'headStream' : 'source',
50 'add' : 'async',
51 'get' : 'async',
52 'append': 'async',
53 'batch': 'async'
54 }
55}
56
57$.permissions = {
58 uxer : ['emit'],
59 anonymous: ['rc', 'assimilate', 'callback', 'greet', 'bonjour', 'connect', 'createLog', 'getLog', 'netcast', 'greeting'],
60 replicate: ['push', 'pull', 'sync'],
61 log : ['add', 'append', 'batch', 'get', 'heads', 'headStream', 'updates'],
62 sign: ['sign', 'onConnect'],
63 require: ['require']
64}
65
66$.init = function(dex, bot){
67 var self = dex
68 var node = dex
69 var rpc = {replicate: {}, log: {}}
70 var logs = bot.logs
71 $.permissions.replicate.forEach(function(e){
72 rpc.replicate[e] = function(opts){
73 opts = opts || {}
74 var type = $.manifest.replicate[e]
75 var log = bot.log.replicate({mode: e, live: opts.live || false})
76 var stream = str2ps[type](log, function(err){
77 console.log(err)
78 })
79 return stream
80 }
81 })
82 $.permissions.log.forEach(function(e){
83 var type = $.manifest[e]
84 if(type === 'async') rpc.log[e] = bot.log[e]
85 else{ // source stream
86 switch(e){
87 case 'headStream':
88 rpc.log[e] = function(opts){
89 return str2ps(bot.log.heads(opts))
90 }
91 break;
92 case 'updates':
93 rpc.log[e] = function(opts){
94 return str2ps(bot.log.createReadStream(opts))
95 }
96 break;
97 }
98 }
99 })
100 var core = {
101 'call' : function(msg, cb){
102 if(msg.to){
103 dex.emit('to:'+msg.to, msg.data)
104 if(cb) cb(null, true)
105 }
106 else cb(null, false)
107 },
108 'emit' : function(channel, data, cb){
109 dex.emit(channel, data)
110 if(cb) cb(null, true)
111 },
112 'callback' : function(id){
113 var em = new emitter
114 var st = emStream(em)
115 var dupe = toPull.duplex(st)
116 var rst = emStream(st)
117 dex.on('to:' + id, function(data){
118 console.log(data)
119 em.emit('to:' + id, {from: bot.keys.id, msg: data})
120 })
121
122 rst.on('to:'+bot.keys.id,function(data){
123 dex.emit('to:'+bot.keys.id, data)
124 console.log(data)
125 })
126 return dupe
127 },
128 'sign': function(msg, cb){
129 var signed = keer.signObj(bot.keys, {msg: msg})
130 cb(null, signed)
131 },
132 'bonjour': function(){
133 //console.log(bot)
134 var record = {
135 type: 'dexbot',
136 port: 12111, // fake cuz why port? idk... also, don't want to parse node.adress() for port #buh
137 // see one secret-stack to read address
138 name: bot.name,
139 host: node.getAddress()
140 }
141 mdns.publish(record)
142
143 },
144 'greet': function(cb){
145 ;(function(_cb){mdns.find({type: 'dexbot'}, function(service){
146 if(service.host === node.getAddress()) return
147 else{
148 //console.log(service, _cb)
149 if(_cb) _cb(service)// add to list of known bots, loookup, etc
150 }
151 })})(cb)
152 },
153 'rc': function(id){
154 var client = muxrpc(dex.getManifest(), {})()
155 peers[id] = client
156 var stream = client.createStream()
157 var dupe = client.dexbot.callback(bot.keys.id)
158 dex.on('to:'+bot.keys.id, function(data){
159 dupe.emit('to:'+bot.keys.id, data)
160 })
161
162 return stream
163 },
164 'connect': function(peer, cb){
165 node.connect(peer.host, function(err, rpc){
166 if(err) console.log(err) // publish errloggify this callback if the method sticks
167 if(cb) cb(null, rpc)
168 // give peer yr rpc
169/*
170 var server = muxrpc({}, dex.getManifest())(dex)
171 var rc = rpc.dexbot.rc(bot.keys.id)
172 var local = server.createStream()
173 pull(rc, local, rc)
174*/
175 // set up two way messaging
176 var pst = rpc.dexbot.callback(bot.keys.id)
177 var dupe = emStream(toStream(pst))
178
179 var rdupe = emStream(dupe)
180 var tp = toPull.duplex(rdupe)
181
182 pull(pst, tp, pst)
183
184 dex.on('to:'+rpc.id, function(data){
185 dupe.emit('to:'+rpc.id, {from: bot.keys.id, msg: data})
186 })
187
188 dupe.on('to:'+bot.keys.id, function(data){
189 dex.emit('to:'+bot.keys.id, data)
190 })
191
192
193 /*
194 rpc.manifest(function(err, data){
195 // console.log(err, data)
196 })
197 //rpc.dexbot.greet(self.name, function(err, greets){
198 // console.log(greets)
199 //})
200 var log = hyperlog(bot.db.sublevel())
201 var local = str2ps.duplex(log.replicate({live : true}), function(err){
202 //console.log('local err or ending?', err)
203 })
204 var random = rpc.dexbot.createLog()
205 var remote = rpc.dexbot.netcast({
206 distance: 3,
207 head: null,
208 author: '1234567890'
209 })
210 var x
211 //pull(local, remote, local)
212 pull(local, random, local)
213 log.on('add', function(data){
214 //console.log(data.value.toString() + service.name)
215 })
216 */
217 })
218
219 },
220 'netcast': function(mesg){
221
222 var distance = mesg.distance || 0
223
224 var log = hyperlog(bot.db.sublevel('netcast'))
225
226 var local = str2ps.duplex(log.replicate({live:true}), function(err){
227 //console.log('remote error or completion?', err)
228 })
229
230 log.add(mesg.head || undefined, JSON.stringify(Object.keys(node.peers)), function(err, doc){
231
232 mesg.distance--
233 mesg.auth = node.address()
234 mesg.head = doc.key
235
236 if(distance < 0) //???
237
238 Object.keys(node.peers).forEach(function(peer){
239 if(distance > 0 && !(mesg.publicKey === peer)){
240 remote = peer.dexbot.netcast(mesg)
241 pull(local, remote, local)
242 log.on('add', function(data){
243 console.log(data.toString())
244 })
245 }
246 })
247
248 })
249
250 return local
251 },
252 'stderr' : function( replicate){
253 var type = replicate ? 'replicate' : 'changes'
254 var stream = str2ps.duplex(errLogDB[type]({live:true}), errLog())
255 return stream
256 },
257 'swarmLog' : function(publicKey){
258 var log = swarmlog({
259 keys: publicKey,
260 sodium: require('chloride/browser'),
261 db: bot.db.sublevel('swarm:' + publicKey),
262 valueEncoding: 'json',
263 hubs: [ 'https://signalhub.mafintosh.com' ]
264 })
265 var stream = str2ps(log)
266 return stream
267 },
268 'getLog' : function(name){
269 var log = logs[name] || hyperlog(bot.db.sublevel(name))
270 var stream = str2ps.source(log.createReadStream({live: true}))
271 //log.on('add', function(d){console.log(d)})
272 return stream
273 },
274 'assimilate': function(id, peer){
275 var log = dex.dexbot.createLog(id)
276 console.log(dex.peers)
277 dex.peers[peer]
278 return log
279 },
280 'createLog': function(name){ // name will usually be a public key
281 var log = logs[name] || hyperlog(bot.db.sublevel(name))
282 logs[name] = log // put these in a hyperkv store, with updates to status: live (connected), last_known_whatabouts (previous replication)
283 // etc
284
285 //log.on('preadd', function(node){ console.log(node)})
286 log.on('end', function(){
287 logs[name] = null
288 })
289 var stream = str2ps.duplex(log.replicate({live:true, mode:'sync'}), function(err){
290 //console.log('remote error or completion?', err)
291 })
292 return stream
293 },
294 'greeting': function(name, cb){
295 cb(null, bot.name + ': GREETINGS TO ' + name)
296 }
297 }
298 Object.assign(rpc, core)
299 return rpc
300}
301

Built with git-ssb-web