git ssb

7+

dinoworm ๐Ÿ› / patchcore



Tree: 0189703ad10f5344bd2d2972f16bf2c6f12102fd

Files: 0189703ad10f5344bd2d2972f16bf2c6f12102fd / sbot.js

6336 bytesRaw
1var pull = require('pull-stream')
2var defer = require('pull-defer')
3var { Value, onceTrue, watch, Set: MutantSet } = require('mutant')
4var ref = require('ssb-ref')
5var Reconnect = require('pull-reconnect')
6var createClient = require('ssb-client')
7var createFeed = require('ssb-feed')
8var nest = require('depnest')
9var ssbKeys = require('ssb-keys')
10var flat = require('flat')
11
12exports.needs = nest({
13 'config.sync.load': 'first',
14 'keys.sync.load': 'first',
15 'sbot.obs.connectionStatus': 'first',
16 'sbot.hook.publish': 'map'
17})
18
19exports.gives = {
20 sbot: {
21 sync: {
22 cache: true
23 },
24 async: {
25 get: true,
26 publish: true,
27 addBlob: true,
28 gossipConnect: true,
29 friendsGet: true
30 },
31 pull: {
32 log: true,
33 userFeed: true,
34 messagesByType: true,
35 feed: true,
36 links: true,
37 backlinks: true,
38 stream: true
39 },
40 obs: {
41 connectionStatus: true,
42 connection: true,
43 connectedPeers: true,
44 localPeers: true
45 }
46 }
47}
48
49exports.create = function (api) {
50 const config = api.config.sync.load()
51 const keys = api.keys.sync.load()
52 var cache = {}
53
54 var sbot = null
55 var connection = Value()
56 var connectionStatus = Value()
57 var connectedPeers = MutantSet()
58 var localPeers = MutantSet()
59
60 var rec = Reconnect(function (isConn) {
61 function notify (value) {
62 isConn(value); connectionStatus.set(value)
63 }
64
65 var opts = {
66 path: config.path,
67 remote: config.remote,
68 host: config.host,
69 port: config.port,
70 key: config.key,
71 appKey: config.caps.shs,
72 timers: config.timers
73 }
74
75 createClient(keys, opts, function (err, _sbot) {
76 if (err) {
77 return notify(err)
78 }
79
80 sbot = _sbot
81 sbot.on('closed', function () {
82 sbot = null
83 connection.set(null)
84 notify(new Error('closed'))
85 })
86
87 connection.set(sbot)
88 notify()
89 })
90 })
91
92 var internal = {
93 getLatest: rec.async(function (id, cb) {
94 sbot.getLatest(id, cb)
95 }),
96 add: rec.async(function (msg, cb) {
97 sbot.add(msg, cb)
98 })
99 }
100
101 watch(connection, (sbot) => {
102 if (sbot) {
103 sbot.gossip.peers((err, peers) => {
104 if (err) return console.error(err)
105 connectedPeers.set(peers.filter(x => x.state === 'connected').map(x => x.key))
106 localPeers.set(peers.filter(x => x.source === 'local').map(x => x.key))
107 })
108 pull(
109 sbot.gossip.changes(),
110 pull.drain(data => {
111 if (data.peer) {
112 if (data.type === 'remove') {
113 connectedPeers.delete(data.peer.key)
114 localPeers.delete(data.peer.key)
115 } else {
116 if (data.peer.source === 'local') {
117 localPeers.add(data.peer.key)
118 }
119 if (data.peer.state === 'connected') {
120 connectedPeers.add(data.peer.key)
121 } else {
122 connectedPeers.delete(data.peer.key)
123 }
124 }
125 }
126 })
127 )
128 }
129 })
130
131 var feed = createFeed(internal, keys, {remote: true})
132
133 return {
134 sbot: {
135 sync: {
136 cache: () => cache
137 },
138 async: {
139 get: rec.async(function (key, cb) {
140 if (typeof cb !== 'function') {
141 throw new Error('cb must be function')
142 }
143 if (cache[key]) cb(null, cache[key])
144 else {
145 sbot.get(key, function (err, value) {
146 if (err) return cb(err)
147 runHooks({key, value})
148 cb(null, value)
149 })
150 }
151 }),
152 publish: rec.async((content, cb) => {
153 if (content.recps) {
154 content = ssbKeys.box(content, content.recps.map(e => {
155 return ref.isFeed(e) ? e : e.link
156 }))
157 } else {
158 Object.keys(flat(content)).forEach(val => {
159 if (ref.isBlob(val)) {
160 sbot.blobs.push(val, err => {
161 if (err) console.error(err)
162 })
163 }
164 })
165 }
166
167 if (sbot) {
168 // instant updating of interface (just incase sbot is busy)
169 runHooks({
170 publishing: true,
171 timestamp: Date.now(),
172 value: {
173 timestamp: Date.now(),
174 author: keys.id,
175 content
176 }
177 })
178 }
179
180 feed.add(content, (err, msg) => {
181 if (err) console.error(err)
182 else if (!cb) console.log(msg)
183 cb && cb(err, msg)
184 })
185 }),
186 addBlob: rec.async((stream, cb) => {
187 return pull(
188 stream,
189 sbot.blobs.add(cb)
190 )
191 }),
192 gossipConnect: rec.async(function (opts, cb) {
193 sbot.gossip.connect(opts, cb)
194 }),
195 friendsGet: rec.async(function (opts, cb) {
196 sbot.friends.get(opts, cb)
197 })
198 },
199 pull: {
200 backlinks: rec.source(query => {
201 return sbot.backlinks.read(query)
202 }),
203 userFeed: rec.source(opts => {
204 return sbot.createUserStream(opts)
205 }),
206 messagesByType: rec.source(opts => {
207 return sbot.messagesByType(opts)
208 }),
209 feed: rec.source(function (opts) {
210 return pull(
211 sbot.createFeedStream(opts),
212 pull.through(runHooks)
213 )
214 }),
215 log: rec.source(opts => {
216 return pull(
217 sbot.createLogStream(opts),
218 pull.through(runHooks)
219 )
220 }),
221 links: rec.source(function (query) {
222 return sbot.links(query)
223 }),
224 stream: function (fn) {
225 var stream = defer.source()
226 onceTrue(connection, function (connection) {
227 stream.resolve(fn(connection))
228 })
229 return stream
230 }
231 },
232 obs: {
233 connectionStatus: (listener) => connectionStatus(listener),
234 connection,
235 connectedPeers: () => connectedPeers,
236 localPeers: () => localPeers
237 }
238 }
239 }
240
241 // scoped
242
243 function runHooks (msg) {
244 if (msg.publishing) {
245 api.sbot.hook.publish(msg)
246 } else if (!cache[msg.key]) {
247 // cache[msg.key] = msg.value
248 // api.sbot.hook.feed(msg)
249 }
250 }
251}
252

Built with git-ssb-web