git ssb

0+

Dominic / ssb-ooo



Tree: 8ded95afd48d0f207eee727841728c1576ca584f

Files: 8ded95afd48d0f207eee727841728c1576ca584f / index.js

2468 bytesRaw
1var pull = require('pull-stream')
2var GQ = require('gossip-query')
3var hash = require('ssb-keys/util').hash
4var ref = require('ssb-ref')
5function getId(msg) {
6 return '%'+hash(JSON.stringify(msg, null, 2))
7}
8
9var Store = require('./store')
10var log = console.error
11
12console.error = function (m) {
13 log(new Error('---------------').stack)
14 log(m)
15
16}
17
18exports.name = 'ooo'
19exports.version = '1.0.0'
20exports.manifest = {
21 stream: 'duplex',
22 get: 'async'
23}
24exports.permissions = {
25 anonymous: {allow: ['stream']}
26}
27
28var Flume = require('flumedb')
29var OffsetLog = require('flumelog-offset')
30var mkdirp = require('mkdirp')
31var ViewHashtable = require('flumeview-hashtable')
32
33exports.init = function (sbot, config) {
34 var id = sbot.id
35
36 store = Store(config)
37
38 var gq = GQ({
39 isQuery: ref.isMsg,
40 isRequest: function (n) {
41 return Number.isInteger(n) && n < 0
42 },
43 isResponse: function (o) {
44 return o && isObject(o)
45 },
46 check: function (key, cb) {
47 store.keys.get(key, function (err, data) {
48 if(data) cb(null, data.value)
49 else
50 sbot.get({id:key, raw: true}, function (err, msg) {
51 cb(null, msg)
52 })
53 })
54 },
55 process: function (id, msg, cb) {
56 if(id !== getId(msg))
57 cb()
58 else cb(null, msg)
59 }
60 })
61
62 function get (id, cb) {
63 gq.query(id, function (err, msg) {
64 if(err) return cb(err)
65 store.add(msg, function (err, data) {
66 data.ooo = true
67 cb(null, data)
68 })
69 })
70 }
71
72 sbot.get.hook(function (fn, args) {
73 var id = args[0]
74 var cb = args[1]
75 if(id.raw) fn(id.id, cb)
76 else
77 fn(id, function (err, value) {
78 if(!err) cb(null, value)
79 else get(id, function (err, data) {
80 if(err) cb(err)
81 else cb(null, data.value)
82 })
83 })
84 })
85
86 sbot.status.hook(function (fn, args) {
87 var status = fn()
88 status.ooo = {}
89 for(var id in gq.state)
90 status.ooo[id] = gq.state[id]
91 return status
92 })
93
94 sbot.progress.hook(function (fn, args) {
95 var prog = fn()
96 prog.ooo = gq.progress()
97 return prog
98 })
99
100
101 sbot.on('rpc:connect', function (rpc, isClient) {
102 if(isClient) {
103 var stream = gq.createStream(rpc.id)
104 pull(stream, rpc.ooo.stream(function () {}), stream)
105 }
106 })
107
108 return {
109 stream: function () {
110 //called by muxrpc, so remote id is set as this.id
111 return gq.createStream(this.id)
112 },
113 get: get
114 }
115}
116
117

Built with git-ssb-web