Sometimes, it is useful to break processor functions into small pieces that will be processed depending on the previous executed step. One way to handle this kind of logic is by using switch statements:
TypeScript Python
Copy enum Step {
Initial ,
Second ,
Finish ,
}
const worker = new Worker (
'queueName' ,
async job => {
let step = job . data .step;
while (step !== Step .Finish) {
switch (step) {
case Step .Initial: {
await doInitialStepStuff ();
await job .updateData ({
step : Step .Second ,
});
step = Step .Second;
break ;
}
case Step .Second: {
await doSecondStepStuff ();
await job .updateData ({
step : Step .Finish ,
});
step = Step .Finish;
return Step .Finish;
}
default : {
throw new Error ( 'invalid step' );
}
}
}
} ,
{ connection } ,
);
Copy class Step ( int , Enum ):
Initial = 1
Second = 2
Finish = 3
async def process ( job : Job , token : str ):
step = job . data . get ( "step" )
while step != Step . Finish :
if step == Step . Initial :
await doInitialStepStuff ()
await job . updateData ({
"step" : Step.Second
})
step = Step . Second
elif step == Step . Second :
await doSecondStepStuff ()
await job . updateData ({
"step" : Step.Finish
})
step = Step . Finish
else :
raise Exception ( "invalid step" )
worker = Worker ( "queueName" , process, { "connection" : connection})
By saving the next step value every time we complete the previous step (here, saving it in the job's data), we can ensure that if the job errors and retries, it does so starting from the correct step.
Delaying
There are situations when it is useful to delay a job when it is being processed.
This can be handled using the moveToDelayed
method. However, it is important to note that when a job is being processed by a worker, the worker keeps a lock on this job with a certain token value. For the moveToDelayed
method to work, we need to pass said token so that it can unlock without error. Finally, we need to exit from the processor by throwing a special error (DelayedError
) that will signal to the worker that the job has been delayed so that it does not try to complete (or fail the job) instead.
Copy import { DelayedError , Worker } from 'bullmq' ;
enum Step {
Initial ,
Second ,
Finish ,
}
const worker = new Worker (
'queueName' ,
async (job : Job , token ?: string ) => {
let step = job . data .step;
while (step !== Step .Finish) {
switch (step) {
case Step .Initial: {
await doInitialStepStuff ();
await job .moveToDelayed ( Date .now () + 200 , token);
await job .updateData ({
step : Step .Second ,
});
throw new DelayedError ();
}
case Step .Second: {
await doSecondStepStuff ();
await job .updateData ({
step : Step .Finish ,
});
step = Step .Finish;
}
default : {
throw new Error ( 'invalid step' );
}
}
}
} ,
{ connection } ,
);
Waiting Children
A common use case is to add children at runtime and then wait for the children to complete.
This can be handled using the moveToWaitingChildren
method. However, it is important to note that when a job is being processed by a worker, the worker keeps a lock on this job with a certain token value. For the moveToWaitingChildren
method to work, we need to pass said token so that it can unlock without error. Finally, we need to exit from the processor by throwing a special error (WaitingChildrenError
) that will signal to the worker that the job has been moved to waiting-children , so that it does not try to complete (or fail) the job instead.
TypeScript Python
Copy import { WaitingChildrenError , Worker } from 'bullmq' ;
enum Step {
Initial ,
Second ,
Third ,
Finish ,
}
const worker = new Worker (
'parentQueueName' ,
async (job : Job , token ?: string ) => {
let step = job . data .step;
while (step !== Step .Finish) {
switch (step) {
case Step .Initial: {
await doInitialStepStuff ();
await childrenQueue .add (
'child-1' ,
{ foo : 'bar' } ,
{
parent : {
id : job .id ,
queue : job .queueQualifiedName ,
} ,
} ,
);
await job .updateData ({
step : Step .Second ,
});
step = Step .Second;
break ;
}
case Step .Second: {
await doSecondStepStuff ();
await childrenQueue .add (
'child-2' ,
{ foo : 'bar' } ,
{
parent : {
id : job .id ,
queue : job .queueQualifiedName ,
} ,
} ,
);
await job .updateData ({
step : Step .Third ,
});
step = Step .Third;
break ;
}
case Step .Third: {
const shouldWait = await job .moveToWaitingChildren (token);
if ( ! shouldWait) {
await job .updateData ({
step : Step .Finish ,
});
step = Step .Finish;
return Step .Finish;
} else {
throw new WaitingChildrenError ();
}
}
default : {
throw new Error ( 'invalid step' );
}
}
}
} ,
{ connection } ,
);
Copy from bullmq import Worker , WaitingChildrenError
from enum import Enum
class Step ( int , Enum ):
Initial = 1
Second = 2
Third = 3
Finish = 4
async def process ( job : Job , token : str ):
step = job . data . get ( "step" )
while step != Step . Finish :
if step == Step . Initial :
await doInitialStepStuff ()
await children_queue . add ( 'child-1' , { "foo" : "bar" },{
"parent" : {
"id" : job.id,
"queue" : job.queueQualifiedName
}
})
await job . updateData ({
"step" : Step.Second
})
step = Step . Second
elif step == Step . Second :
await doSecondStepStuff ()
await children_queue . add ( 'child-2' , { "foo" : "bar" },{
"parent" : {
"id" : job.id,
"queue" : job.queueQualifiedName
}
})
await job . updateData ({
"step" : Step.Third
})
step = Step . Third
elif step == Step . Third :
should_wait = await job . moveToWaitingChildren (token, {})
if not should_wait :
await job . updateData ({
"step" : Step.Finish
})
step = Step . Finish
return Step . Finish
else :
raise WaitingChildrenError
else :
raise Exception ( "invalid step" )
worker = Worker ( "parentQueueName" , process, { "connection" : connection})
Bullmq-Pro: this pattern could be handled by using observables; in that case, we do not need to save next step.
Chaining Flows
Another use case is to add flows at runtime and then wait for the children to complete.
For example, we can add children dynamically in the worker's processor function:
Copy import { FlowProducer , WaitingChildrenError , Worker } from 'bullmq' ;
enum Step {
Initial ,
Second ,
Third ,
Finish ,
}
const flow = new FlowProducer ({ connection });
const worker = new Worker (
'parentQueueName' ,
async (job , token) => {
let step = job . data .step;
while (step !== Step .Finish) {
switch (step) {
case Step .Initial: {
await doInitialStepStuff ();
await flow .add ({
name : 'child-job' ,
queueName : 'childrenQueueName' ,
data : {} ,
children : [
{
name ,
data : { idx : 0 , foo : 'bar' } ,
queueName : 'grandchildrenQueueName' ,
} ,
{
name ,
data : { idx : 1 , foo : 'baz' } ,
queueName : 'grandchildrenQueueName' ,
} ,
] ,
opts : {
parent : {
id : job .id ,
queue : job .queueQualifiedName ,
} ,
} ,
});
await job .updateData ({
step : Step .Second ,
});
step = Step .Second;
break ;
}
case Step .Second: {
await doSecondStepStuff ();
await job .updateData ({
step : Step .Third ,
});
step = Step .Third;
break ;
}
case Step .Third: {
const shouldWait = await job .moveToWaitingChildren (token);
if ( ! shouldWait) {
await job .updateData ({
step : Step .Finish ,
});
step = Step .Finish;
return Step .Finish;
} else {
throw new WaitingChildrenError ();
}
}
default : {
throw new Error ( 'invalid step' );
}
}
}
} ,
{ connection } ,
);