Process Step Jobs
Sometimes, it is useful to break processor function into small pieces that will be processed depending on the previous executed step, we could handle this kind of logic by using switch blocks:
enum Step {
Initial,
Second,
Finish,
}
const worker = new Worker(
'queueName',
async job => {
let step = job.data.step;
while (step !== Step.Finish) {
switch (step) {
case Step.Initial: {
await doInitialStepStuff();
await job.update({
step: Step.Second,
});
step = Step.Second;
break;
}
case Step.Second: {
await doSecondStepStuff();
await job.update({
step: Step.Finish,
});
step = Step.Finish;
return Step.Finish;
}
default: {
throw new Error('invalid step');
}
}
}
},
{ connection },
);
As you can see, we should save the step value; in this case, we are saving it into the job's data. So even in the case of an error, it would be retried in the last step that was saved (in case we use a backoff strategy).
There are situations when it is valuable to delay a job when it is being processed.
This can be handled using the
moveToDelayed
method. However, it is important to note that when a job is being processed by a worker, the worker keeps a lock on this job with a certain token value. For the moveToDelayed
method to work, we need to pass said token so that it can unlock without error. Finally, we need to exit from the processor by throwing a special error DelayedError
that will signal the worker that the job has been delayed so that it does not try to complete (or fail the job) instead.import { DelayedError, Worker } from 'bullmq';
enum Step {
Initial,
Second,
Finish,
}
const worker = new Worker(
'queueName',
async (job: Job, token: string) => {
let step = job.data.step;
while (step !== Step.Finish) {
switch (step) {
case Step.Initial: {
await doInitialStepStuff();
await job.moveToDelayed(Date.now() + 200, token);
await job.update({
step: Step.Second,
});
step = Step.Second;
break;
}
case Step.Second: {
await doSecondStepStuff();
await job.update({
step: Step.Finish,
});
throw new DelayedError();
}
default: {
throw new Error('invalid step');
}
}
}
},
{ connection },
);
A common use case is to add children at runtime and then wait for the children to complete.
This can be handled using the
moveToWaitingChildren
method. However, it is important to note that when a job is being processed by a worker, the worker keeps a lock on this job with a certain token value. For the moveToWaitingChildren
method to work, we need to pass said token so that it can unlock without error. Finally, we need to exit from the processor by throwing a special error WaitingChildrenError
that will signal the worker that the job has been moved to waiting-children so that it does not try to complete (or fail the job) instead.import { WaitingChildrenError, Worker } from 'bullmq';
enum Step {
Initial,
Second,
Third,
Finish,
}
const worker = new Worker(
'parentQueueName',
async (job: Job, token: string) => {
let step = job.data.step;
while (step !== Step.Finish) {
switch (step) {
case Step.Initial: {
await doInitialStepStuff();
await childrenQueue.add(
'child-1',
{ foo: 'bar' },
{
parent: {
id: job.id,
queue: job.queueQualifiedName,
},
},
);
await job.update({
step: Step.Second,
});
step = Step.Second;
break;
}
case Step.Second: {
await doSecondStepStuff();
await childrenQueue.add(
'child-2',
{ foo: 'bar' },
{
parent: {
id: job.id,
queue: job.queueQualifiedName,
},
},
);
await job.update({
step: Step.Third,
});
step = Step.Third;
break;
}
case Step.Third: {
const shouldWait = await job.moveToWaitingChildren(token);
if (!shouldWait) {
await job.update({
step: Step.Finish,
});
step = Step.Finish;
return Step.Finish;
} else {
throw new WaitingChildrenError();
}
}
default: {
throw new Error('invalid step');
}
}
}
},
{ connection },
);
Bullmq-Pro: this pattern could be handled by using observables; in that case, we do not need to save next step.
Another use case is to add flows at runtime and then wait for the children to complete.
For example, we can add children dynamically in the processor function of a worker. This can be handled in this way:
import { FlowProducer, WaitingChildrenError, Worker } from 'bullmq';
enum Step {
Initial,
Second,
Third,
Finish,
}
const flow = new FlowProducer({ connection });
const worker = new Worker(
'parentQueueName',
async (job, token) => {
let step = job.data.step;
while (step !== Step.Finish) {
switch (step) {
case Step.Initial: {
await doInitialStepStuff();
await flow.add({
name: 'child-job',
queueName: 'childrenQueueName',
data: {},
children: [
{
name,
data: { idx: 0, foo: 'bar' },
queueName: 'grandchildrenQueueName',
},
{
name,
data: { idx: 1, foo: 'baz' },
queueName: 'grandchildrenQueueName',
},
],
opts: {
parent: {
id: job.id,
queue: job.queueQualifiedName,
},
},
});
await job.update({
step: Step.Second,
});
step = Step.Second;
break;
}
case Step.Second: {
await doSecondStepStuff();
await job.update({
step: Step.Third,
});
step = Step.Third;
break;
}
case Step.Third: {
const shouldWait = await job.moveToWaitingChildren(token);
if (!shouldWait) {
await job.update({
step: Step.Finish,
});
step = Step.Finish;
return Step.Finish;
} else {
throw new WaitingChildrenError();
}
}
default: {
throw new Error('invalid step');
}
}
}
},
{ connection },
);
Last modified 1mo ago