git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: ff617aaaa70d98abc58ed432aac680e76748c148

Files: ff617aaaa70d98abc58ed432aac680e76748c148 / inbox.js

3145 bytesRaw
1const Buffer = require('safe-buffer').Buffer
2const binarySearchInsert = require('binary-search-insert')
3
4// decides which message to go first
5function messageArbiter (messageA, messageB) {
6 // order by number of ticks if messages have different number of ticks
7 if (messageA._fromTicks !== messageB._fromTicks) {
8 return messageA._fromTicks > messageB._fromTicks
9 } else {
10 // sender id
11 // console.log('here')
12 return Buffer.compare(messageA._fromId, messageB._fromId)
13 }
14}
15
16module.exports = class Inbox {
17 /**
18 * The inbox manages and sorts incoming messages and provides functions
19 * to wait on messages
20 * @param {Object} opts
21 * @param {Object} opts.state
22 * @param {Object} opts.hypervisor
23 */
24 constructor (opts) {
25 this.actor = opts.actor
26 this.hypervisor = opts.hypervisor
27 this._queue = []
28 this._oldestMessagePromise = new Promise((resolve, reject) => {
29 this._oldestMessageResolve = resolve
30 })
31 }
32
33 get isEmpty () {
34 return !this._queue.length
35 }
36
37 /**
38 * queues a message
39 * @param {Message} message
40 */
41 queue (message) {
42 this._queueMessage(message)
43
44 const oldestMessage = this._getOldestMessage()
45 if (oldestMessage === message) {
46 this._oldestMessageResolve(message)
47 this._oldestMessagePromise = new Promise((resolve, reject) => {
48 this._oldestMessageResolve = resolve
49 })
50 }
51 }
52
53 /**
54 * Waits for the the next message if any
55 * @param {Integer} timeout
56 * @returns {Promise}
57 */
58 async nextMessage () {
59 let message = this._getOldestMessage()
60 let timeout = message._fromTicks
61 let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id)
62
63 while (true) {
64 // if all actors are "older" then the time out then stop waiting for messages
65 // since we konw that we can not receive one
66 if (oldestTime >= timeout) {
67 break
68 }
69
70 await Promise.race([
71 this.hypervisor.scheduler.wait(timeout, this.actor.id).then(() => {
72 message = this._getOldestMessage()
73 }),
74 this._olderMessage(message).then(m => {
75 message = m
76 // if there is a message that is "older" then the timeout, the lower
77 // the timeout to the oldest message
78 timeout = message._fromTicks
79 })
80 ])
81 oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id)
82 }
83 message = this._deQueueMessage()
84 // if the message we recived had more ticks then we currently have then
85 // update our ticks to it, since we jumped forward in time
86 if (message && message._fromTicks > this.actor.ticks) {
87 this.actor.ticks = message._fromTicks
88 this.hypervisor.scheduler.update(this.actor)
89 }
90 return message
91 }
92
93 // returns a promise that resolve when a message older then the given message
94 // is recived
95 _olderMessage (message) {
96 return this._oldestMessagePromise
97 }
98
99 _getOldestMessage () {
100 return this._queue[0]
101 }
102
103 _deQueueMessage () {
104 return this._queue.shift()
105 }
106
107 _queueMessage (message) {
108 binarySearchInsert(this._queue, messageArbiter, message)
109 }
110}
111

Built with git-ssb-web