Manually processing jobs
When a Worker is instantiated, the most common usage is to specify a process function so that the worker will automatically process the jobs that arrive to the queue.
Sometimes however it is useful to be able to fetch the jobs manually. Just instantiate the worker without a processor and call getNextJob to fetch the next job:
There is an important consideration regarding job "locks" when processing manually. Locks prevent workers from fetching the a job that is already being processed by another worker. The ownership of the lock is determined by the "token" that is sent when getting the job.
the lock duration setting is called "visibility window" in other queue systems.
Normally a job gets locked as soon as it is fetched from the queue with a max duration of the specified lockDuration
worker option. The default is 30 seconds but can be changed to any value easily. For example, to change it to 60 seconds:
When using standard worker processors, the lock is renewed automatically after half the lock duration time has passed. However, this mechanism does not exist when processing jobs manually, so to avoid the job being moved back to the waiting list of the queue, you need to make sure to process the job faster than the lockDuration
, or manually extend the lock:
Choosing a token
A token represents ownership by given worker currently working on a given job. If the worker dies unexpectedly, the job could be picked up by another worker when the lock expires. A good approach for generating tokens for jobs is simply to generate a UUID for every new job, but it all depends on your specific use case.
Checking for stalled jobs
When processing jobs manually you may also want to start the stalled jobs checker. This checker is needed to move stalled jobs (whose lock has expired) back to the wait status (or failed if they have exhausted the maximum number of stalled attempts, which is 1 by default).
The checker will run periodically (based on the stalledInterval
option) until the worker is closed.
Looping through jobs
In many cases, you will have an "infinite" loop that processes jobs one by one like the following example. Note that the third parameter in job.moveToCompleted
/job.moveToFailed
is not used, signalling that the next job should be returned automatically.
Last updated