BullMQ
Search
K

Rate limiting

BullMQ provides queue rate limiting. It is possible to configure workers so that they obey a given rate limiting option:
import { Worker, QueueScheduler } from 'bullmq';
const worker = new Worker('painter', async job => paintCar(job), {
limiter: {
max: 10,
duration: 1000,
},
});
const scheduler = new QueueScheduler('painter');
Jobs that get rate limited will actually stay in the waiting state.
From BullMQ 2.0 and onwards, the QueueScheduler is not needed anymore.
The rate limiter is global, so if you have for example 10 workers for one queue with the above settings, still only 10 jobs will be processed by second.

Group keys

From BullMQ 3.0 and onwards, group keys support is removed to improve global rate limit, so the information below is only valid for older versions.
It is also possible to define a rate limiter based on group keys, for example you may want to have a rate limiter per customer instead of a global rate limiter for all customers:
import { Queue, Worker, QueueScheduler } from 'bullmq';
const queue = new Queue('painter', {
limiter: {
groupKey: 'customerId',
},
});
const worker = new Worker('painter', async job => paintCar(job), {
limiter: {
max: 10,
duration: 1000,
groupKey: 'customerId',
},
});
const scheduler = new QueueScheduler('painter');
// jobs will be rate limited by the value of customerId key:
await queue.add('rate limited paint', { customerId: 'my-customer-id' });

Manual rate-limit

Sometimes is useful to rate-limit a queue manually instead of based on some static options. For example, you may have an API that returns 429 Too Many Requests, and you want to rate-limit the queue based on that response.
For this purpose, you can use the worker method rateLimit like this:
import { Worker } from 'bullmq';
const worker = new Worker(
'myQueue',
async () => {
const [isRateLimited, duration] = await doExternalCall();
if (isRateLimited) {
await worker.rateLimit(duration);
// Do not forget to throw this special exception,
// since we must differentiate this case from a failure
// in order to move the job to wait again.
throw Worker.RateLimitError();
}
},
{
connection,
limiter: {
max: 1,
duration: 500,
},
},
);
Don't forget to pass limiter options into your worker's options as limiter.max is used to determine if we need to execute the rate limit validation.

Get Queue Rate Limit Ttl

Sometimes is useful to know if our queue is rate limited.
For this purpose, you can use the getRateLimitTtl method like this:
import { Queue } from 'bullmq';
const queue = new Queue('myQueue', { connection });
const ttl = await queue.getRateLimitTtl();
if (ttl > 0) {
console.log('Queue is rate limited');
}

Read more:

Copyright (c) Taskforce.sh Inc.