git ssb

6+

Dominic / epidemic-broadcast-trees



Tree: 5ff620983521663cf40839a68c657acd4624443c

Files: 5ff620983521663cf40839a68c657acd4624443c / state.js

6156 bytesRaw
1'use strict'
2var u = require('./util')
3var isMessage = u.isMessage
4var isNote = u.isNote
5
6//okay just pretending to be purely functional
7//(but if you have good functional disipline, you can mutate)
8function clone (state) { return state }
9
10function isInitRx (state) {
11 return state.remote.req == null || state.local.req == null
12}
13
14function canSend(state) {
15 return !isInitRx(state) &&
16 state.local.seq > Math.max(state.remote.seq, state.remote.req) && state.local.tx
17}
18
19function toSeq (n) {
20 //return Math.abs(n)
21 return n < 0 ? ~n : n
22}
23
24//actually, want to be able to initialize this in receive mode or not.
25exports.init = function (local) {
26 if(!Number.isInteger(local))
27 throw new Error('local must be integer')
28 //idea for better structure
29 return {
30 //state of local,
31 //highest sequence we have
32 //the sequence we asked for (set on initial)
33 //whether we are transmitting
34 local: {seq: local, req: null, tx: true},
35 //the highest sequence we have sent to them
36 //the sequence they gave us (which they thus definitely have)
37 //whether they are transmitting
38 remote: {seq: null, req: null, tx: true},
39 //the next item to send. (orderly queue)
40 ready: local,
41 //the next thing to do to database (disorderly queue)
42 effect: null
43 }
44}
45
46//this is not a reduce, and it has side effects.
47exports.read = function (state) {
48 if(state.ready == null) return state
49 var _ready = state.ready
50 state.ready = null
51 if(isMessage(_ready)) {
52 if(state.remote.seq != null && Math.max(state.remote.seq, state.remote.req) +1 !== _ready.sequence) {
53 throw new Error('out of order!')
54 }
55 state.remote.seq = _ready.sequence
56 state.local.req = Math.max(state.local.req, _ready.sequence)
57 } else {
58 state.local.req = toSeq(_ready)
59 state.remote.tx = _ready >= 0
60 }
61 if(canSend(state)) {
62 state.effect = Math.max(state.remote.seq, state.remote.req) + 1
63 }
64
65 return state
66}
67
68function isOldMessage(state, msg) {
69 return (state.local.seq >= msg.sequence)
70}
71
72function isNextRxMessage(state, msg) {
73 return state.local.seq + 1 == msg.sequence
74}
75
76function isNextTxMessage (state, msg) {
77 return (
78 !isInitRx(state) &&
79 state.remote.req < msg.sequence &&
80 msg.sequence === Math.max(state.remote.seq, state.remote.req) + 1
81 )
82}
83
84exports.receiveMessage = function (state, msg) {
85 if(!isMessage(msg)) throw new Error('expected a Message!')
86 var _state = clone(state)
87
88 _state.remote.req = Math.max(state.remote.req || 0, msg.sequence)
89 //not the same^
90 _state.remote.seq = Math.max(state.remote.seq || 0, msg.sequence)
91
92 if(state.remote.tx == null)
93 throw new Error('we received a message, when we where waiting for remote to send initial request')
94
95 var seq = state.local.seq
96 if(isMessage(state.ready)) {
97 if(state.ready.sequence <= msg.sequence)
98 state.ready = null
99 }
100 if(isOldMessage(state, msg)) {
101 //we already know this, please shut up!
102 //let read move us out of tx mode,
103 if(state.remote.tx)
104 _state.ready = -(seq + 1)
105 }
106 else if(isNextRxMessage(state, msg)) {
107 //since we now know they are ahead, stop transmitting to them
108 if(state.ready != null)
109 _state.ready = null
110 _state.effect = msg
111 if(state.remote.tx == false)
112 state.ready = state.local.seq
113 }
114 else {
115 //this means something went really wrong
116 _state.error = true
117 }
118 return _state
119}
120
121exports.receiveNote = function (state, note) {
122 if(!isNote(note)) throw new Error('expected note!')
123 var _state = clone(state)
124 var seq = state.local.seq
125 var requested = note >= 0
126 var _seq = Math.max(toSeq(note), state.remote.seq)
127
128 _state.local.tx = requested
129 _state.remote.req = Math.max(_seq, _state.remote.req)
130
131 if(state.local.req == null) return _state
132
133 if(isMessage(state.ready) && _seq >= state.ready.sequence)
134 state.ready = null
135
136 //if they sent a note which is ahead of us, and we did not previously send something.
137
138 if(seq < _seq && state.remote.tx == false) {
139 state.ready = state.local.seq
140 }
141 else
142 if((seq > _seq) && requested)
143 _state.effect = _seq + 1
144
145 return _state
146}
147
148//we have either written a new message ourselves,
149//or received a message (and validated it) from another peer.
150exports.appendMessage = function (state, msg) {
151 if(!isMessage(msg)) throw new Error('appendMessage expects a message!')
152 //if this is the msg they need next, make
153 var _state = clone(state)
154 _state.local.seq = msg.sequence
155
156 if(state.local.tx) {
157 if(isNextTxMessage(state, msg)) {
158 _state.ready = msg
159 }
160 else if(isNote(state.ready)) { //this should only happen when it is the initial request
161 //if we are transmitting, why would we be sending a note?
162 //if it's back to even, we don't need to send a message, but if we are not
163 //then the message has meaning.
164 //if(state.remote.tx)
165
166 if(state.local.req != null) {
167 _state.ready = null
168 if(_state.local.seq > Math.max(state.remote.seq, state.remote.req))
169 _state.effect = Math.max(state.remote.seq, state.remote.req) + 1
170 }
171 }
172 else if(!isMessage(_state.ready)) {
173 _state.ready = null
174
175 if(state.local.seq > Math.max(state.remote.req,state.remote.seq))
176 state.effect = Math.max(state.remote.req,state.remote.seq) + 1
177 }
178 }
179 else if(!state.local.tx) {
180 //if we don't know they are up to this, and they havn't
181 //asked us to not send anything (-1) then send a note.
182 if(msg.sequence > state.remote.req && state.remote.req != 0) {
183 _state.ready = state.remote.tx ? msg.sequence : -(msg.sequence+1) //SEND NOTE
184 }
185 else if(isNote(state.ready) && state.ready > 0)
186 state.ready = msg.sequence //UPDATE NOTE
187 else
188 state.ready = null
189 }
190 return state
191}
192
193//have retrived an requested message
194exports.gotMessage = function (state, msg) {
195 if(!isMessage(msg)) throw new Error('expected message')
196 var _state = clone(state)
197 if (isNextTxMessage(state, msg)) {
198 _state.ready = msg
199 }
200 //if it's not the next message
201 //we must have changed state while retriving this message
202 //anyway, just get on with things
203 return _state
204}
205
206

Built with git-ssb-web