git ssb

7+

dinoworm ๐Ÿ› / patchcore



Tree: c2865b1db3cc50c32ef351f6f877be6cafcf0381

Files: c2865b1db3cc50c32ef351f6f877be6cafcf0381 / sbot.js

6130 bytesRaw
1var pull = require('pull-stream')
2var ref = require('ssb-ref')
3var Reconnect = require('pull-reconnect')
4var createClient = require('ssb-client')
5var createFeed = require('ssb-feed')
6var nest = require('depnest')
7var Value = require('mutant/value')
8var ssbKeys = require('ssb-keys')
9
10exports.needs = nest({
11 'config.sync.load': 'first',
12 'keys.sync.load': 'first',
13 'sbot.obs.connectionStatus': 'first',
14 'sbot.hook.publish': 'map'
15})
16
17exports.gives = {
18 sbot: {
19 sync: {
20 cache: true
21 },
22 async: {
23 get: true,
24 publish: true,
25 addBlob: true,
26 gossipConnect: true
27 },
28 pull: {
29 log: true,
30 userFeed: true,
31 messagesByType: true,
32 feed: true,
33 links: true,
34 backlinks: true
35 },
36 obs: {
37 connectionStatus: true,
38 connection: true,
39 connectedPeers: true,
40 localPeers: true
41 }
42 }
43}
44
45exports.create = function (api) {
46 const config = api.config.sync.load()
47 const keys = api.keys.sync.load()
48 var cache = {}
49
50 var sbot = null
51 var connection = Value()
52 var connectionStatus = Value()
53 var connectedPeers = Value([])
54 var localPeers = Value([])
55
56 setInterval(refreshPeers, 1e3)
57
58 var rec = Reconnect(function (isConn) {
59 function notify (value) {
60 isConn(value); connectionStatus.set(value)
61 }
62
63 createClient(keys, config, function (err, _sbot) {
64 if (err) {
65 return notify(err)
66 }
67
68 sbot = _sbot
69 sbot.on('closed', function () {
70 sbot = null
71 connection.set(null)
72 notify(new Error('closed'))
73 })
74
75 connection.set(sbot)
76 notify()
77 refreshPeers()
78 })
79 })
80
81 var internal = {
82 getLatest: rec.async(function (id, cb) {
83 sbot.getLatest(id, cb)
84 }),
85 add: rec.async(function (msg, cb) {
86 sbot.add(msg, cb)
87 })
88 }
89
90 var feed = createFeed(internal, keys, {remote: true})
91
92 return {
93 sbot: {
94 sync: {
95 cache: () => cache
96 },
97 async: {
98 get: rec.async(function (key, cb) {
99 if (typeof cb !== 'function') {
100 throw new Error('cb must be function')
101 }
102 if (cache[key]) cb(null, cache[key])
103 else {
104 sbot.get(key, function (err, value) {
105 if (err) return cb(err)
106 runHooks({key, value})
107 cb(null, value)
108 })
109 }
110 }),
111 publish: rec.async((content, cb) => {
112 if (content.recps) {
113 content = ssbKeys.box(content, content.recps.map(e => {
114 return ref.isFeed(e) ? e : e.link
115 }))
116 } else if (content.mentions) {
117 content.mentions.forEach(mention => {
118 if (ref.isBlob(mention.link)) {
119 sbot.blobs.push(mention.link, err => {
120 if (err) console.error(err)
121 })
122 }
123 })
124 }
125
126 if (sbot) {
127 // instant updating of interface (just incase sbot is busy)
128 runHooks({
129 publishing: true,
130 timestamp: Date.now(),
131 value: {
132 timestamp: Date.now(),
133 author: keys.id,
134 content
135 }
136 })
137 }
138
139 feed.add(content, (err, msg) => {
140 if (err) console.error(err)
141 else if (!cb) console.log(msg)
142 cb && cb(err, msg)
143 })
144 }),
145 addBlob: rec.async((stream, cb) => {
146 return pull(
147 stream,
148 Hash(function (err, id) {
149 if (err) return cb(err)
150 // completely UGLY hack to tell when the blob has been sucessfully written...
151 var start = Date.now()
152 var n = 5
153 next()
154
155 function next () {
156 setTimeout(function () {
157 sbot.blobs.has(id, function (_, has) {
158 if (has) return cb(null, id)
159 if (n--) next()
160 else cb(new Error('write failed'))
161 })
162 }, Date.now() - start)
163 }
164 }),
165 sbot.blobs.add()
166 )
167 }),
168 gossipConnect: rec.async(function (opts, cb) {
169 sbot.gossip.connect(opts, cb)
170 })
171 },
172 pull: {
173 backlinks: rec.source(query => {
174 return sbot.backlinks.read(query)
175 }),
176 userFeed: rec.source(opts => {
177 return sbot.createUserStream(opts)
178 }),
179 messagesByType: rec.source(opts => {
180 return sbot.messagesByType(opts)
181 }),
182 feed: rec.source(function (opts) {
183 return pull(
184 sbot.createFeedStream(opts),
185 pull.through(runHooks)
186 )
187 }),
188 log: rec.source(opts => {
189 return pull(
190 sbot.createLogStream(opts),
191 pull.through(runHooks)
192 )
193 }),
194 links: rec.source(function (query) {
195 return sbot.links(query)
196 })
197 },
198 obs: {
199 connectionStatus: (listener) => connectionStatus(listener),
200 connection,
201 connectedPeers: () => connectedPeers,
202 localPeers: () => localPeers
203 }
204 }
205 }
206
207 // scoped
208
209 function runHooks (msg) {
210 if (msg.publishing) {
211 api.sbot.hook.publish(msg)
212 } else if (!cache[msg.key]) {
213 // cache[msg.key] = msg.value
214 // api.sbot.hook.feed(msg)
215 }
216 }
217
218 function refreshPeers () {
219 if (sbot) {
220 sbot.gossip.peers((err, peers) => {
221 if (err) return console.error(err)
222 connectedPeers.set(peers.filter(x => x.state === 'connected').map(x => x.key))
223 localPeers.set(peers.filter(x => x.source === 'local').map(x => x.key))
224 })
225 }
226 }
227}
228
229function Hash (onHash) {
230 var buffers = []
231 return pull.through(function (data) {
232 buffers.push(typeof data === 'string'
233 ? new Buffer(data, 'utf8')
234 : data
235 )
236 }, function (err) {
237 if (err && !onHash) throw err
238 var b = buffers.length > 1 ? Buffer.concat(buffers) : buffers[0]
239 var h = '&' + ssbKeys.hash(b)
240 onHash && onHash(err, h)
241 })
242}
243

Built with git-ssb-web