Files
reader-api/app/auth.py
T
virtus ac5f5db447 Add Google OAuth configuration and enhance API functionality
- Updated .env.example with WEB_GOOGLE_CLIENT_ID and WEB_GOOGLE_CLIENT_SECRET for Google OAuth.
- Modified README.md to reflect changes in docker-compose for unified web and API deployment.
- Enhanced auth.py to support NextAuth JWT session cookies.
- Improved main.py with lifespan management and additional API endpoints.
- Added mod_overview endpoint in mod.py for MOD panel statistics.
- Updated docker-compose.yml for local API and web service configurations.
2026-03-30 13:55:12 +07:00

135 lines
4.0 KiB
Python

from __future__ import annotations
import datetime as dt
import os
from typing import Any
from fastapi import Depends, HTTPException, Request
from jose import JWTError, jwt
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import get_db_session
SESSION_COOKIE_KEYS = [
"next-auth.session-token",
"__Secure-next-auth.session-token",
"authjs.session-token",
"__Secure-authjs.session-token",
]
def _jwt_secret() -> str:
return settings.mobile_jwt_secret or settings.nextauth_secret
def create_access_token(user_id: str) -> str:
now = dt.datetime.now(dt.timezone.utc)
payload = {
"sub": user_id,
"iat": int(now.timestamp()),
"exp": int((now + dt.timedelta(days=7)).timestamp()),
}
secret = _jwt_secret()
if not secret:
raise RuntimeError("Missing MOBILE_JWT_SECRET or NEXTAUTH_SECRET")
return jwt.encode(payload, secret, algorithm="HS256")
async def _get_user_by_id(db: AsyncSession, user_id: str) -> dict[str, Any] | None:
result = await db.execute(
text(
'SELECT id, email, name, image, role FROM "User" WHERE id = :user_id LIMIT 1'
),
{"user_id": user_id},
)
row = result.mappings().first()
return dict(row) if row else None
async def _get_user_from_session_cookie(db: AsyncSession, request: Request) -> dict[str, Any] | None:
token = None
for key in SESSION_COOKIE_KEYS:
value = request.cookies.get(key)
if value:
token = value
break
if not token:
return None
result = await db.execute(
text(
'SELECT u.id, u.email, u.name, u.image, u.role '
'FROM "Session" s '
'JOIN "User" u ON u.id = s."userId" '
'WHERE s."sessionToken" = :token AND s.expires > NOW() '
'LIMIT 1'
),
{"token": token},
)
row = result.mappings().first()
if row:
return dict(row)
# Support NextAuth/Auth.js JWT session cookies when frontend runs in JWT mode.
secret = _jwt_secret()
if not secret:
return None
try:
payload = jwt.decode(token, secret, algorithms=["HS256"])
except JWTError:
return None
subject = payload.get("sub") or payload.get("id")
if not isinstance(subject, str) or not subject:
return None
role = payload.get("role")
if isinstance(role, str) and role:
return {
"id": subject,
"email": payload.get("email"),
"name": payload.get("name"),
"image": payload.get("picture") or payload.get("image"),
"role": role,
}
return await _get_user_by_id(db, subject)
async def resolve_current_user(db: AsyncSession, request: Request) -> dict[str, Any] | None:
auth = request.headers.get("authorization", "")
if auth.lower().startswith("bearer "):
token = auth.split(" ", 1)[1].strip()
secret = _jwt_secret()
if not secret:
return None
try:
payload = jwt.decode(token, secret, algorithms=["HS256"])
subject = payload.get("sub")
if isinstance(subject, str) and subject:
return await _get_user_by_id(db, subject)
except JWTError:
return None
return await _get_user_from_session_cookie(db, request)
async def require_current_user(db: AsyncSession, request: Request) -> dict[str, Any]:
user = await resolve_current_user(db, request)
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
return user
async def require_mod_user(request: Request, db: AsyncSession = Depends(get_db_session)) -> dict[str, Any]:
user = await resolve_current_user(db, request)
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
if user.get("role") not in ("MOD", "ADMIN"):
raise HTTPException(status_code=403, detail="Forbidden: MOD or ADMIN role required")
return user