BullMQ
  • What is BullMQ
  • Quick Start
  • API Reference
  • Changelogs
    • v4
    • v3
    • v2
    • v1
  • Guide
    • Introduction
    • Connections
    • Queues
      • Auto-removal of jobs
      • Adding jobs in bulk
      • Global Concurrency
      • Removing Jobs
    • Workers
      • Auto-removal of jobs
      • Concurrency
      • Graceful shutdown
      • Stalled Jobs
      • Sandboxed processors
      • Pausing queues
    • Jobs
      • FIFO
      • LIFO
      • Job Ids
      • Job Data
      • Deduplication
      • Delayed
      • Repeatable
      • Prioritized
      • Removing jobs
      • Stalled
      • Getters
    • Job Schedulers
      • Repeat Strategies
      • Repeat options
      • Manage Job Schedulers
    • Flows
      • Adding flows in bulk
      • Get Flow Tree
      • Fail Parent
      • Continue Parent
      • Remove Dependency
      • Ignore Dependency
      • Remove Child Dependency
    • Metrics
      • Prometheus
    • Rate limiting
    • Parallelism and Concurrency
    • Retrying failing jobs
    • Returning job data
    • Events
      • Create Custom Events
    • Telemetry
      • Getting started
      • Running Jaeger
      • Running a simple example
    • QueueScheduler
    • Redis™ Compatibility
      • Dragonfly
    • Redis™ hosting
      • AWS MemoryDB
      • AWS Elasticache
    • Architecture
    • NestJs
      • Producers
      • Queue Events Listeners
    • Going to production
    • Migration to newer versions
    • Troubleshooting
  • Patterns
    • Adding jobs in bulk across different queues
    • Manually processing jobs
    • Named Processor
    • Flows
    • Idempotent jobs
    • Throttle jobs
    • Process Step Jobs
    • Failing fast when Redis is down
    • Stop retrying jobs
    • Timeout jobs
    • Timeout for Sandboxed processors
    • Redis Cluster
  • BullMQ Pro
    • Introduction
    • Install
    • Observables
      • Cancelation
    • Groups
      • Getters
      • Rate limiting
      • Local group rate limit
      • Concurrency
      • Local group concurrency
      • Max group size
      • Pausing groups
      • Prioritized intra-groups
      • Sandboxes for groups
    • Telemetry
    • Batches
    • NestJs
      • Producers
      • Queue Events Listeners
      • API Reference
      • Changelog
    • API Reference
    • Changelog
    • New Releases
    • Support
  • Bull
    • Introduction
    • Install
    • Quick Guide
    • Important Notes
    • Reference
    • Patterns
      • Persistent connections
      • Message queue
      • Returning Job Completions
      • Reusing Redis Connections
      • Redis cluster
      • Custom backoff strategy
      • Debugging
      • Manually fetching jobs
  • Python
    • Introduction
    • Changelog
Powered by GitBook

Copyright (c) Taskforce.sh Inc.

On this page
  • New Batch Options: minSize and timeout
  • Failing jobs
  • Handling events
  • Limitations

Was this helpful?

  1. BullMQ Pro

Batches

Processing jobs in batches

It is possible to configure workers so that instead of processing one job at a time they can process up to a number of jobs (a so-called batch) in one go. Workers using batches have slightly different semantics and behavior than normal workers, so read carefully the following examples to avoid pitfalls.

To enable batches, pass the batch option with a size property representing the maximum number of jobs per batch:

const worker = new WorkerPro(
  'My Queue',
  async (job: JobPro) => {
    const batch = job.getBatch();

    for (let i = 0; i < batch.length; i++) {
      const batchedJob = batch[i];
      await doSomethingWithBatchedJob(batchedJob);
    }
  },
  { connection, batch: { size: 10 } },
);

There is no strict maximum limit for the size of batches; however, keep in mind that larger batches introduce overhead proportional to their size, which could lead to performance issues. Typical batch sizes range between 10 and 50 jobs.

New Batch Options: minSize and timeout

In addition to the size option, two new options—minSize and timeout—provide greater control over batch processing:

  • minSize: Specifies the minimum number of jobs required before the worker processes a batch. The worker will wait until at least minSize jobs are available before fetching and processing them, up to the size limit. If fewer than minSize jobs are available, the worker waits indefinitely unless a timeout is also set.

  • timeout: Defines the maximum time (in milliseconds) the worker will wait for minSize jobs to accumulate. If the timeout expires before minSize is reached, the worker processes whatever jobs are available, up to the size limit. If minSize is not set the timeout option is effectively ignored, as the worker batches only avaialble jobs.

Important: minSize and timeout are not compatible with groups. When groups are used, the worker ignores minSize and tries to batch avaialble jobs without waiting.

Here’s an example configuration using both minSize and timeout:

const worker = new WorkerPro(
  'My Queue',
  async (job: JobPro) => {
    const batch = job.getBatch();
    for (let i = 0; i < batch.length; i++) {
      const batchedJob = batch[i];
      await doSomethingWithBatchedJob(batchedJob);
    }
  },
  {
    connection,
    batch: {
      size: 10,      // Maximum jobs per batch
      minSize: 5,    // Wait for at least 5 jobs
      timeout: 30_000 // Wait up to 30 seconds
    },
  },
);

In this example:

  • The worker waits for at least 5 jobs to become available, up to a maximum of 10 jobs per batch.

  • If 5 or more jobs are available within 30 seconds, it processes the batch (up to 10 jobs).

  • If fewer than 5 jobs are available after 30 seconds, it processes whatever jobs are present, even if below minSize.

Failing jobs

When using batches, the default is that if the processor throws an exception, all jobs in the batch will fail.

To fail specific jobs instead, use the setAsFailed method on individual jobs within the batch:

const worker = new WorkerPro(
  'My Queue',
  async (job: JobPro) => {
    const batch = job.getBatch();

    for (let i = 0; i < batch.length; i++) {
      const batchedJob = batch[i];
      try {
        await doSomethingWithBatchedJob(batchedJob);
      } catch (err) {
        batchedJob.setAsFailed(err);
      }
    }
  },
  { connection, batch: { size: 10 } },
);

Only jobs explicitly marked with setAsFailed will fail; the remaining jobs in the batch will complete succesfully once the processor finishes.

Handling events

Batches are managed by wrapping all jobs in a batch into a dummy job that holds the jobs in an internal array. This simplifies batch processing but affects event handling. For example, worker-level event listeners (e.g., worker.on('completed', ...)) report events for the dummy batch job, not the individual jobs within it.

To retrieve the jobs in a batch from an event handler, use the getBatch method:

worker.on('completed', job => {
  const batch = job.getBatch();
  // ...
});

Using a global event listener you can listen to individual job events even though they may be processed in a batch:

import { QueueEventsPro } from '@taskforcesh/bullmq-pro';

const queueEvents = new QueueEventsPro(queueName, { connection });
queueEvents.on('completed', (jobId, err) => {
  // ...
});

Limitations

Currently, all worker options can be used with batches, however, there are some unsupported features that may be implemented in the future:

PreviousTelemetryNextNestJs

Last updated 2 months ago

Was this helpful?

.

Dynamic rate limit
Manually processing jobs
Dynamically delay jobs