BullMQ
Search…
Process Step Jobs

Process Step jobs

Sometimes, it is useful to break processor function into small pieces that will be processed depending on the previous executed step, we could handle this kind of logic by using switch blocks:
1
enum Step {
2
Initial,
3
Second,
4
Finish,
5
}
6
7
const worker = new Worker(
8
queueName,
9
async job => {
10
let step = job.data.step;
11
while (step !== Step.Finish) {
12
switch (step) {
13
case Step.Initial: {
14
await doInitialStepStuff();
15
await job.update({
16
step: Step.Second,
17
});
18
step = Step.Second;
19
break;
20
}
21
case Step.Second: {
22
await doSecondStepStuff();
23
await job.update({
24
step: Step.Finish,
25
});
26
step = Step.Finish;
27
return Step.Finish;
28
}
29
default: {
30
throw new Error('invalid step');
31
}
32
}
33
}
34
},
35
{ connection },
36
);
Copied!
As you can see, we should save the step value; in this case, we are saving it into the job's data. So even in the case of an error, it would be retried in the last step that was saved (in case we use a backoff strategy).

Waiting Children

A common use case is to add children at runtime and then wait for the children to complete.
This could be handled using the moveToWaitingChildren method:
1
enum Step {
2
Initial,
3
Second,
4
Third,
5
Finish,
6
}
7
8
const worker = new Worker(
9
parentQueueName,
10
async (job, token) => {
11
let step = job.data.step;
12
while (step !== Step.Finish) {
13
switch (step) {
14
case Step.Initial: {
15
await doInitialStepStuff();
16
await childrenQueue.add(
17
'child-1',
18
{ foo: 'bar' },
19
{
20
parent: {
21
id: job.id,
22
queue: `bull:${parentQueueName}`,
23
},
24
},
25
);
26
await job.update({
27
step: Step.Second,
28
});
29
step = Step.Second;
30
break;
31
}
32
case Step.Second: {
33
await doSecondStepStuff();
34
await childrenQueue.add(
35
'child-2',
36
{ foo: 'bar' },
37
{
38
parent: {
39
id: job.id,
40
queue: `${job.prefix}:${job.queueName}`,
41
},
42
},
43
);
44
await job.update({
45
step: Step.Third,
46
});
47
step = Step.Third;
48
break;
49
}
50
case Step.Third: {
51
const shouldWait = await job.moveToWaitingChildren(token);
52
if (!shouldWait) {
53
await job.update({
54
step: Step.Finish,
55
});
56
step = Step.Finish;
57
return Step.Finish;
58
}
59
break;
60
}
61
default: {
62
throw new Error('invalid step');
63
}
64
}
65
}
66
},
67
{ connection },
68
);
Copied!
Bullmq-Pro: this pattern could be handled by using observables; in that case, we do not need to save next step.