git ssb

7+

dinoworm ๐Ÿ› / patchcore



Tree: f52bd8f6f70ddb6dea78a0474573f08de92b6e85

Files: f52bd8f6f70ddb6dea78a0474573f08de92b6e85 / sbot.js

5634 bytesRaw
1var pull = require('pull-stream')
2var ref = require('ssb-ref')
3var Reconnect = require('pull-reconnect')
4var createClient = require('ssb-client')
5var createFeed = require('ssb-feed')
6var nest = require('depnest')
7var Value = require('mutant/value')
8var ssbKeys = require('ssb-keys')
9
10exports.needs = nest({
11 'config.sync.load': 'first',
12 'keys.sync.load': 'first',
13 'sbot.obs.connectionStatus': 'first',
14 'sbot.hook.feed': 'map'
15})
16
17exports.gives = {
18 sbot: {
19 sync: {
20 cache: true
21 },
22 async: {
23 get: true,
24 publish: true,
25 addBlob: true,
26 gossipConnect: true
27 },
28 pull: {
29 log: true,
30 userFeed: true,
31 query: true,
32 feed: true,
33 links: true,
34 search: true
35 },
36 obs: {
37 connectionStatus: true,
38 connectedPeers: true,
39 localPeers: true
40 }
41 }
42}
43
44exports.create = function (api) {
45 const config = api.config.sync.load()
46 const keys = api.keys.sync.load()
47 var cache = {}
48
49 var sbot = null
50 var connectionStatus = Value()
51 var connectedPeers = Value([])
52 var localPeers = Value([])
53
54 setInterval(refreshPeers, 5e3)
55
56 var rec = Reconnect(function (isConn) {
57 function notify (value) {
58 isConn(value); connectionStatus.set(value)
59 }
60
61 createClient(keys, {
62 manifest: require('./manifest.json'),
63 remote: config.remote,
64 caps: config.caps
65 }, function (err, _sbot) {
66 if (err) {
67 return notify(err)
68 }
69
70 sbot = _sbot
71 sbot.on('closed', function () {
72 sbot = null
73 notify(new Error('closed'))
74 })
75
76 notify()
77 })
78 })
79
80 var internal = {
81 getLatest: rec.async(function (id, cb) {
82 sbot.getLatest(id, cb)
83 }),
84 add: rec.async(function (msg, cb) {
85 sbot.add(msg, cb)
86 })
87 }
88
89 var feed = createFeed(internal, keys, {remote: true})
90
91 return {
92 sbot: {
93 sync: {
94 cache: () => cache
95 },
96 async: {
97 get: rec.async(function (key, cb) {
98 if (typeof cb !== 'function') {
99 throw new Error('cb must be function')
100 }
101 if (cache[key]) cb(null, cache[key])
102 else {
103 sbot.get(key, function (err, value) {
104 if (err) return cb(err)
105 runHooks({key, value})
106 cb(null, value)
107 })
108 }
109 }),
110 publish: rec.async((content, cb) => {
111 if (content.recps) {
112 content = ssbKeys.box(content, content.recps.map(e => {
113 return ref.isFeed(e) ? e : e.link
114 }))
115 } else if (content.mentions) {
116 content.mentions.forEach(mention => {
117 if (ref.isBlob(mention.link)) {
118 sbot.blobs.push(mention.link, err => {
119 if (err) console.error(err)
120 })
121 }
122 })
123 }
124
125 feed.add(content, (err, msg) => {
126 if (err) console.error(err)
127 else if (!cb) console.log(msg)
128 cb && cb(err, msg)
129 })
130 }),
131 addBlob: rec.async((stream, cb) => {
132 return pull(
133 stream,
134 Hash(function (err, id) {
135 if (err) return cb(err)
136 // completely UGLY hack to tell when the blob has been sucessfully written...
137 var start = Date.now()
138 var n = 5
139 next()
140
141 function next () {
142 setTimeout(function () {
143 sbot.blobs.has(id, function (_, has) {
144 if (has) return cb(null, id)
145 if (n--) next()
146 else cb(new Error('write failed'))
147 })
148 }, Date.now() - start)
149 }
150 }),
151 sbot.blobs.add()
152 )
153 }),
154 gossipConnect: rec.async(function (opts, cb) {
155 sbot.gossip.connect(opts, cb)
156 })
157 },
158 pull: {
159 query: rec.source(query => {
160 return sbot.query.read(query)
161 }),
162 userFeed: rec.source(opts => {
163 return sbot.createUserStream(opts)
164 }),
165 feed: rec.source(function (opts) {
166 return pull(
167 sbot.createFeedStream(opts),
168 pull.through(runHooks)
169 )
170 }),
171 log: rec.source(opts => {
172 return pull(
173 sbot.createLogStream(opts),
174 pull.through(runHooks)
175 )
176 }),
177 links: rec.source(function (query) {
178 return sbot.links(query)
179 }),
180 search: rec.source(function (opts) {
181 return sbot.fulltext.search(opts)
182 })
183 },
184 obs: {
185 connectionStatus: (listener) => connectionStatus(listener),
186 connectedPeers: () => connectedPeers,
187 localPeers: () => localPeers
188 }
189 }
190 }
191
192 // scoped
193
194 function runHooks (msg) {
195 if (!cache[msg.key]) {
196 cache[msg.key] = msg.value
197 api.sbot.hook.feed(msg)
198 }
199 }
200
201 function refreshPeers () {
202 if (sbot) {
203 sbot.gossip.peers((err, peers) => {
204 if (err) throw console.log(err)
205 connectedPeers.set(peers.filter(x => x.state === 'connected').map(x => x.key))
206 localPeers.set(peers.filter(x => x.source === 'local').map(x => x.key))
207 })
208 }
209 }
210}
211
212function Hash (onHash) {
213 var buffers = []
214 return pull.through(function (data) {
215 buffers.push(typeof data === 'string'
216 ? new Buffer(data, 'utf8')
217 : data
218 )
219 }, function (err) {
220 if (err && !onHash) throw err
221 var b = buffers.length > 1 ? Buffer.concat(buffers) : buffers[0]
222 var h = '&' + ssbKeys.hash(b)
223 onHash && onHash(err, h)
224 })
225}
226

Built with git-ssb-web