git ssb

1+

Daan Patchwork / patchwork



Tree: 58ab0241031aa549a35cce1e678c27065ae66221

Files: 58ab0241031aa549a35cce1e678c27065ae66221 / lib / depject / sbot.js

7780 bytesRaw
1const pull = require('pull-stream')
2const defer = require('pull-defer')
3const { Value, onceTrue, watch, Set: MutantSet } = require('mutant')
4const ref = require('ssb-ref')
5const Reconnect = require('pull-reconnect')
6const createClient = require('ssb-client')
7const nest = require('depnest')
8const ssbKeys = require('ssb-keys')
9const flat = require('flat')
10const extend = require('xtend')
11const pullResume = require('../pull-resume')
12
13exports.needs = nest({
14 'config.sync.load': 'first',
15 'keys.sync.load': 'first',
16 'sbot.obs.connectionStatus': 'first',
17 'sbot.hook.publish': 'map',
18 'progress.obs.indexes': 'first'
19})
20
21exports.gives = {
22 sbot: {
23 sync: {
24 cache: true
25 },
26 async: {
27 get: true,
28 publish: true,
29 addBlob: true,
30 connConnect: true,
31 connRememberConnect: true,
32 friendsGet: true,
33 acceptDHT: true,
34 createDHT: true
35 },
36 pull: {
37 log: true,
38 userFeed: true,
39 messagesByType: true,
40 feed: true,
41 links: true,
42 backlinks: true,
43 stream: true,
44 resumeStream: true
45 },
46 obs: {
47 connectionStatus: true,
48 connection: true,
49 connectedPeers: true,
50 localPeers: true,
51 stagedPeers: true
52 }
53 }
54}
55
56exports.create = function (api) {
57 const config = api.config.sync.load()
58 const keys = api.keys.sync.load()
59 const cache = {}
60
61 let sbot = null
62 const connection = Value()
63 const connectionStatus = Value()
64 const connectedPeers = MutantSet()
65 const localPeers = MutantSet()
66 const stagedPeers = MutantSet()
67
68 const rec = Reconnect(function (isConn) {
69 function notify (value) {
70 isConn(value); connectionStatus.set(value)
71 }
72
73 const opts = {
74 path: config.path,
75 remote: config.remote,
76 host: config.host,
77 port: config.port,
78 key: config.key,
79 appKey: config.caps.shs,
80 timers: config.timers,
81 caps: config.caps,
82 friends: config.friends
83 }
84
85 createClient(keys, opts, function (err, _sbot) {
86 if (err) {
87 return notify(err)
88 }
89
90 sbot = _sbot
91 sbot.on('closed', function () {
92 sbot = null
93 connection.set(null)
94 notify(new Error('closed'))
95 })
96
97 connection.set(sbot)
98 notify()
99 })
100 })
101
102 watch(connection, (sbot) => {
103 if (sbot) {
104 pull(
105 sbot.conn.peers(),
106 pull.drain(entries => {
107 var peers = entries.filter(([, x]) => !!x.key).map(([address, data]) => ({ address, data }))
108 localPeers.set(peers.filter(peer => peer.data.type === 'lan'))
109 connectedPeers.set(peers.filter(peer => peer.data.state === 'connected'))
110 })
111 )
112
113 pull(
114 sbot.conn.stagedPeers(),
115 pull.drain(entries => {
116 var peers = entries.filter(([, x]) => !!x.key).map(([address, data]) => ({ address, data }))
117 stagedPeers.set(peers)
118 })
119 )
120 }
121 })
122
123 return {
124 sbot: {
125 sync: {
126 cache: () => cache
127 },
128 async: {
129 get: rec.async(function (key, cb) {
130 if (typeof cb !== 'function') {
131 throw new Error('cb must be function')
132 }
133 if (cache[key]) cb(null, cache[key])
134 else {
135 const options = typeof key === 'string'
136 ? { private: true, id: key }
137 : key
138
139 sbot.get(options, function (err, value) {
140 if (err) return cb(err)
141 runHooks({ key, value })
142 cb(null, value)
143 })
144 }
145 }),
146 publish: rec.async((content, cb) => {
147 const indexes = api.progress.obs.indexes()
148 const progress = indexes()
149 const pending = progress.target - progress.current || 0
150
151 if (pending) {
152 const err = new Error('Cowardly refusing to publish your message while database is still indexing. Please try again once indexing is finished.')
153
154 if (typeof cb === 'function') {
155 return cb(err)
156 } else {
157 console.error(err.toString())
158 return
159 }
160 }
161
162 if (content.recps) {
163 content = ssbKeys.box(content, content.recps.map(e => {
164 return ref.isFeed(e) ? e : e.link
165 }))
166 } else {
167 const flatContent = flat(content)
168 Object.keys(flatContent).forEach(key => {
169 const val = flatContent[key]
170 if (ref.isBlob(val)) {
171 sbot.blobs.push(val, err => {
172 if (err) console.error(err)
173 })
174 }
175 })
176 }
177
178 if (sbot) {
179 // instant updating of interface (just incase sbot is busy)
180 runHooks({
181 publishing: true,
182 timestamp: Date.now(),
183 value: {
184 timestamp: Date.now(),
185 author: keys.id,
186 content
187 }
188 })
189 }
190
191 sbot.publish(content, (err, msg) => {
192 if (err) console.error(err)
193 else if (!cb) console.log(msg)
194 cb && cb(err, msg)
195 })
196 }),
197 addBlob: rec.async((stream, cb) => {
198 return pull(
199 stream,
200 sbot.blobs.add(cb)
201 )
202 }),
203 connConnect: rec.async(function (address, data, cb) {
204 sbot.conn.connect(address, data, cb)
205 }),
206 connRememberConnect: rec.async(function (address, data, cb) {
207 sbot.conn.remember(address, { autoconnect: true, ...data }, (err) => {
208 if (err) cb(err)
209 else sbot.conn.connect(address, data, cb)
210 })
211 }),
212 friendsGet: rec.async(function (opts, cb) {
213 sbot.friends.get(opts, cb)
214 }),
215 acceptDHT: rec.async(function (opts, cb) {
216 sbot.dhtInvite.accept(opts, cb)
217 }),
218 createDHT: rec.async(function (cb) {
219 sbot.dhtInvite.create(cb)
220 })
221 },
222 pull: {
223 backlinks: rec.source(query => {
224 return sbot.backlinks.read(query)
225 }),
226 userFeed: rec.source(opts => {
227 return sbot.createUserStream(opts)
228 }),
229 messagesByType: rec.source(opts => {
230 return sbot.messagesByType(opts)
231 }),
232 feed: rec.source(function (opts) {
233 return pull(
234 sbot.createFeedStream(opts),
235 pull.through(runHooks)
236 )
237 }),
238 log: rec.source(opts => {
239 return pull(
240 sbot.createLogStream(opts),
241 pull.through(runHooks)
242 )
243 }),
244 links: rec.source(function (query) {
245 return sbot.links(query)
246 }),
247 stream: function (fn) {
248 const stream = defer.source()
249 onceTrue(connection, function (connection) {
250 stream.resolve(fn(connection))
251 })
252 return stream
253 },
254 resumeStream: function (fn, baseOpts) {
255 return function (opts) {
256 const stream = defer.source()
257 onceTrue(connection, function (connection) {
258 stream.resolve(pullResume.remote((opts) => {
259 return fn(connection, opts)
260 }, extend(baseOpts, opts)))
261 })
262 return stream
263 }
264 }
265 },
266 obs: {
267 connectionStatus: (listener) => connectionStatus(listener),
268 connection,
269 connectedPeers: () => connectedPeers,
270 localPeers: () => localPeers,
271 stagedPeers: () => stagedPeers
272 }
273 }
274 }
275
276 // scoped
277
278 function runHooks (msg) {
279 if (msg.publishing) {
280 api.sbot.hook.publish(msg)
281 } else if (!cache[msg.key]) {
282 // cache[msg.key] = msg.value
283 // api.sbot.hook.feed(msg)
284 }
285 }
286}
287

Built with git-ssb-web