git ssb

7+

dinoworm ๐Ÿ› / patchcore



Tree: 5968fb325860b8675c105c04324b66c5858d36e5

Files: 5968fb325860b8675c105c04324b66c5858d36e5 / sbot.js

6996 bytesRaw
1var pull = require('pull-stream')
2var defer = require('pull-defer')
3var { Value, onceTrue, watch, Set: MutantSet } = require('mutant')
4var ref = require('ssb-ref')
5var Reconnect = require('pull-reconnect')
6var createClient = require('ssb-client')
7var createFeed = require('ssb-feed')
8var nest = require('depnest')
9var ssbKeys = require('ssb-keys')
10
11exports.needs = nest({
12 'config.sync.load': 'first',
13 'keys.sync.load': 'first',
14 'sbot.obs.connectionStatus': 'first',
15 'sbot.hook.publish': 'map'
16})
17
18exports.gives = {
19 sbot: {
20 sync: {
21 cache: true
22 },
23 async: {
24 get: true,
25 publish: true,
26 addBlob: true,
27 gossipConnect: true
28 },
29 pull: {
30 log: true,
31 userFeed: true,
32 messagesByType: true,
33 feed: true,
34 links: true,
35 backlinks: true,
36 stream: true
37 },
38 obs: {
39 connectionStatus: true,
40 connection: true,
41 connectedPeers: true,
42 localPeers: true
43 }
44 }
45}
46
47exports.create = function (api) {
48 const config = api.config.sync.load()
49 const keys = api.keys.sync.load()
50 var cache = {}
51
52 var sbot = null
53 var connection = Value()
54 var connectionStatus = Value()
55 var connectedPeers = MutantSet()
56 var localPeers = MutantSet()
57
58 var rec = Reconnect(function (isConn) {
59 function notify (value) {
60 isConn(value); connectionStatus.set(value)
61 }
62
63 createClient(keys, config, function (err, _sbot) {
64 if (err) {
65 return notify(err)
66 }
67
68 sbot = _sbot
69 sbot.on('closed', function () {
70 sbot = null
71 connection.set(null)
72 notify(new Error('closed'))
73 })
74
75 connection.set(sbot)
76 notify()
77 })
78 })
79
80 var internal = {
81 getLatest: rec.async(function (id, cb) {
82 sbot.getLatest(id, cb)
83 }),
84 add: rec.async(function (msg, cb) {
85 sbot.add(msg, cb)
86 })
87 }
88
89 watch(connection, (sbot) => {
90 if (sbot) {
91 sbot.gossip.peers((err, peers) => {
92 if (err) return console.error(err)
93 connectedPeers.set(peers.filter(x => x.state === 'connected').map(x => x.key))
94 localPeers.set(peers.filter(x => x.source === 'local').map(x => x.key))
95 })
96 pull(
97 sbot.gossip.changes(),
98 pull.drain(data => {
99 if (data.peer) {
100 if (data.type === 'remove') {
101 connectedPeers.delete(data.peer.key)
102 localPeers.delete(data.peer.key)
103 } else {
104 if (data.peer.source === 'local') {
105 localPeers.add(data.peer.key)
106 }
107 if (data.peer.state === 'connected') {
108 connectedPeers.add(data.peer.key)
109 } else {
110 connectedPeers.delete(data.peer.key)
111 }
112 }
113 }
114 })
115 )
116 }
117 })
118
119 var feed = createFeed(internal, keys, {remote: true})
120
121 return {
122 sbot: {
123 sync: {
124 cache: () => cache
125 },
126 async: {
127 get: rec.async(function (key, cb) {
128 if (typeof cb !== 'function') {
129 throw new Error('cb must be function')
130 }
131 if (cache[key]) cb(null, cache[key])
132 else {
133 sbot.get(key, function (err, value) {
134 if (err) return cb(err)
135 runHooks({key, value})
136 cb(null, value)
137 })
138 }
139 }),
140 publish: rec.async((content, cb) => {
141 if (content.recps) {
142 content = ssbKeys.box(content, content.recps.map(e => {
143 return ref.isFeed(e) ? e : e.link
144 }))
145 } else if (content.mentions) {
146 content.mentions.forEach(mention => {
147 if (ref.isBlob(mention.link)) {
148 sbot.blobs.push(mention.link, err => {
149 if (err) console.error(err)
150 })
151 }
152 })
153 }
154
155 if (sbot) {
156 // instant updating of interface (just incase sbot is busy)
157 runHooks({
158 publishing: true,
159 timestamp: Date.now(),
160 value: {
161 timestamp: Date.now(),
162 author: keys.id,
163 content
164 }
165 })
166 }
167
168 feed.add(content, (err, msg) => {
169 if (err) console.error(err)
170 else if (!cb) console.log(msg)
171 cb && cb(err, msg)
172 })
173 }),
174 addBlob: rec.async((stream, cb) => {
175 return pull(
176 stream,
177 Hash(function (err, id) {
178 if (err) return cb(err)
179 // completely UGLY hack to tell when the blob has been sucessfully written...
180 var start = Date.now()
181 var n = 5
182 next()
183
184 function next () {
185 setTimeout(function () {
186 sbot.blobs.has(id, function (_, has) {
187 if (has) return cb(null, id)
188 if (n--) next()
189 else cb(new Error('write failed'))
190 })
191 }, Date.now() - start)
192 }
193 }),
194 sbot.blobs.add()
195 )
196 }),
197 gossipConnect: rec.async(function (opts, cb) {
198 sbot.gossip.connect(opts, cb)
199 })
200 },
201 pull: {
202 backlinks: rec.source(query => {
203 return sbot.backlinks.read(query)
204 }),
205 userFeed: rec.source(opts => {
206 return sbot.createUserStream(opts)
207 }),
208 messagesByType: rec.source(opts => {
209 return sbot.messagesByType(opts)
210 }),
211 feed: rec.source(function (opts) {
212 return pull(
213 sbot.createFeedStream(opts),
214 pull.through(runHooks)
215 )
216 }),
217 log: rec.source(opts => {
218 return pull(
219 sbot.createLogStream(opts),
220 pull.through(runHooks)
221 )
222 }),
223 links: rec.source(function (query) {
224 return sbot.links(query)
225 }),
226 stream: function (fn) {
227 var stream = defer.source()
228 onceTrue(connection, function (connection) {
229 stream.resolve(fn(connection))
230 })
231 return stream
232 }
233 },
234 obs: {
235 connectionStatus: (listener) => connectionStatus(listener),
236 connection,
237 connectedPeers: () => connectedPeers,
238 localPeers: () => localPeers
239 }
240 }
241 }
242
243 // scoped
244
245 function runHooks (msg) {
246 if (msg.publishing) {
247 api.sbot.hook.publish(msg)
248 } else if (!cache[msg.key]) {
249 // cache[msg.key] = msg.value
250 // api.sbot.hook.feed(msg)
251 }
252 }
253}
254
255function Hash (onHash) {
256 var buffers = []
257 return pull.through(function (data) {
258 buffers.push(typeof data === 'string'
259 ? new Buffer(data, 'utf8')
260 : data
261 )
262 }, function (err) {
263 if (err && !onHash) throw err
264 var b = buffers.length > 1 ? Buffer.concat(buffers) : buffers[0]
265 var h = '&' + ssbKeys.hash(b)
266 onHash && onHash(err, h)
267 })
268}
269

Built with git-ssb-web