BullMQ
  • What is BullMQ
  • Quick Start
  • API Reference
  • Changelogs
    • v4
    • v3
    • v2
    • v1
  • Guide
    • Introduction
    • Connections
    • Queues
      • Auto-removal of jobs
      • Adding jobs in bulk
      • Global Concurrency
      • Removing Jobs
    • Workers
      • Auto-removal of jobs
      • Concurrency
      • Graceful shutdown
      • Stalled Jobs
      • Sandboxed processors
      • Pausing queues
    • Jobs
      • FIFO
      • LIFO
      • Job Ids
      • Job Data
      • Deduplication
      • Delayed
      • Repeatable
      • Prioritized
      • Removing jobs
      • Stalled
      • Getters
    • Job Schedulers
      • Repeat Strategies
      • Repeat options
      • Manage Job Schedulers
    • Flows
      • Adding flows in bulk
      • Get Flow Tree
      • Fail Parent
      • Continue Parent
      • Remove Dependency
      • Ignore Dependency
      • Remove Child Dependency
    • Metrics
      • Prometheus
    • Rate limiting
    • Parallelism and Concurrency
    • Retrying failing jobs
    • Returning job data
    • Events
      • Create Custom Events
    • Telemetry
      • Getting started
      • Running Jaeger
      • Running a simple example
    • QueueScheduler
    • Redis™ Compatibility
      • Dragonfly
    • Redis™ hosting
      • AWS MemoryDB
      • AWS Elasticache
    • Architecture
    • NestJs
      • Producers
      • Queue Events Listeners
    • Going to production
    • Migration to newer versions
    • Troubleshooting
  • Patterns
    • Adding jobs in bulk across different queues
    • Manually processing jobs
    • Named Processor
    • Flows
    • Idempotent jobs
    • Throttle jobs
    • Process Step Jobs
    • Failing fast when Redis is down
    • Stop retrying jobs
    • Timeout jobs
    • Timeout for Sandboxed processors
    • Redis Cluster
  • BullMQ Pro
    • Introduction
    • Install
    • Observables
      • Cancelation
    • Groups
      • Getters
      • Rate limiting
      • Local group rate limit
      • Concurrency
      • Local group concurrency
      • Max group size
      • Pausing groups
      • Prioritized intra-groups
      • Sandboxes for groups
    • Telemetry
    • Batches
    • NestJs
      • Producers
      • Queue Events Listeners
      • API Reference
      • Changelog
    • API Reference
    • Changelog
    • New Releases
    • Support
  • Bull
    • Introduction
    • Install
    • Quick Guide
    • Important Notes
    • Reference
    • Patterns
      • Persistent connections
      • Message queue
      • Returning Job Completions
      • Reusing Redis Connections
      • Redis cluster
      • Custom backoff strategy
      • Debugging
      • Manually fetching jobs
  • Python
    • Introduction
    • Changelog
Powered by GitBook

Copyright (c) Taskforce.sh Inc.

On this page
  • Installation
  • Get started

Was this helpful?

  1. Python

Introduction

BullMQ is now also available as an experimental python package.

PreviousManually fetching jobsNextChangelog

Last updated 9 months ago

Was this helpful?

The Python package is still in early development and is not recommended for production deployment just yet. In the following months, we will be hardening the code and adding more testing and we expect to have a production-ready package although not all the features are available as in the NodeJS version.

Installation

BullMQ is delivered as a pip package and can thus be installed using pip:

$ pip install bullmq

Get started

BullMQ uses in order to implement concurrency and provide efficient processing of jobs.

You can add jobs to a queue like this, assuming you have a Redis host running locally:

from bullmq import Queue

queue = Queue("myQueue")

# Add a job with data { "foo": "bar" } to the queue
await queue.add("myJob", { "foo": "bar" })

...

# Close when done adding jobs
await queue.close()

In order to consume the jobs from the queue you need to use the Worker class, providing a "processor" function that will consume the jobs. As soon as the worker is instantiated it will start consuming jobs:

from bullmq import Worker
import asyncio
import signal

async def process(job, job_token):
    # job.data will include the data added to the queue
    return doSomethingAsync(job)

async def main():

    # Create an event that will be triggered for shutdown
    shutdown_event = asyncio.Event()

    def signal_handler(signal, frame):
        print("Signal received, shutting down.")
        shutdown_event.set()

    # Assign signal handlers to SIGTERM and SIGINT
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)

    # Feel free to remove the connection parameter, if your redis runs on localhost
    worker = Worker("myQueue", process, {"connection": "rediss://<user>:<password>@<host>:<port>"})

    # Wait until the shutdown event is set
    await shutdown_event.wait()

    # close the worker
    print("Cleaning up worker...")
    await worker.close()
    print("Worker shut down successfully.")

if __name__ == "__main__":
    asyncio.run(main())

If Redis responses are in binary format, you should pass (decode_responses)[https://redis-py.readthedocs.io/en/latest/examples/connection_examples.html#By-default-Redis-return-binary-responses,-to-decode-them-use-decode_responses=True] option as True.

asyncio