Files: 86a0d059a6618c5b0e01793ac5a710c66c9b4dda / state.js
6443 bytesRaw
1 | |
2 | var u = require('./util') |
3 | var isMessage = u.isMessage |
4 | var isNote = u.isNote |
5 | |
6 | //okay just pretending to be purely functional |
7 | //(but if you have good functional disipline, you can mutate) |
8 | function clone (state) { return state } |
9 | |
10 | function isInitRx (state) { |
11 | return state.remote.req == null || state.local.req == null |
12 | } |
13 | |
14 | function canSend(state) { |
15 | return !isInitRx(state) && |
16 | state.local.seq > Math.max(state.remote.seq, state.remote.req) && state.local.tx |
17 | } |
18 | |
19 | function toSeq (n) { |
20 | //return Math.abs(n) |
21 | return n < -1 ? ~n : n |
22 | } |
23 | |
24 | //actually, want to be able to initialize this in receive mode or not. |
25 | exports.init = function (local) { |
26 | // if(!Number.isInteger(local)) |
27 | // throw new Error('local must be integer') |
28 | //idea for better structure |
29 | return { |
30 | //state of local, |
31 | //highest sequence we have |
32 | //the sequence we asked for (set on initial) |
33 | //whether we are transmitting |
34 | local: {seq: local, req: null, tx: true}, |
35 | //the highest sequence we have sent to them |
36 | //the sequence they gave us (which they thus definitely have) |
37 | //whether they are transmitting |
38 | remote: {seq: null, req: null, tx: true}, |
39 | //the next item to send. (orderly queue) |
40 | ready: local, |
41 | //the next thing to do to database (disorderly queue) |
42 | effect: null |
43 | } |
44 | } |
45 | |
46 | //this is not a reduce, and it has side effects. |
47 | exports.read = function (state) { |
48 | if(state.ready == null) return state |
49 | var _ready = state.ready |
50 | state.ready = null |
51 | if(isMessage(_ready)) { |
52 | if(state.remote.seq != null && Math.max(state.remote.seq, state.remote.req) +1 !== _ready.sequence) { |
53 | throw new Error('out of order!') |
54 | } |
55 | state.remote.seq = _ready.sequence |
56 | state.local.req = Math.max(state.local.req, _ready.sequence) |
57 | } else { |
58 | state.local.req = toSeq(_ready) |
59 | state.remote.tx = _ready >= 0 |
60 | } |
61 | if(canSend(state)) { |
62 | state.effect = Math.max(state.remote.seq, state.remote.req) + 1 |
63 | } |
64 | |
65 | return state |
66 | } |
67 | |
68 | function isOldMessage(state, msg) { |
69 | return (state.local.seq >= msg.sequence) |
70 | } |
71 | |
72 | function isNextRxMessage(state, msg) { |
73 | return state.local.seq + 1 == msg.sequence |
74 | } |
75 | |
76 | function isNextTxMessage (state, msg) { |
77 | return ( |
78 | !isInitRx(state) && |
79 | state.remote.req < msg.sequence && |
80 | msg.sequence === Math.max(state.remote.seq, state.remote.req) + 1 |
81 | ) |
82 | } |
83 | |
84 | exports.receiveMessage = function (state, msg) { |
85 | if(!isMessage(msg)) throw new Error('expected a Message!') |
86 | var _state = clone(state) |
87 | |
88 | _state.remote.req = Math.max(state.remote.req || 0, msg.sequence) |
89 | //not the same^ |
90 | _state.remote.seq = Math.max(state.remote.seq || 0, msg.sequence) |
91 | |
92 | if(state.remote.tx == null) |
93 | throw new Error('we received a message, when we where waiting for remote to send initial request') |
94 | |
95 | var seq = state.local.seq |
96 | if(isMessage(state.ready)) { |
97 | if(state.ready.sequence <= msg.sequence) |
98 | state.ready = null |
99 | } |
100 | if(isOldMessage(state, msg)) { |
101 | //we already know this, please shut up! |
102 | //let read move us out of tx mode, |
103 | if(state.remote.tx) |
104 | _state.ready = -(seq + 1) |
105 | //XXX: there might be a race here if we are cancelling this feed |
106 | } |
107 | else if(isNextRxMessage(state, msg)) { |
108 | //since we now know they are ahead, stop transmitting to them |
109 | if(state.ready != null) |
110 | _state.ready = null |
111 | _state.effect = msg |
112 | if(state.remote.tx == false) |
113 | state.ready = state.local.seq |
114 | } |
115 | else { |
116 | //this means something went really wrong |
117 | _state.error = true |
118 | } |
119 | return _state |
120 | } |
121 | |
122 | exports.receiveNote = function (state, note) { |
123 | if(!isNote(note)) throw new Error('expected note!') |
124 | var _state = clone(state) |
125 | var seq = state.local.seq |
126 | var requested = note >= 0 |
127 | var _seq = note == -1 ? -1 : Math.max(toSeq(note), state.remote.seq) |
128 | |
129 | _state.local.tx = requested |
130 | _state.remote.req = Math.max(_seq, _state.remote.req) |
131 | |
132 | //if we havn't decided, or have decided we don't want this feed: |
133 | if(state.local.req == null) return _state |
134 | if(state.local.req == -1) return _state |
135 | |
136 | if(isMessage(state.ready) && _seq >= state.ready.sequence) |
137 | state.ready = null |
138 | |
139 | //if they sent a note which is ahead of us, and we did not previously send something. |
140 | |
141 | if(seq < _seq && state.remote.tx == false) { |
142 | state.ready = state.local.seq |
143 | } |
144 | else |
145 | if((seq > _seq) && requested) |
146 | _state.effect = _seq + 1 |
147 | |
148 | return _state |
149 | } |
150 | |
151 | //we have either written a new message ourselves, |
152 | //or received a message (and validated it) from another peer. |
153 | exports.appendMessage = function (state, msg) { |
154 | if(!isMessage(msg)) throw new Error('appendMessage expects a message!') |
155 | //if this is the msg they need next, make |
156 | var _state = clone(state) |
157 | _state.local.seq = msg.sequence |
158 | |
159 | if(state.local.tx) { |
160 | if(isNextTxMessage(state, msg)) { |
161 | _state.ready = msg |
162 | } |
163 | else if(isNote(state.ready)) { //this should only happen when it is the initial request |
164 | //if we are transmitting, why would we be sending a note? |
165 | //if it's back to even, we don't need to send a message, but if we are not |
166 | //then the message has meaning. |
167 | //if(state.remote.tx) |
168 | |
169 | if(state.local.req != null) { |
170 | _state.ready = null |
171 | if(_state.local.seq > Math.max(state.remote.seq, state.remote.req)) |
172 | _state.effect = Math.max(state.remote.seq, state.remote.req) + 1 |
173 | } |
174 | } |
175 | else if(!isMessage(_state.ready)) { |
176 | _state.ready = null |
177 | |
178 | if(state.local.seq > Math.max(state.remote.req,state.remote.seq)) |
179 | state.effect = Math.max(state.remote.req,state.remote.seq) + 1 |
180 | } |
181 | } |
182 | else if(!state.local.tx) { |
183 | //if we don't know they are up to this, and they havn't |
184 | //asked us to not send anything (-1) then send a note. |
185 | if(state.remote.req == -1) |
186 | ; //they do not want to hear about this one |
187 | else if(msg.sequence > state.remote.req && state.remote.req != 0) { |
188 | _state.ready = state.remote.tx ? msg.sequence : -(msg.sequence+1) //SEND NOTE |
189 | } |
190 | else if(isNote(state.ready) && state.ready > 0) |
191 | state.ready = msg.sequence //UPDATE NOTE |
192 | else |
193 | state.ready = null |
194 | } |
195 | return state |
196 | } |
197 | |
198 | //have retrived an requested message |
199 | exports.gotMessage = function (state, msg) { |
200 | if(!isMessage(msg)) throw new Error('expected message') |
201 | var _state = clone(state) |
202 | if (isNextTxMessage(state, msg)) { |
203 | _state.ready = msg |
204 | } |
205 | //if it's not the next message |
206 | //we must have changed state while retriving this message |
207 | //anyway, just get on with things |
208 | return _state |
209 | } |
210 | |
211 |
Built with git-ssb-web