feat(auth, epub): enhance Google token verification and EPUB chapter extraction
Build and Push Reader API Image / docker (push) Successful in 14s

- Added Google token verification logic to improve security and ensure valid tokens are processed.
- Introduced functions for extracting chapters from EPUB files based on HTML tags, including support for chapter markers.
- Updated `.env.example` to include configuration for an OpenAI-compatible router.
- Refactored existing functions for better readability and maintainability.
This commit is contained in:
2026-05-19 00:15:20 +07:00
parent 611213ae5a
commit bddd592146
4 changed files with 754 additions and 68 deletions
+101
View File
@@ -1,10 +1,13 @@
from __future__ import annotations
import datetime as dt
import logging
import os
from typing import Any
from fastapi import Depends, HTTPException, Request
from google.auth.transport import requests as google_requests
from google.oauth2 import id_token as google_id_token
from jose import JWTError, jwt
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
@@ -12,6 +15,8 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import get_db_session
logger = logging.getLogger(__name__)
SESSION_COOKIE_KEYS = [
"next-auth.session-token",
"__Secure-next-auth.session-token",
@@ -21,6 +26,102 @@ SESSION_COOKIE_KEYS = [
]
ACCESS_TOKEN_TTL_SECONDS = 7 * 24 * 60 * 60
GOOGLE_TOKEN_CLOCK_SKEW_SECONDS = 60
def _google_token_audiences_to_try(token: str) -> list[str | None]:
audiences: list[str | None] = []
seen: set[str] = set()
def add(value: str | None) -> None:
if value is None:
if None not in audiences:
audiences.append(None)
return
cleaned = value.strip()
if not cleaned or cleaned in seen:
return
seen.add(cleaned)
audiences.append(cleaned)
for client_id in settings.google_client_id_list:
add(client_id)
try:
claims = jwt.get_unverified_claims(token)
for key in ("aud", "azp"):
raw = claims.get(key)
if isinstance(raw, str):
add(raw)
elif isinstance(raw, list):
for item in raw:
if isinstance(item, str):
add(item)
except Exception:
pass
if not audiences:
audiences.append(None)
return audiences
def verify_google_id_token(raw_token: str) -> dict[str, Any]:
token = raw_token.strip()
if token.count(".") != 2:
raise HTTPException(status_code=400, detail="googleIdToken must be a JWT")
request = google_requests.Request()
last_exc: Exception | None = None
for audience in _google_token_audiences_to_try(token):
try:
id_info = google_id_token.verify_oauth2_token(
token,
request,
audience,
clock_skew_in_seconds=GOOGLE_TOKEN_CLOCK_SKEW_SECONDS,
)
aud = id_info.get("aud")
allowed = set(settings.google_client_id_list)
if allowed:
aud_values: set[str] = set()
if isinstance(aud, str):
aud_values.add(aud)
elif isinstance(aud, list):
aud_values.update(str(item) for item in aud)
azp = id_info.get("azp")
if isinstance(azp, str):
aud_values.add(azp)
if aud_values.isdisjoint(allowed):
last_exc = ValueError(f"token audience not allowed: {aud_values}")
continue
return id_info
except Exception as exc:
last_exc = exc
continue
try:
claims = jwt.get_unverified_claims(token)
logger.warning(
"google id token rejected len=%s iss=%s aud=%s azp=%s exp=%s err=%s",
len(token),
claims.get("iss"),
claims.get("aud"),
claims.get("azp"),
claims.get("exp"),
last_exc,
)
except Exception:
logger.warning("google id token rejected len=%s err=%s", len(token), last_exc)
err_text = str(last_exc or "").lower()
if any(x in err_text for x in ("certificate", "connection", "timeout", "urlopen", "ssl", "network")):
raise HTTPException(
status_code=503,
detail="Unable to verify Google token (reader-api cannot reach googleapis.com)",
) from last_exc
raise HTTPException(status_code=401, detail="Invalid Google token") from last_exc
def _jwt_secret() -> str: