I ve have a fastapi server and I want to implement background task using Redis and RQ. I ve created FASTAPI_SERVER, REDIS_SERVER and WORKER_SERVER in docker-compose.
这里,我的快拉皮法典想在雷迪斯奎特制定一项任务:
@app.get("/api/greet")
async def greet():
try:
redis_connection = Redis(host=host, port=port, password=password)
queue = Queue(name=report_queue, connection=redis_connection)
logger.debug(f"queue created successfully! {queue.connection}")
job = queue.enqueue("app.tasks.generate_report_task", {"name": "test"})
logger.debug(f"job created successfully! {job.id}")
except Exception as e:
logger.error(f"queue creation failed: {e}")
raise Exception(f"queue creation failed: {e}")
return {"message": "Hello World"}
This is my folder structure of WORKER_SERVER:
──> ls
app Dockerfile requirements.txt scripts supervisord.conf venv
──> cd app && tree
├── config.py
├── database.py
├── __init__.py
├── model.py
├── reports
│ ├── generation.py
│ ├── __init__.py
│ └── utils.py
├── tasks.py
└── templates
└── ppt_template.pptx
快速通道能够在重新处理中产生任务。
fastapi-4 | 2024-01-26 08:57:53.463 | DEBUG | app.main:greet:176 - queue created successfully! Redis<ConnectionPool<Connection<host=redis,port=6379,db=0>>>
fastapi-4 | 2024-01-26 08:58:00.584 | DEBUG | app.main:greet:179 - job created successfully! 71dc6cb5-29b6-4d30-b078-5b315bd8ea44
但是,当RQ找到任务时,它会提出这一点。 直接错误:
worker_server | 2024-01-26 08:58:01,153 DEBG worker-0 stderr output:
worker_server | 08:58:01 [Job 71dc6cb5-29b6-4d30-b078-5b315bd8ea44]: exception raised while executing (app.tasks.generate_report_task)
worker_server | Traceback (most recent call last):
worker_server | File "/home/appuser/venv/lib/python3.10/site-packages/rq/utils.py", line 118, in import_attribute
worker_server | attribute_owner = getattr(module, attribute_owner_name)
worker_server | AttributeError: module app has no attribute tasks
worker_server |
worker_server | During handling of the above exception, another exception occurred:
worker_server |
worker_server | Traceback (most recent call last):
worker_server | File "/home/appuser/venv/lib/python3.10/site-packages/rq/worker.py", line 1428, in perform_job
worker_server | rv = job.perform()
worker_server | File "/home/appuser/venv/lib/python3.10/site-packages/rq/job.py", line 1278, in perform
worker_server | self._result = self._execute()
worker_server | File "/home/appuser/venv/lib/python3.10/site-packages/rq/job.py", line 1315, in _execute
worker_server | result = self.func(*self.args, **self.kwargs)
worker_server | File "/home/appuser/venv/lib/python3.10/site-packages/rq/job.py", line 425, in func
worker_server | return import_attribute(self.func_name)
worker_server | File "/home/appuser/venv/lib/python3.10/site-packages/rq/utils.py", line 120, in import_attribute
worker_server | raise ValueError( Invalid attribute name: %s % attribute_name)
worker_server | ValueError: Invalid attribute name: generate_report_task
我试图在不使用cker子的情况下重新研究这一问题的简单版本。
# enqueue_tasks.py
from rq import Queue
from redis import Redis
if __name__ == "__main__":
redis_conn = Redis()
queue = Queue(name="report-generation", connection=redis_conn)
job = queue.enqueue("worker.tasks.example_task", kwargs={"x": 1, "y": 2})
print(f"Task enqueued with job ID: {job.id}")
import asyncio
async def example_task(**kwargs):
print("started")
await asyncio.sleep(5)
return kwargs.get("x", 1) + kwargs.get("y", 2)
Using rq worker
command to run the worker
rq worker report-generation