нам нужно выполнить задачи в нескольких потоках, которые зависят от максимальных чтений, но задачи с одинаковыми идентификаторами не могут быть выполнены одновременно, в противном случае появится ошибка. Нам нужно создать такую цепочку выполнения, где задачи с разными идентификаторами выполняются параллельно, а задачи, идентификатор которого уже находится в процессе выполнения, добавляются в очередь задач и выполняются, когда задача с одним идентификатором завершила свое выполнение. Но я не могу сделать так, чтобы, когда задача с одним идентификатором завершена, можно было бы вызвать задачу из очереди. < /P>
Я не могу сделать правильную цепочку вызовов. Все задачи, которые приходят к нам от итератора, должны быть выполнены. < /Strong> < /p>
Код: Выделить всё
const executeData = {
running: {},
completed: {}
}
const tasks = [];
for (const i of [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]) {
tasks[i] = [];
for (const action of ['init', 'prepare', 'work', 'finalize', 'cleanup']) {
tasks[i].push({
targetId: i,
action: action,
_onExecute() {
this.running = true;
},
_onComplete() {
delete this.running;
this.completed = true;
}
});
}
}
const q = [...tasks[0], ...tasks[1]];
tasks[0][4]._onComplete = () => {
q.push(...tasks[1], ...tasks[2], ...tasks[3]);
delete tasks[0][4].running;
tasks[0][4].completed = true;
};
tasks[1][1]._onComplete = () => {
q.push(...tasks[4]);
delete tasks[1][1].running;
tasks[1][1].completed = true;
};
tasks[2][2]._onComplete = () => {
q.push(...tasks[5]);
delete tasks[2][2].running;
tasks[2][2].completed = true;
};
tasks[3][3]._onComplete = () => {
q.push(...tasks[6]);
delete tasks[3][3].running;
tasks[3][3].completed = true;
};
tasks[4][4]._onComplete = () => {
q.push(...tasks[7]);
delete tasks[4][4].running;
tasks[4][4].completed = true;
};
tasks[5][4]._onComplete = () => {
q.push(...tasks[8]);
delete tasks[5][4].running;
tasks[5][4].completed = true;
};
tasks[8][4]._onComplete = () => {
q.push(...tasks[9], ...tasks[10], ...tasks[11]);
delete tasks[8][4].running;
tasks[8][4].completed = true;
};
const queue = {
[Symbol.asyncIterator]() {
let i = 0;
return {
async next() {
while (q[i] && (q[i].completed || q[i].acquired)) {
i++;
}
if (i < q.length) {
const value = q[i++];
if (value) {
value.acquired = true;
}
return {
done: false,
value
};
} else {
return {
done: true,
value: undefined
};
}
}
};
},
q
};
async function run(maxThreads = 0) {
maxThreads = Math.max(0, maxThreads || 12)
const activeTargetId = new Set();
const taskQueue = [];
let taskInQueue = 0;
const queueIterator = queue[Symbol.asyncIterator]()
async function executeNext() {
const { value: task, done } = await queueIterator.next();
if (done) return;
if (activeTargetId.has(task.targetId)) {
taskQueue.push(task)
await executeNext()
return;
}
if (taskInQueue < maxThreads) {
activeTargetId.add(task.targetId);
taskInQueue++
await executeTask(task).finally(() => {
activeTargetId.delete(task.targetId);
taskInQueue--
})
}
await executeNext();
}
async function startTaskQ(targetId) {
const nextTask = taskQueue.find(task => task.targetId === targetId);
if (nextTask) {
try {
await executeTask(nextTask);
activeTargetId.add(nextTask.targetId);
} finally {
activeTargetId.delete(nextTask.targetId);
const nextTaskId = taskQueue.findIndex(task => nextTask.targetId === task.targetId);
taskQueue.splice(nextTaskId, 1);
startTaskQ(targetId);
}
}
}
const workres = [];
for (let i = 0; i < maxThreads; i++) {
workres.push(executeNext())
}
await Promise.all(workres);
console.log(executeData, 'executeData')
}
async function executeTask(task) {
const running = executeData.running;
const completed = executeData.completed;
const targetId = task.targetId;
if (running[targetId]) {
throw new Error(`cannot execute task ${targetId}:` +
`${task.action}: task with the same targetId=${targetId} is running`);
}
running[targetId] = task;
const ns = { ...running }
console.log(ns, 'running')
if (task._onExecute) {
task._onExecute();
}
switch (task.action) {
case 'init': {
await sleep(10 * (1 + targetId / 10));
await sleep(30 * (1 + targetId / 10));
break;
}
case 'prepare': {
await sleep(30 * (1 + targetId / 10));
await sleep(70 * (1 + targetId / 10));
break;
}
case 'work': {
await sleep(50 * (1 + targetId / 10));
await sleep(150 * (1 + targetId / 10));
break;
}
case 'finalize': {
await sleep(30 * (1 + targetId / 10));
await sleep(70 * (1 + targetId / 10));
break;
}
default: {
await sleep(25);
await sleep(25);
break;
}
}
delete running[targetId];
if (task._onComplete) {
task._onComplete();
}
if (!completed[targetId]) {
completed[targetId] = [];
}
completed[targetId].push({ targetId: task.targetId, action: task.action });
}
async function sleep(ms) {
ms = Math.max(0, ms);
return new Promise(r => setTimeout(() => r(), ms));
}
run(3)Подробнее здесь: https://stackoverflow.com/questions/797 ... javascript
Мобильная версия