English 中文(简体)
Basic Redis Operation and Async FastAPI Pytest - RuntimeError: Event loop is closed

Below is an example of my tests. This test does not fail but others do despite them being very similar. Would anyone be able to advise if I integrating Redis properly? I am using it to store tokens, nothing else. But I m not sure if the connection pool is being properly ran.

Note: I skipped writing the imports in most cases to limit amount of code to view. At the bottom are my tests and the fail ones. I can post the failing endpoints but nothing crazy is happening there - get_redis function is used similarly to the endpoint below (logout).


from fastapi import FastAPI
from backend.app.api.v1.endpoints import products, users, cart, orders
from backend.app.core.database import create_database_connection
from backend.app.core.redis import create_redis_connection

app = FastAPI()

app.include_router(users.router, prefix="/users")

async def startup_event():
    app.state.client, app.state.db = create_database_connection()
    app.state.redis_pool = await create_redis_connection()

async def shutdown_event():
    await app.state.redis_pool.close()


import aioredis
from fastapi import Request
from backend.app.core.config import settings

async def create_redis_connection():
    redis_pool = await aioredis.from_url(settings.redis_url)
    return redis_pool

async def get_redis(r: Request):
    return r.app.state.redis_pool


router = APIRouter()

async def logout(
    access_token: str = Depends(oauth2_scheme),
    refresh_token: str = Body(..., embed=True),
    # Delete access token from Redis pool
    deleted_access_token = await redis_pool.delete(access_token)
    # Delete refresh token from Redis pool
    deleted_refresh_token = await redis_pool.delete(refresh_token)

    # If neither token was found in the Redis pool, raise an exception
    if not deleted_access_token and not deleted_refresh_token:
        raise HTTPException(status_code=401, detail="Invalid access or refresh token")

    return {"detail": "Successfully logged out"}


def anyio_backend() -> str: # disable trio
    return "asyncio"

async def app_with_db():
    await app.router.startup()
    await app.router.shutdown()

async def test_user(app_with_db): # create/manage test user
    async with AsyncClient(app=app, base_url="http://test") as ac:
        # Use the current time to ensure uniqueness, convert it to bytes and hash it using SHA-1
        unique_hash = hashlib.sha1(str(time.time()).encode()).hexdigest() # used for test email

        # Prepare the test user data
        new_user = {
            "email": f"pytestuser-{unique_hash[:20]}@example.com",
            "password": "Test@1234",
            "confirm_password": "Test@1234"

        # Send a POST request to the /users/register endpoint
        response = await ac.post("/users/register", json=new_user)
        assert response.status_code == 200

        # Find user ID by EMAIL
        db = app.state.db
        user_in_db = await db.users.find_one({"email": new_user["email"]})

        # Assert that the user was found and the email matches
        assert user_in_db is not None
        assert user_in_db["email"] == new_user["email"]

        # Define test user login dict
        test_user_credentials = {
            "username" : user_in_db["email"], # OAuth expects "username", not "email"
            "password" : new_user["password"]

        yield test_user_credentials

        # Clean DB from test data
        await db.users.delete_one({"_id": ObjectId(user_in_db["_id"])})

async def test_login_and_logout(app_with_db, test_user):
    async with AsyncClient(app=app, base_url="http://test") as ac:

        # Send a POST request to the /token endpoint
        response = await ac.post("/users/token", data=test_user)

        # Assert that the response status code is 200
        assert response.status_code == 200

        # Assert the returned token data
        token_data = response.json()
        assert "access_token" in token_data
        assert "refresh_token" in token_data
        assert token_data["token_type"] == "bearer"

        # Logout
        response = await ac.post("/users/logout", headers={"Authorization": f"Bearer {token_data[ access_token ]}"}, json={"refresh_token": token_data[ refresh_token ]})
        assert response.status_code == 200
        detail = response.json()
        assert detail["detail"] == "Successfully logged out"


backend/tests/api/v1/test_users.py::test_register PASSED
backend/tests/api/v1/test_users.py::test_login_and_logout PASSED
backend/tests/api/v1/test_users.py::test_refresh_token ERROR
backend/tests/api/v1/test_users.py::test_register_existing_email PASSED
backend/tests/api/v1/test_users.py::test_login_incorrect_password PASSED
backend/tests/api/v1/test_users.py::test_login_nonexistent_email PASSED
backend/tests/api/v1/test_users.py::test_refresh_token_invalid PASSED
backend/tests/api/v1/test_users.py::test_logout_invalid_token FAILED
backend/tests/api/v1/test_users.py::test_register_invalid_data FAILED
backend/tests/api/v1/test_users.py::test_register_mismatch_data PASSED
====================================================== ERRORS ======================================================
_______________________________________ ERROR at setup of test_refresh_token _______________________________________

anyio_backend =  asyncio , args = (), kwargs = { app_with_db : None}, backend_name =  asyncio , backend_options = {}
runner = <anyio._backends._asyncio.TestRunner object at 0x7f17708ded70>

    def wrapper(*args, anyio_backend, **kwargs):  # type: ignore[no-untyped-def]
        backend_name, backend_options = extract_backend_and_options(anyio_backend)
        if has_backend_arg:
            kwargs["anyio_backend"] = anyio_backend
        with get_runner(backend_name, backend_options) as runner:
            if isasyncgenfunction(func):
>               yield from runner.run_asyncgen_fixture(func, kwargs)



============================================= short test summary info ==============================================
FAILED backend/tests/api/v1/test_users.py::test_logout_invalid_token - RuntimeError: Event loop is closed
FAILED backend/tests/api/v1/test_users.py::test_register_invalid_data - RuntimeError: Event loop is closed
ERROR backend/tests/api/v1/test_users.py::test_refresh_token - RuntimeError: Event loop is closed
======================================= 2 failed, 7 passed, 1 error in 2.32s =======================================


How do you mix SQL DB vs. Key-Value store (i.e. Redis)

I m reviewing my code and realize I spend a tremendous amount of time taking rows from a database, formatting as XML, AJAX GET to browser, and then converting back into a hashed javascript object ...

Predis sharding (consistent hashing)

Predis claim to have Client-side sharding (support for consistent hashing of keys). http://github.com/nrk/predis I can do sharding using connect to an array of profiles (nodes) but it isn t ...

key value stores for extendable objects

http://www.infoq.com/presentations/newport-evolving-key-value-programming-model is a video about KV stores, and the whole premise is that redis promotes a column-based style for storing the attributes ...

nginx/redis and handling tracking params in url

I am using nginx and redis in my website. For several items on my site, I want to add tracking params to their urls so that when a user clicks on an item, I can collect statistics of user usage apart ...
