git ssb

1+

Daan Patchwork / patchwork



Tree: d3826fa5570d6a70af4cd9401fe6f9284ec0ef5c

Files: d3826fa5570d6a70af4cd9401fe6f9284ec0ef5c / lib / depject / sbot.js

7664 bytesRaw
1const pull = require('pull-stream')
2const defer = require('pull-defer')
3const { Value, onceTrue, watch, Set: MutantSet } = require('mutant')
4const ref = require('ssb-ref')
5const Reconnect = require('pull-reconnect')
6const createClient = require('ssb-client')
7const nest = require('depnest')
8const ssbKeys = require('ssb-keys')
9const flat = require('flat')
10const extend = require('xtend')
11const pullResume = require('../pull-resume')
12
13exports.needs = nest({
14 'config.sync.load': 'first',
15 'keys.sync.load': 'first',
16 'sbot.obs.connectionStatus': 'first',
17 'sbot.hook.publish': 'map',
18 'progress.obs.indexes': 'first'
19})
20
21exports.gives = {
22 sbot: {
23 sync: {
24 cache: true
25 },
26 async: {
27 get: true,
28 publish: true,
29 addBlob: true,
30 connConnect: true,
31 connRememberConnect: true,
32 friendsGet: true,
33 acceptDHT: true
34 },
35 pull: {
36 log: true,
37 userFeed: true,
38 messagesByType: true,
39 feed: true,
40 links: true,
41 backlinks: true,
42 stream: true,
43 resumeStream: true
44 },
45 obs: {
46 connectionStatus: true,
47 connection: true,
48 connectedPeers: true,
49 localPeers: true,
50 stagedPeers: true
51 }
52 }
53}
54
55exports.create = function (api) {
56 const config = api.config.sync.load()
57 const keys = api.keys.sync.load()
58 const cache = {}
59
60 let sbot = null
61 const connection = Value()
62 const connectionStatus = Value()
63 const connectedPeers = MutantSet()
64 const localPeers = MutantSet()
65 const stagedPeers = MutantSet()
66
67 const rec = Reconnect(function (isConn) {
68 function notify (value) {
69 isConn(value); connectionStatus.set(value)
70 }
71
72 const opts = {
73 path: config.path,
74 remote: config.remote,
75 host: config.host,
76 port: config.port,
77 key: config.key,
78 appKey: config.caps.shs,
79 timers: config.timers,
80 caps: config.caps,
81 friends: config.friends
82 }
83
84 createClient(keys, opts, function (err, _sbot) {
85 if (err) {
86 return notify(err)
87 }
88
89 sbot = _sbot
90 sbot.on('closed', function () {
91 sbot = null
92 connection.set(null)
93 notify(new Error('closed'))
94 })
95
96 connection.set(sbot)
97 notify()
98 })
99 })
100
101 watch(connection, (sbot) => {
102 if (sbot) {
103 pull(
104 sbot.conn.peers(),
105 pull.drain(entries => {
106 var peers = entries.filter(([, x]) => !!x.key).map(([address, data]) => ({ address, data }))
107 localPeers.set(peers.filter(peer => peer.data.type === 'lan'))
108 connectedPeers.set(peers.filter(peer => peer.data.state === 'connected'))
109 })
110 )
111
112 pull(
113 sbot.conn.stagedPeers(),
114 pull.drain(entries => {
115 var peers = entries.filter(([, x]) => !!x.key).map(([address, data]) => ({ address, data }))
116 stagedPeers.set(peers)
117 })
118 )
119 }
120 })
121
122 return {
123 sbot: {
124 sync: {
125 cache: () => cache
126 },
127 async: {
128 get: rec.async(function (key, cb) {
129 if (typeof cb !== 'function') {
130 throw new Error('cb must be function')
131 }
132 if (cache[key]) cb(null, cache[key])
133 else {
134 const options = typeof key === 'string'
135 ? { private: true, id: key }
136 : key
137
138 sbot.get(options, function (err, value) {
139 if (err) return cb(err)
140 runHooks({ key, value })
141 cb(null, value)
142 })
143 }
144 }),
145 publish: rec.async((content, cb) => {
146 const indexes = api.progress.obs.indexes()
147 const progress = indexes()
148 const pending = progress.target - progress.current || 0
149
150 if (pending) {
151 const err = new Error('Cowardly refusing to publish your message while database is still indexing. Please try again once indexing is finished.')
152
153 if (typeof cb === 'function') {
154 return cb(err)
155 } else {
156 console.error(err.toString())
157 return
158 }
159 }
160
161 if (content.recps) {
162 content = ssbKeys.box(content, content.recps.map(e => {
163 return ref.isFeed(e) ? e : e.link
164 }))
165 } else {
166 const flatContent = flat(content)
167 Object.keys(flatContent).forEach(key => {
168 const val = flatContent[key]
169 if (ref.isBlob(val)) {
170 sbot.blobs.push(val, err => {
171 if (err) console.error(err)
172 })
173 }
174 })
175 }
176
177 if (sbot) {
178 // instant updating of interface (just incase sbot is busy)
179 runHooks({
180 publishing: true,
181 timestamp: Date.now(),
182 value: {
183 timestamp: Date.now(),
184 author: keys.id,
185 content
186 }
187 })
188 }
189
190 sbot.publish(content, (err, msg) => {
191 if (err) console.error(err)
192 else if (!cb) console.log(msg)
193 cb && cb(err, msg)
194 })
195 }),
196 addBlob: rec.async((stream, cb) => {
197 return pull(
198 stream,
199 sbot.blobs.add(cb)
200 )
201 }),
202 connConnect: rec.async(function (address, data, cb) {
203 sbot.conn.connect(address, data, cb)
204 }),
205 connRememberConnect: rec.async(function (address, data, cb) {
206 sbot.conn.remember(address, { autoconnect: true, ...data }, (err) => {
207 if (err) cb(err)
208 else sbot.conn.connect(address, data, cb)
209 })
210 }),
211 friendsGet: rec.async(function (opts, cb) {
212 sbot.friends.get(opts, cb)
213 }),
214 acceptDHT: rec.async(function (opts, cb) {
215 sbot.dhtInvite.accept(opts, cb)
216 })
217 },
218 pull: {
219 backlinks: rec.source(query => {
220 return sbot.backlinks.read(query)
221 }),
222 userFeed: rec.source(opts => {
223 return sbot.createUserStream(opts)
224 }),
225 messagesByType: rec.source(opts => {
226 return sbot.messagesByType(opts)
227 }),
228 feed: rec.source(function (opts) {
229 return pull(
230 sbot.createFeedStream(opts),
231 pull.through(runHooks)
232 )
233 }),
234 log: rec.source(opts => {
235 return pull(
236 sbot.createLogStream(opts),
237 pull.through(runHooks)
238 )
239 }),
240 links: rec.source(function (query) {
241 return sbot.links(query)
242 }),
243 stream: function (fn) {
244 const stream = defer.source()
245 onceTrue(connection, function (connection) {
246 stream.resolve(fn(connection))
247 })
248 return stream
249 },
250 resumeStream: function (fn, baseOpts) {
251 return function (opts) {
252 const stream = defer.source()
253 onceTrue(connection, function (connection) {
254 stream.resolve(pullResume.remote((opts) => {
255 return fn(connection, opts)
256 }, extend(baseOpts, opts)))
257 })
258 return stream
259 }
260 }
261 },
262 obs: {
263 connectionStatus: (listener) => connectionStatus(listener),
264 connection,
265 connectedPeers: () => connectedPeers,
266 localPeers: () => localPeers,
267 stagedPeers: () => stagedPeers
268 }
269 }
270 }
271
272 // scoped
273
274 function runHooks (msg) {
275 if (msg.publishing) {
276 api.sbot.hook.publish(msg)
277 } else if (!cache[msg.key]) {
278 // cache[msg.key] = msg.value
279 // api.sbot.hook.feed(msg)
280 }
281 }
282}
283

Built with git-ssb-web