git ssb

7+

dinoworm ๐Ÿ› / patchcore



Tree: aeae4fa406f651c0ba5716dd48e856acc3cad353

Files: aeae4fa406f651c0ba5716dd48e856acc3cad353 / sbot.js

5379 bytesRaw
1var pull = require('pull-stream')
2var ref = require('ssb-ref')
3var Reconnect = require('pull-reconnect')
4var createClient = require('ssb-client')
5var createFeed = require('ssb-feed')
6var nest = require('depnest')
7var Value = require('mutant/value')
8var ssbKeys = require('ssb-keys')
9
10exports.needs = nest({
11 'config.sync.load': 'first',
12 'keys.sync.load': 'first',
13 'sbot.obs.connectionStatus': 'first',
14 'sbot.hook.feed': 'map'
15})
16
17exports.gives = {
18 sbot: {
19 sync: {
20 cache: true
21 },
22 async: {
23 get: true,
24 publish: true,
25 addBlob: true
26 },
27 pull: {
28 log: true,
29 userFeed: true,
30 query: true,
31 feed: true,
32 links: true
33 },
34 obs: {
35 connectionStatus: true,
36 connectedPeers: true,
37 localPeers: true
38 }
39 }
40}
41
42exports.create = function (api) {
43 const config = api.config.sync.load()
44 const keys = api.keys.sync.load()
45 var cache = {}
46
47 var sbot = null
48 var connectionStatus = Value()
49 var connectedPeers = Value([])
50 var localPeers = Value([])
51
52 setInterval(refreshPeers, 5e3)
53
54 var rec = Reconnect(function (isConn) {
55 function notify (value) {
56 isConn(value); connectionStatus.set(value)
57 }
58
59 createClient(keys, {
60 manifest: require('./manifest.json'),
61 remote: config.remote,
62 caps: config.caps
63 }, function (err, _sbot) {
64 if (err) {
65 return notify(err)
66 }
67
68 sbot = _sbot
69 sbot.on('closed', function () {
70 sbot = null
71 notify(new Error('closed'))
72 })
73
74 notify()
75 })
76 })
77
78 var internal = {
79 getLatest: rec.async(function (id, cb) {
80 sbot.getLatest(id, cb)
81 }),
82 add: rec.async(function (msg, cb) {
83 sbot.add(msg, cb)
84 })
85 }
86
87 var feed = createFeed(internal, keys, {remote: true})
88
89 return {
90 sbot: {
91 sync: {
92 cache: () => cache
93 },
94 async: {
95 get: rec.async(function (key, cb) {
96 if (typeof cb !== 'function') {
97 throw new Error('cb must be function')
98 }
99 if (cache[key]) cb(null, cache[key])
100 else {
101 sbot.get(key, function (err, value) {
102 if (err) return cb(err)
103 runHooks({key, value})
104 cb(null, value)
105 })
106 }
107 }),
108 publish: rec.async((content, cb) => {
109 if (content.recps) {
110 content = ssbKeys.box(content, content.recps.map(e => {
111 return ref.isFeed(e) ? e : e.link
112 }))
113 } else if (content.mentions) {
114 content.mentions.forEach(mention => {
115 if (ref.isBlob(mention.link)) {
116 sbot.blobs.push(mention.link, err => {
117 if (err) console.error(err)
118 })
119 }
120 })
121 }
122
123 feed.add(content, (err, msg) => {
124 if (err) console.error(err)
125 else if (!cb) console.log(msg)
126 cb && cb(err, msg)
127 })
128 }),
129 addBlob: rec.async((stream, cb) => {
130 return pull(
131 stream,
132 Hash(function (err, id) {
133 if (err) return cb(err)
134 // completely UGLY hack to tell when the blob has been sucessfully written...
135 var start = Date.now()
136 var n = 5
137 next()
138
139 function next () {
140 setTimeout(function () {
141 sbot.blobs.has(id, function (_, has) {
142 if (has) return cb(null, id)
143 if (n--) next()
144 else cb(new Error('write failed'))
145 })
146 }, Date.now() - start)
147 }
148 }),
149 sbot.blobs.add()
150 )
151 })
152 },
153 pull: {
154 query: rec.source(query => {
155 return sbot.query.read(query)
156 }),
157 userFeed: rec.source(opts => {
158 return sbot.createUserStream(opts)
159 }),
160 feed: rec.source(function (opts) {
161 return pull(
162 sbot.createFeedStream(opts),
163 pull.through(runHooks)
164 )
165 }),
166 log: rec.source(opts => {
167 return pull(
168 sbot.createLogStream(opts),
169 pull.through(runHooks)
170 )
171 }),
172 links: rec.source(function (query) {
173 return sbot.links(query)
174 })
175 },
176 obs: {
177 connectionStatus: (listener) => connectionStatus(listener),
178 connectedPeers: () => connectedPeers,
179 localPeers: () => localPeers
180 }
181 }
182 }
183
184 // scoped
185
186 function runHooks (msg) {
187 if (!cache[msg.key]) {
188 cache[msg.key] = msg.value
189 api.sbot.hook.feed(msg)
190 }
191 }
192
193 function refreshPeers () {
194 if (sbot) {
195 sbot.gossip.peers((err, peers) => {
196 if (err) throw console.log(err)
197 connectedPeers.set(peers.filter(x => x.state === 'connected').map(x => x.key))
198 localPeers.set(peers.filter(x => x.source === 'local').map(x => x.key))
199 })
200 }
201 }
202}
203
204function Hash (onHash) {
205 var buffers = []
206 return pull.through(function (data) {
207 buffers.push(typeof data === 'string'
208 ? new Buffer(data, 'utf8')
209 : data
210 )
211 }, function (err) {
212 if (err && !onHash) throw err
213 var b = buffers.length > 1 ? Buffer.concat(buffers) : buffers[0]
214 var h = '&' + ssbKeys.hash(b)
215 onHash && onHash(err, h)
216 })
217}
218

Built with git-ssb-web