Files: 7feb7c9fd152c9b122c6ee9ec57387c444a89541 / inbox.js
3145 bytesRaw
1 | const Buffer = require('safe-buffer').Buffer |
2 | const binarySearchInsert = require('binary-search-insert') |
3 | |
4 | // decides which message to go first |
5 | function messageArbiter (messageA, messageB) { |
6 | // order by number of ticks if messages have different number of ticks |
7 | if (messageA._fromTicks !== messageB._fromTicks) { |
8 | return messageA._fromTicks > messageB._fromTicks |
9 | } else { |
10 | // sender id |
11 | // console.log('here') |
12 | return Buffer.compare(messageA._fromId, messageB._fromId) |
13 | } |
14 | } |
15 | |
16 | module.exports = class Inbox { |
17 | /** |
18 | * The inbox manages and sorts incoming messages and provides functions |
19 | * to wait on messages |
20 | * @param {Object} opts |
21 | * @param {Object} opts.state |
22 | * @param {Object} opts.hypervisor |
23 | */ |
24 | constructor (opts) { |
25 | this.actor = opts.actor |
26 | this.hypervisor = opts.hypervisor |
27 | this._queue = [] |
28 | this._oldestMessagePromise = new Promise((resolve, reject) => { |
29 | this._oldestMessageResolve = resolve |
30 | }) |
31 | } |
32 | |
33 | get isEmpty () { |
34 | return !this._queue.length |
35 | } |
36 | |
37 | /** |
38 | * queues a message |
39 | * @param {Message} message |
40 | */ |
41 | queue (message) { |
42 | this._queueMessage(message) |
43 | |
44 | const oldestMessage = this._getOldestMessage() |
45 | if (oldestMessage === message) { |
46 | this._oldestMessageResolve(message) |
47 | this._oldestMessagePromise = new Promise((resolve, reject) => { |
48 | this._oldestMessageResolve = resolve |
49 | }) |
50 | } |
51 | } |
52 | |
53 | /** |
54 | * Waits for the the next message if any |
55 | * @param {Integer} timeout |
56 | * @returns {Promise} |
57 | */ |
58 | async nextMessage () { |
59 | let message = this._getOldestMessage() |
60 | let timeout = message._fromTicks |
61 | let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id) |
62 | |
63 | while (true) { |
64 | // if all actors are "older" then the time out then stop waiting for messages |
65 | // since we konw that we can not receive one |
66 | if (oldestTime >= timeout) { |
67 | break |
68 | } |
69 | |
70 | await Promise.race([ |
71 | this.hypervisor.scheduler.wait(timeout, this.actor.id).then(() => { |
72 | message = this._getOldestMessage() |
73 | }), |
74 | this._olderMessage(message).then(m => { |
75 | message = m |
76 | // if there is a message that is "older" then the timeout, the lower |
77 | // the timeout to the oldest message |
78 | timeout = message._fromTicks |
79 | }) |
80 | ]) |
81 | oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id) |
82 | } |
83 | message = this._deQueueMessage() |
84 | // if the message we recived had more ticks then we currently have then |
85 | // update our ticks to it, since we jumped forward in time |
86 | if (message && message._fromTicks > this.actor.ticks) { |
87 | this.actor.ticks = message._fromTicks |
88 | this.hypervisor.scheduler.update(this.actor) |
89 | } |
90 | return message |
91 | } |
92 | |
93 | // returns a promise that resolve when a message older then the given message |
94 | // is recived |
95 | _olderMessage (message) { |
96 | return this._oldestMessagePromise |
97 | } |
98 | |
99 | _getOldestMessage () { |
100 | return this._queue[0] |
101 | } |
102 | |
103 | _deQueueMessage () { |
104 | return this._queue.shift() |
105 | } |
106 | |
107 | _queueMessage (message) { |
108 | binarySearchInsert(this._queue, messageArbiter, message) |
109 | } |
110 | } |
111 |
Built with git-ssb-web