git ssb

7+

dinoworm ๐Ÿ› / patchcore



Tree: 49133faa00025b793d47d7d54e7dcfd6691073ad

Files: 49133faa00025b793d47d7d54e7dcfd6691073ad / sbot.js

6366 bytesRaw
1var pull = require('pull-stream')
2var ref = require('ssb-ref')
3var Reconnect = require('pull-reconnect')
4var createClient = require('ssb-client')
5var createFeed = require('ssb-feed')
6var nest = require('depnest')
7var Value = require('mutant/value')
8var ssbKeys = require('ssb-keys')
9
10exports.needs = nest({
11 'config.sync.load': 'first',
12 'keys.sync.load': 'first',
13 'sbot.obs.connectionStatus': 'first',
14 'sbot.hook.feed': 'map'
15})
16
17exports.gives = {
18 sbot: {
19 sync: {
20 cache: true
21 },
22 async: {
23 get: true,
24 publish: true,
25 addBlob: true,
26 gossipConnect: true
27 },
28 pull: {
29 log: true,
30 userFeed: true,
31 messagesByType: true,
32 query: true,
33 feed: true,
34 links: true,
35 search: true,
36 replicateProgress: true,
37 },
38 obs: {
39 connectionStatus: true,
40 connection: true,
41 connectedPeers: true,
42 localPeers: true
43 }
44 }
45}
46
47exports.create = function (api) {
48 const config = api.config.sync.load()
49 const keys = api.keys.sync.load()
50 var cache = {}
51
52 var sbot = null
53 var connection = Value()
54 var connectionStatus = Value()
55 var connectedPeers = Value([])
56 var localPeers = Value([])
57
58 setInterval(refreshPeers, 1e3)
59
60 var rec = Reconnect(function (isConn) {
61 function notify (value) {
62 isConn(value); connectionStatus.set(value)
63 }
64
65 createClient(keys, config, function (err, _sbot) {
66 if (err) {
67 return notify(err)
68 }
69
70 sbot = _sbot
71 sbot.on('closed', function () {
72 sbot = null
73 connection.set(null)
74 notify(new Error('closed'))
75 })
76
77 connection.set(sbot)
78 notify()
79 refreshPeers()
80 })
81 })
82
83 var internal = {
84 getLatest: rec.async(function (id, cb) {
85 sbot.getLatest(id, cb)
86 }),
87 add: rec.async(function (msg, cb) {
88 sbot.add(msg, cb)
89 })
90 }
91
92 var feed = createFeed(internal, keys, {remote: true})
93
94 return {
95 sbot: {
96 sync: {
97 cache: () => cache
98 },
99 async: {
100 get: rec.async(function (key, cb) {
101 if (typeof cb !== 'function') {
102 throw new Error('cb must be function')
103 }
104 if (cache[key]) cb(null, cache[key])
105 else {
106 sbot.get(key, function (err, value) {
107 if (err) return cb(err)
108 runHooks({key, value})
109 cb(null, value)
110 })
111 }
112 }),
113 publish: rec.async((content, cb) => {
114 if (content.recps) {
115 content = ssbKeys.box(content, content.recps.map(e => {
116 return ref.isFeed(e) ? e : e.link
117 }))
118 } else if (content.mentions) {
119 content.mentions.forEach(mention => {
120 if (ref.isBlob(mention.link)) {
121 sbot.blobs.push(mention.link, err => {
122 if (err) console.error(err)
123 })
124 }
125 })
126 }
127
128 if (sbot) {
129 // instant updating of interface (just incase sbot is busy)
130 runHooks({
131 publishing: true,
132 timestamp: Date.now(),
133 value: {
134 timestamp: Date.now(),
135 author: keys.id,
136 content
137 }
138 })
139 }
140
141 feed.add(content, (err, msg) => {
142 if (err) console.error(err)
143 else if (!cb) console.log(msg)
144 cb && cb(err, msg)
145 })
146 }),
147 addBlob: rec.async((stream, cb) => {
148 return pull(
149 stream,
150 Hash(function (err, id) {
151 if (err) return cb(err)
152 // completely UGLY hack to tell when the blob has been sucessfully written...
153 var start = Date.now()
154 var n = 5
155 next()
156
157 function next () {
158 setTimeout(function () {
159 sbot.blobs.has(id, function (_, has) {
160 if (has) return cb(null, id)
161 if (n--) next()
162 else cb(new Error('write failed'))
163 })
164 }, Date.now() - start)
165 }
166 }),
167 sbot.blobs.add()
168 )
169 }),
170 gossipConnect: rec.async(function (opts, cb) {
171 sbot.gossip.connect(opts, cb)
172 })
173 },
174 pull: {
175 query: rec.source(query => {
176 return sbot.query.read(query)
177 }),
178 userFeed: rec.source(opts => {
179 return sbot.createUserStream(opts)
180 }),
181 messagesByType: rec.source(opts => {
182 return sbot.messagesByType(opts)
183 }),
184 feed: rec.source(function (opts) {
185 return pull(
186 sbot.createFeedStream(opts),
187 pull.through(runHooks)
188 )
189 }),
190 log: rec.source(opts => {
191 return pull(
192 sbot.createLogStream(opts),
193 pull.through(runHooks)
194 )
195 }),
196 links: rec.source(function (query) {
197 return sbot.links(query)
198 }),
199 search: rec.source(function (opts) {
200 return sbot.fulltext.search(opts)
201 }),
202 replicateProgress: rec.source(function (opts) {
203 return sbot.replicate.changes()
204 })
205 },
206 obs: {
207 connectionStatus: (listener) => connectionStatus(listener),
208 connection,
209 connectedPeers: () => connectedPeers,
210 localPeers: () => localPeers
211 }
212 }
213 }
214
215 // scoped
216
217 function runHooks (msg) {
218 if (msg.publishing) {
219 api.sbot.hook.feed(msg)
220 } else if (!cache[msg.key]) {
221 cache[msg.key] = msg.value
222 api.sbot.hook.feed(msg)
223 }
224 }
225
226 function refreshPeers () {
227 if (sbot) {
228 sbot.gossip.peers((err, peers) => {
229 if (err) throw console.log(err)
230 connectedPeers.set(peers.filter(x => x.state === 'connected').map(x => x.key))
231 localPeers.set(peers.filter(x => x.source === 'local').map(x => x.key))
232 })
233 }
234 }
235}
236
237function Hash (onHash) {
238 var buffers = []
239 return pull.through(function (data) {
240 buffers.push(typeof data === 'string'
241 ? new Buffer(data, 'utf8')
242 : data
243 )
244 }, function (err) {
245 if (err && !onHash) throw err
246 var b = buffers.length > 1 ? Buffer.concat(buffers) : buffers[0]
247 var h = '&' + ssbKeys.hash(b)
248 onHash && onHash(err, h)
249 })
250}
251

Built with git-ssb-web