BullMQ
Search…
Quick Guide

Basic Usage

1
const Queue = require("bull");
2
3
const videoQueue = new Queue("video transcoding", "redis://127.0.0.1:6379");
4
const audioQueue = new Queue("audio transcoding", {
5
redis: { port: 6379, host: "127.0.0.1", password: "foobared" },
6
}); // Specify Redis connection using object
7
const imageQueue = new Queue("image transcoding");
8
const pdfQueue = new Queue("pdf transcoding");
9
10
videoQueue.process(function (job, done) {
11
// job.data contains the custom data passed when the job was created
12
// job.id contains id of this job.
13
14
// transcode video asynchronously and report progress
15
job.progress(42);
16
17
// call done when finished
18
done();
19
20
// or give a error if error
21
done(new Error("error transcoding"));
22
23
// or pass it a result
24
done(null, { framerate: 29.5 /* etc... */ });
25
26
// If the job throws an unhandled exception it is also handled correctly
27
throw new Error("some unexpected error");
28
});
29
30
audioQueue.process(function (job, done) {
31
// transcode audio asynchronously and report progress
32
job.progress(42);
33
34
// call done when finished
35
done();
36
37
// or give a error if error
38
done(new Error("error transcoding"));
39
40
// or pass it a result
41
done(null, { samplerate: 48000 /* etc... */ });
42
43
// If the job throws an unhandled exception it is also handled correctly
44
throw new Error("some unexpected error");
45
});
46
47
imageQueue.process(function (job, done) {
48
// transcode image asynchronously and report progress
49
job.progress(42);
50
51
// call done when finished
52
done();
53
54
// or give a error if error
55
done(new Error("error transcoding"));
56
57
// or pass it a result
58
done(null, { width: 1280, height: 720 /* etc... */ });
59
60
// If the job throws an unhandled exception it is also handled correctly
61
throw new Error("some unexpected error");
62
});
63
64
pdfQueue.process(function (job) {
65
// Processors can also return promises instead of using the done callback
66
return pdfAsyncProcessor();
67
});
68
69
videoQueue.add({ video: "http://example.com/video1.mov" });
70
audioQueue.add({ audio: "http://example.com/audio1.mp3" });
71
imageQueue.add({ image: "http://example.com/image1.tiff" });
72
Copied!

Using promises

Alternatively, you can use return promises instead of using the done callback:
1
videoQueue.process(function (job) {
2
// don't forget to remove the done callback!
3
// Simply return a promise
4
return fetchVideo(job.data.url).then(transcodeVideo);
5
6
// Handles promise rejection
7
return Promise.reject(new Error("error transcoding"));
8
9
// Passes the value the promise is resolved with to the "completed" event
10
return Promise.resolve({ framerate: 29.5 /* etc... */ });
11
12
// If the job throws an unhandled exception it is also handled correctly
13
throw new Error("some unexpected error");
14
// same as
15
return Promise.reject(new Error("some unexpected error"));
16
});
17
Copied!

Sandboxed processes

The process function can also be run in a separate process. This has several advantages:
  • The process is sandboxed so if it crashes it does not affect the worker.
  • You can run blocking code without affecting the queue (jobs will not stall).
  • Better utilization of multi-core CPUs.
  • Less connections to redis.
In order to use this feature just create a separate file with the processor:
1
// processor.js
2
module.exports = function (job) {
3
// Do some heavy work
4
5
return Promise.resolve(result);
6
}
Copied!
And define the processor like this:
1
// Single process:
2
queue.process('/path/to/my/processor.js');
3
4
// You can use concurrency as well:
5
queue.process(5, '/path/to/my/processor.js');
6
7
// and named processors:
8
queue.process('my processor', 5, '/path/to/my/processor.js');
Copied!

Repeated jobs

A job can be added to a queue and processed repeatedly according to a cron specification:
1
paymentsQueue.process(function (job) {
2
// Check payments
3
});
4
5
// Repeat payment job once every day at 3:15 (am)
6
paymentsQueue.add(paymentsData, { repeat: { cron: "15 3 * * *" } });
7
Copied!
As a tip, check your expressions here to verify they are correct: cron expression generator

Pause / Resume

A queue can be paused and resumed globally (pass true to pause processing for just this worker):
1
queue.pause().then(function () {
2
// queue is paused now
3
});
4
5
queue.resume().then(function () {
6
// queue is resumed now
7
});
Copied!

Events

A queue emits some useful events, for example...
1
myqueue.on('completed', function (job, result) {
2
// Job completed with output result!
3
})
Copied!
For more information on events, including the full list of events that are fired, check out the Events reference

Queues performance

Queues are relative cheap, so if you need many of them just create new ones with different names. However having too many queues can become unmanageable. Up to a dozen is normally ok.
1
const userJohn = new Queue('john');
2
const userLisa = new Queue('lisa');
3
.
4
.
5
.
Copied!
Also keep in mind that every queue instance will require new redis connections, check how to reuse connections or you can also use named processors to achieve a similar result.

Cluster support

From version 3.2.0 and above it is recommended to use threaded processors instead.
Queues are robust and can be run in parallel in several threads or processes without any risk of hazards or queue corruption. Check this simple example using cluster to parallelize jobs across processes:
1
const Queue = require("bull");
2
const cluster = require("cluster");
3
4
const numWorkers = 8;
5
const queue = new Queue("test concurrent queue");
6
7
if (cluster.isMaster) {
8
for (let i = 0; i < numWorkers; i++) {
9
cluster.fork();
10
}
11
12
cluster.on("online", function (worker) {
13
// Let's create a few jobs for the queue workers
14
for (let i = 0; i < 500; i++) {
15
queue.add({ foo: "bar" });
16
}
17
});
18
19
cluster.on("exit", function (worker, code, signal) {
20
console.log("worker " + worker.process.pid + " died");
21
});
22
} else {
23
queue.process(function (job, jobDone) {
24
console.log("Job done by worker", cluster.worker.id, job.id);
25
jobDone();
26
});
27
}
28
Copied!