English 中文(简体)
Basic Redis Operation and Async FastAPI Pytest - RuntimeError: Event loop is closed
原标题:

Below is an example of my tests. This test does not fail but others do despite them being very similar. Would anyone be able to advise if I integrating Redis properly? I am using it to store tokens, nothing else. But I m not sure if the connection pool is being properly ran.

Note: I skipped writing the imports in most cases to limit amount of code to view. At the bottom are my tests and the fail ones. I can post the failing endpoints but nothing crazy is happening there - get_redis function is used similarly to the endpoint below (logout).

main.py:

from fastapi import FastAPI
from backend.app.api.v1.endpoints import products, users, cart, orders
from backend.app.core.database import create_database_connection
from backend.app.core.redis import create_redis_connection

app = FastAPI()

app.include_router(users.router, prefix="/users")

@app.on_event("startup")
async def startup_event():
    app.state.client, app.state.db = create_database_connection()
    app.state.redis_pool = await create_redis_connection()

@app.on_event("shutdown")
async def shutdown_event():
    app.state.client.close()
    await app.state.redis_pool.close()

redis.py:

import aioredis
from fastapi import Request
from backend.app.core.config import settings

async def create_redis_connection():
    redis_pool = await aioredis.from_url(settings.redis_url)
    return redis_pool

async def get_redis(r: Request):
    return r.app.state.redis_pool

users.py:

router = APIRouter()

@router.post("/logout")
async def logout(
    access_token: str = Depends(oauth2_scheme),
    refresh_token: str = Body(..., embed=True),
    redis_pool=Depends(get_redis)
):
    # Delete access token from Redis pool
    deleted_access_token = await redis_pool.delete(access_token)
    # Delete refresh token from Redis pool
    deleted_refresh_token = await redis_pool.delete(refresh_token)

    # If neither token was found in the Redis pool, raise an exception
    if not deleted_access_token and not deleted_refresh_token:
        raise HTTPException(status_code=401, detail="Invalid access or refresh token")

    return {"detail": "Successfully logged out"}

test_users.py:

@pytest.fixture 
def anyio_backend() -> str: # disable trio
    return "asyncio"

@pytest.fixture
async def app_with_db():
    await app.router.startup()
    yield
    await app.router.shutdown()

@pytest.fixture 
async def test_user(app_with_db): # create/manage test user
    async with AsyncClient(app=app, base_url="http://test") as ac:
        # Use the current time to ensure uniqueness, convert it to bytes and hash it using SHA-1
        unique_hash = hashlib.sha1(str(time.time()).encode()).hexdigest() # used for test email

        # Prepare the test user data
        new_user = {
            "email": f"pytestuser-{unique_hash[:20]}@example.com",
            "password": "Test@1234",
            "confirm_password": "Test@1234"
        }

        # Send a POST request to the /users/register endpoint
        response = await ac.post("/users/register", json=new_user)
        assert response.status_code == 200

        # Find user ID by EMAIL
        db = app.state.db
        user_in_db = await db.users.find_one({"email": new_user["email"]})

        # Assert that the user was found and the email matches
        assert user_in_db is not None
        assert user_in_db["email"] == new_user["email"]

        # Define test user login dict
        test_user_credentials = {
            "username" : user_in_db["email"], # OAuth expects "username", not "email"
            "password" : new_user["password"]
        }

        yield test_user_credentials

        # Clean DB from test data
        await db.users.delete_one({"_id": ObjectId(user_in_db["_id"])})

@pytest.mark.anyio
async def test_login_and_logout(app_with_db, test_user):
    async with AsyncClient(app=app, base_url="http://test") as ac:

        # Send a POST request to the /token endpoint
        response = await ac.post("/users/token", data=test_user)

        # Assert that the response status code is 200
        assert response.status_code == 200

        # Assert the returned token data
        token_data = response.json()
        assert "access_token" in token_data
        assert "refresh_token" in token_data
        assert token_data["token_type"] == "bearer"

        # Logout
        response = await ac.post("/users/logout", headers={"Authorization": f"Bearer {token_data[ access_token ]}"}, json={"refresh_token": token_data[ refresh_token ]})
        assert response.status_code == 200
        detail = response.json()
        assert detail["detail"] == "Successfully logged out"

ERRORS:

backend/tests/api/v1/test_users.py::test_register PASSED
backend/tests/api/v1/test_users.py::test_login_and_logout PASSED
backend/tests/api/v1/test_users.py::test_refresh_token ERROR
backend/tests/api/v1/test_users.py::test_register_existing_email PASSED
backend/tests/api/v1/test_users.py::test_login_incorrect_password PASSED
backend/tests/api/v1/test_users.py::test_login_nonexistent_email PASSED
backend/tests/api/v1/test_users.py::test_refresh_token_invalid PASSED
backend/tests/api/v1/test_users.py::test_logout_invalid_token FAILED
backend/tests/api/v1/test_users.py::test_register_invalid_data FAILED
backend/tests/api/v1/test_users.py::test_register_mismatch_data PASSED
====================================================== ERRORS ======================================================
_______________________________________ ERROR at setup of test_refresh_token _______________________________________

anyio_backend =  asyncio , args = (), kwargs = { app_with_db : None}, backend_name =  asyncio , backend_options = {}
runner = <anyio._backends._asyncio.TestRunner object at 0x7f17708ded70>

    def wrapper(*args, anyio_backend, **kwargs):  # type: ignore[no-untyped-def]
        backend_name, backend_options = extract_backend_and_options(anyio_backend)
        if has_backend_arg:
            kwargs["anyio_backend"] = anyio_backend
    
        with get_runner(backend_name, backend_options) as runner:
            if isasyncgenfunction(func):
>               yield from runner.run_asyncgen_fixture(func, kwargs)

../../../.local/share/virtualenvs/NG_Web--8SMFfFz/lib/python3.10/site-packages/anyio/pytest_plugin.py:68:

[...]

============================================= short test summary info ==============================================
FAILED backend/tests/api/v1/test_users.py::test_logout_invalid_token - RuntimeError: Event loop is closed
FAILED backend/tests/api/v1/test_users.py::test_register_invalid_data - RuntimeError: Event loop is closed
ERROR backend/tests/api/v1/test_users.py::test_refresh_token - RuntimeError: Event loop is closed
======================================= 2 failed, 7 passed, 1 error in 2.32s =======================================
                                                       
问题回答

暂无回答




相关问题
How do you mix SQL DB vs. Key-Value store (i.e. Redis)

I m reviewing my code and realize I spend a tremendous amount of time taking rows from a database, formatting as XML, AJAX GET to browser, and then converting back into a hashed javascript object ...

Predis sharding (consistent hashing)

Predis claim to have Client-side sharding (support for consistent hashing of keys). http://github.com/nrk/predis I can do sharding using connect to an array of profiles (nodes) but it isn t ...

key value stores for extendable objects

http://www.infoq.com/presentations/newport-evolving-key-value-programming-model is a video about KV stores, and the whole premise is that redis promotes a column-based style for storing the attributes ...

nginx/redis and handling tracking params in url

I am using nginx and redis in my website. For several items on my site, I want to add tracking params to their urls so that when a user clicks on an item, I can collect statistics of user usage apart ...

热门标签