Files: 5ff620983521663cf40839a68c657acd4624443c / state.js
6156 bytesRaw
1 | |
2 | var u = require('./util') |
3 | var isMessage = u.isMessage |
4 | var isNote = u.isNote |
5 | |
6 | //okay just pretending to be purely functional |
7 | //(but if you have good functional disipline, you can mutate) |
8 | function clone (state) { return state } |
9 | |
10 | function isInitRx (state) { |
11 | return state.remote.req == null || state.local.req == null |
12 | } |
13 | |
14 | function canSend(state) { |
15 | return !isInitRx(state) && |
16 | state.local.seq > Math.max(state.remote.seq, state.remote.req) && state.local.tx |
17 | } |
18 | |
19 | function toSeq (n) { |
20 | //return Math.abs(n) |
21 | return n < 0 ? ~n : n |
22 | } |
23 | |
24 | //actually, want to be able to initialize this in receive mode or not. |
25 | exports.init = function (local) { |
26 | if(!Number.isInteger(local)) |
27 | throw new Error('local must be integer') |
28 | //idea for better structure |
29 | return { |
30 | //state of local, |
31 | //highest sequence we have |
32 | //the sequence we asked for (set on initial) |
33 | //whether we are transmitting |
34 | local: {seq: local, req: null, tx: true}, |
35 | //the highest sequence we have sent to them |
36 | //the sequence they gave us (which they thus definitely have) |
37 | //whether they are transmitting |
38 | remote: {seq: null, req: null, tx: true}, |
39 | //the next item to send. (orderly queue) |
40 | ready: local, |
41 | //the next thing to do to database (disorderly queue) |
42 | effect: null |
43 | } |
44 | } |
45 | |
46 | //this is not a reduce, and it has side effects. |
47 | exports.read = function (state) { |
48 | if(state.ready == null) return state |
49 | var _ready = state.ready |
50 | state.ready = null |
51 | if(isMessage(_ready)) { |
52 | if(state.remote.seq != null && Math.max(state.remote.seq, state.remote.req) +1 !== _ready.sequence) { |
53 | throw new Error('out of order!') |
54 | } |
55 | state.remote.seq = _ready.sequence |
56 | state.local.req = Math.max(state.local.req, _ready.sequence) |
57 | } else { |
58 | state.local.req = toSeq(_ready) |
59 | state.remote.tx = _ready >= 0 |
60 | } |
61 | if(canSend(state)) { |
62 | state.effect = Math.max(state.remote.seq, state.remote.req) + 1 |
63 | } |
64 | |
65 | return state |
66 | } |
67 | |
68 | function isOldMessage(state, msg) { |
69 | return (state.local.seq >= msg.sequence) |
70 | } |
71 | |
72 | function isNextRxMessage(state, msg) { |
73 | return state.local.seq + 1 == msg.sequence |
74 | } |
75 | |
76 | function isNextTxMessage (state, msg) { |
77 | return ( |
78 | !isInitRx(state) && |
79 | state.remote.req < msg.sequence && |
80 | msg.sequence === Math.max(state.remote.seq, state.remote.req) + 1 |
81 | ) |
82 | } |
83 | |
84 | exports.receiveMessage = function (state, msg) { |
85 | if(!isMessage(msg)) throw new Error('expected a Message!') |
86 | var _state = clone(state) |
87 | |
88 | _state.remote.req = Math.max(state.remote.req || 0, msg.sequence) |
89 | //not the same^ |
90 | _state.remote.seq = Math.max(state.remote.seq || 0, msg.sequence) |
91 | |
92 | if(state.remote.tx == null) |
93 | throw new Error('we received a message, when we where waiting for remote to send initial request') |
94 | |
95 | var seq = state.local.seq |
96 | if(isMessage(state.ready)) { |
97 | if(state.ready.sequence <= msg.sequence) |
98 | state.ready = null |
99 | } |
100 | if(isOldMessage(state, msg)) { |
101 | //we already know this, please shut up! |
102 | //let read move us out of tx mode, |
103 | if(state.remote.tx) |
104 | _state.ready = -(seq + 1) |
105 | } |
106 | else if(isNextRxMessage(state, msg)) { |
107 | //since we now know they are ahead, stop transmitting to them |
108 | if(state.ready != null) |
109 | _state.ready = null |
110 | _state.effect = msg |
111 | if(state.remote.tx == false) |
112 | state.ready = state.local.seq |
113 | } |
114 | else { |
115 | //this means something went really wrong |
116 | _state.error = true |
117 | } |
118 | return _state |
119 | } |
120 | |
121 | exports.receiveNote = function (state, note) { |
122 | if(!isNote(note)) throw new Error('expected note!') |
123 | var _state = clone(state) |
124 | var seq = state.local.seq |
125 | var requested = note >= 0 |
126 | var _seq = Math.max(toSeq(note), state.remote.seq) |
127 | |
128 | _state.local.tx = requested |
129 | _state.remote.req = Math.max(_seq, _state.remote.req) |
130 | |
131 | if(state.local.req == null) return _state |
132 | |
133 | if(isMessage(state.ready) && _seq >= state.ready.sequence) |
134 | state.ready = null |
135 | |
136 | //if they sent a note which is ahead of us, and we did not previously send something. |
137 | |
138 | if(seq < _seq && state.remote.tx == false) { |
139 | state.ready = state.local.seq |
140 | } |
141 | else |
142 | if((seq > _seq) && requested) |
143 | _state.effect = _seq + 1 |
144 | |
145 | return _state |
146 | } |
147 | |
148 | //we have either written a new message ourselves, |
149 | //or received a message (and validated it) from another peer. |
150 | exports.appendMessage = function (state, msg) { |
151 | if(!isMessage(msg)) throw new Error('appendMessage expects a message!') |
152 | //if this is the msg they need next, make |
153 | var _state = clone(state) |
154 | _state.local.seq = msg.sequence |
155 | |
156 | if(state.local.tx) { |
157 | if(isNextTxMessage(state, msg)) { |
158 | _state.ready = msg |
159 | } |
160 | else if(isNote(state.ready)) { //this should only happen when it is the initial request |
161 | //if we are transmitting, why would we be sending a note? |
162 | //if it's back to even, we don't need to send a message, but if we are not |
163 | //then the message has meaning. |
164 | //if(state.remote.tx) |
165 | |
166 | if(state.local.req != null) { |
167 | _state.ready = null |
168 | if(_state.local.seq > Math.max(state.remote.seq, state.remote.req)) |
169 | _state.effect = Math.max(state.remote.seq, state.remote.req) + 1 |
170 | } |
171 | } |
172 | else if(!isMessage(_state.ready)) { |
173 | _state.ready = null |
174 | |
175 | if(state.local.seq > Math.max(state.remote.req,state.remote.seq)) |
176 | state.effect = Math.max(state.remote.req,state.remote.seq) + 1 |
177 | } |
178 | } |
179 | else if(!state.local.tx) { |
180 | //if we don't know they are up to this, and they havn't |
181 | //asked us to not send anything (-1) then send a note. |
182 | if(msg.sequence > state.remote.req && state.remote.req != 0) { |
183 | _state.ready = state.remote.tx ? msg.sequence : -(msg.sequence+1) //SEND NOTE |
184 | } |
185 | else if(isNote(state.ready) && state.ready > 0) |
186 | state.ready = msg.sequence //UPDATE NOTE |
187 | else |
188 | state.ready = null |
189 | } |
190 | return state |
191 | } |
192 | |
193 | //have retrived an requested message |
194 | exports.gotMessage = function (state, msg) { |
195 | if(!isMessage(msg)) throw new Error('expected message') |
196 | var _state = clone(state) |
197 | if (isNextTxMessage(state, msg)) { |
198 | _state.ready = msg |
199 | } |
200 | //if it's not the next message |
201 | //we must have changed state while retriving this message |
202 | //anyway, just get on with things |
203 | return _state |
204 | } |
205 | |
206 |
Built with git-ssb-web