Cancelling jobs
The job cancellation feature allows you to gracefully cancel jobs that are currently being processed by a worker. This is implemented using the standard AbortController and AbortSignal APIs.
How It Works
When a worker processes a job, it can receive an optional AbortSignal as the third parameter in the processor function. This signal can be used to detect when a job has been cancelled and perform cleanup operations.
import { Worker } from 'bullmq';
const worker = new Worker('myQueue', async (job, token, signal) => {
// The signal parameter is optional and provides cancellation support
// Your job processing logic here
});Cancelling Jobs
The Worker class provides methods to cancel jobs:
// Cancel a specific job by ID
const cancelled = worker.cancelJob('job-id-123');
console.log('Job cancelled:', cancelled); // true if job was active, false otherwise
// Cancel with a reason (useful for debugging)
worker.cancelJob('job-id-456', 'User requested cancellation');
// Cancel all active jobs
worker.cancelAllJobs();
// Cancel all with a reason
worker.cancelAllJobs('System shutdown');
// Get list of active jobs from queue
const activeJobs = await queue.getActive();
console.log(
'Active jobs:',
activeJobs.map(j => j.id),
);Cancellation Reasons
When you provide a cancellation reason, it's passed to the AbortController.abort(reason) method and can be accessed via signal.reason:
const worker = new Worker('myQueue', async (job, token, signal) => {
return new Promise((resolve, reject) => {
signal?.addEventListener('abort', () => {
// Access the cancellation reason
const reason = signal.reason || 'No reason provided';
console.log(`Job ${job.id} cancelled: ${reason}`);
reject(new Error(`Cancelled: ${reason}`));
});
// Your processing logic
});
});
// Later, cancel with a descriptive reason
worker.cancelJob(job.id, 'Resource limit exceeded');Handling Cancellation (Recommended Pattern)
The event-based approach is the recommended pattern as it provides immediate response to cancellation:
import { Worker } from 'bullmq';
const worker = new Worker('myQueue', async (job, token, signal) => {
return new Promise((resolve, reject) => {
// Listen for abort event
signal?.addEventListener('abort', () => {
console.log(`Job ${job.id} cancellation requested`);
// Clean up resources
clearInterval(interval);
// Reject with error
reject(new Error('Job was cancelled'));
});
// Your processing logic
const interval = setInterval(() => {
// Do work
processNextItem();
}, 100);
});
});Why Event-Based?
Immediate response - No polling delay
More efficient - No CPU wasted checking in loops
Cleaner code - Separation of concerns
Standard pattern - Matches Web APIs like
fetch()
Using with Native APIs
Many Web APIs natively support AbortSignal. The signal is composable - you can pass it to APIs and still listen to it yourself:
const worker = new Worker('fetchQueue', async (job, token, signal) => {
return new Promise(async (resolve, reject) => {
// Set up abort listener - handles cancellation for the job
signal?.addEventListener('abort', () => {
reject(new Error('Job was cancelled'));
});
// Pass the SAME signal to fetch - it will abort the network request
const response = await fetch(job.data.url, {
signal, // ✅ Cancels the HTTP request at network level
method: 'GET',
headers: job.data.headers,
});
const data = await response.json();
resolve(data);
});
});Why this pattern is better:
Simpler - One abort listener handles everything
Composable - Signal passed to
fetch()AND listened to in jobThe HTTP request is truly cancelled at the network level
The job is properly marked as failed when cancelled
No complex error checking needed
APIs That Support AbortSignal
Many modern APIs accept signal directly:
fetch(url, { signal })- HTTP requestsaddEventListener(event, handler, { signal })- Auto-removes listener on abortMany database clients (Postgres, MongoDB drivers)
File system operations in newer Node.js APIs
Cancelling Custom Operations
For operations that don't natively support AbortSignal, implement proper cleanup:
const worker = new Worker('customQueue', async (job, token, signal) => {
// Start your operation
const operation = startLongRunningOperation(job.data);
// Set up cancellation handler that actually stops the operation
signal?.addEventListener('abort', () => {
operation.cancel(); // ✅ Actually stops the work
});
try {
const result = await operation.promise;
return result;
} catch (error) {
if (signal?.aborted) {
throw new Error('Operation cancelled');
}
throw error;
}
});Async Cleanup on Cancellation
Perform cleanup operations before rejecting the promise:
const worker = new Worker('dbQueue', async (job, token, signal) => {
// Acquire resources
const db = await connectToDatabase();
const cache = await connectToCache();
return new Promise(async (resolve, reject) => {
// Set up cleanup handler
signal?.addEventListener('abort', async () => {
try {
console.log('Cleaning up resources...');
// Close connections gracefully
await db.close();
await cache.disconnect();
console.log('Cleanup complete');
reject(new Error('Cancelled after cleanup'));
} catch (cleanupError) {
console.error('Cleanup failed:', cleanupError);
reject(new Error('Cleanup failed during cancellation'));
}
});
try {
// Do your work
const result = await processWithDatabase(db, job.data);
await cache.set(`job:${job.id}`, result);
resolve(result);
} catch (error) {
// Cleanup on error too
await db.close();
await cache.disconnect();
throw error;
}
});
});Alternative: Polling Pattern
You can also check signal.aborted periodically (less efficient but simpler for some use cases):
const worker = new Worker('batchQueue', async (job, token, signal) => {
const items = job.data.items;
const results = [];
for (let i = 0; i < items.length; i++) {
// Check if job has been cancelled
if (signal?.aborted) {
throw new Error(`Cancelled after processing ${i} items`);
}
const result = await processItem(items[i]);
results.push(result);
// Update progress
await job.updateProgress(((i + 1) / items.length) * 100);
}
return { results, total: results.length };
});Job State After Cancellation
With Regular Error (Will Retry)
When you throw a regular Error upon cancellation:
Job state: Moves to
failedRetries: Job WILL be retried if
attemptsremainUse case: When you want the job to be retried later
const worker = new Worker('retryQueue', async (job, token, signal) => {
return new Promise((resolve, reject) => {
signal?.addEventListener('abort', () => {
// Regular Error - job will retry if attempts remain
reject(new Error('Cancelled, will retry'));
});
// Your work...
});
});
// Set attempts when adding jobs
await queue.add('task', data, { attempts: 3 });With UnrecoverableError (No Retry)
When you throw an UnrecoverableError:
Job state: Moves to
failedRetries: Job will NOT be retried
Use case: When cancellation should be permanent
import { Worker, UnrecoverableError } from 'bullmq';
const worker = new Worker('noRetryQueue', async (job, token, signal) => {
return new Promise((resolve, reject) => {
signal?.addEventListener('abort', () => {
// UnrecoverableError - no retries
reject(new UnrecoverableError('Cancelled permanently'));
});
// Your work...
});
});Handling Lock Renewal Failures
When a worker loses its lock on a job (due to network issues, Redis problems, or long-running operations), you can gracefully handle this situation using the lockRenewalFailed event:
const worker = new Worker(
'myQueue',
async (job, token, signal) => {
return new Promise(async (resolve, reject) => {
signal?.addEventListener('abort', async () => {
console.log('Job cancelled - cleaning up resources');
await cleanupResources();
reject(new Error('Job cancelled'));
});
// Your work...
});
},
{ connection },
);
// Cancel jobs when lock renewal fails
worker.on('lockRenewalFailed', (jobIds: string[]) => {
console.log('Lock renewal failed for jobs:', jobIds);
jobIds.forEach(jobId => worker.cancelJob(jobId));
});Important: When a worker loses the lock on a job, it cannot move that job to the failed state (as it no longer owns the lock). Instead:
The
cancelJob()aborts the signal, allowing the processor to clean up resourcesThe job remains in
activestate temporarilyBullMQ's stalled job checker will detect the job and move it back to
waitingAnother worker (or the same worker) will pick it up and retry
This is the correct and intended behavior - trust BullMQ's stalled job mechanism to handle lost locks.
Why This Pattern Works
Immediate cleanup: The processor detects
signal.abortedand can release resourcesNo wasted work: The processor stops processing when it loses the lock
Automatic recovery: The stalled job checker moves the job back to waiting
No data loss: The job will be retried according to its
maxStalledCountsettingWorks with existing infrastructure: Uses BullMQ's built-in stalled job handling
Multi-Phase Work with Cancellation
Check cancellation at strategic points in multi-phase operations:
const worker = new Worker('multiPhaseQueue', async (job, token, signal) => {
return new Promise(async (resolve, reject) => {
signal?.addEventListener('abort', () => {
reject(new Error('Cancelled'));
});
try {
// Phase 1: Download
if (signal?.aborted) throw new Error('Cancelled before download');
const data = await downloadData(job.data.url);
await job.updateProgress(33);
// Phase 2: Process
if (signal?.aborted) throw new Error('Cancelled before processing');
const processed = await processData(data);
await job.updateProgress(66);
// Phase 3: Upload
if (signal?.aborted) throw new Error('Cancelled before upload');
const result = await uploadResults(processed);
await job.updateProgress(100);
resolve(result);
} catch (error) {
reject(error);
}
});
});Backward Compatibility
The signal parameter is optional. Existing processors that don't use it will continue to work normally:
// Old processor - still works
const worker = new Worker('myQueue', async job => {
return await processJob(job);
});
// New processor - with cancellation support
const worker = new Worker('myQueue', async (job, token, signal) => {
// Can now handle cancellation
});Best Practices
Use event-based cancellation for immediate response
Clean up resources in the abort handler
Use UnrecoverableError when cancellation should be permanent
Combine with timeouts for better control
Check
signal.abortedat strategic points in long operationsHandle cleanup errors gracefully to avoid leaving resources open
Read more:
Last updated
Was this helpful?