Batches

Processing jobs in batches

It is possible to configure workers so that instead of processing one job at a time they can process up to a number of jobs (a so-called batch) in one go.

Workers using batches have slightly different semantics and behavior than normal workers, so read carefully the following examples to avoid pitfalls.

In order to enable batches you must pass the batches option with a size representing the maximum amount of jobs per batch:

const worker = new WorkerPro(
  'My Queue',
  async (job: JobPro) => {
    const batch = job.getBatch();

    for (let i = 0; i < batch.length; i++) {
      const batchedJob = batch[i];
      await doSomethingWithBatchedJob(batchedJob);
    }
  },
  { connection, batch: { size: 10 } },
);

There is no maximum limit for the size of the batches, however, keep in mind that there is an overhead proportional to the size of the batch, so really large batches could create performance issues. A typical value would be something between 10 and 50 jobs per batch.

Failing jobs

When using batches, the default is that if the processor throws an exception, all the jobs in the batch will fail.

Sometimes it is useful to just fail specific jobs in a batch, we can accomplish this by using the job's method setAsFailed. See how the example above can be modified to fail specific jobs:

const worker = new WorkerPro(
  'My Queue',
  async (job: JobPro) => {
    const batch = job.getBatch();

    for (let i = 0; i < batch.length; i++) {
      const batchedJob = batch[i];
      try {
        await doSomethingWithBatchedJob(batchedJob);
      } catch (err) {
        batchedJob.setAsFailed(err);
      }
    }
  },
  { connection, batch: { size: 10 } },
);

Only the jobs that are setAsFailed will fail, the rest will be moved to complete when the processor for the batch job completes.

Handling events

Batches are handled by wrapping all the jobs in a batch into a dummy job that keeps all the jobs in an internal array. This approach simplifies the mechanics of running batches, however, it also affects things like how events are handled. For instance, if you need to listen for individual jobs that have completed or failed you must use global events, as the event handler on the worker instance will only report on the events produced by the wrapper batch job, and not the jobs themselves.

It is possible, however, to call the getBatch function in order to retrieve all the jobs that belong to a given batch.

worker.on('completed', job => {
  const batch = job.getBatch();
  // ...
});

Using a global event listener you can listen to individual job events even though they may be processed in a batch:

import { QueueEventsPro } from '@taskforcesh/bullmq-pro';

const queueEvents = new QueueEventsPro(queueName, { connection });
queueEvents.on('completed', (jobId, err) => {
  // ...
});

Limitations

Currently, all worker options can be used with batches, however, there are some unsupported features that may be implemented in the future:

Last updated

Copyright (c) Taskforce.sh Inc.