BullMQ
  • What is BullMQ
  • Quick Start
  • API Reference
  • Changelogs
    • v4
    • v3
    • v2
    • v1
  • Guide
    • Introduction
    • Connections
    • Queues
      • Auto-removal of jobs
      • Adding jobs in bulk
      • Global Concurrency
      • Removing Jobs
    • Workers
      • Auto-removal of jobs
      • Concurrency
      • Graceful shutdown
      • Stalled Jobs
      • Sandboxed processors
      • Pausing queues
    • Jobs
      • FIFO
      • LIFO
      • Job Ids
      • Job Data
      • Deduplication
      • Delayed
      • Repeatable
      • Prioritized
      • Removing jobs
      • Stalled
      • Getters
    • Job Schedulers
      • Repeat Strategies
      • Repeat options
      • Manage Job Schedulers
    • Flows
      • Adding flows in bulk
      • Get Flow Tree
      • Fail Parent
      • Continue Parent
      • Remove Dependency
      • Ignore Dependency
      • Remove Child Dependency
    • Metrics
      • Prometheus
    • Rate limiting
    • Parallelism and Concurrency
    • Retrying failing jobs
    • Returning job data
    • Events
      • Create Custom Events
    • Telemetry
      • Getting started
      • Running Jaeger
      • Running a simple example
    • QueueScheduler
    • Redis™ Compatibility
      • Dragonfly
    • Redis™ hosting
      • AWS MemoryDB
      • AWS Elasticache
    • Architecture
    • NestJs
      • Producers
      • Queue Events Listeners
    • Going to production
    • Migration to newer versions
    • Troubleshooting
  • Patterns
    • Adding jobs in bulk across different queues
    • Manually processing jobs
    • Named Processor
    • Flows
    • Idempotent jobs
    • Throttle jobs
    • Process Step Jobs
    • Failing fast when Redis is down
    • Stop retrying jobs
    • Timeout jobs
    • Timeout for Sandboxed processors
    • Redis Cluster
  • BullMQ Pro
    • Introduction
    • Install
    • Observables
      • Cancelation
    • Groups
      • Getters
      • Rate limiting
      • Local group rate limit
      • Concurrency
      • Local group concurrency
      • Max group size
      • Pausing groups
      • Prioritized intra-groups
      • Sandboxes for groups
    • Telemetry
    • Batches
    • NestJs
      • Producers
      • Queue Events Listeners
      • API Reference
      • Changelog
    • API Reference
    • Changelog
    • New Releases
    • Support
  • Bull
    • Introduction
    • Install
    • Quick Guide
    • Important Notes
    • Reference
    • Patterns
      • Persistent connections
      • Message queue
      • Returning Job Completions
      • Reusing Redis Connections
      • Redis cluster
      • Custom backoff strategy
      • Debugging
      • Manually fetching jobs
  • Python
    • Introduction
    • Changelog
Powered by GitBook

Copyright (c) Taskforce.sh Inc.

On this page
  • Simple Mode
  • Throttle Mode
  • The Deduplicated Event
  • Listening for the Deduplicated Event
  • Get Deduplication Job Id
  • Remove Deduplication Key
  • Read more:

Was this helpful?

  1. Guide
  2. Jobs

Deduplication

Deduplication in BullMQ is a process where job execution is delayed and deduplicated based on specific identifiers. It ensures that within a specified period, or until a specific job is completed or failed, no new jobs with the same identifier will be added to the queue. Instead, these attempts will trigger a deduplicated event.

Simple Mode

The Simple Mode extends the deduplication duration until the job's completion or failure. This means that as long as the job remains in an incomplete state (neither succeeded nor failed), any subsequent job with the same deduplication ID will be ignored.

// Add a job that will be deduplicated as this record is not finished (completed or failed).
await myQueue.add(
  'house',
  { color: 'white' },
  { deduplication: { id: 'customValue' } },
);

While this job is not moved to completed or failed state, next jobs added with same deduplication id will be ignored and a deduplicated event will be triggered by our QueueEvent class.

This mode is particularly useful for jobs that have a long running time or those that must not be duplicated until they are resolved, such as processing a file upload or performing a critical update that should not be repeated if the initial attempt is still in progress.

Throttle Mode

In the Throttle Mode, deduplication works by assigning a delay (Time to Live, TTL) to a job upon its creation. If a similar job (identified by a unique deduplication ID) is added during this delay period, it is ignored. This prevents the queue from being overwhelmed with multiple instances of the same task, thus optimizing the processing time and resource utilization.

import { Queue } from 'bullmq';

const myQueue = new Queue('Paint');

// Add a job that will be deduplicated for 5 seconds.
await myQueue.add(
  'house',
  { color: 'white' },
  { deduplication: { id: 'customValue', ttl: 5000 } },
);

In this example, after adding the house painting job with the deduplicated parameters (id and ttl), any subsequent job with the same deduplication ID customValue added within 5 seconds will be ignored. This is useful for scenarios where rapid, repetitive requests are made, such as multiple users or processes attempting to trigger the same job.

Note that you must provide a deduplication id that should represent your job. You can hash your entire job data or a subset of attributes for creating this identifier.

Any manual deletion will disable the deduplication. For example, when calling job.remove method.

The Deduplicated Event

The deduplicated event is emitted whenever a job is ignored due to deduplication in either Simple Mode or Throttle Mode. This event allows you to monitor deduplication activity and take action if needed, such as logging the occurrence or notifying a user that their request was ignored.

Listening for the Deduplicated Event

To listen for the deduplicated event, use the QueueEvents class from BullMQ:

import { QueueEvents } from 'bullmq';

const queueEvents = new QueueEvents('myQueue');

queueEvents.on('deduplicated', ({ jobId, deduplicationId, deduplicatedJobId }, id) => {
  console.log(`Job ${deduplicatedJobId} was deduplicated due to existing job ${jobId} 
  with deduplication ID ${deduplicationId}`);
});

In this example:

  • jobId: The Id of the existing job that triggered the deduplication.

  • deduplicationId: The deduplication Id that caused the job to be ignored.

  • deduplicatedJobId: The Id of the job that was ignored.

Get Deduplication Job Id

If you need to know the id of the job that started the deduplicated state, you can call the getDeduplicationJobId method.

const jobId = await myQueue.getDeduplicationJobId('customValue');

Remove Deduplication Key

If you need to stop deduplication before ttl finishes or before finishing a job, you can call the removeDeduplicationKey method.

await myQueue.removeDeduplicationKey('customValue');

Read more:

PreviousJob DataNextDelayed

Last updated 2 months ago

Was this helpful?

💡

💡

Add Job API Reference
Remove Deduplication Key API Reference