git ssb

6+

Dominic / epidemic-broadcast-trees



Commit b46fcf7ac381e66d45744423daddc9a3c5c7879c

more code, taking a pattern somewhat like inu, with reduce+effects

Dominic Tarr committed on 3/20/2017, 8:25:28 PM
Parent: 07d71bc37eaf07a4593d3e76558b555418a23064

Files changed

notes.txtadded
source.jsdeleted
state.jsadded
stream.jsadded
notes.txtView
@@ -1,0 +1,72 @@
1 + OUR_SEQ, THEIR_SEQ, THEM_RECV, US_RECV
2 +
3 + onRecieveValidMessage: //after we have processed the message.
4 + if(OUR_SEQ > THEIR_SEQ && THEM_RECV)
5 + send(msg)
6 + else if(OUR_SEQ < THEIR_SEQ && US_RECV)
7 + send({id: msg.author, seq: msg.sequence}) //OPTIONAL, don't have to do this every time
8 + onReceiveMessage:
9 + if(OUR_SEQ > msg.sequence)
10 + send({id: msg.author, seq: - OUR_SEQ}) //tell them to stop sending.
11 + //else, validate the message and continue.
12 + onRecieveNote:
13 + if(note.seq < 0 && THEM_RECV) {
14 + THEM_RECV = false //they have asked us to stop sending this feed to them.
15 + }
16 + if(Math.abs(note.seq) > OUR_SEQ) {
17 + US_RECV = true
18 + send({id: note.id, seq: OUR_SEQ})
19 + }
20 +
21 + onBeginReplication:
22 + for(var id in feeds)
23 + send({id: id, seq: feed[id].seq})
24 +
25 + //okay I feel satisfied that is the correct logic
26 + //but how do I make this FSM pull?
27 +
28 + //I know, functional style state
29 +
30 + //For each peer, for each feed keep the state of:
31 +
32 + remote = {requested: Seq, sent: Seq, ready: msg|null, sending: bool, receiving: bool} //sending to, receiving from
33 +
34 + //also keep a map of local = {id: seq}
35 + //then I think all events can be handled sync
36 +
37 + READ:
38 + if ready!= null, the puller takes `ready` then sets it to `null`
39 + if ready was a msg, this also triggers retriving the next msg, if this feed.sent < local[id]
40 + if ready was a note, it's just sent without triggering anything.
41 +
42 + RECEIVE_MSG: //receive a message from this peer, before it's validated.
43 + remote.requested = msg.sequence //always remember they are up to this sequence.
44 +
45 + if(msg.seq <= local[msg.author]) //if we already know about this message
46 + if(remote.receiving) {
47 + remote.ready = {id, seq: - local[id]} //tell them we do not need this feed.
48 + remote.receiving = false
49 + }
50 + if(!remote.receiving) {
51 + we have asked them to stop sending, but they havn't got the note yet.
52 + anyway, remember they know this sequence.
53 + remote.requested = msg.sequence
54 + }
55 + else if(remote.sending) {
56 + this is an error, they should never send to us if we are sending.
57 + if this ever happens it's a programmer error. maybe should tell them to top sending?
58 + you might receive a
59 + }
60 + RECEIVE_NOTE:
61 + if(remote.sending) {
62 + if(note.seq < 0) //stop sending
63 + remote.sending = false, remote.ready = null, remote.requested = abs(note.seq)
64 + }
65 + else if(!remote.receiving) {
66 + if(abs(note.seq) > local[id]) //if this is the fastest peer for this feed, ask them to send.
67 +
68 +
69 + }
70 +
71 +
72 +
source.jsView
@@ -1,85 +1,0 @@
1-
2-
3-/*
4- OUR_SEQ, THEIR_SEQ, THEM_RECV, US_RECV
5-
6- onRecieveValidMessage: //after we have processed the message.
7- if(OUR_SEQ > THEIR_SEQ && THEM_RECV)
8- send(msg)
9- else if(OUR_SEQ < THEIR_SEQ && US_RECV)
10- send({id: msg.author, seq: msg.sequence}) //OPTIONAL, don't have to do this every time
11- onReceiveMessage:
12- if(OUR_SEQ > msg.sequence)
13- send({id: msg.author, seq: - OUR_SEQ}) //tell them to stop sending.
14- //else, validate the message and continue.
15- onRecieveNote:
16- if(note.seq < 0 && THEM_RECV) {
17- THEM_RECV = false //they have asked us to stop sending this feed to them.
18- }
19- if(Math.abs(note.seq) > OUR_SEQ) {
20- US_RECV = true
21- send({id: note.id, seq: OUR_SEQ})
22- }
23-
24- onBeginReplication:
25- for(var id in feeds)
26- send({id: id, seq: feed[id].seq})
27-
28- //okay I feel satisfied that is the correct logic
29- //but how do I make this FSM pull?
30-*/
31-
32-//local and remote are observables, containing maps {id:sequence}
33-//if sequence is negative on remote then it means "up to here" but do not send.
34-
35-exports = module.exports = function (local, remote) {
36-
37- var queues = []
38-
39- //updated whenever the remote sends a vector clock.
40- remote(function () {
41- //iterate the queue, check if everything is still wanted.
42- // 1. if remote says id:-n then stop sending id (remove from queue)
43- })
44-
45- local(function () {
46- // 2. if we are receiving from remote, then we move ahead, we must send a {id: -seq} message
47- to tell them not to send to us.
48-
49-
50- })
51-
52- function read (abort, cb) {
53- //find the most recent queue and send it.
54- ;(function next () {
55- exports.sort(queue)
56- if(!queue.length) //nothing ready.
57- ready.once(next, false)
58- else if(remote[queue.value.author] < 0)
59- //take the oldest available item.
60- })()
61- }
62-}
63-
64-//this should be replaced with a heap,
65-//but i'll look for a good heap implementation later
66-//this should be enough for now.
67-exports.sort = function (queue) {
68- return queue.sort(function (a, b) {
69- if(!a.value && !b.value) return 0
70- if(a.value && !b.value) return -1
71- else if(b.value && !a.value) return 1
72- else return a.value.timestamp - b.value.timestamp
73- return 0
74- })
75-}
76-
77-
78-
79-
80-
81-
82-
83-
84-
85-
state.jsView
@@ -1,0 +1,109 @@
1 +function clone (state) { return state }
2 +
3 +exports.init = function (local) {
4 + return {
5 + local: local,
6 + sending: false,
7 + receiving: false,
8 + sent: 0, received: 0,
9 + ready: null,
10 + effect: []
11 + }
12 +}
13 +
14 +//this is not a reduce, and it has side effects.
15 +exports.read = function (state) {
16 + if(!state.ready) return [null, state]
17 + var _ready = state.ready
18 + state.ready = null
19 + if(isMessage(_ready) state.sent = _ready.sequence
20 + return [_ready, state]
21 +}
22 +
23 +exports.receiveMessage = function (state, msg) {
24 + var _state = clone(state)
25 + _state.received = msg.sequence
26 +
27 + if(!state.receiving) {
28 + //we should not have received a message if we are in sending state!
29 + _state.error = true
30 + }
31 + else if(state.receiving) {
32 + if(state.local[msg.author] > msg.sequence) {//we already know this msg
33 + _state.ready = {id: msg.author, seq: -1*state.local[msg.author]}
34 + _state.receiving = false
35 + }
36 + else if(state.local[msg.author] + 1 == msg.sequence)
37 + _state.effect = [{action: 'append', arg: msg}]
38 + ; //SIDE EFFECT: ready to validate
39 + else
40 + ; //ignore
41 + }
42 +
43 + return _state
44 +}
45 +
46 +exports.receiveNote = function (state, note) {
47 + var _state = clone(state)
48 + _state.received = Math.max(Math.abs(note.seq), _state.has || 0)
49 + if(state.sending) {
50 + if(note.seq < 0) {
51 + _state.sending = false
52 + _state.ready = null
53 + }
54 +
55 + //they know about a message we don't yet, go into receiving mode
56 + if(Math.abs(note.seq) > state.local[note.id] && !state.receiving) {
57 + _state.receiving = true
58 + _state.sending = false
59 + //ready for next request.
60 + _state.ready = {id: note.id, seq: state.local[note.id]}
61 + }
62 +
63 + //we where about to send a message, but they asked for an older one (weird)
64 + if(state.ready && state.ready.sequence <= note.seq) {
65 + _state.ready = null
66 + _state.effect = [{action: 'get', arg: note}]
67 + //SIDE EFFECT: retrive next message
68 + }
69 + }
70 + else if(state.recieving) {
71 + //generally shouldn't happen but
72 + //could if we have just switched to receiving but they didn't get the message yet
73 + }
74 + else if(!state.receiving) {
75 + if(state.local[note.id] < Math.abs(note.seq)) {
76 + _state.receiving = true
77 + _state.ready = {id: note.id, state.local[note.id]} //request this feed from our current value.
78 + }
79 + else if(note.seq > 0) {
80 + _state.sending = true
81 + if(state.local[note.id] > note.seq) {
82 + if(!isMessage(state.ready) || state.ready.sequence !== note.sequence) {
83 + _state.ready = null
84 + _state.effect = [{action: 'get', arg: note}]
85 + }
86 + }
87 + }
88 + }
89 +}
90 +
91 +//we have either written a new message ourselves,
92 +//or received a message (and validated it) from another peer.
93 +exports.appendMessage = function (state, msg) {
94 + //if this is the msg they need next, make
95 + var _state = clone(state)
96 + if(state.sending && state.sent + 1 === msg.sequence)
97 + _state.ready = msg
98 + else if(!state.sending)
99 + //how about some way to delay sending notes, for bandwidth?
100 + //and to slow down upwards replication if you are datacapped.
101 + _state.ready = {id: msg.author, seq: msg.sequence * -1}
102 +}
103 +
104 +//have retrived an requested message
105 +exports.retriveMessage = function (state, msg) {
106 +
107 +}
108 +
109 +
stream.jsView
@@ -1,0 +1,105 @@
1 +var state = require('./state')
2 +var explain = require('explain-error')
3 +
4 +//local and remote are observables, containing maps {id:sequence}
5 +//if sequence is negative on remote then it means "up to here" but do not send.
6 +
7 +exports = module.exports = function (local, get, append) {
8 +
9 + var remotes = {}
10 + var queues = [] //the remotes, but sorted by who has the next message to send.
11 +
12 + var actions = {
13 + append: append,
14 + get: function (note) {
15 + get(note, function (err, msg) {
16 + //this error should never happen
17 + if(err) return console.error(explain(error, 'could not get message:'+JSON.stringify(note))
18 + if(remotes[msg.author])
19 + remotes[msg.author] = effects(state.retrivedMessage(remotes[msg.author], msg))
20 + })
21 + }
22 + }
23 +
24 + function read (abort, cb) {
25 + //find the most recent queue and send it.
26 + ;(function next () {
27 + exports.sort(queue)
28 + if(!queue.length) //nothing ready.
29 + ready.once(next, false)
30 + else if(isMessage(queue[0])) {
31 + var msg = queue[0].ready
32 + queue[0].ready = null
33 + queue[0].state.effect = [{action: 'get', arg: {id: msg.author, seq: msg.sequence + 1}}]
34 + cb(null, msg)
35 + } else if(isNote(queue[0]) {
36 + //lump together all available notes into a single {<id>: <seq>,...} object
37 + var notes = {}
38 + for(var i = 0; isNote(queue[i].ready); i++) {
39 + notes[queue[i].ready.id] = queue[i].ready.seq
40 + queue[i].ready = null
41 + }
42 + //we don't need to queue an effect, because notes are always triggered by other events.
43 + cb(null, notes)
44 + }
45 + })()
46 + }
47 +
48 + function effects (state) {
49 + if(!state.effect || !state.effect.length) return
50 + var effects = state.effect
51 + state.effect = []
52 + while(effects.length) {
53 + var effect = effects.shift()
54 + actions[effect.action](effect.arg)
55 + }
56 + if(state.ready) ready(state)
57 + }
58 +
59 + //a message was received or created, in real time
60 +
61 + return {
62 + source: read,
63 + sink: pull.drain(function (data) {
64 + if(isMessage(data) && remotes[data.author]) {
65 + var msg = data
66 + remotes[msg.author] = effects(state.receiveMessage(remotes[msg.author], msg))
67 + }
68 + else if(isNotes(data)) {
69 + //go through and update all state, then process all effects
70 + for(var id in data) {
71 + if(remotes[id])
72 + remotes[id] = state.receiveNote(remotes[id], {id: id, seq: data[id})
73 + }
74 + for(var id in data)
75 + remotes[id] = effects(remotes[id])
76 + }
77 + },
78 + //must call append when a message is added in real time (not for old messages though)
79 + //maybe pass in a stream instead?
80 + append: function (msg) {
81 + //it can be greater or equal,
82 + //because more than one message could have been processed before append is called.
83 + if(local[msg.author] < msg.sequence)
84 + throw new Error('local sequence is expected to be at greater or equal to '+msg.sequence)
85 +
86 + if(remotes[msg.author])
87 + remotes[msg.author] = effects(state.appendMessage(remotes[msg.author], msg))
88 + },
89 + //how to request the feeds to replicate?
90 + }
91 +}
92 +
93 +//this should be replaced with a heap,
94 +//but i'll look for a good heap implementation later
95 +//this should be enough for now.
96 +exports.sort = function (queue) {
97 + return queue.sort(function (a, b) {
98 + if(!a.value && !b.value) return 0
99 + if(a.value && !b.value) return -1
100 + else if(b.value && !a.value) return 1
101 + else return a.value.timestamp - b.value.timestamp
102 + return 0
103 + })
104 +}
105 +

Built with git-ssb-web