BullMQ
Search…
Observables
Instead of returning regular promises, your workers can also return an Observable, this allows for some more advanced uses cases:
  • It makes it possible to cleanly cancel a running job.
  • You can define a "Time to live" (TTL) so that jobs that take too long time will be automatically canceled.
  • Since the last value returned by the observable is persisted, you could retry a job and continue where you left of, for example, if the job implements a state machine or similar.
If you are new to Observables you may want to read this introduction. The two biggest advantages that Observables have over Promises are that they can emit more than 1 value and that they are cancelable.
Let's see a silly example of a worker making use of Observables:
1
import { WorkerPro } from "@taskforcesh/bullmq-pro"
2
import { Observable } from "rxjs"
3
4
const processor = async () => {
5
return new Observable<number>(subscriber => {
6
subscriber.next(1);
7
subscriber.next(2);
8
subscriber.next(3);
9
const intervalId = setTimeout(() => {
10
subscriber.next(4);
11
subscriber.complete();
12
}, 500);
13
14
// Provide a way of canceling and disposing the interval resource
15
return function unsubscribe() {
16
clearInterval(intervalId);
17
};
18
});
19
};
20
21
const worker = new WorkerPro(queueName, processor, { connection });
22
Copied!
In the example above, the observable will emit 4 values, the first 3 directly and then a 4th after 500 ms. Also note that the "subscriber" returns a "unsubscribe" function. This is the function that will be called if the Observable is cancelled, so this is where you would do the necessary clean up.
You may be asking whats the use of returning several values for a worker. One case that comes to mind is if you have a larger processor and you want to make sure that if the process crashes you can continue from the latest value. You could do this with a simple switch-case on the return value, something like this:
1
import { WorkerPro } from "@taskforcesh/bullmq-pro"
2
import { Observable } from "rxjs"
3
4
const processor = async (job) => {
5
return new Observable<number>(subscriber => {
6
switch(job.returnvalue){
7
default:
8
subscriber.next(1);
9
case 1:
10
subscriber.next(2);
11
case 2:
12
subscriber.next(3);
13
case 3:
14
subscriber.complete();
15
}
16
});
17
};
18
19
const worker = new WorkerPro(queueName, processor, { connection });
Copied!
Copy link