BullMQ
  • What is BullMQ
  • Quick Start
  • API Reference
  • Changelogs
    • v4
    • v3
    • v2
    • v1
  • Guide
    • Introduction
    • Connections
    • Queues
      • Auto-removal of jobs
      • Adding jobs in bulk
      • Global Concurrency
      • Removing Jobs
    • Workers
      • Auto-removal of jobs
      • Concurrency
      • Graceful shutdown
      • Stalled Jobs
      • Sandboxed processors
      • Pausing queues
    • Jobs
      • FIFO
      • LIFO
      • Job Ids
      • Job Data
      • Deduplication
      • Delayed
      • Repeatable
      • Prioritized
      • Removing jobs
      • Stalled
      • Getters
    • Job Schedulers
      • Repeat Strategies
      • Repeat options
      • Manage Job Schedulers
    • Flows
      • Adding flows in bulk
      • Get Flow Tree
      • Fail Parent
      • Continue Parent
      • Remove Dependency
      • Ignore Dependency
      • Remove Child Dependency
    • Metrics
      • Prometheus
    • Rate limiting
    • Parallelism and Concurrency
    • Retrying failing jobs
    • Returning job data
    • Events
      • Create Custom Events
    • Telemetry
      • Getting started
      • Running Jaeger
      • Running a simple example
    • QueueScheduler
    • Redis™ Compatibility
      • Dragonfly
    • Redis™ hosting
      • AWS MemoryDB
      • AWS Elasticache
    • Architecture
    • NestJs
      • Producers
      • Queue Events Listeners
    • Going to production
    • Migration to newer versions
    • Troubleshooting
  • Patterns
    • Adding jobs in bulk across different queues
    • Manually processing jobs
    • Named Processor
    • Flows
    • Idempotent jobs
    • Throttle jobs
    • Process Step Jobs
    • Failing fast when Redis is down
    • Stop retrying jobs
    • Timeout jobs
    • Timeout for Sandboxed processors
    • Redis Cluster
  • BullMQ Pro
    • Introduction
    • Install
    • Observables
      • Cancelation
    • Groups
      • Getters
      • Rate limiting
      • Local group rate limit
      • Concurrency
      • Local group concurrency
      • Max group size
      • Pausing groups
      • Prioritized intra-groups
      • Sandboxes for groups
    • Telemetry
    • Batches
    • NestJs
      • Producers
      • Queue Events Listeners
      • API Reference
      • Changelog
    • API Reference
    • Changelog
    • New Releases
    • Support
  • Bull
    • Introduction
    • Install
    • Quick Guide
    • Important Notes
    • Reference
    • Patterns
      • Persistent connections
      • Message queue
      • Returning Job Completions
      • Reusing Redis Connections
      • Redis cluster
      • Custom backoff strategy
      • Debugging
      • Manually fetching jobs
  • Python
    • Introduction
    • Changelog
Powered by GitBook

Copyright (c) Taskforce.sh Inc.

On this page
  • Typescript typings
  • Read more:

Was this helpful?

  1. Guide

Workers

PreviousRemoving JobsNextAuto-removal of jobs

Last updated 8 months ago

Was this helpful?

Workers are the actual instances that perform some job based on the jobs that are added in the queue. A worker is equivalent to a "message" receiver in a traditional message queue. The worker's duty is to complete the job. If it succeeds, the job will be moved to the "completed" status. If the worker throws an exception during its processing, the job will automatically be moved to the "failed" status.

Failed jobs can be automatically retried, see

A worker is instantiated with the Worker class, and the work itself will be performed in the process function. Process functions are meant to be asynchronous, using either the async keyword or returning a promise.

import { Worker, Job } from 'bullmq';

const worker = new Worker(queueName, async (job: Job) => {
  // Optionally report some progress
  await job.updateProgress(42);

  // Optionally sending an object as progress
  await job.updateProgress({ foo: 'bar' });

  // Do something with job
  return 'some value';
});

When a worker instance is created, it launches the processor immediately

In order to decide when your processor should start its execution, pass autorun: false as part of worker options:

import { Worker, Job } from 'bullmq';

const worker = new Worker(
  queueName,
  async (job: Job) => {
    // Optionally report some progress
    await job.updateProgress(42);

    // Optionally sending an object as progress
    await job.updateProgress({ foo: 'bar' });

    // Do something with job
    return 'some value';
  },
  { autorun: false },
);

worker.run();

Note that a processor can optionally return a value. This value can be retrieved either by getting the job and accessing the returnvalue property or by listening to the completed event:

worker.on('completed', (job: Job, returnvalue: any) => {
  // Do something with the return value.
});

Progress

Inside the worker process function it is also possible to emit progress events. Calling job.progress you can specify a number or an object if you have more complex needs. The progress event can be listened for in the same way as the completed event:

worker.on('progress', (job: Job, progress: number | object) => {
  // Do something with the return value.
});

Finally, when the process fails with an exception it is possible to listen for the failed event too:

worker.on('failed', (job: Job | undefined, error: Error, prev: string) => {
  // Do something with the return value.
});

It is also possible to listen to global events in order to get notifications of job completions, progress and failures:

import { QueueEvents } from 'bullmq';

const queueEvents = new QueueEvents('Paint');

queueEvents.on('completed', ({ jobId: string, returnvalue: any }) => {
  // Called every time a job is completed by any worker.
});

queueEvents.on('failed', ({ jobId: string, failedReason: string }) => {
  // Called whenever a job is moved to failed by any worker.
});

queueEvents.on('progress', ({jobId: string, data: number | object}) => {
  // jobId received a progress event
});

Finally, you should attach an error listener to your worker to avoid NodeJS raising an unhandled exception when an error occurs. For example:

worker.on('error', err => {
  // log the error
  console.error(err);
});

Typescript typings

It is also possible to specify the data types for the Job data and return value using generics:

const worker = new Worker<MyData, MyReturn>(queueName, async (job: Job) => {});

Read more:

If the error handler is missing, your worker may stop processing jobs when an error is emitted! Find more info .

💡

💡

Retrying failing jobs
here
Worker API Reference
Queue Events API Reference