var path = require('path') var url = require('url') var pull = require('pull-stream') var fs = require('fs') var memo = require('asyncmemo') var toPull = require('stream-to-pull-stream') function redirect(res, dest) { res.writeHead(302, { Location: dest }) res.end() } function asyncSource(fn) { var next return function (end, cb) { if (next) return next(end, cb) if (end) return cb(end) fn(function (err, _next) { if (err) return cb(err) next = _next if (typeof next !== 'function') return cb(new TypeError('bad function')) next(null, cb) }) } } function asyncDuplex(fn) { var next var waiting = 3 var sourceCb, sinkRead, fnErr, fnStream fn(function (err, stream) { fnErr = err fnStream = stream if (!--waiting) next() }) function onError(err) { sourceCb(err) sinkRead(err, function (err) { if (err && err !== true) console.trace(err) }) } function next() { if (fnErr) return onError(fnErr) sourceCb(null, pull(sinkRead, fnStream)) } return { source: asyncSource(function (cb) { sourceCb = cb if (!--waiting) next() }), sink: function (read) { sinkRead = read if (!--waiting) next() } } } function ifFunction(fn) { return typeof fn === 'function' ? fn : null } function getWantBlob(blobs, id, cb) { blobs.size(id, function (err, size) { if (size == null) blobs.want(id, gotBlobWant) else gotBlobWant(null, true) }) function gotBlobWant(err, has) { if (err) return cb(err) if (!has) return cb(new Error('missing blob')) pull(blobs.get(id), pull.collect(function (err, bufs) { if (err) return cb(err) cb(null, Buffer.concat(bufs)) })) } } exports.name = 'exec.js' exports.version = '1.0.0' exports.manifest = { callAsync: 'async', callDuplex: 'duplex', callSource: 'source', callSink: 'sink', clearCache: 'async' } exports.init = function (sbot, config) { console.log('[exec.js] init') var conf = config.exec || {} var prefix = '/' + (conf.prefix || 'exec') var dir = path.join(config.path, conf.dir || 'exec') var blobsDir = path.join(config.path, 'blobs', 'sha256') var mtimes = {} function getBlobModule(id, cb) { var hexId = new Buffer(id.substr(1, 44), 'base64').toString('hex') var filename = path.join(hexId.substr(0, 2), hexId.substr(2)) sbot.blobs.size(id, function (err, size) { if (size == null) sbot.blobs.want(id, gotBlobWant) else gotBlobWant(null, true) function gotBlobWant(err, has) { if (err) return cb(err) if (!has) return cb(new Error('missing blob')) var module try { module = require(filename) } catch(e) { return cb(e) } cb(null, module) } }) } function getFileModule(name, cb) { var filename = path.join(dir, name || 'index.js') var self = this fs.stat(filename, function (err, stat) { if (err) return cb(err) var prevMtime = mtimes[filename] var mtime = stat.mtime.getTime() if (mtime !== prevMtime) { if (prevMtime) delete require.cache[filename] mtimes[filename] = mtime } var module try { module = require(filename) } catch(e) { return cb(e) } cb(null, module) }) } function getModule(id, cb) { if (typeof id !== 'string') return cb(new TypeError('id should be string')) if (id[0] === '&') getBlobModule(id, cb) else getFileModule(id, cb) } if (!sbot.ws || !sbot.ws.use) { console.error('[exec.js] missing sbot.ws.use') } else sbot.ws.use(function (req, res, next) { req.uri = url.parse(req.url, true) if (req.uri.pathname === prefix) { return redirect(res, path.basename(prefix) + '/') } if (req.uri.pathname.substr(0, prefix.length + 1) !== prefix + '/') { return next && next() } var id = req.uri.pathname.substr(prefix.length + 1) if (id[0] === '&') { // canonicalize with URL-encoding so that relative URLs // work (since blob ids may contain "/" ) var dest = prefix + '/' + encodeURIComponent(id) + (req.uri.search || '') return redirect(res, dest) } try { id = decodeURIComponent(id) } catch (e) { res.writeHead(400, {'Content-Type': 'text/plain'}) return res.end('Bad id "' + id + '"') } getPlugin(id, function (err, plugin) { try { if (err) throw err if (!plugin) return next() var opts = { method: 'GET', headers: req.headers, uri: req.uri } if (req.method === 'GET' && typeof plugin.get === 'function') { var source = plugin.get(opts) if (!source) return res.writeHead(201), res.end() pull( source, toPull.sink(res) ) } else if (req.method === 'POST' && typeof plugin.post === 'function') { var through = plugin.post(opts) if (!through) return req.end(), res.writeHead(201), res.end() pull( toPull.source(req), through, toPull.sink(res) ) } else if (typeof plugin.request === 'function') { var stream = plugin.request(opts) if (!stream) return req.end(), res.writeHead(201), res.end() pull( toPull.source(req), stream, toPull.sink(res) ) } else { res.writeHead(405, {'Content-Type': 'text/plain'}) return res.end('Method not allowed') } } catch(err) { res.writeHead(500, {'Content-Type': 'text/plain'}) res.end(err.stack || err) } }) }) function callAt(api, method, args) { var self, fn = api var parts = method.split('.') for (var i = 0; i < parts.length; i++) { self = fn fn = fn && fn[parts[i]] } if (!fn) throw new Error('Missing method') return fn.apply(self, args) } return { callAsync: function (id, method/*, args..., cb*/) { var args = [].slice.call(arguments, 2) var cb = ifFunction(args[args.length-1]) || console.trace getPlugin(id, function (err, api) { if (err) return cb(err) try { callAt(api, method, args) } catch(e) { cb(e) } }) }, callDuplex: function (id, method/*, args...*/) { var args = [].slice.call(arguments, 2) return asyncDuplex(function (cb) { getPlugin(id, function (err, api) { if (err) return cb(err) cb(null, callAt(api, method, args)) }) }) }, callSource: function (id, method/*, args...*/) { var args = [].slice.call(arguments, 2) return asyncSource(function (cb) { getPlugin(id, function (err, api) { if (err) return cb(err) cb(null, callAt(api, method, args)) }) }) }, callSink: function (id, method/*, args...*/) { var args = [].slice.call(arguments, 2) function onEnd(err) { if (err) console.trace(id, method, err) } return function (read) { getPlugin(id, function (err, api) { if (err) return read(err, onEnd) callAt(api, method, args)(read) }) } }, clearCache: function (cb) { getScript.cache.clear() getBlobModule.cache.clear() getPluginCached.cache.clear() cb(null) } } }