BullMQ
  • What is BullMQ
  • Quick Start
  • API Reference
  • Changelogs
    • v4
    • v3
    • v2
    • v1
  • Guide
    • Introduction
    • Connections
    • Queues
      • Auto-removal of jobs
      • Adding jobs in bulk
      • Global Concurrency
      • Removing Jobs
    • Workers
      • Auto-removal of jobs
      • Concurrency
      • Graceful shutdown
      • Stalled Jobs
      • Sandboxed processors
      • Pausing queues
    • Jobs
      • FIFO
      • LIFO
      • Job Ids
      • Job Data
      • Deduplication
      • Delayed
      • Repeatable
      • Prioritized
      • Removing jobs
      • Stalled
      • Getters
    • Job Schedulers
      • Repeat Strategies
      • Repeat options
      • Manage Job Schedulers
    • Flows
      • Adding flows in bulk
      • Get Flow Tree
      • Fail Parent
      • Continue Parent
      • Remove Dependency
      • Ignore Dependency
      • Remove Child Dependency
    • Metrics
      • Prometheus
    • Rate limiting
    • Parallelism and Concurrency
    • Retrying failing jobs
    • Returning job data
    • Events
      • Create Custom Events
    • Telemetry
      • Getting started
      • Running Jaeger
      • Running a simple example
    • QueueScheduler
    • Redisâ„¢ Compatibility
      • Dragonfly
    • Redisâ„¢ hosting
      • AWS MemoryDB
      • AWS Elasticache
    • Architecture
    • NestJs
      • Producers
      • Queue Events Listeners
    • Going to production
    • Migration to newer versions
    • Troubleshooting
  • Patterns
    • Adding jobs in bulk across different queues
    • Manually processing jobs
    • Named Processor
    • Flows
    • Idempotent jobs
    • Throttle jobs
    • Process Step Jobs
    • Failing fast when Redis is down
    • Stop retrying jobs
    • Timeout jobs
    • Timeout for Sandboxed processors
    • Redis Cluster
  • BullMQ Pro
    • Introduction
    • Install
    • Observables
      • Cancelation
    • Groups
      • Getters
      • Rate limiting
      • Local group rate limit
      • Concurrency
      • Local group concurrency
      • Max group size
      • Pausing groups
      • Prioritized intra-groups
      • Sandboxes for groups
    • Telemetry
    • Batches
    • NestJs
      • Producers
      • Queue Events Listeners
      • API Reference
      • Changelog
    • API Reference
    • Changelog
    • New Releases
    • Support
  • Bull
    • Introduction
    • Install
    • Quick Guide
    • Important Notes
    • Reference
    • Patterns
      • Persistent connections
      • Message queue
      • Returning Job Completions
      • Reusing Redis Connections
      • Redis cluster
      • Custom backoff strategy
      • Debugging
      • Manually fetching jobs
  • Python
    • Introduction
    • Changelog
Powered by GitBook

Copyright (c) Taskforce.sh Inc.

On this page

Was this helpful?

  1. Patterns

Timeout jobs

BullMQ does not provide a specific mechanism to timeout jobs, however this can be accomplished in many cases with a custom timeout code in the worker's process function.

The basic concept is to set up a timeout callback that will abort the job processing, and throw an UnrecoverableError (to avoid retries, although this may not alway be the desired behaviour, if so just throw a normal Error). Note how we specified the timeout as a property of the job's data, in case we want to have different timeouts depending on the job, but we could also have a fixed constant timeout for all jobs if we wanted.

const worker = new Worker('foo', async job => {
  let controller = new AbortController();
  const timer = setTimeout(() => controller.abort(), job.data.timeout);
    
  try {
    await doSomethingAbortable(controller.signal);
  } catch(err) {
     if (err.name == "AbortError") {
      throw new UnrecoverableError("Timeout");
    } else {
      throw err;
    }
  } finally {
    clearTimeout(timer);
  }
});

In this simple example we assume that doSomethingAbortable is an asynchronous function that can handle abort signals and abort itself gracefully.

Now let's see another case when we want to timeout a fetch call, it would look like this:

const worker = new Worker("foo", async (job) => { 
  let controller = new AbortController();
  const timer = setTimeout(() => controller.abort(), job.data.timeout);
  try {
    let response = await fetch("/slowserver.com", {
      signal: controller.signal,
    }); 
    const result = await response.text();
  } catch (err) {
    if (err.name == "AbortError") {
      throw new UnrecoverableError("Timeout");
    } else {
      throw err;
    }
  } finally {
    clearTimeout(timer)
  }
});

In summary, while it is possible to implement timeout in your jobs, the mechanism to do it may vary depending on the type of asynchronous operations your jobs is performing, but in many cases using AbortController in combination with a setTimeout is more than enough.

PreviousStop retrying jobsNextTimeout for Sandboxed processors

Last updated 8 months ago

Was this helpful?

In this example we are aborting the fetch call using , which is the default mechanism provided by fetch to abort calls. Note that abort will even cause the async call to response.text() to also throw an Abort exception.

AbortController