Files
WealthWise/backend/app/api/deps.py

153 lines
4.2 KiB
Python
Raw Normal View History

"""WealthWise API dependencies.
This module provides reusable dependencies for FastAPI endpoints including:
- Database session injection
- Authentication dependencies with JWT validation
- Common dependency utilities
"""
from typing import Optional
from uuid import UUID
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlalchemy import select
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.config import get_settings
from app.core.db import get_session
from app.models import User
from app.schemas.user import TokenPayload
settings = get_settings()
# OAuth2 scheme for JWT token authentication
# Uses the token endpoint for OAuth2 Password Flow
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/auth/token",
auto_error=False, # Allow optional auth for public endpoints
)
# Database session dependency
# Re-export get_session for cleaner imports in endpoint modules
SessionDep = Depends(get_session)
async def get_current_user(
token: Optional[str] = Depends(oauth2_scheme),
session: AsyncSession = SessionDep,
) -> User:
"""Get current authenticated user from JWT token.
This dependency extracts the Bearer token from the Authorization header,
validates it locally using the application's secret key, queries the
database for the user, and returns the User model.
Args:
token: JWT access token from Authorization header (Bearer token)
session: Database session for user lookup
Returns:
User: Authenticated user model from database
Raises:
HTTPException: 401 if token is missing, invalid, or user not found/inactive
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if not token:
raise credentials_exception
try:
# Decode and validate JWT
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM],
)
# Extract user ID from subject claim
user_id: Optional[str] = payload.get("sub")
if user_id is None:
raise credentials_exception
# Validate token type
token_type = payload.get("type")
if token_type != "access":
raise credentials_exception
except JWTError:
raise credentials_exception
# Query database for user
statement = select(User).where(User.id == UUID(user_id))
result = await session.exec(statement)
user = result.first()
# Validate user exists and is active
if user is None or not user.is_active:
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
"""Verify user is active (non-superuser check optional).
This dependency extends get_current_user. Currently just returns the user
since get_current_user already checks is_active. Can be extended for
additional checks like email verification.
Args:
current_user: User from get_current_user dependency
Returns:
Active User model
Raises:
HTTPException: 401 if user is inactive
"""
return current_user
async def get_current_superuser(
current_user: User = Depends(get_current_user),
) -> User:
"""Verify user is a superuser/admin.
This dependency requires the user to have superuser privileges.
Args:
current_user: User from get_current_user dependency
Returns:
Superuser User model
Raises:
HTTPException: 403 if user is not a superuser
"""
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions",
)
return current_user
# Convenience exports for endpoint modules
__all__ = [
"get_session",
"SessionDep",
"oauth2_scheme",
"get_current_user",
"get_current_active_user",
"get_current_superuser",
]