BullMQ
  • What is BullMQ
  • Quick Start
  • API Reference
  • Changelogs
    • v4
    • v3
    • v2
    • v1
  • Guide
    • Introduction
    • Connections
    • Queues
      • Auto-removal of jobs
      • Adding jobs in bulk
      • Global Concurrency
      • Removing Jobs
    • Workers
      • Auto-removal of jobs
      • Concurrency
      • Graceful shutdown
      • Stalled Jobs
      • Sandboxed processors
      • Pausing queues
    • Jobs
      • FIFO
      • LIFO
      • Job Ids
      • Job Data
      • Deduplication
      • Delayed
      • Repeatable
      • Prioritized
      • Removing jobs
      • Stalled
      • Getters
    • Job Schedulers
      • Repeat Strategies
      • Repeat options
      • Manage Job Schedulers
    • Flows
      • Adding flows in bulk
      • Get Flow Tree
      • Fail Parent
      • Continue Parent
      • Remove Dependency
      • Ignore Dependency
      • Remove Child Dependency
    • Metrics
      • Prometheus
    • Rate limiting
    • Parallelism and Concurrency
    • Retrying failing jobs
    • Returning job data
    • Events
      • Create Custom Events
    • Telemetry
      • Getting started
      • Running Jaeger
      • Running a simple example
    • QueueScheduler
    • Redis™ Compatibility
      • Dragonfly
    • Redis™ hosting
      • AWS MemoryDB
      • AWS Elasticache
    • Architecture
    • NestJs
      • Producers
      • Queue Events Listeners
    • Going to production
    • Migration to newer versions
    • Troubleshooting
  • Patterns
    • Adding jobs in bulk across different queues
    • Manually processing jobs
    • Named Processor
    • Flows
    • Idempotent jobs
    • Throttle jobs
    • Process Step Jobs
    • Failing fast when Redis is down
    • Stop retrying jobs
    • Timeout jobs
    • Timeout for Sandboxed processors
    • Redis Cluster
  • BullMQ Pro
    • Introduction
    • Install
    • Observables
      • Cancelation
    • Groups
      • Getters
      • Rate limiting
      • Local group rate limit
      • Concurrency
      • Local group concurrency
      • Max group size
      • Pausing groups
      • Prioritized intra-groups
      • Sandboxes for groups
    • Telemetry
    • Batches
    • NestJs
      • Producers
      • Queue Events Listeners
      • API Reference
      • Changelog
    • API Reference
    • Changelog
    • New Releases
    • Support
  • Bull
    • Introduction
    • Install
    • Quick Guide
    • Important Notes
    • Reference
    • Patterns
      • Persistent connections
      • Message queue
      • Returning Job Completions
      • Reusing Redis Connections
      • Redis cluster
      • Custom backoff strategy
      • Debugging
      • Manually fetching jobs
  • Python
    • Introduction
    • Changelog
Powered by GitBook

Copyright (c) Taskforce.sh Inc.

On this page
  • Getters
  • Get Dependencies Count
  • Get Children Values
  • Provide options
  • Jobs removal
  • Read more:

Was this helpful?

  1. Guide

Flows

PreviousManage Job SchedulersNextAdding flows in bulk

Last updated 1 month ago

Was this helpful?

BullMQ supports parent - child relationships between jobs. The basic idea is that a parent job will not be moved to the wait status (i.e. where it could be picked up by a worker) until all its children jobs have been processed successfully. Apart from that, a parent or a child job are no different from regular jobs.

This functionality enables the creation of flows where jobs are the node of trees of arbitrary depth.

Flows are added to a queue using the FlowProducer class.

In order to create "flows" you must use the class. The method accepts an object with the following interface:

interface FlowJob {
  name: string;
  queueName: string;
  data?: any;
  prefix?: string;
  opts?: Omit<JobsOptions, 'parent' | 'repeat'>;
  children?: FlowJob[];
}

So we can add a flow like this one:

import { FlowProducer } from 'bullmq';

// A FlowProducer constructor takes an optional "connection"
// object otherwise it connects to a local redis instance.
const flowProducer = new FlowProducer();

const flow = await flowProducer.add({
  name: 'renovate-interior',
  queueName: 'renovate',
  children: [
    { name: 'paint', data: { place: 'ceiling' }, queueName: 'steps' },
    { name: 'paint', data: { place: 'walls' }, queueName: 'steps' },
    { name: 'fix', data: { place: 'floor' }, queueName: 'steps' },
  ],
});
from bullmq import FlowProducer

# A FlowProducer constructor takes an optional "connection"
# object otherwise it connects to a local redis instance.
flowProducer = FlowProducer()

flow = await flowProducer.add({
  "name": "renovate-interior",
  "queueName": "renovate",
  "children": [
    { "name": "paint", "data": { "place": "ceiling" }, "queueName": "steps" },
    { "name": "paint", "data": { "place": "walls" }, "queueName": "steps" },
    { "name": "fix", "data": { "place": "floor" }, "queueName": "steps" },
  ],
})

The above code will atomically add 4 jobs: one to the "renovate" queue, and 3 to the "steps" queue. When the 3 jobs in the "steps" queue are completed, the parent job in the "renovate" queue will be processed as a regular job.

The above call will return instances for all the jobs added to the queue.

Note that the parent queue does not need to be the same queue as the one used for the children.

If a jobId option is provided, make sure that it does not contain a colon : as this is considered a separator.

When the parent job is processed it is possible to access the results generated by its child jobs. For example, lets assume the following worker for the child jobs:

import { Worker } from 'bullmq';

const stepsWorker = new Worker('steps', async job => {
  await performStep(job.data);

  if (job.name === 'paint') {
    return 2500;
  } else if (job.name === 'fix') {
    return 1750;
  }
});
from bullmq import Worker

async def process(job: Job, token: str):
  await performStep(job.data)
  if job.name == 'paint':
    return 2500
  elif job.name == 'fix':
    return 1750

stepsWorker = Worker("steps", process, {"connection": connection})

We can implement a parent worker that sums the costs of the children's jobs using the getChildrenValues method. This method returns an object with job keys as keys and the result of that given job as a value:

import { Worker } from 'bullmq';

const renovateWorker = new Worker('renovate', async job => {
  const childrenValues = await job.getChildrenValues();

  const totalCosts = Object.values(childrenValues).reduce(
    (prev, cur) => prev + cur,
    0,
  );

  await sendInvoice(totalCosts);
});

It is possible to add as deep job hierarchies as needed. See the following example where jobs are depending on each other, allowing serial execution of jobs:

import { FlowProducer } from 'bullmq';
const flowProducer = new FlowProducer();

const queueName = 'assembly-line';
const chain = await flowProducer.add({
  name: 'car',
  data: { step: 'engine' },
  queueName,
  children: [
    {
      name: 'car',
      data: { step: 'wheels' },
      queueName,
      children: [{ name: 'car', data: { step: 'chassis' }, queueName }],
    },
  ],
});
from bullmq import FlowProducer

flowProducer = FlowProducer()

queueName = 'assembly-line'
chain = await flowProducer.add({
  "name": "car",
  "data": { "step": "engine" },
  "queueName": queueName,
  "children": [
    {
      "name": "car",
      "data": { "step": "wheels" },
      "queueName": queueName,
      "children": [{ "name": "car", "data": { "step": "chassis" }, "queueName": queueName }],
    },
  ],
})

In this case one job will be processed after the previous one has been completed.

The order of processing would be: chassis, wheels and finally engine.

Getters

There are some special getters that can be used in order to get jobs related to a flow. First, we have a method in the Job class to get all the dependencies for a given job:

const dependencies = await job.getDependencies();

it will return all the direct dependencies (i.e. the children of a given job).

Or if you want to get specific types of children:

// cursors are used in pagination
const { processed, nextProcessedCursor } = await job.getDependencies({
  processed: {
    count: 5,
    cursor: 0,
  },
});

const { unprocessed, nextUnprocessedCursor } = await job.getDependencies({
  unprocessed: {
    count: 5,
    cursor: 0,
  },
});

const { failed, nextFailedCursor } = await job.getDependencies({
  failed: {
    count: 5,
    cursor: 0,
  },
});

const { ignored, nextIgnoredCursor } = await job.getDependencies({
  ignored: {
    count: 5,
    cursor: 0,
  },
});

The Job class also provides some other methods that we presented above

Get Dependencies Count

To get all the different counts of children by type:

const { failed, ignored, processed, unprocessed } =
  await job.getDependenciesCount();

Or if you want to be specific:

const { failed } = await job.getDependenciesCount({
  failed: true,
});

const { ignored, processed } = await job.getDependenciesCount({
  ignored: true,
  processed: true,
});

const { unprocessed } = await job.getDependenciesCount({
  unprocessed: true,
});

Get Children Values

To get all the values produced by the children of a given job:

const values = await job.getChildrenValues();

Also, a new property is available in the Job class, parentKey, with a fully qualified key for the job parent.

Finally, there is also a new state which a job can be in, "waiting-children", for parent jobs that have not yet had their children completed:

const state = await job.getState();
// state will be "waiting-children"

Provide options

When adding a flow it is also possible to provide an extra object queueOptions object, in which you can add your specific options for every queue that is used in the flow. These options would affect each one of the jobs that are added to the flow using the FlowProducer.

import { FlowProducer } from 'bullmq';
const flowProducer = new FlowProducer();

const queueName = 'assembly-line';
const chain = await flowProducer.add(
  {
    name: 'car',
    data: { step: 'engine' },
    queueName,
    children: [
      {
        name: 'car',
        data: { step: 'wheels' },
        queueName,
      },
    ],
  },
  {
    queuesOptions: {
      [queueName]: {
        defaultJobOptions: {
          removeOnComplete: true,
        },
      },
    },
  },
);

Queue options are defined in the context of their instances. You should provide your configurations in the second parameter in order to avoid unexpected behaviors.

Jobs removal

BullMQ also provides seamless removal functionality for jobs that are part of a flow.

When removing a job that is part of the flow there are several important considerations:

  1. If a parent job is removed, all its children will also be removed.

  2. If a child job is removed, its parent dependency to said child is also removed, and if the child was the last child in the dependencies list, the parent job will be completed.

  3. Since a job can be both a parent and a child in a large flow, both 1 and 2 will occur if removing such a job.

  4. If any of the jobs that would be removed happen to be locked, none of the jobs will be removed, and an exception will be thrown.

Apart from the considerations above, removing a job can simply be done by either using the Job or the Queue class:

await job.remove();
// or
await queue.remove(job.id);

Read more:

📋

💡

💡

💡

💡

💡

FlowProducer
add
Divide large jobs using flows
FlowProducer API Reference
Job API Reference
Get Children Values API Reference
Get Dependencies API Reference
Get Dependencies Count API Reference