git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 2ccf9440ac095c4a148a986168ef433353ae2815

Files: 2ccf9440ac095c4a148a986168ef433353ae2815 / inbox.js

3841 bytesRaw
1const binarySearchInsert = require('binary-search-insert')
2const Buffer = require('safe-buffer').Buffer
3
4// decides which message to go first
5function messageArbiter (messageA, messageB) {
6 // order by number of ticks if messages have different number of ticks
7 if (messageA._fromTicks !== messageB._fromTicks) {
8 return messageA._fromTicks > messageB._fromTicks
9 } else {
10 // sender id
11 return Buffer.compare(messageA._fromId, messageB._fromId)
12 }
13}
14
15module.exports = class Inbox {
16 /**
17 * The inbox manages and sorts incoming messages and provides functions
18 * to wait on messages
19 * @param {Object} opts
20 * @param {Object} opts.state
21 * @param {Object} opts.hypervisor
22 */
23 constructor (opts) {
24 this.actor = opts.actor
25 this.hypervisor = opts.hypervisor
26 this._queue = []
27 this._waitingTagsQueue = []
28 this._oldestMessagePromise = new Promise((resolve, reject) => {
29 this._oldestMessageResolve = resolve
30 })
31 }
32
33 /**
34 * queues a message
35 * @param {Message} message
36 */
37 queue (message) {
38 this._queueMessage(message)
39
40 const oldestMessage = this._getOldestMessage()
41 if (oldestMessage === message) {
42 this._oldestMessageResolve(message)
43 this._oldestMessagePromise = new Promise((resolve, reject) => {
44 this._oldestMessageResolve = resolve
45 })
46 }
47 }
48
49 /**
50 * Waits for a message sent with a capablitly that has one of the given tags
51 * @param {Array<*>} tags
52 * @param {Integer} timeout
53 * @returns {Promise}
54 */
55 async waitOnTag (tags, timeout) {
56 if (this._waitingTags) {
57 throw new Error('already getting next message')
58 }
59
60 this._waitingTags = new Set(tags)
61 this._queue = this._queue.filter(message => !this._queueTaggedMessage(message))
62
63 // todo: add saturation test
64 const message = await this.getNextMessage(timeout)
65 delete this._waitingTags
66 this._waitingTagsQueue.forEach(message => this._queueMessage(message))
67 this._waitingTagsQueue = []
68
69 return message
70 }
71
72 /**
73 * Waits for the the next message if any
74 * @param {Integer} timeout
75 * @returns {Promise}
76 */
77 async getNextMessage (timeout = 0) {
78 let message = this._getOldestMessage()
79 if (message === undefined && timeout === 0) {
80 return
81 }
82
83 timeout += this.actor.ticks
84 let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id)
85
86 while (true) {
87 if (message && message._fromTicks < timeout) {
88 timeout = message._fromTicks
89 }
90
91 if (oldestTime >= timeout) {
92 break
93 }
94
95 await Promise.race([
96 this.hypervisor.scheduler.wait(timeout, this.actor.id).then(() => {
97 message = this._getOldestMessage()
98 }),
99 this._olderMessage(message).then(m => {
100 message = m
101 })
102 ])
103 oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id)
104 }
105 return this._deQueueMessage()
106 }
107
108 // returns a promise that resolve when a message older then the given message
109 // is recived
110 _olderMessage (message) {
111 return this._oldestMessagePromise
112 }
113
114 _getOldestMessage () {
115 if (this._waitingTags) {
116 return this._waitingTagsQueue[0]
117 } else {
118 return this._queue[0]
119 }
120 }
121
122 _deQueueMessage () {
123 if (this._waitingTags) {
124 return this._waitingTagsQueue.shift()
125 } else {
126 return this._queue.shift()
127 }
128 }
129
130 _queueMessage (message) {
131 if (!(this._waitingTags && this._queueTaggedMessage(message))) {
132 binarySearchInsert(this._queue, messageArbiter, message)
133 }
134 }
135
136 _queueTaggedMessage (message) {
137 if (this._waitingTags.has(message.tag)) {
138 this._waitingTags.delete(message.tag)
139 binarySearchInsert(this._waitingTagsQueue, messageArbiter, message)
140 return true
141 } else {
142 return false
143 }
144 }
145}
146

Built with git-ssb-web