Workers
Workers are the actual instances that perform some job based on the jobs that are added in the queue. A worker is equivalent to a "message" receiver in a traditional message queue. The worker's duty is to complete the job. If it succeeds, the job will be moved to the "completed" status. If the worker throws an exception during its processing, the job will automatically be moved to the "failed" status.
A worker is instantiated with the Worker
class, and the work itself will be performed in the process function. Process functions are meant to be asynchronous, using either the async
keyword or returning a promise.
import { Worker, Job } from 'bullmq';
const worker = new Worker(queueName, async (job: Job) => {
// Optionally report some progress
await job.updateProgress(42);
// Optionally sending an object as progress
await job.updateProgress({ foo: 'bar' });
// Do something with job
return 'some value';
});
In order to decide when your processor should start its execution, pass autorun: false
as part of worker options:
import { Worker, Job } from 'bullmq';
const worker = new Worker(
queueName,
async (job: Job) => {
// Optionally report some progress
await job.updateProgress(42);
// Optionally sending an object as progress
await job.updateProgress({ foo: 'bar' });
// Do something with job
return 'some value';
},
{ autorun: false },
);
worker.run();
Note that a processor can optionally return a value. This value can be retrieved either by getting the job and accessing the returnvalue
property or by listening to the completed
event:
worker.on('completed', (job: Job, returnvalue: any) => {
// Do something with the return value.
});
Progress
Inside the worker process function it is also possible to emit progress events. Calling job.progress
you can specify a number or an object if you have more complex needs. The progress
event can be listened for in the same way as the completed
event:
worker.on('progress', (job: Job, progress: number | object) => {
// Do something with the return value.
});
Finally, when the process fails with an exception it is possible to listen for the failed
event too:
worker.on('failed', (job: Job | undefined, error: Error, prev: string) => {
// Do something with the return value.
});
It is also possible to listen to global events in order to get notifications of job completions, progress and failures:
import { QueueEvents } from 'bullmq';
const queueEvents = new QueueEvents('Paint');
queueEvents.on('completed', ({ jobId: string, returnvalue: any }) => {
// Called every time a job is completed by any worker.
});
queueEvents.on('failed', ({ jobId: string, failedReason: string }) => {
// Called whenever a job is moved to failed by any worker.
});
queueEvents.on('progress', ({jobId: string, data: number | object}) => {
// jobId received a progress event
});
Finally, you should attach an error listener to your worker to avoid NodeJS raising an unhandled exception when an error occurs. For example:
worker.on('error', err => {
// log the error
console.error(err);
});
If the error handler is missing, your worker may stop processing jobs when an error is emitted! Find more info here.
Typescript typings
It is also possible to specify the data types for the Job data and return value using generics:
const worker = new Worker<MyData, MyReturn>(queueName, async (job: Job) => {});
Read more:
Last updated
Was this helpful?