-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathAwaitQueue.ts
More file actions
221 lines (181 loc) · 6.07 KB
/
AwaitQueue.ts
File metadata and controls
221 lines (181 loc) · 6.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
import { Logger } from './Logger';
import type {
AwaitQueuePushOptions,
AwaitQueueTask,
AwaitQueueTaskDump,
} from './types';
import { AwaitQueueStoppedError, AwaitQueueRemovedTaskError } from './errors';
const logger = new Logger('AwaitQueue');
type PendingTask<T> = {
id: number;
task: AwaitQueueTask<T>;
name?: string;
enqueuedAt: number;
executedAt?: number;
completed: boolean;
resolve: (result: T | PromiseLike<T>) => void;
reject: (
error: Error,
{ canExecuteNextTask }: { canExecuteNextTask: boolean }
) => void;
};
export class AwaitQueue {
// Queue of pending tasks (map of PendingTasks indexed by id).
private readonly pendingTasks: Map<number, PendingTask<any>> = new Map();
// Incrementing PendingTask id.
private nextTaskId = 0;
constructor() {
logger.debug('constructor()');
}
get size(): number {
return this.pendingTasks.size;
}
async push<T>(
task: AwaitQueueTask<T>,
name?: string,
options?: AwaitQueuePushOptions
): Promise<T> {
name = name ?? task.name;
logger.debug(`push() [name:${name}, options:%o]`, options);
if (typeof task !== 'function') {
throw new TypeError('given task is not a function');
}
if (name) {
try {
Object.defineProperty(task, 'name', { value: name });
} catch (error) {}
}
return new Promise<T>((resolve, reject) => {
if (name && options?.removeOngoingTasksWithSameName) {
for (const pendingTask of this.pendingTasks.values()) {
if (pendingTask.name === name) {
pendingTask.reject(new AwaitQueueRemovedTaskError(), {
canExecuteNextTask: false,
});
}
}
}
const pendingTask: PendingTask<T> = {
id: this.nextTaskId++,
task: task,
name: name,
enqueuedAt: Date.now(),
executedAt: undefined,
completed: false,
resolve: (result: T | PromiseLike<T>) => {
// pendingTask.resolve() can only be called in execute() method. Since
// resolve() was called it means that the task successfully completed.
// However the task may have been stopped before it completed (via
// stop() or remove()) so its completed flag was already set. If this
// is the case, abort here since next task (if any) is already being
// executed.
if (pendingTask.completed) {
return;
}
pendingTask.completed = true;
// Remove the task from the queue.
this.pendingTasks.delete(pendingTask.id);
logger.debug(`resolving task [name:${pendingTask.name}]`);
// Resolve the task with the obtained result.
resolve(result);
// Execute the next pending task (if any).
const [nextPendingTask] = this.pendingTasks.values();
// NOTE: During the resolve() callback the user app may have interacted
// with the queue. For instance, the app may have pushed a task while
// the queue was empty so such a task is already being executed. If so,
// don't execute it twice.
if (nextPendingTask && !nextPendingTask.executedAt) {
void this.execute(nextPendingTask);
}
},
reject: (
error: Error,
{ canExecuteNextTask }: { canExecuteNextTask: boolean }
) => {
// pendingTask.reject() can be called within execute() method if the
// task completed with error. However it may have also been called in
// stop() or remove() methods (before or while being executed) so its
// completed flag was already set. If so, abort here since next task
// (if any) is already being executed.
if (pendingTask.completed) {
return;
}
pendingTask.completed = true;
// Remove the task from the queue.
this.pendingTasks.delete(pendingTask.id);
logger.debug(
`rejecting task [name:${pendingTask.name}]: %s`,
String(error)
);
// Reject the task with the obtained error.
reject(error);
// May execute next pending task (if any).
if (canExecuteNextTask) {
const [nextPendingTask] = this.pendingTasks.values();
// NOTE: During the reject() callback the user app may have interacted
// with the queue. For instance, the app may have pushed a task while
// the queue was empty so such a task is already being executed. If so,
// don't execute it twice.
if (nextPendingTask && !nextPendingTask.executedAt) {
void this.execute(nextPendingTask);
}
}
},
};
// Append task to the queue.
this.pendingTasks.set(pendingTask.id, pendingTask);
// And execute it if this is the only task in the queue.
if (this.pendingTasks.size === 1) {
void this.execute(pendingTask);
}
});
}
stop(): void {
logger.debug('stop()');
for (const pendingTask of this.pendingTasks.values()) {
logger.debug(`stop() | stopping task [name:${pendingTask.name}]`);
pendingTask.reject(new AwaitQueueStoppedError(), {
canExecuteNextTask: false,
});
}
}
remove(taskIdx: number): void {
logger.debug(`remove() [taskIdx:${taskIdx}]`);
const pendingTask = Array.from(this.pendingTasks.values())[taskIdx];
if (!pendingTask) {
logger.debug(`stop() | no task with given idx [taskIdx:${taskIdx}]`);
return;
}
pendingTask.reject(new AwaitQueueRemovedTaskError(), {
canExecuteNextTask: true,
});
}
dump(): AwaitQueueTaskDump[] {
const now = Date.now();
let idx = 0;
return Array.from(this.pendingTasks.values()).map(pendingTask => ({
idx: idx++,
task: pendingTask.task,
name: pendingTask.name,
enqueuedTime: pendingTask.executedAt
? pendingTask.executedAt - pendingTask.enqueuedAt
: now - pendingTask.enqueuedAt,
executionTime: pendingTask.executedAt ? now - pendingTask.executedAt : 0,
}));
}
private async execute<T>(pendingTask: PendingTask<T>): Promise<void> {
logger.debug(`execute() [name:${pendingTask.name}]`);
if (pendingTask.executedAt) {
throw new Error('task already being executed');
}
pendingTask.executedAt = Date.now();
try {
const result = await pendingTask.task();
// Resolve the task with its resolved result (if any).
pendingTask.resolve(result);
} catch (error) {
// Reject the task with its rejected error.
pendingTask.reject(error as Error, { canExecuteNextTask: true });
}
}
}