Files
reader-api/app/auth.py
T
virtus f933898c56
Build and Push Reader API Image / docker (push) Successful in 54s
Add access token expiration configuration and new auth session endpoint
2026-04-24 01:53:25 +07:00

138 lines
4.1 KiB
Python

from __future__ import annotations
import datetime as dt
import os
from typing import Any
from fastapi import Depends, HTTPException, Request
from jose import JWTError, jwt
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import get_db_session
SESSION_COOKIE_KEYS = [
"next-auth.session-token",
"__Secure-next-auth.session-token",
"authjs.session-token",
"__Secure-authjs.session-token",
"reader_access_token",
]
ACCESS_TOKEN_TTL_SECONDS = 7 * 24 * 60 * 60
def _jwt_secret() -> str:
return settings.mobile_jwt_secret or settings.nextauth_secret
def create_access_token(user_id: str) -> str:
now = dt.datetime.now(dt.timezone.utc)
payload = {
"sub": user_id,
"iat": int(now.timestamp()),
"exp": int((now + dt.timedelta(seconds=ACCESS_TOKEN_TTL_SECONDS)).timestamp()),
}
secret = _jwt_secret()
if not secret:
raise RuntimeError("Missing MOBILE_JWT_SECRET or NEXTAUTH_SECRET")
return jwt.encode(payload, secret, algorithm="HS256")
async def _get_user_by_id(db: AsyncSession, user_id: str) -> dict[str, Any] | None:
result = await db.execute(
text(
'SELECT id, email, name, image, role FROM "User" WHERE id = :user_id LIMIT 1'
),
{"user_id": user_id},
)
row = result.mappings().first()
return dict(row) if row else None
async def _get_user_from_session_cookie(db: AsyncSession, request: Request) -> dict[str, Any] | None:
token = None
for key in SESSION_COOKIE_KEYS:
value = request.cookies.get(key)
if value:
token = value
break
if not token:
return None
result = await db.execute(
text(
'SELECT u.id, u.email, u.name, u.image, u.role '
'FROM "Session" s '
'JOIN "User" u ON u.id = s."userId" '
'WHERE s."sessionToken" = :token AND s.expires > NOW() '
'LIMIT 1'
),
{"token": token},
)
row = result.mappings().first()
if row:
return dict(row)
# Support NextAuth/Auth.js JWT session cookies when frontend runs in JWT mode.
secret = _jwt_secret()
if not secret:
return None
try:
payload = jwt.decode(token, secret, algorithms=["HS256"])
except JWTError:
return None
subject = payload.get("sub") or payload.get("id")
if not isinstance(subject, str) or not subject:
return None
role = payload.get("role")
if isinstance(role, str) and role:
return {
"id": subject,
"email": payload.get("email"),
"name": payload.get("name"),
"image": payload.get("picture") or payload.get("image"),
"role": role,
}
return await _get_user_by_id(db, subject)
async def resolve_current_user(db: AsyncSession, request: Request) -> dict[str, Any] | None:
auth = request.headers.get("authorization", "")
if auth.lower().startswith("bearer "):
token = auth.split(" ", 1)[1].strip()
secret = _jwt_secret()
if not secret:
return None
try:
payload = jwt.decode(token, secret, algorithms=["HS256"])
subject = payload.get("sub")
if isinstance(subject, str) and subject:
return await _get_user_by_id(db, subject)
except JWTError:
return None
return await _get_user_from_session_cookie(db, request)
async def require_current_user(db: AsyncSession, request: Request) -> dict[str, Any]:
user = await resolve_current_user(db, request)
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
return user
async def require_mod_user(request: Request, db: AsyncSession = Depends(get_db_session)) -> dict[str, Any]:
user = await resolve_current_user(db, request)
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
if user.get("role") not in ("MOD", "ADMIN"):
raise HTTPException(status_code=403, detail="Forbidden: MOD or ADMIN role required")
return user