BullMQ
  • What is BullMQ
  • Quick Start
  • API Reference
  • Changelogs
    • v4
    • v3
    • v2
    • v1
  • Guide
    • Introduction
    • Connections
    • Queues
      • Auto-removal of jobs
      • Adding jobs in bulk
      • Global Concurrency
      • Removing Jobs
    • Workers
      • Auto-removal of jobs
      • Concurrency
      • Graceful shutdown
      • Stalled Jobs
      • Sandboxed processors
      • Pausing queues
    • Jobs
      • FIFO
      • LIFO
      • Job Ids
      • Job Data
      • Deduplication
      • Delayed
      • Repeatable
      • Prioritized
      • Removing jobs
      • Stalled
      • Getters
    • Job Schedulers
      • Repeat Strategies
      • Repeat options
      • Manage Job Schedulers
    • Flows
      • Adding flows in bulk
      • Get Flow Tree
      • Fail Parent
      • Continue Parent
      • Remove Dependency
      • Ignore Dependency
      • Remove Child Dependency
    • Metrics
      • Prometheus
    • Rate limiting
    • Parallelism and Concurrency
    • Retrying failing jobs
    • Returning job data
    • Events
      • Create Custom Events
    • Telemetry
      • Getting started
      • Running Jaeger
      • Running a simple example
    • QueueScheduler
    • Redisâ„¢ Compatibility
      • Dragonfly
    • Redisâ„¢ hosting
      • AWS MemoryDB
      • AWS Elasticache
    • Architecture
    • NestJs
      • Producers
      • Queue Events Listeners
    • Going to production
    • Migration to newer versions
    • Troubleshooting
  • Patterns
    • Adding jobs in bulk across different queues
    • Manually processing jobs
    • Named Processor
    • Flows
    • Idempotent jobs
    • Throttle jobs
    • Process Step Jobs
    • Failing fast when Redis is down
    • Stop retrying jobs
    • Timeout jobs
    • Timeout for Sandboxed processors
    • Redis Cluster
  • BullMQ Pro
    • Introduction
    • Install
    • Observables
      • Cancelation
    • Groups
      • Getters
      • Rate limiting
      • Local group rate limit
      • Concurrency
      • Local group concurrency
      • Max group size
      • Pausing groups
      • Prioritized intra-groups
      • Sandboxes for groups
    • Telemetry
    • Batches
    • NestJs
      • Producers
      • Queue Events Listeners
      • API Reference
      • Changelog
    • API Reference
    • Changelog
    • New Releases
    • Support
  • Bull
    • Introduction
    • Install
    • Quick Guide
    • Important Notes
    • Reference
    • Patterns
      • Persistent connections
      • Message queue
      • Returning Job Completions
      • Reusing Redis Connections
      • Redis cluster
      • Custom backoff strategy
      • Debugging
      • Manually fetching jobs
  • Python
    • Introduction
    • Changelog
Powered by GitBook

Copyright (c) Taskforce.sh Inc.

On this page

Was this helpful?

  1. Bull
  2. Patterns

Manually fetching jobs

If you want to manually fetch the jobs from the queue instead of letting the automatic processor taking care of it, this pattern is for you.

Manually transitioning states for jobs can be done with a few simple methods.

  1. Adding a job to the 'waiting' queue. Grab the queue and call add.

import Queue from 'bull';

const queue = new Queue({
  limiter: {
    max: 5,
    duration: 5000,
    bounceBack: true // important
  },
  ...queueOptions
});
queue.add({ random_attr: 'random_value' });
  1. Pulling a job from 'waiting' and moving it to 'active'.

const job: Job = await queue.getNextJob();
  1. Move the job to the 'failed' queue if something goes wrong.

const (nextJobData, nextJobId) = await job.moveToFailed(
  {
    message: 'Call to external service failed!',
  },
  true,
);
  1. Move the job to the 'completed' queue.

const (nextJobData, nextJobId) = await job.moveToCompleted('succeeded', true);
  1. Return the next job if one is returned.

if (nextJobdata) {
  return Job.fromJSON(queue, nextJobData, nextJobId);
}

Note

By default the lock duration for a job that has been returned by getNextJob or moveToCompleted is 30 seconds. If it takes more time than that the job will be automatically marked as stalled and depending on the max stalled options be moved back to the wait state or marked as failed. In order to avoid this you must use job.extendLock(duration) in order to give you some more time before the lock expires. It is recommended to extend the lock when half the lock time has passsed.

PreviousDebuggingNextIntroduction

Last updated 1 year ago

Was this helpful?