git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: c77c7618fb3abd31a3d1b797af55b9f21a318173

Files: c77c7618fb3abd31a3d1b797af55b9f21a318173 / inbox.js

4520 bytesRaw
1const Buffer = require('safe-buffer').Buffer
2const binarySearchInsert = require('binary-search-insert')
3
4// decides which message to go first
5function messageArbiter (messageA, messageB) {
6 // order by number of ticks if messages have different number of ticks
7 if (messageA._fromTicks !== messageB._fromTicks) {
8 return messageA._fromTicks > messageB._fromTicks
9 } else {
10 // sender id
11 return Buffer.compare(messageA._fromId, messageB._fromId)
12 }
13}
14
15module.exports = class Inbox {
16 /**
17 * The inbox manages and sorts incoming messages and provides functions
18 * to wait on messages
19 * @param {Object} opts
20 * @param {Object} opts.state
21 * @param {Object} opts.hypervisor
22 */
23 constructor (opts) {
24 this.actor = opts.actor
25 this.hypervisor = opts.hypervisor
26 this._queue = []
27 this._waitingTagsQueue = []
28 this._oldestMessagePromise = new Promise((resolve, reject) => {
29 this._oldestMessageResolve = resolve
30 })
31 }
32
33 get isEmpty () {
34 return !this._queue.length
35 }
36
37 /**
38 * queues a message
39 * @param {Message} message
40 */
41 queue (message) {
42 this._queueMessage(message)
43
44 const oldestMessage = this._getOldestMessage()
45 if (oldestMessage === message) {
46 this._oldestMessageResolve(message)
47 this._oldestMessagePromise = new Promise((resolve, reject) => {
48 this._oldestMessageResolve = resolve
49 })
50 }
51 }
52
53 /**
54 * Waits for a message sent with a capablitly that has one of the given tags
55 * @param {Array<*>} tags
56 * @param {Integer} timeout
57 * @returns {Promise}
58 */
59 async nextTaggedMessage (tags, timeout) {
60 this._waitingTags = new Set(tags)
61 this._queue = this._queue.filter(message => !this._queueTaggedMessage(message))
62
63 // todo: add saturation test
64 const message = await this.nextMessage(timeout)
65 delete this._waitingTags
66 this._waitingTagsQueue.forEach(message => this._queueMessage(message))
67 this._waitingTagsQueue = []
68
69 return message
70 }
71
72 /**
73 * Waits for the the next message if any
74 * @param {Integer} timeout
75 * @returns {Promise}
76 */
77 async nextMessage (timeout) {
78 if (!this._gettingNextMessage) {
79 this._gettingNextMessage = true
80 } else {
81 throw new Error('already waiting for next message')
82 }
83
84 let message = this._getOldestMessage()
85
86 timeout += this.actor.ticks
87 let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id)
88
89 while (true) {
90 if (message) {
91 // if the message we recived had more ticks then we currently have then
92 // update our ticks to it, since we jumped forward in time
93 if (message._fromTicks > this.actor.ticks) {
94 this.actor.ticks = message._fromTicks
95 this.hypervisor.scheduler.update(this.actor)
96 }
97
98 // if there is a message that is "older" then the timeout, the lower
99 // the timeout to the oldest message
100 if (message._fromTicks < timeout) {
101 timeout = message._fromTicks
102 }
103 }
104
105 // if all actors are "older" then the time out then stop waiting for messages
106 // since we konw that we can not receive one
107 if (oldestTime >= timeout) {
108 break
109 }
110
111 await Promise.race([
112 this.hypervisor.scheduler.wait(timeout, this.actor.id).then(() => {
113 message = this._getOldestMessage()
114 }),
115 this._olderMessage(message).then(m => {
116 message = m
117 })
118 ])
119 oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id)
120 }
121 this._gettingNextMessage = false
122 return this._deQueueMessage()
123 }
124
125 // returns a promise that resolve when a message older then the given message
126 // is recived
127 _olderMessage (message) {
128 return this._oldestMessagePromise
129 }
130
131 _getOldestMessage () {
132 if (this._waitingTags) {
133 return this._waitingTagsQueue[0]
134 } else {
135 return this._queue[0]
136 }
137 }
138
139 _deQueueMessage () {
140 if (this._waitingTags) {
141 return this._waitingTagsQueue.shift()
142 } else {
143 return this._queue.shift()
144 }
145 }
146
147 _queueMessage (message) {
148 if (!(this._waitingTags && this._queueTaggedMessage(message))) {
149 binarySearchInsert(this._queue, messageArbiter, message)
150 }
151 }
152
153 _queueTaggedMessage (message) {
154 if (this._waitingTags.has(message.tag)) {
155 this._waitingTags.delete(message.tag)
156 binarySearchInsert(this._waitingTagsQueue, messageArbiter, message)
157 return true
158 } else {
159 return false
160 }
161 }
162}
163

Built with git-ssb-web