BullMQ
  • What is BullMQ
  • Quick Start
  • API Reference
  • Changelogs
    • v4
    • v3
    • v2
    • v1
  • Guide
    • Introduction
    • Connections
    • Queues
      • Auto-removal of jobs
      • Adding jobs in bulk
      • Global Concurrency
      • Removing Jobs
    • Workers
      • Auto-removal of jobs
      • Concurrency
      • Graceful shutdown
      • Stalled Jobs
      • Sandboxed processors
      • Pausing queues
    • Jobs
      • FIFO
      • LIFO
      • Job Ids
      • Job Data
      • Deduplication
      • Delayed
      • Repeatable
      • Prioritized
      • Removing jobs
      • Stalled
      • Getters
    • Job Schedulers
      • Repeat Strategies
      • Repeat options
      • Manage Job Schedulers
    • Flows
      • Adding flows in bulk
      • Get Flow Tree
      • Fail Parent
      • Continue Parent
      • Remove Dependency
      • Ignore Dependency
      • Remove Child Dependency
    • Metrics
      • Prometheus
    • Rate limiting
    • Parallelism and Concurrency
    • Retrying failing jobs
    • Returning job data
    • Events
      • Create Custom Events
    • Telemetry
      • Getting started
      • Running Jaeger
      • Running a simple example
    • QueueScheduler
    • Redis™ Compatibility
      • Dragonfly
    • Redis™ hosting
      • AWS MemoryDB
      • AWS Elasticache
    • Architecture
    • NestJs
      • Producers
      • Queue Events Listeners
    • Going to production
    • Migration to newer versions
    • Troubleshooting
  • Patterns
    • Adding jobs in bulk across different queues
    • Manually processing jobs
    • Named Processor
    • Flows
    • Idempotent jobs
    • Throttle jobs
    • Process Step Jobs
    • Failing fast when Redis is down
    • Stop retrying jobs
    • Timeout jobs
    • Timeout for Sandboxed processors
    • Redis Cluster
  • BullMQ Pro
    • Introduction
    • Install
    • Observables
      • Cancelation
    • Groups
      • Getters
      • Rate limiting
      • Local group rate limit
      • Concurrency
      • Local group concurrency
      • Max group size
      • Pausing groups
      • Prioritized intra-groups
      • Sandboxes for groups
    • Telemetry
    • Batches
    • NestJs
      • Producers
      • Queue Events Listeners
      • API Reference
      • Changelog
    • API Reference
    • Changelog
    • New Releases
    • Support
  • Bull
    • Introduction
    • Install
    • Quick Guide
    • Important Notes
    • Reference
    • Patterns
      • Persistent connections
      • Message queue
      • Returning Job Completions
      • Reusing Redis Connections
      • Redis cluster
      • Custom backoff strategy
      • Debugging
      • Manually fetching jobs
  • Python
    • Introduction
    • Changelog
Powered by GitBook

Copyright (c) Taskforce.sh Inc.

On this page
  • continueParentOnFailure
  • removeUnprocessedChildren
  • getFailedChildrenValues
  • Example
  • Practical use case

Was this helpful?

  1. Guide
  2. Flows

Continue Parent

Process parent if any children fails.

Available since v5.58.0

The continueParentOnFailure option allows a parent job to start processing as soon as a child job fails, while the removeUnprocessedChildren method enables dynamic cleanup of unprocessed child jobs. Additionally, you can use the getFailedChildrenValues() method to determine whether the parent is processing due to a child failure or because all children completed successfully, allowing you to define distinct logic paths.

continueParentOnFailure

When set to true on a child job, the continueParentOnFailure option causes the parent job to begin processing immediately if that child fails. This contrasts with the default behavior, where the parent waits for all children to finish.

  • Key Behavior: The parent moves to the active state as soon as a child with this option fails, even if other children are still running or unprocessed.

  • Use Case: Ideal for scenarios where a child’s failure requires immediate parent intervention, such as aborting the workflow or performing cleanup.

removeUnprocessedChildren

This method, available on a job instance, removes all unprocessed child jobs (those in waiting or delayed states) from the queue. It’s particularly useful when paired with continueParentOnFailure to get rid of remaining children after a failure.

  • Key Behavior: Only affects children that haven’t started processing; active, completed or failed children remain intact.

  • Usage: Call within the parent’s processor to clean up dynamically.

getFailedChildrenValues

The getFailedChildrenValues() method returns an object mapping the IDs of failed child jobs to their failure error messages. This allows the parent job to determine why it’s processing—whether due to a child failure (triggered by continueParentOnFailure) or because all children completed successfully.

  • Return Value: An object where keys are job IDs and values are error messages (e.g., { "job-id-1": "Upload failed" }). If no children failed, the object is empty.

  • Usage: Use this in the parent’s processor to branch logic based on the presence of failed children.

Example

The following example shows how to combine these features, with the parent job reacting differently based on whether a child failed or all children succeeded:

const { FlowProducer } = require('bullmq');
const flow = new FlowProducer({ connection });

// Define the flow
const originalTree = await flow.add({
  name: 'root-job',
  queueName: 'topQueueName',
  data: {},
  children: [
    {
      name: 'child-job-1',
      data: { idx: 0, foo: 'bar' },
      queueName: 'childrenQueueName',
      opts: { continueParentOnFailure: true }, // Parent processes if this child fails
    },
    {
      name: 'child-job-2',
      data: { idx: 1, foo: 'baz' },
      queueName: 'childrenQueueName',
    },
    {
      name: 'child-job-3',
      data: { idx: 2, foo: 'qux' },
      queueName: 'childrenQueueName',
    },
  ],
});

// Processor for the parent job
const processor = async (job) => {
  // Check if any children failed
  const failedChildren = await job.getFailedChildrenValues();
  const hasFailedChildren = Object.keys(failedChildren).length > 0;

  if (hasFailedChildren) {
    // Path 1: A child failed, triggering continueParentOnFailure
    console.log(`Parent job ${job.name} triggered by child failure(s):`, failedChildren);
    
    // Remove unprocessed children
    await job.removeUnprocessedChildren();
    console.log('Unprocessed child jobs have been removed.');
    
    // Additional cleanup or error handling can go here
  } else {
    // Path 2: All children completed successfully
    console.log(`Parent job ${job.name} processing after all children completed successfully.`);
    
    // Proceed with normal parent logic (e.g., aggregating results)
  }
};

Practical use case

Consider a workflow where child jobs upload files to different servers. If one upload fails (e.g., child-job-1), the parent can use continueParentOnFailure to react immediately, check getFailedChildrenValues() to confirm the failure, and call removeUnprocessedChildren() to cancel remaining uploads. If all uploads succeed, the parent might aggregate the results instead.

PreviousFail ParentNextRemove Dependency

Last updated 1 month ago

Was this helpful?