git ssb

7+

dinoworm ๐Ÿ› / patchcore



Tree: dcf3c5bf7f8b9d2639c0311e45bd8b037256762c

Files: dcf3c5bf7f8b9d2639c0311e45bd8b037256762c / sbot.js

6238 bytesRaw
1var pull = require('pull-stream')
2var ref = require('ssb-ref')
3var Reconnect = require('pull-reconnect')
4var createClient = require('ssb-client')
5var createFeed = require('ssb-feed')
6var nest = require('depnest')
7var Value = require('mutant/value')
8var ssbKeys = require('ssb-keys')
9
10exports.needs = nest({
11 'config.sync.load': 'first',
12 'keys.sync.load': 'first',
13 'sbot.obs.connectionStatus': 'first',
14 'sbot.hook.feed': 'map'
15})
16
17exports.gives = {
18 sbot: {
19 sync: {
20 cache: true
21 },
22 async: {
23 get: true,
24 publish: true,
25 addBlob: true,
26 gossipConnect: true
27 },
28 pull: {
29 log: true,
30 userFeed: true,
31 query: true,
32 feed: true,
33 links: true,
34 search: true,
35 replicateProgress: true,
36 },
37 obs: {
38 connectionStatus: true,
39 connection: true,
40 connectedPeers: true,
41 localPeers: true
42 }
43 }
44}
45
46exports.create = function (api) {
47 const config = api.config.sync.load()
48 const keys = api.keys.sync.load()
49 var cache = {}
50
51 var sbot = null
52 var connection = Value()
53 var connectionStatus = Value()
54 var connectedPeers = Value([])
55 var localPeers = Value([])
56
57 setInterval(refreshPeers, 1e3)
58
59 var rec = Reconnect(function (isConn) {
60 function notify (value) {
61 isConn(value); connectionStatus.set(value)
62 }
63
64 createClient(keys, config, function (err, _sbot) {
65 if (err) {
66 return notify(err)
67 }
68
69 sbot = _sbot
70 sbot.on('closed', function () {
71 sbot = null
72 connection.set(null)
73 notify(new Error('closed'))
74 })
75
76 connection.set(sbot)
77 notify()
78 refreshPeers()
79 })
80 })
81
82 var internal = {
83 getLatest: rec.async(function (id, cb) {
84 sbot.getLatest(id, cb)
85 }),
86 add: rec.async(function (msg, cb) {
87 sbot.add(msg, cb)
88 })
89 }
90
91 var feed = createFeed(internal, keys, {remote: true})
92
93 return {
94 sbot: {
95 sync: {
96 cache: () => cache
97 },
98 async: {
99 get: rec.async(function (key, cb) {
100 if (typeof cb !== 'function') {
101 throw new Error('cb must be function')
102 }
103 if (cache[key]) cb(null, cache[key])
104 else {
105 sbot.get(key, function (err, value) {
106 if (err) return cb(err)
107 runHooks({key, value})
108 cb(null, value)
109 })
110 }
111 }),
112 publish: rec.async((content, cb) => {
113 if (content.recps) {
114 content = ssbKeys.box(content, content.recps.map(e => {
115 return ref.isFeed(e) ? e : e.link
116 }))
117 } else if (content.mentions) {
118 content.mentions.forEach(mention => {
119 if (ref.isBlob(mention.link)) {
120 sbot.blobs.push(mention.link, err => {
121 if (err) console.error(err)
122 })
123 }
124 })
125 }
126
127 if (sbot) {
128 // instant updating of interface (just incase sbot is busy)
129 runHooks({
130 publishing: true,
131 timestamp: Date.now(),
132 value: {
133 timestamp: Date.now(),
134 author: keys.id,
135 content
136 }
137 })
138 }
139
140 feed.add(content, (err, msg) => {
141 if (err) console.error(err)
142 else if (!cb) console.log(msg)
143 cb && cb(err, msg)
144 })
145 }),
146 addBlob: rec.async((stream, cb) => {
147 return pull(
148 stream,
149 Hash(function (err, id) {
150 if (err) return cb(err)
151 // completely UGLY hack to tell when the blob has been sucessfully written...
152 var start = Date.now()
153 var n = 5
154 next()
155
156 function next () {
157 setTimeout(function () {
158 sbot.blobs.has(id, function (_, has) {
159 if (has) return cb(null, id)
160 if (n--) next()
161 else cb(new Error('write failed'))
162 })
163 }, Date.now() - start)
164 }
165 }),
166 sbot.blobs.add()
167 )
168 }),
169 gossipConnect: rec.async(function (opts, cb) {
170 sbot.gossip.connect(opts, cb)
171 })
172 },
173 pull: {
174 query: rec.source(query => {
175 return sbot.query.read(query)
176 }),
177 userFeed: rec.source(opts => {
178 return sbot.createUserStream(opts)
179 }),
180 feed: rec.source(function (opts) {
181 return pull(
182 sbot.createFeedStream(opts),
183 pull.through(runHooks)
184 )
185 }),
186 log: rec.source(opts => {
187 return pull(
188 sbot.createLogStream(opts),
189 pull.through(runHooks)
190 )
191 }),
192 links: rec.source(function (query) {
193 return sbot.links(query)
194 }),
195 search: rec.source(function (opts) {
196 return sbot.fulltext.search(opts)
197 }),
198 replicateProgress: rec.source(function (opts) {
199 return sbot.replicate.changes()
200 })
201 },
202 obs: {
203 connectionStatus: (listener) => connectionStatus(listener),
204 connection,
205 connectedPeers: () => connectedPeers,
206 localPeers: () => localPeers
207 }
208 }
209 }
210
211 // scoped
212
213 function runHooks (msg) {
214 if (msg.publishing) {
215 api.sbot.hook.feed(msg)
216 } else if (!cache[msg.key]) {
217 cache[msg.key] = msg.value
218 api.sbot.hook.feed(msg)
219 }
220 }
221
222 function refreshPeers () {
223 if (sbot) {
224 sbot.gossip.peers((err, peers) => {
225 if (err) throw console.log(err)
226 connectedPeers.set(peers.filter(x => x.state === 'connected').map(x => x.key))
227 localPeers.set(peers.filter(x => x.source === 'local').map(x => x.key))
228 })
229 }
230 }
231}
232
233function Hash (onHash) {
234 var buffers = []
235 return pull.through(function (data) {
236 buffers.push(typeof data === 'string'
237 ? new Buffer(data, 'utf8')
238 : data
239 )
240 }, function (err) {
241 if (err && !onHash) throw err
242 var b = buffers.length > 1 ? Buffer.concat(buffers) : buffers[0]
243 var h = '&' + ssbKeys.hash(b)
244 onHash && onHash(err, h)
245 })
246}
247

Built with git-ssb-web