Source: external/task-factory.js

import Scheduler from './scheduler/scheduler';
import UnboundedSchedulerPolicy from './scheduler/policies/unbounded-policy';
import EnqueueSchedulerPolicy from './scheduler/policies/enqueued-policy';
import DropSchedulerPolicy from './scheduler/policies/drop-policy';
import KeepLatestSchedulerPolicy from './scheduler/policies/keep-latest-policy';
import RestartableSchedulerPolicy from './scheduler/policies/restartable-policy';
import { Task } from './task/task';
import { TaskGroup } from './task/task-group';
import { DEFAULT_ENVIRONMENT } from './environment';

function assertModifiersNotMixedWithGroup(obj) {
  if (obj._hasSetConcurrencyConstraint && obj._taskGroupPath) {
    throw new Error(
      `Cannot use both 'group' and other concurrency-constraining task modifiers (e.g. 'drop', 'enqueue', 'restartable')`,
    );
  }
}

function assertUnsetBufferPolicy(obj) {
  if (obj._hasSetBufferPolicy) {
    throw new Error(
      `Cannot set multiple buffer policies on a task or task group. ${obj._schedulerPolicyClass} has already been set for task or task group '${obj.name}'`,
    );
  }
}

const MODIFIER_REGISTRY = {
  enqueue: (factory, value) =>
    value && factory.setBufferPolicy(EnqueueSchedulerPolicy),
  evented: (factory, value) => value && factory.setEvented(value),
  debug: (factory, value) => value && factory.setDebug(value),
  drop: (factory, value) =>
    value && factory.setBufferPolicy(DropSchedulerPolicy),
  group: (factory, groupName) => factory.setGroup(groupName),
  keepLatest: (factory, value) =>
    value && factory.setBufferPolicy(KeepLatestSchedulerPolicy),
  maxConcurrency: (factory, maxConcurrency) =>
    factory.setMaxConcurrency(maxConcurrency),
  onState: (factory, onStateCallback) => factory.setOnState(onStateCallback),
  restartable: (factory, value) =>
    value && factory.setBufferPolicy(RestartableSchedulerPolicy),
};

/**
 * Callback type defining a task modifier
 *
 * @callback TaskFactory~TaskModifier
 * @param {TaskFactory} factory
 * @param {*} taskModifierOption
 */

/**
 * Registers a new modifier with the modifier registry
 *
 * @param {string} name Name of the modifier
 * @param {TaskFactory~TaskModifier} callback
 */
export function registerModifier(name, callback) {
  if (MODIFIER_REGISTRY[name]) {
    throw new Error(
      `A modifier with the name '${name}' has already been defined.`,
    );
  }

  MODIFIER_REGISTRY[name] = callback;
}

/**
 * Returns a specified modifier, if it exists in the registry
 *
 * @param {string} name Name of the modifier
 * @returns {TaskFactory~TaskModifier?}
 */
export function getModifier(name) {
  return MODIFIER_REGISTRY[name];
}

/**
 * Returns whether a specified modifier exists in the registry
 *
 * @param {string} name Name of the modifier
 * @returns {boolean}
 */
export function hasModifier(name) {
  return name in MODIFIER_REGISTRY;
}

/**
 * Factory used for instantiating Tasks and Task Groups. Mostly for internal
 * use, but some public APIs exposed via the Task Modifier APIs.
 *
 * <style>
 *  .ignore-this--this-is-here-to-hide-constructor,
 *  #TaskFactory { display: none }
 * </style>
 *
 * @class TaskFactory
 */
export class TaskFactory {
  env = DEFAULT_ENVIRONMENT;

  _debug = null;
  _enabledModifiers = [];
  _hasSetConcurrencyConstraint = false;
  _hasSetBufferPolicy = false;
  _hasEnabledEvents = false;
  _maxConcurrency = null;
  _onStateCallback = (state, taskable) => taskable.setState(state);
  _schedulerPolicyClass = UnboundedSchedulerPolicy;
  _taskGroupPath = null;

  constructor(name = '<unknown>', taskDefinition = null, options = {}) {
    this.name = name;
    this.taskDefinition = taskDefinition;
    this.options = options;
    this._processModifierOptions(options);
  }

  /**
   * Returns a new Task bound to the given context
   *
   * @protected
   * @param {*} context
   * @returns {Task}
   */
  createTask(context) {
    let options = this.getTaskOptions(context);

    return new Task(
      Object.assign(
        {
          generatorFactory: (args) => this.taskDefinition.apply(context, args),
        },
        options,
      ),
    );
  }

  /**
   * Returns a new TaskGroup bound to the given context
   *
   * @protected
   * @param {*} context
   * @returns {Task}
   */
  createTaskGroup(context) {
    let options = this.getTaskOptions(context);

    return new TaskGroup(options);
  }

  /**
   * Returns a modifier callback with the given name bound to this TaskFactory,
   * if registered.
   *
   * @protected
   * @param {string} name
   * @returns {TaskFactory~TaskModifier?}
   */
  getModifier(name) {
    if (hasModifier(name)) {
      return MODIFIER_REGISTRY[name].bind(null, this);
    }
  }

  /**
   * Returns the options provided to TaskFactory
   *
   * @public
   * @returns {object}
   */
  getOptions() {
    return this.options;
  }

  /**
   * Returns a new Scheduler instance
   *
   * @protected
   * @param {*} schedulerPolicy
   * @param {boolean} stateTrackingEnabled
   * @returns {Scheduler}
   */
  getScheduler(schedulerPolicy, stateTrackingEnabled) {
    return new Scheduler(schedulerPolicy, stateTrackingEnabled);
  }

  /**
   * Returns the options to pass to a Task or TaskGroup constructor
   *
   * @protected
   * @param {*} context
   * @returns {object}
   */
  getTaskOptions(context) {
    let group, scheduler;
    let onStateCallback = this._onStateCallback;

    if (this._taskGroupPath) {
      group = context[this._taskGroupPath];
      if (!(group instanceof TaskGroup)) {
        throw new Error(
          `Expected group '${this._taskGroupPath}' to be defined but was not found.`,
        );
      }

      scheduler = group.scheduler;
    } else {
      let schedulerPolicy = new this._schedulerPolicyClass(
        this._maxConcurrency,
      );
      scheduler = this.getScheduler(
        schedulerPolicy,
        onStateCallback && typeof onStateCallback === 'function',
      );
    }

    return {
      context,
      debug: this._debug,
      env: this.env,
      name: this.name,
      group,
      scheduler,
      hasEnabledEvents: this._hasEnabledEvents,
      onStateCallback,
      enabledModifiers: this._enabledModifiers,
      modifierOptions: this.getOptions(),
    };
  }

  /**
   * Sets the Scheduler buffer policy class to the specified value.
   *
   * Will raise an assertion if a buffer policy has already been specified
   *
   * @param {*} policy
   * @returns {TaskFactory}
   */
  setBufferPolicy(policy) {
    assertUnsetBufferPolicy(this);
    this._hasSetBufferPolicy = true;
    this._hasSetConcurrencyConstraint = true;
    this._schedulerPolicyClass = policy;
    assertModifiersNotMixedWithGroup(this);

    return this;
  }

  /**
   * Sets debug mode
   *
   * @param {boolean} enabled
   * @returns {TaskFactory}
   */
  setDebug(enabled) {
    this._debug = enabled;
    return this;
  }

  /**
   * Sets whether Task will dispatch Task events or not
   *
   * @param {boolean} enabled
   * @returns {TaskFactory}
   */
  setEvented(enabled) {
    this._hasEnabledEvents = enabled;
    return this;
  }

  /**
   * Sets Scheduling policy's `maxConcurrency`
   *
   * @param {number} maxConcurrency
   * @returns {TaskFactory}
   */
  setMaxConcurrency(maxConcurrency) {
    this._hasSetConcurrencyConstraint = true;
    this._maxConcurrency = maxConcurrency;
    return this;
  }

  /**
   * Assigns Task created from this factory to the specified group name
   *
   * @param {string} group
   * @returns {TaskFactory}
   */
  setGroup(group) {
    this._taskGroupPath = group;
    return this;
  }

  /**
   * Sets the name of tasks created from this factory
   *
   * @param {string} name
   * @returns {TaskFactory}
   */
  setName(name) {
    this.name = name;
    return this;
  }

  /**
   * Sets the callback used on state updates. Can be set to null to disable
   * state tracking on tasks.
   *
   * @param {function} onStateCallback
   * @returns {TaskFactory}
   */
  setOnState(onStateCallback) {
    this._onStateCallback = onStateCallback;
    return this;
  }

  /**
   * Sets the definition for tasks created from this factory
   *
   * @param {*} taskDefinition
   * @returns {TaskFactory}
   */
  setTaskDefinition(taskDefinition) {
    this.taskDefinition = taskDefinition;
    return this;
  }

  _processModifierOptions(options) {
    if (!options) {
      return;
    }

    for (let key of Object.keys(options)) {
      let value = options[key];
      let modifier = this.getModifier(key);

      if (typeof modifier === 'function' && modifier(value)) {
        this._enabledModifiers.push(key);
      }
    }
  }
}