Workers

Workers are the actual instances that perform some job based on the jobs that are added in the queue. A worker is equivalent to a "message" receiver in a traditional message queue. The worker's duty is to complete the job. If it succeeds, the job will be moved to the "completed" status. If the worker throws an exception during its processing, the job will automatically be moved to the "failed" status.

Failed jobs can be automatically retried, see Retrying failing jobs

A worker is instantiated with the Worker class, and the work itself will be performed in the process function. Process functions are meant to be asynchronous, using either the async keyword or returning a promise.

import { Worker, Job } from 'bullmq';

const worker = new Worker(queueName, async (job: Job) => {
  // Optionally report some progress
  await job.updateProgress(42);

  // Optionally sending an object as progress
  await job.updateProgress({ foo: 'bar' });

  // Do something with job
  return 'some value';
});

When a worker instance is created, it launches the processor immediately

In order to decide when your processor should start its execution, pass autorun: false as part of worker options:

import { Worker, Job } from 'bullmq';

const worker = new Worker(
  queueName,
  async (job: Job) => {
    // Optionally report some progress
    await job.updateProgress(42);

    // Optionally sending an object as progress
    await job.updateProgress({ foo: 'bar' });

    // Do something with job
    return 'some value';
  },
  { autorun: false },
);

worker.run();

Note that a processor can optionally return a value. This value can be retrieved either by getting the job and accessing the returnvalue property or by listening to the completed event:

worker.on('completed', (job: Job, returnvalue: any) => {
  // Do something with the return value.
});

Progress

Inside the worker process function it is also possible to emit progress events. Calling job.progress you can specify a number or an object if you have more complex needs. The progress event can be listened for in the same way as the completed event:

worker.on('progress', (job: Job, progress: number | object) => {
  // Do something with the return value.
});

Finally, when the process fails with an exception it is possible to listen for the failed event too:

worker.on('failed', (job: Job | undefined, error: Error, prev: string) => {
  // Do something with the return value.
});

It is also possible to listen to global events in order to get notifications of job completions, progress and failures:

import { QueueEvents } from 'bullmq';

const queueEvents = new QueueEvents('Paint');

queueEvents.on('completed', ({ jobId: string, returnvalue: any }) => {
  // Called every time a job is completed by any worker.
});

queueEvents.on('failed', ({ jobId: string, failedReason: string }) => {
  // Called whenever a job is moved to failed by any worker.
});

queueEvents.on('progress', ({jobId: string, data: number | object}) => {
  // jobId received a progress event
});

Finally, you should attach an error listener to your worker to avoid NodeJS raising an unhandled exception when an error occurs. For example:

worker.on('error', err => {
  // log the error
  console.error(err);
});

If the error handler is missing, your worker may stop processing jobs when an error is emitted! Find more info here.

Typescript typings

It is also possible to specify the data types for the Job data and return value using generics:

const worker = new Worker<MyData, MyReturn>(queueName, async (job: Job) => {});

Read more:

Last updated

Copyright (c) Taskforce.sh Inc.