Files: 12e5d3c2c7b5bcd8cdbf500ac017d10708c07907 / index.js
2468 bytesRaw
1 | var pull = require('pull-stream') |
2 | var GQ = require('gossip-query') |
3 | var hash = require('ssb-keys/util').hash |
4 | var ref = require('ssb-ref') |
5 | function getId(msg) { |
6 | return '%'+hash(JSON.stringify(msg, null, 2)) |
7 | } |
8 | |
9 | var Store = require('./store') |
10 | var log = console.error |
11 | |
12 | console.error = function (m) { |
13 | log(new Error('---------------').stack) |
14 | log(m) |
15 | |
16 | } |
17 | |
18 | exports.name = 'ooo' |
19 | exports.version = '1.0.0' |
20 | exports.manifest = { |
21 | stream: 'duplex', |
22 | get: 'async' |
23 | } |
24 | exports.permissions = { |
25 | anonymous: {allow: ['stream']} |
26 | } |
27 | |
28 | var Flume = require('flumedb') |
29 | var OffsetLog = require('flumelog-offset') |
30 | var mkdirp = require('mkdirp') |
31 | var ViewHashtable = require('flumeview-hashtable') |
32 | |
33 | exports.init = function (sbot, config) { |
34 | var id = sbot.id |
35 | |
36 | store = Store(config) |
37 | |
38 | var gq = GQ({ |
39 | isQuery: ref.isMsg, |
40 | isRequest: function (n) { |
41 | return Number.isInteger(n) && n < 0 |
42 | }, |
43 | isResponse: function (o) { |
44 | return o && isObject(o) |
45 | }, |
46 | check: function (key, cb) { |
47 | store.keys.get(key, function (err, data) { |
48 | if(data) cb(null, data.value) |
49 | else |
50 | sbot.get({id:key, raw: true}, function (err, msg) { |
51 | cb(null, msg) |
52 | }) |
53 | }) |
54 | }, |
55 | process: function (id, msg, cb) { |
56 | if(id !== getId(msg)) |
57 | cb() |
58 | else cb(null, msg) |
59 | } |
60 | }) |
61 | |
62 | function get (id, cb) { |
63 | gq.query(id, function (err, msg) { |
64 | if(err) return cb(err) |
65 | store.add(msg, function (err, data) { |
66 | data.ooo = true |
67 | cb(null, data) |
68 | }) |
69 | }) |
70 | } |
71 | |
72 | sbot.get.hook(function (fn, args) { |
73 | var id = args[0] |
74 | var cb = args[1] |
75 | if(id.raw) fn(id.id, cb) |
76 | else |
77 | fn(id, function (err, value) { |
78 | if(!err) cb(null, value) |
79 | else get(id, function (err, data) { |
80 | if(err) cb(err) |
81 | else cb(null, data.value) |
82 | }) |
83 | }) |
84 | }) |
85 | |
86 | sbot.status.hook(function (fn, args) { |
87 | var status = fn() |
88 | status.ooo = {} |
89 | for(var id in gq.state) |
90 | status.ooo[id] = gq.state[id] |
91 | return status |
92 | }) |
93 | |
94 | sbot.progress.hook(function (fn, args) { |
95 | var prog = fn() |
96 | prog.ooo = gq.progress() |
97 | return prog |
98 | }) |
99 | |
100 | |
101 | sbot.on('rpc:connect', function (rpc, isClient) { |
102 | if(isClient) { |
103 | var stream = gq.createStream(rpc.id) |
104 | pull(stream, rpc.ooo.stream(function () {}), stream) |
105 | } |
106 | }) |
107 | |
108 | return { |
109 | stream: function () { |
110 | //called by muxrpc, so remote id is set as this.id |
111 | return gq.createStream(this.id) |
112 | }, |
113 | get: get |
114 | } |
115 | } |
116 | |
117 |
Built with git-ssb-web