git ssb

1+

cheerbitch / dexbot



Tree: 4fe728780fc41a67f0e103a0221e05b4b9c93423

Files: 4fe728780fc41a67f0e103a0221e05b4b9c93423 / core / index.js

7246 bytesRaw
1// var mdns = require('bonjour')()
2// var dnsdisco = require('dns-discovery')
3// both error if no network interface connection
4var util = require('../util')
5var keer = require('ssb-keys')
6var hyperlog = require('hyperlog')
7var swarmlog = require('swarmlog')
8var pull = require('pull-stream')
9var str2ps = require('stream-to-pull-stream')
10var toPull = str2ps
11var toStream = require('pull-stream-to-stream')
12var emStream = require('emit-stream')
13var muxrpc = require('muxrpc')
14var emStream = require('emit-stream')
15var emitter = require('events').EventEmitter
16
17var hkv = require('hyperkv')
18var hldex = require('hyperlog-index')
19
20
21var $ = module.exports = {}
22
23$.name = 'dexbot' // change to 'rpc'
24
25// dexbot is a *SUPA!!!*
26
27$.manifest = {
28 rc: 'duplex',
29 callback: 'duplex',
30 emit: 'async',
31 assimilate: 'duplex',
32 greet: 'async',
33 createLog: 'duplex',
34 getLog: 'source',
35 requestPublicKey: 'async',
36 netcast: 'duplex',
37 bonjour: 'source',
38 connect: 'async',
39 swarmLog: 'duplex',
40 stderr: 'source',
41 greeting: 'async',
42 sign: 'async',
43 replicate: {
44 'push': 'sink',
45 'pull': 'source',
46 'sync': 'duplex'
47 },
48 log: {
49 'updates' : 'source',
50 'heads' : 'async',
51 'headStream' : 'source',
52 'add' : 'async',
53 'get' : 'async',
54 'append': 'async',
55 'batch': 'async'
56 }
57}
58
59$.permissions = {
60 uxer : ['emit'],
61 anonymous: ['rc', 'assimilate', 'callback', 'connect', 'createLog', 'getLog', 'netcast'],
62 replicate: ['push', 'pull', 'sync'],
63 log : ['add', 'append', 'batch', 'get', 'heads', 'headStream', 'updates'],
64 sign: ['sign', 'onConnect'],
65 require: ['require']
66}
67
68$.init = function(dex, bot){
69 var self = dex
70 var node = dex
71 var rpc = {replicate: {}, log: {}}
72 var logs = bot.logs
73 var peers = {}
74 var kv = hkv({
75 db: bot.db,
76 log: hyperlog(bot.db.sublevel('kvi:' + dex.id))
77 })
78 kv.get(dex.id + ':logs', function(err, data){
79
80 })
81 $.permissions.replicate.forEach(function(e){
82 rpc.replicate[e] = function(opts){
83 opts = opts || {}
84 var id = opts.id
85 var type = $.manifest.replicate[e]
86 var log = logs[id] || bot.log.replicate({mode: e, live: opts.live || false})
87 var stream = str2ps[type](log, function(err){
88 console.log(err)
89 })
90 return stream
91 }
92 })
93 $.permissions.log.forEach(function(e){
94 var type = $.manifest[e]
95 if(type === 'async') rpc.log[e] = bot.log[e]
96 else{ // source stream
97 switch(e){
98 case 'headStream':
99 rpc.log[e] = function(opts){
100 return str2ps(bot.log.heads(opts))
101 }
102 break;
103 case 'updates':
104 rpc.log[e] = function(opts){
105 return str2ps(bot.log.createReadStream(opts))
106 }
107 break;
108 }
109 }
110 })
111 var core = {
112 'emit' : function(channel, data, cb){
113 dex.emit(channel, data)
114 if(cb) cb(null, true)
115 },
116 'callback' : function(id){
117 var em = new emitter
118 var st = emStream(em)
119 var dupe = toPull.duplex(st)
120 var rst = emStream(st)
121 dex.on('to:' + id, function(data){
122 console.log(data)
123 em.emit('to:' + id, {from: bot.keys.id, msg: data})
124 })
125
126 rst.on('to:'+bot.keys.id,function(data){
127 dex.emit('to:'+bot.keys.id, data)
128 // console.log(data)
129 })
130 return dupe
131 },
132 'sign': function(msg, cb){
133 var signed = keer.signObj(bot.keys, {msg: msg})
134 cb(null, signed)
135 },
136 'rc': function(id){
137 var client = muxrpc(dex.getManifest(), {})()
138 peers[id].client = client
139 var stream = client.createStream()
140 var dupe = client.dexbot.callback(bot.keys.id)
141 dex.on('to:'+bot.keys.id, function(data){
142 dupe.emit('to:'+bot.keys.id, data)
143 })
144
145 return stream
146 },
147 'connect': function(peer, cb){
148 node.connect(peer.host, function(err, rpc){
149 if(err) console.log(err) // publish errloggify this callback if the method sticks
150 peers[rpc.id] = {rpc: rpc, id: rpc.id, known_hosts: [peer.address]}
151 if(cb) cb(null, rpc)
152
153 // give peer yr rpc
154/*
155 var server = muxrpc({}, dex.getManifest())(dex)
156 var rc = rpc.dexbot.rc(bot.keys.id)
157 var local = server.createStream()
158 pull(rc, local, rc)
159*/
160 // set up two way messaging
161 var pst = rpc.dexbot.callback(bot.keys.id)
162 var dupe = emStream(toStream(pst))
163
164 var rdupe = emStream(dupe)
165 var tp = toPull.duplex(rdupe)
166
167 pull(pst, tp, pst)
168
169 dex.on('to:'+rpc.id, function(data){
170 dupe.emit('to:'+rpc.id, {from: bot.keys.id, msg: data})
171 })
172
173 dupe.on('to:'+bot.keys.id, function(data){
174 dex.emit('to:'+bot.keys.id, data)
175 })
176 })
177 },
178 'netcast': function(mesg){
179
180 var distance = mesg.distance || 0
181
182 var log = hyperlog(bot.db.sublevel('netcast'))
183
184 var local = str2ps.duplex(log.replicate({live:true}), function(err){
185 //console.log('remote error or completion?', err)
186 })
187
188 log.add(mesg.head || undefined, JSON.stringify(Object.keys(node.peers)), function(err, doc){
189
190 mesg.distance--
191 mesg.auth = node.address()
192 mesg.head = doc.key
193
194 if(distance < 0) //???
195
196 Object.keys(node.peers).forEach(function(peer){
197 if(distance > 0 && !(mesg.publicKey === peer)){
198 remote = peer.dexbot.netcast(mesg)
199 pull(local, remote, local)
200 log.on('add', function(data){
201 console.log(data.toString())
202 })
203 }
204 })
205
206 })
207
208 return local
209 },
210 'stderr' : function( replicate){
211 var type = replicate ? 'replicate' : 'changes'
212 var stream = str2ps.duplex(errLogDB[type]({live:true}), errLog())
213 return stream
214 },
215 'swarmLog' : function(publicKey){
216 var log = swarmlog({
217 keys: publicKey,
218 sodium: require('chloride/browser'),
219 db: bot.db.sublevel('swarm:' + publicKey),
220 valueEncoding: 'json',
221 hubs: [ 'https://signalhub.mafintosh.com' ]
222 })
223 var stream = str2ps(log)
224 return stream
225 },
226 'getLog' : function(name){
227 var log = logs[name] || hyperlog(bot.db.sublevel(name))
228 var stream = str2ps.source(log.createReadStream({live: true}))
229 //log.on('add', function(d){console.log(d)})
230 return stream
231 },
232 'assimilate': function(id, peer){
233 var log = dex.dexbot.createLog(id)
234 console.log(dex.peers)
235 dex.peers[peer]
236 return log
237 },
238 'createLog': function(name){ // name will usually be a public key
239 var log = logs[name] || hyperlog(bot.db.sublevel(name))
240 logs[name] = log // put these in a hyperkv store, with updates to status: live (connected), last_known_whatabouts (previous replication)
241
242 log.on('end', function(){
243 logs[name] = null
244 })
245 var stream = str2ps.duplex(log.replicate({live:true, mode:'sync'}), function(err){
246 //console.log('remote error or completion?', err)
247 })
248 return stream
249 }
250 }
251 Object.assign(rpc, core)
252 return rpc
253}
254

Built with git-ssb-web