Моя цель:
- Мы взять на обработку несколько задач
- Установить активный статус задачи (не могу найти способ)
-
active — это статус самой задачи в Bullmq.
[*]Этот сервис вернет нам данные, которые будут содержать статус и некоторые данные (которые нам на самом деле не нужны).
Если пройдет какое-то время (например 30 секунд) и с задачей ничего не произойдет - она должен повторить. (Скорее всего, за это отвечает StalledInterval)
[*]Сторонний сервис обновит статус задачи на маршруте, скажем, /sms/status .
Проблема, с которой я столкнулся, заключается в том, что задачи не переходят в статус «активный», из-за чего они повторяются (выбираются таймером) каждые 5 секунд.
import { InjectQueue } from "@nestjs/bullmq";
import { Inject, Injectable, Logger, type OnModuleDestroy, type OnModuleInit } from "@nestjs/common";
import { seconds } from "@nestjs/throttler";
import { Job, Worker, type Queue } from "bullmq";
import Redis from "ioredis";
@Injectable()
export class SmsQueueProcessor implements OnModuleInit, OnModuleDestroy
{
private readonly logger = new Logger(SmsQueueProcessor.name);
private intervalTimer: NodeJS.Timeout;
private worker: Worker;
constructor(
@Inject("REDIS_CONNECTION")
private readonly redisConnection: Redis,
@InjectQueue("smsQueue")
private readonly smsQueue: Queue
) { }
async onModuleInit()
{
this.worker = new Worker(
"smsQueue",
null, // Remove automatic processing of a new task
{
connection: this.redisConnection,
stalledInterval: seconds(30)
}
);
this.worker.on("failed", (job, err) =>
{
this.logger.error(`Job ${ job.id } failed:`, err.message);
});
this.intervalTimer = setInterval(async () =>
{
const jobs = await this.smsQueue.getJobs([ "waiting" ]);
if (jobs.length > 0)
{
this.processBulk(jobs);
}
}, 5000);
}
async onModuleDestroy()
{
if (this.intervalTimer)
{
clearInterval(this.intervalTimer);
}
await this.worker.close();
}
async processBulk(jobs: Job[])
{
this.logger.log(`Processing processBulk (items: ${ jobs.length } )`);
try
{
const payloads = jobs.map(({ data }) => data); // Getting data from tasks
const { data } = await this.sendBulk(payloads); // Send a request to a third-party service with all data
for (const job of jobs)
{
const res = data.find((item) => item.extra_id === job.id);
if (res)
{
const token = crypto.randomUUID();
await job.extendLock(token, seconds(30));
await this.handleJobResult(job, res);
}
else
{
this.logger.warn("No result found for job " + job.id);
}
}
}
catch (error)
{
this.logger.error("Error processing jobs:", error.message);
}
}
private async sendBulk(data)
{
// Some kind of query that will return data to us:
return {
data: data.map((data) =>
{
return {
extra_id: data.extra_id,
status: "send"
} satisfies ISendResponse;
})
};
}
private async handleJobResult(job: Job, res: ISendResponse)
{
if (res.err)
{
this.logger.error("Error sending SMS: " + res.err);
await job.moveToFailed(new Error(res.err), job.token);
}
else
{
switch (res.status)
{
case "send":
await job.log("In progress: " + res.status);
await job.updateProgress(50);
break;
case Status.Delivered:
await job.log("Delivered");
await job.updateProgress(100);
await job.moveToCompleted("Delivered", job.token);
break;
case Status.Undelivered:
await job.moveToFailed(new Error("SMS Undelivered"), job.token);
break;
default:
await job.log(`Unknown status: ${ res.status }`);
this.logger.error(`Unknown status for job ${ job.id }: ${ res.status }`);
}
}
}
}
Подробнее здесь: https://stackoverflow.com/questions/793 ... one-a-task
Мобильная версия