git ssb

0+

Dominic / ssb-ooo



Tree: 0ba44f19cf8a272c733041c8b3eaf1e9723db428

Files: 0ba44f19cf8a272c733041c8b3eaf1e9723db428 / index.js

2532 bytesRaw
1var pull = require('pull-stream')
2var GQ = require('gossip-query')
3var hash = require('ssb-keys/util').hash
4var ref = require('ssb-ref')
5
6function getId(msg) {
7 return '%'+hash(JSON.stringify(msg, null, 2))
8}
9
10function isObject (o) {
11 return o && 'object' === typeof o
12}
13
14var Store = require('./store')
15var log = console.error
16
17console.error = function (m) {
18 log(new Error('---------------').stack)
19 log(m)
20
21}
22
23exports.name = 'ooo'
24exports.version = '1.0.0'
25exports.manifest = {
26 stream: 'duplex',
27 get: 'async'
28}
29exports.permissions = {
30 anonymous: {allow: ['stream']}
31}
32
33var Flume = require('flumedb')
34var OffsetLog = require('flumelog-offset')
35var mkdirp = require('mkdirp')
36var ViewHashtable = require('flumeview-hashtable')
37
38exports.init = function (sbot, config) {
39 var id = sbot.id
40
41 store = Store(config)
42
43 var gq = GQ({
44 isQuery: ref.isMsg,
45 isRequest: function (n) {
46 return Number.isInteger(n) && n < 0
47 },
48 isResponse: function (o) {
49 return o && isObject(o)
50 },
51 check: function (key, cb) {
52 store.keys.get(key, function (err, data) {
53 if(data) cb(null, data.value)
54 else
55 sbot.get({id:key, raw: true}, function (err, msg) {
56 cb(null, msg)
57 })
58 })
59 },
60 process: function (id, msg, cb) {
61 if(id !== getId(msg))
62 cb()
63 else cb(null, msg)
64 }
65 })
66
67 function get (id, cb) {
68 gq.query(id, function (err, msg) {
69 if(err) return cb(err)
70 store.add(msg, function (err, data) {
71 data.ooo = true
72 cb(null, data)
73 })
74 })
75 }
76
77 sbot.get.hook(function (fn, args) {
78 var id = args[0]
79 var cb = args[1]
80 if(id.raw) fn(id.id, cb)
81 else
82 fn(id, function (err, value) {
83 if(!err) cb(null, value)
84 else get(id, function (err, data) {
85 if(err) cb(err)
86 else cb(null, data.value)
87 })
88 })
89 })
90
91 sbot.status.hook(function (fn, args) {
92 var status = fn()
93 status.ooo = {}
94 for(var id in gq.state)
95 status.ooo[id] = gq.state[id]
96 return status
97 })
98
99 sbot.progress.hook(function (fn, args) {
100 var prog = fn()
101 prog.ooo = gq.progress()
102 return prog
103 })
104
105
106 sbot.on('rpc:connect', function (rpc, isClient) {
107 if(isClient) {
108 var stream = gq.createStream(rpc.id)
109 pull(stream, rpc.ooo.stream(function () {}), stream)
110 }
111 })
112
113 return {
114 stream: function () {
115 //called by muxrpc, so remote id is set as this.id
116 return gq.createStream(this.id)
117 },
118 get: get
119 }
120}
121
122

Built with git-ssb-web