git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 3146cb3249c2ab000ed44d4712910ff42fd0d5c1

Files: 3146cb3249c2ab000ed44d4712910ff42fd0d5c1 / inbox.js

4697 bytesRaw
1const Buffer = require('safe-buffer').Buffer
2const binarySearchInsert = require('binary-search-insert')
3
4// decides which message to go first
5function messageArbiter (messageA, messageB) {
6 // order by number of ticks if messages have different number of ticks
7 if (messageA._fromTicks !== messageB._fromTicks) {
8 return messageA._fromTicks > messageB._fromTicks
9 } else {
10 // sender id
11 return Buffer.compare(messageA._fromId, messageB._fromId)
12 }
13}
14
15module.exports = class Inbox {
16 /**
17 * The inbox manages and sorts incoming messages and provides functions
18 * to wait on messages
19 * @param {Object} opts
20 * @param {Object} opts.state
21 * @param {Object} opts.hypervisor
22 */
23 constructor (opts) {
24 this.actor = opts.actor
25 this.hypervisor = opts.hypervisor
26 this._queue = []
27 this._waitingTagsQueue = []
28 this._oldestMessagePromise = new Promise((resolve, reject) => {
29 this._oldestMessageResolve = resolve
30 })
31 }
32
33 /**
34 * queues a message
35 * @param {Message} message
36 */
37 queue (message) {
38 this._queueMessage(message)
39
40 const oldestMessage = this._getOldestMessage()
41 if (oldestMessage === message) {
42 this._oldestMessageResolve(message)
43 this._oldestMessagePromise = new Promise((resolve, reject) => {
44 this._oldestMessageResolve = resolve
45 })
46 }
47 }
48
49 /**
50 * Waits for a message sent with a capablitly that has one of the given tags
51 * @param {Array<*>} tags
52 * @param {Integer} timeout
53 * @returns {Promise}
54 */
55 async nextTaggedMessage (tags, timeout) {
56 if (this._waitingTags) {
57 throw new Error('already waiting on tags')
58 }
59 this._waitingTags = new Set(tags)
60 this._queue = this._queue.filter(message => !this._queueTaggedMessage(message))
61
62 // todo: add saturation test
63 const message = await this.nextMessage(timeout)
64 delete this._waitingTags
65 this._waitingTagsQueue.forEach(message => this._queueMessage(message))
66 this._waitingTagsQueue = []
67
68 return message
69 }
70
71 /**
72 * Waits for the the next message if any
73 * @param {Integer} timeout
74 * @returns {Promise}
75 */
76 nextMessage (timeout) {
77 if (!this._gettingNextMessage) {
78 this._gettingNextMessage = this._nextMessage(timeout)
79 }
80 return this._gettingNextMessage
81 }
82
83 async _nextMessage (timeout = 0) {
84 await Promise.all([...this.actor._sending.values()])
85 let message = this._getOldestMessage()
86 if (message === undefined && timeout === 0) {
87 return
88 }
89
90 timeout += this.actor.ticks
91 let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id)
92
93 while (true) {
94 if (message) {
95 // if the message we recived had more ticks then we currently have then
96 // update our ticks to it, since we jumped forward in time
97 if (message._fromTicks > this.actor.ticks) {
98 this.actor.ticks = message._fromTicks
99 this.hypervisor.scheduler.update(this.actor)
100 }
101
102 // if there is a message that is "older" then the timeout, the lower
103 // the timeout to the oldest message
104 if (message._fromTicks < timeout) {
105 timeout = message._fromTicks
106 }
107 }
108
109 // if all actors are "older" then the time out then stop waiting for messages
110 // since we konw that we can not receive one
111 if (oldestTime >= timeout) {
112 break
113 }
114
115 await Promise.race([
116 this.hypervisor.scheduler.wait(timeout, this.actor.id).then(() => {
117 message = this._getOldestMessage()
118 }),
119 this._olderMessage(message).then(m => {
120 message = m
121 })
122 ])
123 oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id)
124 }
125 this._gettingNextMessage = false
126 return this._deQueueMessage()
127 }
128
129 // returns a promise that resolve when a message older then the given message
130 // is recived
131 _olderMessage (message) {
132 return this._oldestMessagePromise
133 }
134
135 _getOldestMessage () {
136 if (this._waitingTags) {
137 return this._waitingTagsQueue[0]
138 } else {
139 return this._queue[0]
140 }
141 }
142
143 _deQueueMessage () {
144 if (this._waitingTags) {
145 return this._waitingTagsQueue.shift()
146 } else {
147 return this._queue.shift()
148 }
149 }
150
151 _queueMessage (message) {
152 if (!(this._waitingTags && this._queueTaggedMessage(message))) {
153 binarySearchInsert(this._queue, messageArbiter, message)
154 }
155 }
156
157 _queueTaggedMessage (message) {
158 if (this._waitingTags.has(message.tag)) {
159 this._waitingTags.delete(message.tag)
160 binarySearchInsert(this._waitingTagsQueue, messageArbiter, message)
161 return true
162 } else {
163 return false
164 }
165 }
166}
167

Built with git-ssb-web