queue.js

import queue from './internal/queue';
import wrapAsync from './internal/wrapAsync';

/**
 * 任务队列,通过 worker 函数处理完成
 * @typedef {Iterable} QueueObject
 * @memberOf module:ControlFlow
 * @property {Function} length - 返回等待处理项目的数量的函数。
 * 通过 `queue.length()` 调用。
 * @property {boolean} started - boolean 值,是否有任务已经推入队列或已被处理过。
 * @property {Function} running - 函数,返回当前正在处理的任务数量。
 * 用法:`queue.running()`。
 * @property {Function} workersList - 函数,返回当前正在处理的任务数组。
 * 用法:`queue.workersList()`。
 * @property {Function} idle - 函数,当有任务等待处理时,返回 false;否则返回 true。
 * 用法:`queue.idle()`。
 * @property {number} concurrency - 整型值,决定了多少 `worker` 函数可以并发运行。
 * `queue` 创建后,在运行中也可以修改此属性。
 * @property {number} payload - integer 值,指定了每次传给 work 函数的任务数量。
 * 仅在 [cargo]{@link module:ControlFlow.cargo} 对象中生效。
 * @property {AsyncFunction} push - 添加新任务到 `queue`。 `worker` 处理完任务后,会调用 `callback`。
 * 不仅可以传入单个任务,还可以提交 `tasks` 数组。
 * 队列中每个任务结束后都会调用 callback。
 * 用法:`queue.push(task, [callback])`。
 * @property {AsyncFunction} unshift - 在 `queue` 前端推入任务。
 * 用法:`queue.unshift(task, [callback])`。
 * @property {AsyncFunction} pushAsync - 和 `q.push` 一样,除了它会返回 Promise 对象,
 * 并在出错时产生 reject。
 * @property {AsyncFunction} unshirtAsync - 和 `q.unshift` 一样,除了它会返回 Promise 对象,
 * 并在出错时产生 reject。
 * @property {Function} remove - 使用测试函数检测,如果匹配则会移除任务。
 * 如果是 [priorityQueue]{@link module:ControlFlow.priorityQueue} 对象,
 * test 函数会传入一个具有 `data` 和 `priority` 属性的 object。
 * 用法:`queue.remove(testFn)`,其中 `testFn` 的形式是:
 * `function ({data, priority}) {}`,并需要返回 Boolean。
 * @property {Function} saturated - 饱和。函数设置了一个callback,当运行的 worker 数量
 * 遇到 `concurrency` 限制、更多任务将会排队时触发。
 * 如果省略了 callback,`q.saturated()` 会返回一个 Promise 用于下次事件的发生。
 * @property {Function} unsaturated - 不饱和。函数设置了一个callback,当运行的 worker 数量
 * 少于 `concurrency` 和 `buffer` 限制,不会再有任务排队时触发。
 * 如果省略了 callback,`q.unsaturated()` 会返回一个 Promise 用于下次事件的发生。
 * @property {number} buffer - 认定 `queue` 为不饱和(`unsaturated`)的最小缓冲阈值
 * @property {Function} empty - 当队列中最后一项任务从 `queue` 分配到 `worker` 时触发设置的回调函数。
 * 如果省略了 callback,`q.empty()` 会返回一个 Promise 用于下次事件的发生。
 * @property {Function} drain - 当队列中最后一项任务从 `worker` 返回时触发设置的回调函数。
 * 如果省略了 callback,`q.drain()` 会返回一个 Promise 用于下次事件的发生。
 * @property {Function} error - 设置单个任务出错时的回调函数。
 * 函数签名:`function(error, task)`。
 * 如果省略了 callback,`error()` 会返回一个 Promise,并在下次出错时 reject。
 * @property {boolean} paused - 指示队列是否处于暂停状态的 boolean 值
 * @property {Function} pause - 使队列暂停,直至调用 `resume()` 后继续。
 * 用法:`queue.pause()`。
 * @property {Function} resume - 继续运行暂停中的队列。
 * 用法:`queue.resume()`。
 * @property {Function} kill - 该函数移除所有 `drain` 回调函数,清空任务,强制队列进入
 * 闲置模式。调用此函数后,不应再把任务推入队列。 用法:`queue.kill()`。
 *
 * @example
 * const q = async.queue(worker, 2)
 * q.push(item1)
 * q.push(item2)
 * q.push(item3)
 * // queue 是可迭代的,展开成 array 后检测
 * const items = [...q] // [item1, item2, item3]
 * // 或使用 for of
 * for (let item of q) {
 *     console.log(item)
 * }
 *
 * q.drain(() => {
 *     console.log('all done')
 * })
 * // 或
 * await q.drain()
 */

/**
 * 使用指定的并发数设置 `concurrency` 创建 `queue` 对象。
 * 添加进 `queue` 的任务会并行处理(上限为 `concurrency` 数值)。
 * 如果所有的 `worker` 都在处理中,任务将会排队等候。
 * 一旦有 `worker` 完成了一个 `task`,将调用 `task` 的回调函数(callback)。
 *
 * @name queue
 * @static
 * @memberOf module:ControlFlow
 * @method
 * @category Control Flow
 * @param {AsyncFunction} worker - 处理队列任务的异步函数。
 * 如果想要处理单独的任务中的错误,为 `q.push()` 传入 callback。
 * 调用参数是 (task, callback)。
 * @param {number} [concurrency=1] - `integer` 值,决定了
 * `worker` 函数并发运行的数量。 省略时,默认值为 `1`。
 * 如果设置为 `0`,将抛出错误。
 * @returns {module:ControlFlow.QueueObject} 用来管理任务的 queue 对象。
 * 可附加回调函数到某些属性上,并监听队列生命周期的一些事件。
 * @example
 *
 * // 以并发数 2 创建队列对象
 * var q = async.queue(function(task, callback) {
 *     console.log('hello ' + task.name);
 *     callback();
 * }, 2);
 *
 * // 指定 callback
 * q.drain(function() {
 *     console.log('all items have been processed');
 * });
 * // 或 await 至结束
 * await q.drain()
 *
 * // 设置 error callback
 * q.error(function(err, task) {
 *     console.error('task experienced an error');
 * });
 *
 * // 为队列添加任务
 * q.push({name: 'foo'}, function(err) {
 *     console.log('finished processing foo');
 * });
 * // callback 可忽略
 * q.push({name: 'bar'});
 *
 * // 批量添加任务到队列
 * q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function(err) {
 *     console.log('finished processing item');
 * });
 *
 * // 添加任务至队列的前排
 * q.unshift({name: 'bar'}, function (err) {
 *     console.log('finished processing bar');
 * });
 */
export default function (worker, concurrency) {
    var _worker = wrapAsync(worker);
    return queue((items, cb) => {
        _worker(items[0], cb);
    }, concurrency, 1);
}