TaskQueue.js 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. /**
  2. * Copyright (c) Facebook, Inc. and its affiliates.
  3. *
  4. * This source code is licensed under the MIT license found in the
  5. * LICENSE file in the root directory of this source tree.
  6. *
  7. * @format
  8. * @flow
  9. */
  10. 'use strict';
  11. const infoLog = require('../Utilities/infoLog');
  12. const invariant = require('invariant');
  13. type SimpleTask = {
  14. name: string,
  15. run: () => void,
  16. ...
  17. };
  18. type PromiseTask = {
  19. name: string,
  20. gen: () => Promise<any>,
  21. ...
  22. };
  23. export type Task = Function | SimpleTask | PromiseTask;
  24. const DEBUG: false = false;
  25. /**
  26. * TaskQueue - A system for queueing and executing a mix of simple callbacks and
  27. * trees of dependent tasks based on Promises. No tasks are executed unless
  28. * `processNext` is called.
  29. *
  30. * `enqueue` takes a Task object with either a simple `run` callback, or a
  31. * `gen` function that returns a `Promise` and puts it in the queue. If a gen
  32. * function is supplied, then the promise it returns will block execution of
  33. * tasks already in the queue until it resolves. This can be used to make sure
  34. * the first task is fully resolved (including asynchronous dependencies that
  35. * also schedule more tasks via `enqueue`) before starting on the next task.
  36. * The `onMoreTasks` constructor argument is used to inform the owner that an
  37. * async task has resolved and that the queue should be processed again.
  38. *
  39. * Note: Tasks are only actually executed with explicit calls to `processNext`.
  40. */
  41. class TaskQueue {
  42. /**
  43. * TaskQueue instances are self contained and independent, so multiple tasks
  44. * of varying semantics and priority can operate together.
  45. *
  46. * `onMoreTasks` is invoked when `PromiseTask`s resolve if there are more
  47. * tasks to process.
  48. */
  49. constructor({onMoreTasks}: {onMoreTasks: () => void, ...}) {
  50. this._onMoreTasks = onMoreTasks;
  51. this._queueStack = [{tasks: [], popable: false}];
  52. }
  53. /**
  54. * Add a task to the queue. It is recommended to name your tasks for easier
  55. * async debugging. Tasks will not be executed until `processNext` is called
  56. * explicitly.
  57. */
  58. enqueue(task: Task): void {
  59. this._getCurrentQueue().push(task);
  60. }
  61. enqueueTasks(tasks: Array<Task>): void {
  62. tasks.forEach(task => this.enqueue(task));
  63. }
  64. cancelTasks(tasksToCancel: Array<Task>): void {
  65. // search through all tasks and remove them.
  66. this._queueStack = this._queueStack
  67. .map(queue => ({
  68. ...queue,
  69. tasks: queue.tasks.filter(task => tasksToCancel.indexOf(task) === -1),
  70. }))
  71. .filter((queue, idx) => queue.tasks.length > 0 || idx === 0);
  72. }
  73. /**
  74. * Check to see if `processNext` should be called.
  75. *
  76. * @returns {boolean} Returns true if there are tasks that are ready to be
  77. * processed with `processNext`, or returns false if there are no more tasks
  78. * to be processed right now, although there may be tasks in the queue that
  79. * are blocked by earlier `PromiseTask`s that haven't resolved yet.
  80. * `onMoreTasks` will be called after each `PromiseTask` resolves if there are
  81. * tasks ready to run at that point.
  82. */
  83. hasTasksToProcess(): boolean {
  84. return this._getCurrentQueue().length > 0;
  85. }
  86. /**
  87. * Executes the next task in the queue.
  88. */
  89. processNext(): void {
  90. const queue = this._getCurrentQueue();
  91. if (queue.length) {
  92. const task = queue.shift();
  93. try {
  94. if (task.gen) {
  95. DEBUG && infoLog('TaskQueue: genPromise for task ' + task.name);
  96. this._genPromise((task: any)); // Rather than annoying tagged union
  97. } else if (task.run) {
  98. DEBUG && infoLog('TaskQueue: run task ' + task.name);
  99. task.run();
  100. } else {
  101. invariant(
  102. typeof task === 'function',
  103. 'Expected Function, SimpleTask, or PromiseTask, but got:\n' +
  104. JSON.stringify(task, null, 2),
  105. );
  106. DEBUG && infoLog('TaskQueue: run anonymous task');
  107. task();
  108. }
  109. } catch (e) {
  110. e.message =
  111. 'TaskQueue: Error with task ' + (task.name || '') + ': ' + e.message;
  112. throw e;
  113. }
  114. }
  115. }
  116. _queueStack: Array<{
  117. tasks: Array<Task>,
  118. popable: boolean,
  119. ...
  120. }>;
  121. _onMoreTasks: () => void;
  122. _getCurrentQueue(): Array<Task> {
  123. const stackIdx = this._queueStack.length - 1;
  124. const queue = this._queueStack[stackIdx];
  125. if (
  126. queue.popable &&
  127. queue.tasks.length === 0 &&
  128. this._queueStack.length > 1
  129. ) {
  130. this._queueStack.pop();
  131. DEBUG &&
  132. infoLog('TaskQueue: popped queue: ', {
  133. stackIdx,
  134. queueStackSize: this._queueStack.length,
  135. });
  136. return this._getCurrentQueue();
  137. } else {
  138. return queue.tasks;
  139. }
  140. }
  141. _genPromise(task: PromiseTask) {
  142. // Each async task pushes it's own queue onto the queue stack. This
  143. // effectively defers execution of previously queued tasks until the promise
  144. // resolves, at which point we allow the new queue to be popped, which
  145. // happens once it is fully processed.
  146. this._queueStack.push({tasks: [], popable: false});
  147. const stackIdx = this._queueStack.length - 1;
  148. DEBUG && infoLog('TaskQueue: push new queue: ', {stackIdx});
  149. DEBUG && infoLog('TaskQueue: exec gen task ' + task.name);
  150. task
  151. .gen()
  152. .then(() => {
  153. DEBUG &&
  154. infoLog('TaskQueue: onThen for gen task ' + task.name, {
  155. stackIdx,
  156. queueStackSize: this._queueStack.length,
  157. });
  158. this._queueStack[stackIdx].popable = true;
  159. this.hasTasksToProcess() && this._onMoreTasks();
  160. })
  161. .catch(ex => {
  162. ex.message = `TaskQueue: Error resolving Promise in task ${
  163. task.name
  164. }: ${ex.message}`;
  165. throw ex;
  166. })
  167. .done();
  168. }
  169. }
  170. module.exports = TaskQueue;