git ssb

7+

dinoworm ๐Ÿ› / patchcore



Tree: a35242aa5cf18d28921e8c879cab704c7680f8d0

Files: a35242aa5cf18d28921e8c879cab704c7680f8d0 / sbot.js

6500 bytesRaw
1var pull = require('pull-stream')
2var ref = require('ssb-ref')
3var Reconnect = require('pull-reconnect')
4var createClient = require('ssb-client')
5var createFeed = require('ssb-feed')
6var nest = require('depnest')
7var Value = require('mutant/value')
8var ssbKeys = require('ssb-keys')
9
10exports.needs = nest({
11 'config.sync.load': 'first',
12 'keys.sync.load': 'first',
13 'sbot.obs.connectionStatus': 'first',
14 'sbot.hook.publish': 'map'
15})
16
17exports.gives = {
18 sbot: {
19 sync: {
20 cache: true
21 },
22 async: {
23 get: true,
24 publish: true,
25 addBlob: true,
26 gossipConnect: true
27 },
28 pull: {
29 log: true,
30 userFeed: true,
31 messagesByType: true,
32 query: true,
33 feed: true,
34 links: true,
35 search: true,
36 replicateProgress: true,
37 backlinks: true
38 },
39 obs: {
40 connectionStatus: true,
41 connection: true,
42 connectedPeers: true,
43 localPeers: true
44 }
45 }
46}
47
48exports.create = function (api) {
49 const config = api.config.sync.load()
50 const keys = api.keys.sync.load()
51 var cache = {}
52
53 var sbot = null
54 var connection = Value()
55 var connectionStatus = Value()
56 var connectedPeers = Value([])
57 var localPeers = Value([])
58
59 setInterval(refreshPeers, 1e3)
60
61 var rec = Reconnect(function (isConn) {
62 function notify (value) {
63 isConn(value); connectionStatus.set(value)
64 }
65
66 createClient(keys, config, function (err, _sbot) {
67 if (err) {
68 return notify(err)
69 }
70
71 sbot = _sbot
72 sbot.on('closed', function () {
73 sbot = null
74 connection.set(null)
75 notify(new Error('closed'))
76 })
77
78 connection.set(sbot)
79 notify()
80 refreshPeers()
81 })
82 })
83
84 var internal = {
85 getLatest: rec.async(function (id, cb) {
86 sbot.getLatest(id, cb)
87 }),
88 add: rec.async(function (msg, cb) {
89 sbot.add(msg, cb)
90 })
91 }
92
93 var feed = createFeed(internal, keys, {remote: true})
94
95 return {
96 sbot: {
97 sync: {
98 cache: () => cache
99 },
100 async: {
101 get: rec.async(function (key, cb) {
102 if (typeof cb !== 'function') {
103 throw new Error('cb must be function')
104 }
105 if (cache[key]) cb(null, cache[key])
106 else {
107 sbot.get(key, function (err, value) {
108 if (err) return cb(err)
109 runHooks({key, value})
110 cb(null, value)
111 })
112 }
113 }),
114 publish: rec.async((content, cb) => {
115 if (content.recps) {
116 content = ssbKeys.box(content, content.recps.map(e => {
117 return ref.isFeed(e) ? e : e.link
118 }))
119 } else if (content.mentions) {
120 content.mentions.forEach(mention => {
121 if (ref.isBlob(mention.link)) {
122 sbot.blobs.push(mention.link, err => {
123 if (err) console.error(err)
124 })
125 }
126 })
127 }
128
129 if (sbot) {
130 // instant updating of interface (just incase sbot is busy)
131 runHooks({
132 publishing: true,
133 timestamp: Date.now(),
134 value: {
135 timestamp: Date.now(),
136 author: keys.id,
137 content
138 }
139 })
140 }
141
142 feed.add(content, (err, msg) => {
143 if (err) console.error(err)
144 else if (!cb) console.log(msg)
145 cb && cb(err, msg)
146 })
147 }),
148 addBlob: rec.async((stream, cb) => {
149 return pull(
150 stream,
151 Hash(function (err, id) {
152 if (err) return cb(err)
153 // completely UGLY hack to tell when the blob has been sucessfully written...
154 var start = Date.now()
155 var n = 5
156 next()
157
158 function next () {
159 setTimeout(function () {
160 sbot.blobs.has(id, function (_, has) {
161 if (has) return cb(null, id)
162 if (n--) next()
163 else cb(new Error('write failed'))
164 })
165 }, Date.now() - start)
166 }
167 }),
168 sbot.blobs.add()
169 )
170 }),
171 gossipConnect: rec.async(function (opts, cb) {
172 sbot.gossip.connect(opts, cb)
173 })
174 },
175 pull: {
176 query: rec.source(query => {
177 return sbot.query.read(query)
178 }),
179 backlinks: rec.source(query => {
180 return sbot.backlinks.read(query)
181 }),
182 userFeed: rec.source(opts => {
183 return sbot.createUserStream(opts)
184 }),
185 messagesByType: rec.source(opts => {
186 return sbot.messagesByType(opts)
187 }),
188 feed: rec.source(function (opts) {
189 return pull(
190 sbot.createFeedStream(opts),
191 pull.through(runHooks)
192 )
193 }),
194 log: rec.source(opts => {
195 return pull(
196 sbot.createLogStream(opts),
197 pull.through(runHooks)
198 )
199 }),
200 links: rec.source(function (query) {
201 return sbot.links(query)
202 }),
203 search: rec.source(function (opts) {
204 return sbot.fulltext.search(opts)
205 }),
206 replicateProgress: rec.source(function (opts) {
207 return sbot.replicate.changes()
208 })
209 },
210 obs: {
211 connectionStatus: (listener) => connectionStatus(listener),
212 connection,
213 connectedPeers: () => connectedPeers,
214 localPeers: () => localPeers
215 }
216 }
217 }
218
219 // scoped
220
221 function runHooks (msg) {
222 if (msg.publishing) {
223 api.sbot.hook.publish(msg)
224 } else if (!cache[msg.key]) {
225 // cache[msg.key] = msg.value
226 // api.sbot.hook.feed(msg)
227 }
228 }
229
230 function refreshPeers () {
231 if (sbot) {
232 sbot.gossip.peers((err, peers) => {
233 if (err) return console.error(err)
234 connectedPeers.set(peers.filter(x => x.state === 'connected').map(x => x.key))
235 localPeers.set(peers.filter(x => x.source === 'local').map(x => x.key))
236 })
237 }
238 }
239}
240
241function Hash (onHash) {
242 var buffers = []
243 return pull.through(function (data) {
244 buffers.push(typeof data === 'string'
245 ? new Buffer(data, 'utf8')
246 : data
247 )
248 }, function (err) {
249 if (err && !onHash) throw err
250 var b = buffers.length > 1 ? Buffer.concat(buffers) : buffers[0]
251 var h = '&' + ssbKeys.hash(b)
252 onHash && onHash(err, h)
253 })
254}
255

Built with git-ssb-web