git ssb

7+

dinoworm ๐Ÿ› / patchcore



Tree: d47c6e10a9fe8fb04aee3e2736ee04a7f2896003

Files: d47c6e10a9fe8fb04aee3e2736ee04a7f2896003 / sbot.js

6905 bytesRaw
1var pull = require('pull-stream')
2var defer = require('pull-defer')
3var { Value, onceTrue, watch, Set: MutantSet } = require('mutant')
4var ref = require('ssb-ref')
5var Reconnect = require('pull-reconnect')
6var createClient = require('ssb-client')
7var createFeed = require('ssb-feed')
8var nest = require('depnest')
9var ssbKeys = require('ssb-keys')
10var flat = require('flat')
11
12exports.needs = nest({
13 'config.sync.load': 'first',
14 'keys.sync.load': 'first',
15 'sbot.obs.connectionStatus': 'first',
16 'sbot.hook.publish': 'map'
17})
18
19exports.gives = {
20 sbot: {
21 sync: {
22 cache: true
23 },
24 async: {
25 get: true,
26 publish: true,
27 addBlob: true,
28 gossipConnect: true,
29 friendsGet: true
30 },
31 pull: {
32 log: true,
33 userFeed: true,
34 messagesByType: true,
35 feed: true,
36 links: true,
37 backlinks: true,
38 stream: true,
39 getBlob: true
40 },
41 obs: {
42 connectionStatus: true,
43 connection: true,
44 connectedPeers: true,
45 localPeers: true
46 }
47 }
48}
49
50exports.create = function (api) {
51 const config = api.config.sync.load()
52 const keys = api.keys.sync.load()
53 var cache = {}
54
55 var sbot = null
56 var connection = Value()
57 var connectionStatus = Value()
58 var connectedPeers = MutantSet()
59 var localPeers = MutantSet()
60
61 var rec = Reconnect(function (isConn) {
62 function notify (value) {
63 isConn(value); connectionStatus.set(value)
64 }
65
66 var opts = {
67 path: config.path,
68 remote: config.remote,
69 host: config.host,
70 port: config.port,
71 key: config.key,
72 appKey: config.caps.shs,
73 timers: config.timers,
74 caps: config.caps,
75 friends: config.friends
76 }
77
78 createClient(keys, opts, function (err, _sbot) {
79 if (err) {
80 return notify(err)
81 }
82
83 sbot = _sbot
84 sbot.on('closed', function () {
85 sbot = null
86 connection.set(null)
87 notify(new Error('closed'))
88 })
89
90 connection.set(sbot)
91 notify()
92 })
93 })
94
95 var internal = {
96 getLatest: rec.async(function (id, cb) {
97 sbot.getLatest(id, cb)
98 }),
99 add: rec.async(function (msg, cb) {
100 sbot.add(msg, cb)
101 })
102 }
103
104 setInterval(function () {
105 if (sbot) {
106 sbot.gossip.peers((err, peers) => {
107 if (err) return console.error(err)
108 localPeers.set(peers.filter(x => x.source === 'local').map(x => x.key))
109 connectedPeers.set(peers.filter(x => x.state === 'connected').map(x => x.key))
110 })
111 }
112 }, 1000 * 60)
113
114 watch(connection, (sbot) => {
115 if (sbot) {
116 sbot.gossip.peers((err, peers) => {
117 if (err) return console.error(err)
118 connectedPeers.set(peers.filter(x => x.state === 'connected').map(x => x.key))
119 localPeers.set(peers.filter(x => x.source === 'local').map(x => x.key))
120 })
121 pull(
122 sbot.gossip.changes(),
123 pull.drain(data => {
124 if (data.peer) {
125 if (data.type === 'remove' || data.type === 'disconnect') {
126 connectedPeers.delete(data.peer.key)
127 } else {
128 if (data.peer.source === 'local') {
129 localPeers.add(data.peer.key)
130 }
131 if (data.peer.state === 'connected') {
132 connectedPeers.add(data.peer.key)
133 } else {
134 connectedPeers.delete(data.peer.key)
135 }
136 }
137 }
138 })
139 )
140 }
141 })
142
143 var feed = createFeed(internal, keys, {remote: true})
144
145 return {
146 sbot: {
147 sync: {
148 cache: () => cache
149 },
150 async: {
151 get: rec.async(function (key, cb) {
152 if (typeof cb !== 'function') {
153 throw new Error('cb must be function')
154 }
155 if (cache[key]) cb(null, cache[key])
156 else {
157 sbot.get(key, function (err, value) {
158 if (err) return cb(err)
159 runHooks({key, value})
160 cb(null, value)
161 })
162 }
163 }),
164 publish: rec.async((content, cb) => {
165 if (content.recps) {
166 content = ssbKeys.box(content, content.recps.map(e => {
167 return ref.isFeed(e) ? e : e.link
168 }))
169 } else {
170 var flatContent = flat(content)
171 Object.keys(flatContent).forEach(key => {
172 var val = flatContent[key]
173 if (ref.isBlob(val)) {
174 sbot.blobs.push(val, err => {
175 if (err) console.error(err)
176 })
177 }
178 })
179 }
180
181 if (sbot) {
182 // instant updating of interface (just incase sbot is busy)
183 runHooks({
184 publishing: true,
185 timestamp: Date.now(),
186 value: {
187 timestamp: Date.now(),
188 author: keys.id,
189 content
190 }
191 })
192 }
193
194 feed.add(content, (err, msg) => {
195 if (err) console.error(err)
196 else if (!cb) console.log(msg)
197 cb && cb(err, msg)
198 })
199 }),
200 addBlob: rec.async((stream, cb) => {
201 return pull(
202 stream,
203 sbot.blobs.add(cb)
204 )
205 }),
206 gossipConnect: rec.async(function (opts, cb) {
207 sbot.gossip.connect(opts, cb)
208 }),
209 friendsGet: rec.async(function (opts, cb) {
210 sbot.friends.get(opts, cb)
211 })
212 },
213 pull: {
214 backlinks: rec.source(query => {
215 return sbot.backlinks.read(query)
216 }),
217 userFeed: rec.source(opts => {
218 return sbot.createUserStream(opts)
219 }),
220 messagesByType: rec.source(opts => {
221 return sbot.messagesByType(opts)
222 }),
223 feed: rec.source(function (opts) {
224 return pull(
225 sbot.createFeedStream(opts),
226 pull.through(runHooks)
227 )
228 }),
229 log: rec.source(opts => {
230 return pull(
231 sbot.createLogStream(opts),
232 pull.through(runHooks)
233 )
234 }),
235 links: rec.source(function (query) {
236 return sbot.links(query)
237 }),
238 stream: function (fn) {
239 var stream = defer.source()
240 onceTrue(connection, function (connection) {
241 stream.resolve(fn(connection))
242 })
243 return stream
244 },
245 getBlob: rec.source(function (opts) {
246 return sbot.blobs.get(opts)
247 })
248 },
249 obs: {
250 connectionStatus: (listener) => connectionStatus(listener),
251 connection,
252 connectedPeers: () => connectedPeers,
253 localPeers: () => localPeers
254 }
255 }
256 }
257
258 // scoped
259
260 function runHooks (msg) {
261 if (msg.publishing) {
262 api.sbot.hook.publish(msg)
263 } else if (!cache[msg.key]) {
264 // cache[msg.key] = msg.value
265 // api.sbot.hook.feed(msg)
266 }
267 }
268}
269
270
271

Built with git-ssb-web