git ssb

6+

Dominic / epidemic-broadcast-trees



Tree: 86a0d059a6618c5b0e01793ac5a710c66c9b4dda

Files: 86a0d059a6618c5b0e01793ac5a710c66c9b4dda / state.js

6443 bytesRaw
1'use strict'
2var u = require('./util')
3var isMessage = u.isMessage
4var isNote = u.isNote
5
6//okay just pretending to be purely functional
7//(but if you have good functional disipline, you can mutate)
8function clone (state) { return state }
9
10function isInitRx (state) {
11 return state.remote.req == null || state.local.req == null
12}
13
14function canSend(state) {
15 return !isInitRx(state) &&
16 state.local.seq > Math.max(state.remote.seq, state.remote.req) && state.local.tx
17}
18
19function toSeq (n) {
20 //return Math.abs(n)
21 return n < -1 ? ~n : n
22}
23
24//actually, want to be able to initialize this in receive mode or not.
25exports.init = function (local) {
26// if(!Number.isInteger(local))
27// throw new Error('local must be integer')
28 //idea for better structure
29 return {
30 //state of local,
31 //highest sequence we have
32 //the sequence we asked for (set on initial)
33 //whether we are transmitting
34 local: {seq: local, req: null, tx: true},
35 //the highest sequence we have sent to them
36 //the sequence they gave us (which they thus definitely have)
37 //whether they are transmitting
38 remote: {seq: null, req: null, tx: true},
39 //the next item to send. (orderly queue)
40 ready: local,
41 //the next thing to do to database (disorderly queue)
42 effect: null
43 }
44}
45
46//this is not a reduce, and it has side effects.
47exports.read = function (state) {
48 if(state.ready == null) return state
49 var _ready = state.ready
50 state.ready = null
51 if(isMessage(_ready)) {
52 if(state.remote.seq != null && Math.max(state.remote.seq, state.remote.req) +1 !== _ready.sequence) {
53 throw new Error('out of order!')
54 }
55 state.remote.seq = _ready.sequence
56 state.local.req = Math.max(state.local.req, _ready.sequence)
57 } else {
58 state.local.req = toSeq(_ready)
59 state.remote.tx = _ready >= 0
60 }
61 if(canSend(state)) {
62 state.effect = Math.max(state.remote.seq, state.remote.req) + 1
63 }
64
65 return state
66}
67
68function isOldMessage(state, msg) {
69 return (state.local.seq >= msg.sequence)
70}
71
72function isNextRxMessage(state, msg) {
73 return state.local.seq + 1 == msg.sequence
74}
75
76function isNextTxMessage (state, msg) {
77 return (
78 !isInitRx(state) &&
79 state.remote.req < msg.sequence &&
80 msg.sequence === Math.max(state.remote.seq, state.remote.req) + 1
81 )
82}
83
84exports.receiveMessage = function (state, msg) {
85 if(!isMessage(msg)) throw new Error('expected a Message!')
86 var _state = clone(state)
87
88 _state.remote.req = Math.max(state.remote.req || 0, msg.sequence)
89 //not the same^
90 _state.remote.seq = Math.max(state.remote.seq || 0, msg.sequence)
91
92 if(state.remote.tx == null)
93 throw new Error('we received a message, when we where waiting for remote to send initial request')
94
95 var seq = state.local.seq
96 if(isMessage(state.ready)) {
97 if(state.ready.sequence <= msg.sequence)
98 state.ready = null
99 }
100 if(isOldMessage(state, msg)) {
101 //we already know this, please shut up!
102 //let read move us out of tx mode,
103 if(state.remote.tx)
104 _state.ready = -(seq + 1)
105 //XXX: there might be a race here if we are cancelling this feed
106 }
107 else if(isNextRxMessage(state, msg)) {
108 //since we now know they are ahead, stop transmitting to them
109 if(state.ready != null)
110 _state.ready = null
111 _state.effect = msg
112 if(state.remote.tx == false)
113 state.ready = state.local.seq
114 }
115 else {
116 //this means something went really wrong
117 _state.error = true
118 }
119 return _state
120}
121
122exports.receiveNote = function (state, note) {
123 if(!isNote(note)) throw new Error('expected note!')
124 var _state = clone(state)
125 var seq = state.local.seq
126 var requested = note >= 0
127 var _seq = note == -1 ? -1 : Math.max(toSeq(note), state.remote.seq)
128
129 _state.local.tx = requested
130 _state.remote.req = Math.max(_seq, _state.remote.req)
131
132 //if we havn't decided, or have decided we don't want this feed:
133 if(state.local.req == null) return _state
134 if(state.local.req == -1) return _state
135
136 if(isMessage(state.ready) && _seq >= state.ready.sequence)
137 state.ready = null
138
139 //if they sent a note which is ahead of us, and we did not previously send something.
140
141 if(seq < _seq && state.remote.tx == false) {
142 state.ready = state.local.seq
143 }
144 else
145 if((seq > _seq) && requested)
146 _state.effect = _seq + 1
147
148 return _state
149}
150
151//we have either written a new message ourselves,
152//or received a message (and validated it) from another peer.
153exports.appendMessage = function (state, msg) {
154 if(!isMessage(msg)) throw new Error('appendMessage expects a message!')
155 //if this is the msg they need next, make
156 var _state = clone(state)
157 _state.local.seq = msg.sequence
158
159 if(state.local.tx) {
160 if(isNextTxMessage(state, msg)) {
161 _state.ready = msg
162 }
163 else if(isNote(state.ready)) { //this should only happen when it is the initial request
164 //if we are transmitting, why would we be sending a note?
165 //if it's back to even, we don't need to send a message, but if we are not
166 //then the message has meaning.
167 //if(state.remote.tx)
168
169 if(state.local.req != null) {
170 _state.ready = null
171 if(_state.local.seq > Math.max(state.remote.seq, state.remote.req))
172 _state.effect = Math.max(state.remote.seq, state.remote.req) + 1
173 }
174 }
175 else if(!isMessage(_state.ready)) {
176 _state.ready = null
177
178 if(state.local.seq > Math.max(state.remote.req,state.remote.seq))
179 state.effect = Math.max(state.remote.req,state.remote.seq) + 1
180 }
181 }
182 else if(!state.local.tx) {
183 //if we don't know they are up to this, and they havn't
184 //asked us to not send anything (-1) then send a note.
185 if(state.remote.req == -1)
186 ; //they do not want to hear about this one
187 else if(msg.sequence > state.remote.req && state.remote.req != 0) {
188 _state.ready = state.remote.tx ? msg.sequence : -(msg.sequence+1) //SEND NOTE
189 }
190 else if(isNote(state.ready) && state.ready > 0)
191 state.ready = msg.sequence //UPDATE NOTE
192 else
193 state.ready = null
194 }
195 return state
196}
197
198//have retrived an requested message
199exports.gotMessage = function (state, msg) {
200 if(!isMessage(msg)) throw new Error('expected message')
201 var _state = clone(state)
202 if (isNextTxMessage(state, msg)) {
203 _state.ready = msg
204 }
205 //if it's not the next message
206 //we must have changed state while retriving this message
207 //anyway, just get on with things
208 return _state
209}
210
211

Built with git-ssb-web