Files: 0ba44f19cf8a272c733041c8b3eaf1e9723db428 / index.js
2532 bytesRaw
1 | var pull = require('pull-stream') |
2 | var GQ = require('gossip-query') |
3 | var hash = require('ssb-keys/util').hash |
4 | var ref = require('ssb-ref') |
5 | |
6 | function getId(msg) { |
7 | return '%'+hash(JSON.stringify(msg, null, 2)) |
8 | } |
9 | |
10 | function isObject (o) { |
11 | return o && 'object' === typeof o |
12 | } |
13 | |
14 | var Store = require('./store') |
15 | var log = console.error |
16 | |
17 | console.error = function (m) { |
18 | log(new Error('---------------').stack) |
19 | log(m) |
20 | |
21 | } |
22 | |
23 | exports.name = 'ooo' |
24 | exports.version = '1.0.0' |
25 | exports.manifest = { |
26 | stream: 'duplex', |
27 | get: 'async' |
28 | } |
29 | exports.permissions = { |
30 | anonymous: {allow: ['stream']} |
31 | } |
32 | |
33 | var Flume = require('flumedb') |
34 | var OffsetLog = require('flumelog-offset') |
35 | var mkdirp = require('mkdirp') |
36 | var ViewHashtable = require('flumeview-hashtable') |
37 | |
38 | exports.init = function (sbot, config) { |
39 | var id = sbot.id |
40 | |
41 | store = Store(config) |
42 | |
43 | var gq = GQ({ |
44 | isQuery: ref.isMsg, |
45 | isRequest: function (n) { |
46 | return Number.isInteger(n) && n < 0 |
47 | }, |
48 | isResponse: function (o) { |
49 | return o && isObject(o) |
50 | }, |
51 | check: function (key, cb) { |
52 | store.keys.get(key, function (err, data) { |
53 | if(data) cb(null, data.value) |
54 | else |
55 | sbot.get({id:key, raw: true}, function (err, msg) { |
56 | cb(null, msg) |
57 | }) |
58 | }) |
59 | }, |
60 | process: function (id, msg, cb) { |
61 | if(id !== getId(msg)) |
62 | cb() |
63 | else cb(null, msg) |
64 | } |
65 | }) |
66 | |
67 | function get (id, cb) { |
68 | gq.query(id, function (err, msg) { |
69 | if(err) return cb(err) |
70 | store.add(msg, function (err, data) { |
71 | data.ooo = true |
72 | cb(null, data) |
73 | }) |
74 | }) |
75 | } |
76 | |
77 | sbot.get.hook(function (fn, args) { |
78 | var id = args[0] |
79 | var cb = args[1] |
80 | if(id.raw) fn(id.id, cb) |
81 | else |
82 | fn(id, function (err, value) { |
83 | if(!err) cb(null, value) |
84 | else get(id, function (err, data) { |
85 | if(err) cb(err) |
86 | else cb(null, data.value) |
87 | }) |
88 | }) |
89 | }) |
90 | |
91 | sbot.status.hook(function (fn, args) { |
92 | var status = fn() |
93 | status.ooo = {} |
94 | for(var id in gq.state) |
95 | status.ooo[id] = gq.state[id] |
96 | return status |
97 | }) |
98 | |
99 | sbot.progress.hook(function (fn, args) { |
100 | var prog = fn() |
101 | prog.ooo = gq.progress() |
102 | return prog |
103 | }) |
104 | |
105 | |
106 | sbot.on('rpc:connect', function (rpc, isClient) { |
107 | if(isClient) { |
108 | var stream = gq.createStream(rpc.id) |
109 | pull(stream, rpc.ooo.stream(function () {}), stream) |
110 | } |
111 | }) |
112 | |
113 | return { |
114 | stream: function () { |
115 | //called by muxrpc, so remote id is set as this.id |
116 | return gq.createStream(this.id) |
117 | }, |
118 | get: get |
119 | } |
120 | } |
121 | |
122 |
Built with git-ssb-web