git ssb

1+

Daan Patchwork / patchwork



Tree: 654bc8db06bf328fa22b234b99ce9bb907694ec0

Files: 654bc8db06bf328fa22b234b99ce9bb907694ec0 / lib / depject / sbot.js

7987 bytesRaw
1const pull = require('pull-stream')
2const defer = require('pull-defer')
3const { Value, onceTrue, watch, Set: MutantSet } = require('mutant')
4const ref = require('ssb-ref')
5const Reconnect = require('pull-reconnect')
6const createClient = require('ssb-client')
7const nest = require('depnest')
8const ssbKeys = require('ssb-keys')
9const flat = require('flat')
10const extend = require('xtend')
11const pullResume = require('../pull-resume')
12
13exports.needs = nest({
14 'config.sync.load': 'first',
15 'keys.sync.load': 'first',
16 'sbot.obs.connectionStatus': 'first',
17 'sbot.hook.publish': 'map',
18 'progress.obs.indexes': 'first'
19})
20
21exports.gives = {
22 sbot: {
23 sync: {
24 cache: true
25 },
26 async: {
27 get: true,
28 getLatest: true,
29 publish: true,
30 addBlob: true,
31 connConnect: true,
32 connRememberConnect: true,
33 friendsGet: true,
34 acceptDHT: true
35 },
36 pull: {
37 log: true,
38 userFeed: true,
39 messagesByType: true,
40 feed: true,
41 links: true,
42 backlinks: true,
43 stream: true,
44 resumeStream: true
45 },
46 obs: {
47 connectionStatus: true,
48 connection: true,
49 connectedPeers: true,
50 localPeers: true,
51 stagedPeers: true
52 }
53 }
54}
55
56exports.create = function (api) {
57 const config = api.config.sync.load()
58 const keys = api.keys.sync.load()
59 const cache = {}
60
61 let sbot = null
62 const connection = Value()
63 const connectionStatus = Value()
64 const connectedPeers = MutantSet()
65 const localPeers = MutantSet()
66 const stagedPeers = MutantSet()
67
68 const rec = Reconnect(function (isConn) {
69 function notify (value) {
70 isConn(value); connectionStatus.set(value)
71 }
72
73 const opts = {
74 path: config.path,
75 remote: config.remote,
76 host: config.host,
77 port: config.port,
78 key: config.key,
79 appKey: config.caps.shs,
80 timers: config.timers,
81 caps: config.caps,
82 friends: config.friends
83 }
84
85 createClient(keys, opts, function (err, _sbot) {
86 if (err) {
87 return notify(err)
88 }
89
90 sbot = _sbot
91 sbot.on('closed', function () {
92 sbot = null
93 connection.set(null)
94 notify(new Error('closed'))
95 })
96
97 connection.set(sbot)
98 notify()
99 })
100 })
101
102 watch(connection, (sbot) => {
103 if (sbot) {
104 pull(
105 sbot.conn.peers(),
106 pull.drain(entries => {
107 const peers = entries.filter(([, x]) => !!x.key).map(([address, data]) => ({ address, data }))
108 localPeers.set(peers.filter(peer => peer.data.type === 'lan'))
109 connectedPeers.set(peers.filter(peer => peer.data.state === 'connected'))
110 })
111 )
112
113 pull(
114 sbot.conn.stagedPeers(),
115 pull.drain(entries => {
116 const peers = entries.filter(([, x]) => !!x.key).map(([address, data]) => ({ address, data }))
117 stagedPeers.set(peers)
118 })
119 )
120 }
121 })
122
123 return {
124 sbot: {
125 sync: {
126 cache: () => cache
127 },
128 async: {
129 get: rec.async(function (key, cb) {
130 if (typeof cb !== 'function') {
131 throw new Error('cb must be function')
132 }
133 if (cache[key]) cb(null, cache[key])
134 else {
135 const options = typeof key === 'string'
136 ? { private: true, id: key }
137 : key
138
139 sbot.get(options, function (err, value) {
140 if (err) return cb(err)
141 runHooks({ key, value })
142 cb(null, value)
143 })
144 }
145 }),
146 getLatest: rec.async(function (id, cb) {
147 if (typeof cb !== 'function') {
148 throw new Error('cb must be function')
149 }
150 sbot.getLatest(id, function (err, value) {
151 if (err) return cb(err)
152 cb(null, value)
153 })
154 }),
155 publish: rec.async((content, cb) => {
156 const indexes = api.progress.obs.indexes()
157 const progress = indexes()
158 const pending = progress.target - progress.current || 0
159
160 if (pending) {
161 const err = new Error('Cowardly refusing to publish your message while database is still indexing. Please try again once indexing is finished.')
162
163 if (typeof cb === 'function') {
164 return cb(err)
165 } else {
166 console.error(err.toString())
167 return
168 }
169 }
170
171 if (content.recps) {
172 content = ssbKeys.box(content, content.recps.map(e => {
173 return ref.isFeed(e) ? e : e.link
174 }))
175 } else {
176 const flatContent = flat(content)
177 Object.keys(flatContent).forEach(key => {
178 const val = flatContent[key]
179 if (ref.isBlob(val)) {
180 sbot.blobs.push(val, err => {
181 if (err) console.error(err)
182 })
183 }
184 })
185 }
186
187 if (sbot) {
188 // instant updating of interface (just incase sbot is busy)
189 runHooks({
190 publishing: true,
191 timestamp: Date.now(),
192 value: {
193 timestamp: Date.now(),
194 author: keys.id,
195 content
196 }
197 })
198 }
199
200 sbot.publish(content, (err, msg) => {
201 if (err) console.error(err)
202 else if (!cb) console.log(msg)
203 cb && cb(err, msg)
204 })
205 }),
206 addBlob: rec.async((stream, cb) => {
207 return pull(
208 stream,
209 sbot.blobs.add(cb)
210 )
211 }),
212 connConnect: rec.async(function (address, data, cb) {
213 sbot.conn.connect(address, data, cb)
214 }),
215 connRememberConnect: rec.async(function (address, data, cb) {
216 sbot.conn.remember(address, { autoconnect: true, ...data }, (err) => {
217 if (err) cb(err)
218 else sbot.conn.connect(address, data, cb)
219 })
220 }),
221 friendsGet: rec.async(function (opts, cb) {
222 sbot.friends.get(opts, cb)
223 }),
224 acceptDHT: rec.async(function (opts, cb) {
225 sbot.dhtInvite.accept(opts, cb)
226 })
227 },
228 pull: {
229 backlinks: rec.source(query => {
230 return sbot.backlinks.read(query)
231 }),
232 userFeed: rec.source(opts => {
233 return sbot.createUserStream(opts)
234 }),
235 messagesByType: rec.source(opts => {
236 return sbot.messagesByType(opts)
237 }),
238 feed: rec.source(function (opts) {
239 return pull(
240 sbot.createFeedStream(opts),
241 pull.through(runHooks)
242 )
243 }),
244 log: rec.source(opts => {
245 return pull(
246 sbot.createLogStream(opts),
247 pull.through(runHooks)
248 )
249 }),
250 links: rec.source(function (query) {
251 return sbot.links(query)
252 }),
253 stream: function (fn) {
254 const stream = defer.source()
255 onceTrue(connection, function (connection) {
256 stream.resolve(fn(connection))
257 })
258 return stream
259 },
260 resumeStream: function (fn, baseOpts) {
261 return function (opts) {
262 const stream = defer.source()
263 onceTrue(connection, function (connection) {
264 stream.resolve(pullResume.remote((opts) => {
265 return fn(connection, opts)
266 }, extend(baseOpts, opts)))
267 })
268 return stream
269 }
270 }
271 },
272 obs: {
273 connectionStatus: (listener) => connectionStatus(listener),
274 connection,
275 connectedPeers: () => connectedPeers,
276 localPeers: () => localPeers,
277 stagedPeers: () => stagedPeers
278 }
279 }
280 }
281
282 // scoped
283
284 function runHooks (msg) {
285 if (msg.publishing) {
286 api.sbot.hook.publish(msg)
287 } else if (!cache[msg.key]) {
288 // cache[msg.key] = msg.value
289 // api.sbot.hook.feed(msg)
290 }
291 }
292}
293

Built with git-ssb-web