From 9a3bb4b6ce53a89c6701cbf233d18aedd32c3be4 Mon Sep 17 00:00:00 2001 From: virtus Date: Thu, 30 Apr 2026 01:53:52 +0700 Subject: [PATCH] feat(storage): implement NAS content storage with read/write capabilities feat(docker): configure NAS content and EPUB source directories in docker-compose feat(migrations): add tables for SourceAsset, ImportJob, ChapterContentRef, and AssetNovelMapping feat(scripts): create backfill script for populating ChapterContentRef from MongoDB chapters --- .gitignore | 7 +- README.md | 91 ++ ROLLOUT_CHECKLIST.md | 30 + app/auth.py | 5 +- app/config.py | 3 + app/main.py | 972 ++++++++++++++++++++- app/storage.py | 33 + docker-compose.yml | 13 + migrations/2026_04_nas_content_storage.sql | 43 + scripts/backfill_chapter_content_refs.py | 118 +++ 10 files changed, 1297 insertions(+), 18 deletions(-) create mode 100644 ROLLOUT_CHECKLIST.md create mode 100644 app/storage.py create mode 100644 migrations/2026_04_nas_content_storage.sql create mode 100644 scripts/backfill_chapter_content_refs.py diff --git a/.gitignore b/.gitignore index 7d72304..1389ee3 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,9 @@ build/ .env .env.local # Local debug/test scripts -test_*.py \ No newline at end of file +test_*.py + +# Local NAS mount test data +data/epub-source/* +data/content/* +data/nas-content/* diff --git a/README.md b/README.md index 6a6a787..e12015b 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,51 @@ Notes: - `api-local` listens on port `8001` and automatically points to `postgres` + `mongo` containers. - `web` listens on port `3000` and calls API internally through `http://api:8000`. +### NAS mount points (chapter content + EPUB source) + +API containers now reserve two mount folders: + +- `/data/content`: converted chapter files (`txt` + `raw_html`) +- `/data/epub-source`: source EPUB library + +Default env mapping (already wired in compose): + +```env +NAS_CONTENT_ROOT=/data/content +EPUB_SOURCE_ROOT=/data/epub-source +``` + +If you want to bind to host folders for local testing: + +```yaml +services: + api: + volumes: + - /absolute/local/path/content:/data/content + - /absolute/local/path/epub-source:/data/epub-source +``` + +If you want to use NFS-backed docker volumes, define them under `volumes:`. Example: + +```yaml +volumes: + nas_chapter_content: + driver: local + driver_opts: + type: nfs + o: addr=100.93.79.10,nolock,soft,rw + device: ":/volume2/apps/reader-content" + + nas_epub_source: + driver: local + driver_opts: + type: nfs + o: addr=100.93.79.10,nolock,soft,rw + device: ":/volume2/apps/reader-epub" +``` + +For your EPUB structure (folder per novel, multiple `.epub` parts inside), mount the parent folder to `/data/epub-source`. + ## Implemented Endpoints - GET /api/health @@ -109,6 +154,52 @@ Notes: - GET /api/truyen/suggest - GET /api/chapters/{chapterId} +## NAS Migration Ops + +### 1) Apply SQL migration manually + +Run SQL in `migrations/2026_04_nas_content_storage.sql` against PostgreSQL. + +### 2) Backfill existing chapter content from Mongo -> NAS + ChapterContentRef + +Dry-run first: + +```bash +python scripts/backfill_chapter_content_refs.py --limit 1000 --dry-run +``` + +Then execute: + +```bash +python scripts/backfill_chapter_content_refs.py --limit 1000 +``` + +You can run multiple batches by increasing/changing `--limit`. + +Checkpoint/resume mode: + +```bash +python scripts/backfill_chapter_content_refs.py --limit 1000 --state-file .backfill_state.json +``` + +Or continue from a known ObjectId: + +```bash +python scripts/backfill_chapter_content_refs.py --limit 1000 --after-id 680f7f3a2f0d53f4f2b7a123 +``` + +## Chapter Read Cutover Flag + +Set in `.env`: + +```env +CHAPTER_CONTENT_MODE=nas_first +``` + +Values: +- `nas_first` (default): read NAS ref first, fallback Mongo. +- `mongo_first`: keep Mongo-first during cautious rollout. + ## Notes - Web session auth is supported via NextAuth session cookies (next-auth.session-token and secure variants). diff --git a/ROLLOUT_CHECKLIST.md b/ROLLOUT_CHECKLIST.md new file mode 100644 index 0000000..d0521ad --- /dev/null +++ b/ROLLOUT_CHECKLIST.md @@ -0,0 +1,30 @@ +# Rollout Checklist - NAS Chapter Storage + +## Pre-Deploy +- [ ] Backup PostgreSQL schema + critical tables +- [ ] Verify NAS mount/access permissions in API runtime +- [ ] Enable feature flags (default: Mongo fallback on) + +## Deploy Order +1. Deploy DB migrations +2. Deploy API with dual-read disabled by default +3. Enable discover/approve/convert job APIs +4. Run pilot import set (small curated EPUB batch) +5. Enable NAS-first for pilot users/env +6. Gradually ramp NAS-first traffic + +## Runtime Verification +- [ ] `/api/health` stable +- [ ] Chapter read success rate >= target +- [ ] NAS read timeout/error rate below threshold +- [ ] Mongo fallback rate trending down + +## Rollback +- [ ] Switch feature flag to Mongo-first immediately +- [ ] Stop import jobs +- [ ] Keep imported refs for investigation (no destructive cleanup) + +## Post-Deploy +- [ ] Compare chapter counts and random content samples +- [ ] Review failed/review_required import queue +- [ ] Publish release notes for web/mobile teams diff --git a/app/auth.py b/app/auth.py index 00b0ace..4d57881 100644 --- a/app/auth.py +++ b/app/auth.py @@ -121,7 +121,10 @@ async def resolve_current_user(db: AsyncSession, request: Request) -> dict[str, return await _get_user_from_session_cookie(db, request) -async def require_current_user(db: AsyncSession, request: Request) -> dict[str, Any]: +async def require_current_user( + request: Request, + db: AsyncSession = Depends(get_db_session), +) -> dict[str, Any]: user = await resolve_current_user(db, request) if not user: raise HTTPException(status_code=401, detail="Unauthorized") diff --git a/app/config.py b/app/config.py index b072c3c..53368cc 100644 --- a/app/config.py +++ b/app/config.py @@ -20,6 +20,9 @@ class Settings(BaseSettings): r2_secret_access_key: str = "" r2_bucket_name: str = "" r2_public_base_url: str = "" + nas_content_root: str = "./data/content" + epub_source_root: str = "./data/epub-source" + chapter_content_mode: str = "nas_first" # nas_first | mongo_first deepseek_key: str = "" deepseek_model: str = "deepseek-chat" diff --git a/app/main.py b/app/main.py index 6493930..e56c03a 100644 --- a/app/main.py +++ b/app/main.py @@ -1,10 +1,12 @@ from __future__ import annotations import datetime as dt +import hashlib import random import secrets import uuid from contextlib import asynccontextmanager +from pathlib import Path from typing import Any from bson import ObjectId @@ -20,14 +22,80 @@ from app.auth import ACCESS_TOKEN_TTL_SECONDS, create_access_token, require_curr from app.routers import mod from app.config import settings from app.database import get_db_session, mongo_client, mongo_db +from app.storage import storage @asynccontextmanager async def lifespan(_: FastAPI): + await _ensure_migration_tables() yield mongo_client.close() +async def _ensure_migration_tables() -> None: + from app.database import engine + + ddl_statements = [ + 'CREATE EXTENSION IF NOT EXISTS unaccent', + ''' + CREATE TABLE IF NOT EXISTS "SourceAsset" ( + id TEXT PRIMARY KEY, + path TEXT NOT NULL, + sha256 TEXT NOT NULL, + opf_identifier TEXT, + title TEXT, + author TEXT, + status TEXT NOT NULL DEFAULT 'discovered', + "createdAt" TIMESTAMPTZ NOT NULL DEFAULT NOW(), + "updatedAt" TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + ''', + ''' + CREATE UNIQUE INDEX IF NOT EXISTS "SourceAsset_sha256_key" ON "SourceAsset"(sha256) + ''', + ''' + CREATE TABLE IF NOT EXISTS "ImportJob" ( + id TEXT PRIMARY KEY, + "sourceAssetId" TEXT NOT NULL REFERENCES "SourceAsset"(id) ON DELETE CASCADE, + status TEXT NOT NULL DEFAULT 'pending', + error TEXT, + "createdAt" TIMESTAMPTZ NOT NULL DEFAULT NOW(), + "updatedAt" TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + ''', + ''' + CREATE TABLE IF NOT EXISTS "ChapterContentRef" ( + "chapterId" TEXT PRIMARY KEY, + "txtHref" TEXT NOT NULL, + "rawHtmlHref" TEXT NOT NULL, + "contentHash" TEXT NOT NULL, + "createdAt" TIMESTAMPTZ NOT NULL DEFAULT NOW(), + "updatedAt" TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + ''', + ''' + CREATE TABLE IF NOT EXISTS "AssetNovelMapping" ( + id TEXT PRIMARY KEY, + "sourceAssetId" TEXT NOT NULL REFERENCES "SourceAsset"(id) ON DELETE CASCADE, + "novelId" TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + note TEXT, + "createdAt" TIMESTAMPTZ NOT NULL DEFAULT NOW(), + "updatedAt" TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + ''', + ] + + async with engine.begin() as conn: + for ddl in ddl_statements: + try: + await conn.execute(text(ddl)) + except Exception: + if ddl.strip().lower().startswith("create extension"): + continue + raise + + app = FastAPI(title=settings.app_name, lifespan=lifespan) app.include_router(mod.router) @@ -909,7 +977,7 @@ async def get_novel_chapters( @app.get("/api/truyen/{novel_id}/chapters/by-number/{chapter_number}") -async def get_chapter_by_number(novel_id: str, chapter_number: int): +async def get_chapter_by_number(novel_id: str, chapter_number: int, db: AsyncSession = Depends(get_db_session)): chapter = await mongo_db["chapters"].find_one({"novelId": novel_id, "number": chapter_number}) if not chapter: raise HTTPException(status_code=404, detail="Chapter not found") @@ -926,12 +994,15 @@ async def get_chapter_by_number(novel_id: str, chapter_number: int): await mongo_db["chapters"].update_one({"_id": chapter["_id"]}, {"$inc": {"views": 1}}) + chapter_id = str(chapter.get("_id")) + content = await _resolve_chapter_content(chapter_id, chapter.get("content"), db) + return { "id": str(chapter.get("_id")), "novelId": chapter.get("novelId"), "number": chapter.get("number"), "title": chapter.get("title"), - "content": chapter.get("content"), + "content": content, "views": int(chapter.get("views") or 0) + 1, "volumeNumber": chapter.get("volumeNumber"), "volumeTitle": chapter.get("volumeTitle"), @@ -944,7 +1015,7 @@ async def get_chapter_by_number(novel_id: str, chapter_number: int): @app.get("/api/chapters/{chapter_id}") -async def get_chapter_detail(chapter_id: str): +async def get_chapter_detail(chapter_id: str, db: AsyncSession = Depends(get_db_session)): try: object_id = ObjectId(chapter_id) except Exception as exc: @@ -963,12 +1034,14 @@ async def get_chapter_detail(chapter_id: str): {"number": 1}, ) + content = await _resolve_chapter_content(chapter_id, chapter.get("content"), db) + return { "id": str(chapter.get("_id")), "novelId": chapter.get("novelId"), "number": chapter.get("number"), "title": chapter.get("title"), - "content": chapter.get("content"), + "content": content, "views": chapter.get("views", 0), "volumeNumber": chapter.get("volumeNumber"), "volumeTitle": chapter.get("volumeTitle"), @@ -1022,6 +1095,873 @@ class RatePayload(BaseModel): score: float = Field(ge=1, le=5) +class SourceAssetApprovePayload(BaseModel): + status: str = Field(pattern="^(approved|rejected|review_required)$") + + +class ImportJobCreatePayload(BaseModel): + sourceAssetId: str + + +class ImportJobApplyMappingPayload(BaseModel): + novelId: str + overwrite: bool = False + + +class ImportJobManualMapPayload(BaseModel): + novelId: str + sourceChapterNumber: int = Field(ge=1) + targetChapterId: str + overwrite: bool = True + + +class ImportJobCompletePayload(BaseModel): + force: bool = False + + +class SourceAssetUpsertPayload(BaseModel): + path: str + sha256: str + opfIdentifier: str | None = None + title: str | None = None + author: str | None = None + + +def _asset_file_sha256(path: Path) -> str: + h = hashlib.sha256() + with path.open("rb") as f: + while True: + chunk = f.read(1024 * 1024) + if not chunk: + break + h.update(chunk) + return h.hexdigest() + + +def _extract_epub_chapters(epub_path: Path) -> list[dict[str, Any]]: + from app.routers.mod import _build_chapters_from_toc, _epub_html_to_text, _postprocess_extracted_chapters + from ebooklib import epub as epublib + + book = epublib.read_epub(str(epub_path), options={"ignore_ncx": False}) + extracted = _postprocess_extracted_chapters(_build_chapters_from_toc(book), "toc") + + chapters: list[dict[str, Any]] = [] + for idx, ch in enumerate(extracted, start=1): + content = str(ch.get("content") or "") + if not content.strip(): + continue + chapters.append( + { + "number": int(ch.get("number") or idx), + "title": str(ch.get("title") or f"Chapter {idx}"), + "raw_html": content, + "txt": _epub_html_to_text(content).strip(), + } + ) + return chapters + + +async def _resolve_chapter_content(chapter_id: str, mongo_fallback: str | None, db: AsyncSession) -> str | None: + ref_row = ( + await db.execute( + text('SELECT "txtHref" FROM "ChapterContentRef" WHERE "chapterId" = :chapter_id LIMIT 1'), + {"chapter_id": chapter_id}, + ) + ).mappings().first() + + if settings.chapter_content_mode == "mongo_first": + if mongo_fallback: + return mongo_fallback + if ref_row: + try: + return storage.read_text(ref_row["txtHref"]) + except Exception: + return mongo_fallback + return mongo_fallback + + if ref_row: + try: + return storage.read_text(ref_row["txtHref"]) + except Exception: + return mongo_fallback + return mongo_fallback + + +@app.get("/api/import/assets") +async def list_source_assets( + status: str | None = None, + unconvertedOnly: bool = Query(default=False), + q: str | None = None, + limit: int = Query(default=50, ge=1, le=200), + db: AsyncSession = Depends(get_db_session), +): + where_parts: list[str] = [] + params: dict[str, Any] = {"limit": limit} + if status: + where_parts.append('s.status = :status') + params["status"] = status + + if unconvertedOnly: + where_parts.append( + 'NOT EXISTS (SELECT 1 FROM "ImportJob" j WHERE j."sourceAssetId" = s.id AND j.status = :completed_status)' + ) + params["completed_status"] = "completed" + + if q and q.strip(): + raw = q.strip().lower() + where_parts.append( + '(unaccent(lower(s.path)) ILIKE unaccent(:q_raw) OR lower(s.path) ILIKE :q_raw)' + ) + params["q_raw"] = f"%{raw}%" + + where_sql = f"WHERE {' AND '.join(where_parts)}" if where_parts else "" + rows = ( + await db.execute( + text( + f'SELECT id, path, sha256, opf_identifier, title, author, status, "createdAt", "updatedAt" ' + f'FROM "SourceAsset" s {where_sql} ORDER BY s."updatedAt" DESC LIMIT :limit' + ), + params, + ) + ).mappings().all() + return [dict(r) for r in rows] + + +@app.post("/api/import/assets/upsert") +async def upsert_source_asset( + payload: SourceAssetUpsertPayload, + db: AsyncSession = Depends(get_db_session), + user: dict = Depends(require_current_user), +): + if user.get("role") not in ("MOD", "ADMIN"): + raise HTTPException(status_code=403, detail="Forbidden") + + existing = ( + await db.execute(text('SELECT id, path, sha256 FROM "SourceAsset" WHERE sha256 = :sha256 LIMIT 1'), {"sha256": payload.sha256}) + ).mappings().first() + + if existing: + row = ( + await db.execute( + text( + 'UPDATE "SourceAsset" SET path = :path, opf_identifier = :opf, title = :title, author = :author, ' + '"updatedAt" = NOW() WHERE id = :id ' + 'RETURNING id, path, sha256, status, "updatedAt"' + ), + { + "id": existing["id"], + "path": payload.path, + "opf": payload.opfIdentifier, + "title": payload.title, + "author": payload.author, + }, + ) + ).mappings().first() + else: + new_id = _new_id("asset_") + row = ( + await db.execute( + text( + 'INSERT INTO "SourceAsset" (id, path, sha256, opf_identifier, title, author, status) ' + 'VALUES (:id, :path, :sha256, :opf, :title, :author, :status) ' + 'RETURNING id, path, sha256, status, "updatedAt"' + ), + { + "id": new_id, + "path": payload.path, + "sha256": payload.sha256, + "opf": payload.opfIdentifier, + "title": payload.title, + "author": payload.author, + "status": "discovered", + }, + ) + ).mappings().first() + + await db.commit() + return dict(row) if row else {} + + +@app.post("/api/import/discover") +async def discover_epub_assets( + limit: int = Query(default=200, ge=1, le=2000), + db: AsyncSession = Depends(get_db_session), + user: dict = Depends(require_current_user), +): + if user.get("role") not in ("MOD", "ADMIN"): + raise HTTPException(status_code=403, detail="Forbidden") + + root = Path(settings.epub_source_root) + if not root.exists(): + raise HTTPException(status_code=400, detail=f"EPUB source root not found: {settings.epub_source_root}") + + found = sorted(root.rglob("*.epub"))[:limit] + discovered = 0 + updated = 0 + + for epub_path in found: + sha256 = _asset_file_sha256(epub_path) + rel_path = str(epub_path.relative_to(root)) + existing = ( + await db.execute(text('SELECT id FROM "SourceAsset" WHERE sha256 = :sha LIMIT 1'), {"sha": sha256}) + ).mappings().first() + + if existing: + await db.execute( + text('UPDATE "SourceAsset" SET path = :path, "updatedAt" = NOW() WHERE id = :id'), + {"id": existing["id"], "path": rel_path}, + ) + updated += 1 + continue + + await db.execute( + text( + 'INSERT INTO "SourceAsset" (id, path, sha256, status) VALUES (:id, :path, :sha, :status)' + ), + {"id": _new_id("asset_"), "path": rel_path, "sha": sha256, "status": "discovered"}, + ) + discovered += 1 + + await db.commit() + return {"scanned": len(found), "discovered": discovered, "updated": updated} + + +@app.post("/api/import/assets/{asset_id}/approve") +async def approve_source_asset( + asset_id: str, + payload: SourceAssetApprovePayload, + db: AsyncSession = Depends(get_db_session), + user: dict = Depends(require_current_user), +): + if user.get("role") not in ("MOD", "ADMIN"): + raise HTTPException(status_code=403, detail="Forbidden") + row = ( + await db.execute( + text( + 'UPDATE "SourceAsset" SET status = :status, "updatedAt" = NOW() ' + 'WHERE id = :id RETURNING id, status, "updatedAt"' + ), + {"id": asset_id, "status": payload.status}, + ) + ).mappings().first() + if not row: + raise HTTPException(status_code=404, detail="Source asset not found") + await db.commit() + return dict(row) + + +@app.post("/api/import/jobs") +async def create_import_job( + payload: ImportJobCreatePayload, + db: AsyncSession = Depends(get_db_session), + user: dict = Depends(require_current_user), +): + if user.get("role") not in ("MOD", "ADMIN"): + raise HTTPException(status_code=403, detail="Forbidden") + + source_row = ( + await db.execute( + text('SELECT id, status FROM "SourceAsset" WHERE id = :id LIMIT 1'), + {"id": payload.sourceAssetId}, + ) + ).mappings().first() + if not source_row: + raise HTTPException(status_code=404, detail="Source asset not found") + if source_row["status"] != "approved": + raise HTTPException(status_code=400, detail="Source asset must be approved") + + job_id = _new_id("job_") + await db.execute( + text('INSERT INTO "ImportJob" (id, "sourceAssetId", status) VALUES (:id, :asset_id, :status)'), + {"id": job_id, "asset_id": payload.sourceAssetId, "status": "pending"}, + ) + await db.commit() + return {"id": job_id, "sourceAssetId": payload.sourceAssetId, "status": "pending"} + + +@app.get("/api/import/jobs/{job_id}") +async def get_import_job(job_id: str, db: AsyncSession = Depends(get_db_session)): + row = ( + await db.execute( + text( + 'SELECT j.id, j."sourceAssetId", j.status, j.error, j."createdAt", j."updatedAt" ' + 'FROM "ImportJob" j WHERE j.id = :id LIMIT 1' + ), + {"id": job_id}, + ) + ).mappings().first() + if not row: + raise HTTPException(status_code=404, detail="Import job not found") + return dict(row) + + +@app.post("/api/import/jobs/{job_id}/run") +async def run_import_job( + job_id: str, + db: AsyncSession = Depends(get_db_session), + user: dict = Depends(require_current_user), +): + if user.get("role") not in ("MOD", "ADMIN"): + raise HTTPException(status_code=403, detail="Forbidden") + + job = ( + await db.execute( + text( + 'SELECT j.id, j."sourceAssetId", s.path, s.status AS source_status ' + 'FROM "ImportJob" j JOIN "SourceAsset" s ON s.id = j."sourceAssetId" ' + 'WHERE j.id = :id LIMIT 1' + ), + {"id": job_id}, + ) + ).mappings().first() + if not job: + raise HTTPException(status_code=404, detail="Import job not found") + + source_path = Path(settings.epub_source_root) / str(job["path"]) + if not source_path.exists(): + await db.execute( + text('UPDATE "ImportJob" SET status = :status, error = :err, "updatedAt" = NOW() WHERE id = :id'), + {"id": job_id, "status": "failed", "err": "EPUB file not found"}, + ) + await db.commit() + raise HTTPException(status_code=400, detail="EPUB source file not found") + + await db.execute( + text('UPDATE "ImportJob" SET status = :status, error = NULL, "updatedAt" = NOW() WHERE id = :id'), + {"id": job_id, "status": "processing"}, + ) + await db.commit() + + try: + chapters = _extract_epub_chapters(source_path) + if not chapters: + raise RuntimeError("No readable chapters extracted from EPUB") + + for chapter in chapters: + base = f"{job['sourceAssetId']}/{chapter['number']}" + txt_write = storage.write_text(f"{base}.txt", chapter["txt"]) + storage.write_text(f"{base}.raw.html", chapter["raw_html"]) + # missing mapping to canonical chapter ids: keep in review_required queue + + await db.execute( + text('UPDATE "ImportJob" SET status = :status, error = :err, "updatedAt" = NOW() WHERE id = :id'), + { + "id": job_id, + "status": "review_required", + "err": f"missing_mapping:{len(chapters)}_chapters_ready", + }, + ) + await db.commit() + return {"id": job_id, "status": "review_required", "chaptersExtracted": len(chapters)} + except Exception as exc: + await db.execute( + text('UPDATE "ImportJob" SET status = :status, error = :err, "updatedAt" = NOW() WHERE id = :id'), + {"id": job_id, "status": "failed", "err": str(exc)}, + ) + await db.commit() + raise HTTPException(status_code=500, detail="Import job failed") from exc + + +@app.post("/api/import/jobs/{job_id}/apply-mapping") +async def apply_import_job_mapping( + job_id: str, + payload: ImportJobApplyMappingPayload, + db: AsyncSession = Depends(get_db_session), + user: dict = Depends(require_current_user), +): + if user.get("role") not in ("MOD", "ADMIN"): + raise HTTPException(status_code=403, detail="Forbidden") + + job = ( + await db.execute( + text('SELECT id, "sourceAssetId" FROM "ImportJob" WHERE id = :id LIMIT 1'), + {"id": job_id}, + ) + ).mappings().first() + if not job: + raise HTTPException(status_code=404, detail="Import job not found") + + asset_id = str(job["sourceAssetId"]) + asset_dir = Path(settings.nas_content_root) / asset_id + if not asset_dir.exists(): + raise HTTPException(status_code=400, detail="Converted content folder not found") + + txt_files = sorted(asset_dir.glob("*.txt")) + mapped = 0 + missing = 0 + + for txt_file in txt_files: + chapter_token = txt_file.stem + if not chapter_token.isdigit(): + continue + chapter_number = int(chapter_token) + chapter = await mongo_db["chapters"].find_one( + {"novelId": payload.novelId, "number": chapter_number}, + {"_id": 1}, + ) + if not chapter: + missing += 1 + continue + + chapter_id = str(chapter.get("_id")) + txt_href = f"{asset_id}/{chapter_number}.txt" + raw_href = f"{asset_id}/{chapter_number}.raw.html" + content_hash = hashlib.sha256(storage.read_text(txt_href).encode("utf-8")).hexdigest() + + if payload.overwrite: + await db.execute( + text( + 'INSERT INTO "ChapterContentRef" ("chapterId", "txtHref", "rawHtmlHref", "contentHash") ' + 'VALUES (:chapter_id, :txt_href, :raw_href, :hash) ' + 'ON CONFLICT ("chapterId") DO UPDATE ' + 'SET "txtHref" = EXCLUDED."txtHref", "rawHtmlHref" = EXCLUDED."rawHtmlHref", ' + '"contentHash" = EXCLUDED."contentHash", "updatedAt" = NOW()' + ), + { + "chapter_id": chapter_id, + "txt_href": txt_href, + "raw_href": raw_href, + "hash": content_hash, + }, + ) + else: + await db.execute( + text( + 'INSERT INTO "ChapterContentRef" ("chapterId", "txtHref", "rawHtmlHref", "contentHash") ' + 'VALUES (:chapter_id, :txt_href, :raw_href, :hash) ' + 'ON CONFLICT ("chapterId") DO NOTHING' + ), + { + "chapter_id": chapter_id, + "txt_href": txt_href, + "raw_href": raw_href, + "hash": content_hash, + }, + ) + mapped += 1 + + status = "completed" if missing == 0 else "review_required" + await db.execute( + text( + 'INSERT INTO "AssetNovelMapping" (id, "sourceAssetId", "novelId", status, note) ' + 'VALUES (:id, :asset_id, :novel_id, :status, :note)' + ), + { + "id": _new_id("map_"), + "asset_id": asset_id, + "novel_id": payload.novelId, + "status": status, + "note": f"mapped={mapped},missing={missing}", + }, + ) + await db.execute( + text('UPDATE "ImportJob" SET status = :status, error = :err, "updatedAt" = NOW() WHERE id = :id'), + { + "id": job_id, + "status": status, + "err": None if missing == 0 else f"missing_mapping:{missing}", + }, + ) + await db.commit() + + return {"jobId": job_id, "sourceAssetId": asset_id, "mapped": mapped, "missing": missing, "status": status} + + +@app.get("/api/import/review-required") +async def list_review_required_jobs( + limit: int = Query(default=100, ge=1, le=500), + db: AsyncSession = Depends(get_db_session), + user: dict = Depends(require_current_user), +): + if user.get("role") not in ("MOD", "ADMIN"): + raise HTTPException(status_code=403, detail="Forbidden") + + rows = ( + await db.execute( + text( + 'SELECT j.id, j."sourceAssetId", j.status, j.error, j."updatedAt", s.path, s.title, s.author ' + 'FROM "ImportJob" j JOIN "SourceAsset" s ON s.id = j."sourceAssetId" ' + 'WHERE j.status = :status ORDER BY j."updatedAt" DESC LIMIT :limit' + ), + {"status": "review_required", "limit": limit}, + ) + ).mappings().all() + return [dict(r) for r in rows] + + +@app.get("/api/import/jobs/{job_id}/missing-mappings") +async def get_missing_mappings( + job_id: str, + novelId: str, + db: AsyncSession = Depends(get_db_session), + user: dict = Depends(require_current_user), +): + if user.get("role") not in ("MOD", "ADMIN"): + raise HTTPException(status_code=403, detail="Forbidden") + + job = ( + await db.execute(text('SELECT id, "sourceAssetId" FROM "ImportJob" WHERE id = :id LIMIT 1'), {"id": job_id}) + ).mappings().first() + if not job: + raise HTTPException(status_code=404, detail="Import job not found") + + asset_id = str(job["sourceAssetId"]) + asset_dir = Path(settings.nas_content_root) / asset_id + if not asset_dir.exists(): + raise HTTPException(status_code=400, detail="Converted content folder not found") + + missing: list[dict[str, Any]] = [] + for txt_file in sorted(asset_dir.glob("*.txt")): + token = txt_file.stem + if not token.isdigit(): + continue + chapter_number = int(token) + chapter = await mongo_db["chapters"].find_one( + {"novelId": novelId, "number": chapter_number}, + {"_id": 1}, + ) + if not chapter: + missing.append( + { + "sourceChapterNumber": chapter_number, + "txtHref": f"{asset_id}/{chapter_number}.txt", + "rawHtmlHref": f"{asset_id}/{chapter_number}.raw.html", + } + ) + return {"jobId": job_id, "sourceAssetId": asset_id, "novelId": novelId, "missing": missing} + + +@app.post("/api/import/jobs/{job_id}/manual-map") +async def manual_map_chapter( + job_id: str, + payload: ImportJobManualMapPayload, + db: AsyncSession = Depends(get_db_session), + user: dict = Depends(require_current_user), +): + if user.get("role") not in ("MOD", "ADMIN"): + raise HTTPException(status_code=403, detail="Forbidden") + + job = ( + await db.execute(text('SELECT id, "sourceAssetId" FROM "ImportJob" WHERE id = :id LIMIT 1'), {"id": job_id}) + ).mappings().first() + if not job: + raise HTTPException(status_code=404, detail="Import job not found") + + asset_id = str(job["sourceAssetId"]) + txt_href = f"{asset_id}/{payload.sourceChapterNumber}.txt" + raw_href = f"{asset_id}/{payload.sourceChapterNumber}.raw.html" + try: + content = storage.read_text(txt_href) + except Exception as exc: + raise HTTPException(status_code=404, detail="Source chapter content not found") from exc + + target = await mongo_db["chapters"].find_one( + {"_id": ObjectId(payload.targetChapterId), "novelId": payload.novelId}, + {"_id": 1}, + ) + if not target: + raise HTTPException(status_code=404, detail="Target chapter not found for novel") + + content_hash = hashlib.sha256(content.encode("utf-8")).hexdigest() + if payload.overwrite: + await db.execute( + text( + 'INSERT INTO "ChapterContentRef" ("chapterId", "txtHref", "rawHtmlHref", "contentHash") ' + 'VALUES (:chapter_id, :txt_href, :raw_href, :hash) ' + 'ON CONFLICT ("chapterId") DO UPDATE ' + 'SET "txtHref" = EXCLUDED."txtHref", "rawHtmlHref" = EXCLUDED."rawHtmlHref", ' + '"contentHash" = EXCLUDED."contentHash", "updatedAt" = NOW()' + ), + { + "chapter_id": payload.targetChapterId, + "txt_href": txt_href, + "raw_href": raw_href, + "hash": content_hash, + }, + ) + else: + await db.execute( + text( + 'INSERT INTO "ChapterContentRef" ("chapterId", "txtHref", "rawHtmlHref", "contentHash") ' + 'VALUES (:chapter_id, :txt_href, :raw_href, :hash) ON CONFLICT ("chapterId") DO NOTHING' + ), + { + "chapter_id": payload.targetChapterId, + "txt_href": txt_href, + "raw_href": raw_href, + "hash": content_hash, + }, + ) + await db.commit() + return { + "jobId": job_id, + "sourceAssetId": asset_id, + "sourceChapterNumber": payload.sourceChapterNumber, + "targetChapterId": payload.targetChapterId, + "status": "mapped", + } + + +@app.get("/api/import/jobs/{job_id}/source-chapter-preview") +async def preview_source_chapter( + job_id: str, + chapterNumber: int = Query(..., ge=1), + includeRawHtml: bool = Query(default=False), + db: AsyncSession = Depends(get_db_session), + user: dict = Depends(require_current_user), +): + if user.get("role") not in ("MOD", "ADMIN"): + raise HTTPException(status_code=403, detail="Forbidden") + + job = ( + await db.execute(text('SELECT id, "sourceAssetId" FROM "ImportJob" WHERE id = :id LIMIT 1'), {"id": job_id}) + ).mappings().first() + if not job: + raise HTTPException(status_code=404, detail="Import job not found") + + asset_id = str(job["sourceAssetId"]) + txt_href = f"{asset_id}/{chapterNumber}.txt" + raw_href = f"{asset_id}/{chapterNumber}.raw.html" + try: + txt_content = storage.read_text(txt_href) + except Exception as exc: + raise HTTPException(status_code=404, detail="Source chapter not found") from exc + + response: dict[str, Any] = { + "jobId": job_id, + "sourceAssetId": asset_id, + "chapterNumber": chapterNumber, + "txtHref": txt_href, + "rawHtmlHref": raw_href, + "txt": txt_content, + } + if includeRawHtml: + try: + response["rawHtml"] = storage.read_text(raw_href) + except Exception: + response["rawHtml"] = None + return response + + +@app.post("/api/import/jobs/{job_id}/complete") +async def complete_import_job( + job_id: str, + payload: ImportJobCompletePayload, + db: AsyncSession = Depends(get_db_session), + user: dict = Depends(require_current_user), +): + if user.get("role") not in ("MOD", "ADMIN"): + raise HTTPException(status_code=403, detail="Forbidden") + + row = ( + await db.execute( + text('SELECT id, "sourceAssetId", status FROM "ImportJob" WHERE id = :id LIMIT 1'), + {"id": job_id}, + ) + ).mappings().first() + if not row: + raise HTTPException(status_code=404, detail="Import job not found") + + if row["status"] not in ("review_required", "completed") and not payload.force: + raise HTTPException(status_code=400, detail="Job is not ready for completion") + + await db.execute( + text('UPDATE "ImportJob" SET status = :status, error = NULL, "updatedAt" = NOW() WHERE id = :id'), + {"id": job_id, "status": "completed"}, + ) + await db.execute( + text( + 'UPDATE "AssetNovelMapping" SET status = :status, "updatedAt" = NOW() ' + 'WHERE "sourceAssetId" = :asset_id AND status != :status' + ), + {"asset_id": row["sourceAssetId"], "status": "completed"}, + ) + await db.commit() + return {"jobId": job_id, "status": "completed", "sourceAssetId": row["sourceAssetId"]} + + +@app.get("/api/import/jobs/{job_id}/mapping-progress") +async def get_mapping_progress( + job_id: str, + novelId: str, + db: AsyncSession = Depends(get_db_session), + user: dict = Depends(require_current_user), +): + if user.get("role") not in ("MOD", "ADMIN"): + raise HTTPException(status_code=403, detail="Forbidden") + + job = ( + await db.execute(text('SELECT id, "sourceAssetId", status, error FROM "ImportJob" WHERE id = :id LIMIT 1'), {"id": job_id}) + ).mappings().first() + if not job: + raise HTTPException(status_code=404, detail="Import job not found") + + asset_id = str(job["sourceAssetId"]) + asset_dir = Path(settings.nas_content_root) / asset_id + if not asset_dir.exists(): + raise HTTPException(status_code=400, detail="Converted content folder not found") + + total = 0 + mapped = 0 + missing_numbers: list[int] = [] + for txt_file in sorted(asset_dir.glob("*.txt")): + token = txt_file.stem + if not token.isdigit(): + continue + chapter_number = int(token) + total += 1 + chapter = await mongo_db["chapters"].find_one( + {"novelId": novelId, "number": chapter_number}, + {"_id": 1}, + ) + if not chapter: + missing_numbers.append(chapter_number) + continue + + chapter_id = str(chapter.get("_id")) + ref = ( + await db.execute( + text('SELECT "chapterId" FROM "ChapterContentRef" WHERE "chapterId" = :id LIMIT 1'), + {"id": chapter_id}, + ) + ).mappings().first() + if ref: + mapped += 1 + else: + missing_numbers.append(chapter_number) + + missing = max(total - mapped, 0) + percent = 100.0 if total == 0 else round((mapped / total) * 100, 2) + return { + "jobId": job_id, + "sourceAssetId": asset_id, + "novelId": novelId, + "jobStatus": job["status"], + "jobError": job.get("error"), + "totalSourceChapters": total, + "mappedChapters": mapped, + "missingChapters": missing, + "progressPercent": percent, + "missingChapterNumbers": missing_numbers, + } + + +@app.post("/api/import/jobs/{job_id}/auto-complete") +async def auto_complete_import_job( + job_id: str, + novelId: str, + db: AsyncSession = Depends(get_db_session), + user: dict = Depends(require_current_user), +): + if user.get("role") not in ("MOD", "ADMIN"): + raise HTTPException(status_code=403, detail="Forbidden") + + job = ( + await db.execute(text('SELECT id, "sourceAssetId" FROM "ImportJob" WHERE id = :id LIMIT 1'), {"id": job_id}) + ).mappings().first() + if not job: + raise HTTPException(status_code=404, detail="Import job not found") + + asset_id = str(job["sourceAssetId"]) + asset_dir = Path(settings.nas_content_root) / asset_id + if not asset_dir.exists(): + raise HTTPException(status_code=400, detail="Converted content folder not found") + + total = 0 + mapped = 0 + for txt_file in sorted(asset_dir.glob("*.txt")): + token = txt_file.stem + if not token.isdigit(): + continue + chapter_number = int(token) + total += 1 + chapter = await mongo_db["chapters"].find_one({"novelId": novelId, "number": chapter_number}, {"_id": 1}) + if not chapter: + continue + chapter_id = str(chapter.get("_id")) + ref = ( + await db.execute( + text('SELECT "chapterId" FROM "ChapterContentRef" WHERE "chapterId" = :id LIMIT 1'), + {"id": chapter_id}, + ) + ).mappings().first() + if ref: + mapped += 1 + + if total == 0: + raise HTTPException(status_code=400, detail="No source chapter files found") + if mapped != total: + raise HTTPException( + status_code=400, + detail=f"Cannot auto-complete: mapped {mapped}/{total}. Resolve missing mappings first.", + ) + + await db.execute( + text('UPDATE "ImportJob" SET status = :status, error = NULL, "updatedAt" = NOW() WHERE id = :id'), + {"id": job_id, "status": "completed"}, + ) + await db.execute( + text( + 'UPDATE "AssetNovelMapping" SET status = :status, "updatedAt" = NOW() ' + 'WHERE "sourceAssetId" = :asset_id AND "novelId" = :novel_id' + ), + {"status": "completed", "asset_id": asset_id, "novel_id": novelId}, + ) + await db.commit() + return {"jobId": job_id, "sourceAssetId": asset_id, "novelId": novelId, "status": "completed", "mapped": mapped, "total": total} + + +@app.delete("/api/import/jobs/{job_id}") +async def delete_import_job( + job_id: str, + removeContent: bool = Query(default=True), + db: AsyncSession = Depends(get_db_session), + user: dict = Depends(require_current_user), +): + if user.get("role") not in ("MOD", "ADMIN"): + raise HTTPException(status_code=403, detail="Forbidden") + + row = ( + await db.execute( + text('SELECT id, "sourceAssetId" FROM "ImportJob" WHERE id = :id LIMIT 1'), + {"id": job_id}, + ) + ).mappings().first() + if not row: + raise HTTPException(status_code=404, detail="Import job not found") + + source_asset_id = str(row["sourceAssetId"]) + removed_files = 0 + removed_dir = False + + if removeContent: + asset_dir = Path(settings.nas_content_root) / source_asset_id + if asset_dir.exists() and asset_dir.is_dir(): + files = list(asset_dir.glob("**/*")) + for p in files: + if p.is_file(): + p.unlink(missing_ok=True) + removed_files += 1 + for p in sorted(asset_dir.glob("**/*"), key=lambda x: len(x.parts), reverse=True): + if p.is_dir(): + p.rmdir() + asset_dir.rmdir() + removed_dir = True + + await db.execute(text('DELETE FROM "ImportJob" WHERE id = :id'), {"id": job_id}) + await db.execute(text('DELETE FROM "AssetNovelMapping" WHERE "sourceAssetId" = :asset_id'), {"asset_id": source_asset_id}) + await db.commit() + + return { + "jobId": job_id, + "deleted": True, + "sourceAssetId": source_asset_id, + "removeContent": removeContent, + "removedFiles": removed_files, + "removedDir": removed_dir, + } + + @app.post("/api/truyen/{novel_id}/rate") async def rate_novel(novel_id: str, payload: RatePayload, db: AsyncSession = Depends(get_db_session)): row = ( @@ -1118,7 +2058,7 @@ async def create_comment( request: Request, db: AsyncSession = Depends(get_db_session), ): - user = await require_current_user(db, request) + user = await require_current_user(request, db) content = payload.content.strip() if not content: raise HTTPException(status_code=400, detail="Content is required") @@ -1156,7 +2096,7 @@ async def create_comment( @app.get("/api/user/bookmarks") async def list_bookmarks(request: Request, db: AsyncSession = Depends(get_db_session)): - user = await require_current_user(db, request) + user = await require_current_user(request, db) rows = ( await db.execute( @@ -1206,7 +2146,7 @@ class BookmarkPayload(BaseModel): @app.post("/api/user/bookmarks") async def upsert_bookmark(payload: BookmarkPayload, request: Request, db: AsyncSession = Depends(get_db_session)): - user = await require_current_user(db, request) + user = await require_current_user(request, db) action = (payload.action or "").strip() existing = ( @@ -1296,7 +2236,7 @@ async def upsert_bookmark(payload: BookmarkPayload, request: Request, db: AsyncS @app.delete("/api/user/bookmarks/{novel_id}") async def delete_bookmark(novel_id: str, request: Request, db: AsyncSession = Depends(get_db_session)): - user = await require_current_user(db, request) + user = await require_current_user(request, db) row = ( await db.execute( text( @@ -1331,7 +2271,7 @@ async def update_reading_progress( request: Request, db: AsyncSession = Depends(get_db_session), ): - user = await require_current_user(db, request) + user = await require_current_user(request, db) result = await _update_reading_progress( db, user["id"], @@ -1345,7 +2285,7 @@ async def update_reading_progress( @app.get("/api/user/profile") async def profile(request: Request, db: AsyncSession = Depends(get_db_session)): - user = await require_current_user(db, request) + user = await require_current_user(request, db) return { "id": user["id"], "email": user.get("email"), @@ -1357,7 +2297,7 @@ async def profile(request: Request, db: AsyncSession = Depends(get_db_session)): @app.get("/api/user/settings") async def get_user_settings(request: Request, db: AsyncSession = Depends(get_db_session)): - user = await require_current_user(db, request) + user = await require_current_user(request, db) row = ( await db.execute( text( @@ -1392,7 +2332,7 @@ async def save_user_settings( request: Request, db: AsyncSession = Depends(get_db_session), ): - user = await require_current_user(db, request) + user = await require_current_user(request, db) existing = ( await db.execute( @@ -1451,7 +2391,7 @@ async def save_user_settings( @app.get("/api/user/recommendations") async def list_recommendations(request: Request, db: AsyncSession = Depends(get_db_session)): - user = await require_current_user(db, request) + user = await require_current_user(request, db) docs = ( await mongo_db["userrecommendations"] @@ -1503,7 +2443,7 @@ async def create_recommendation( request: Request, db: AsyncSession = Depends(get_db_session), ): - user = await require_current_user(db, request) + user = await require_current_user(request, db) novel_exists = ( await db.execute( @@ -1545,7 +2485,7 @@ async def delete_recommendation( novelId: str = Query(...), db: AsyncSession = Depends(get_db_session), ): - user = await require_current_user(db, request) + user = await require_current_user(request, db) existing = await mongo_db["userrecommendations"].find_one( {"userId": user["id"], "novelId": novelId}, {"_id": 1} @@ -1664,7 +2604,7 @@ async def mobile_login(payload: MobileLoginPayload, db: AsyncSession = Depends(g @app.get("/api/auth/session") async def auth_session(request: Request, db: AsyncSession = Depends(get_db_session)): - user = await require_current_user(db, request) + user = await require_current_user(request, db) return { "user": { "id": user["id"], diff --git a/app/storage.py b/app/storage.py new file mode 100644 index 0000000..9206f6e --- /dev/null +++ b/app/storage.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import hashlib +from pathlib import Path + +from app.config import settings + + +class NasContentStorage: + def __init__(self, root_dir: str): + self.root = Path(root_dir).resolve() + self.root.mkdir(parents=True, exist_ok=True) + + def _resolve(self, href: str) -> Path: + rel = href.strip().lstrip("/") + target = (self.root / rel).resolve() + if self.root not in target.parents and target != self.root: + raise ValueError("Invalid storage href") + return target + + def read_text(self, href: str) -> str: + path = self._resolve(href) + return path.read_text(encoding="utf-8") + + def write_text(self, href: str, content: str) -> dict[str, str | int]: + path = self._resolve(href) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(content, encoding="utf-8") + digest = hashlib.sha256(content.encode("utf-8")).hexdigest() + return {"href": href, "sha256": digest, "size": len(content.encode("utf-8"))} + + +storage = NasContentStorage(settings.nas_content_root) diff --git a/docker-compose.yml b/docker-compose.yml index 49e7a29..3361baf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,6 +9,12 @@ services: - .env ports: - "8000:8000" + environment: + NAS_CONTENT_ROOT: ${NAS_CONTENT_ROOT:-/data/content} + EPUB_SOURCE_ROOT: ${EPUB_SOURCE_ROOT:-/data/epub-source} + volumes: + - nas_chapter_content:/data/content + - nas_epub_source:/data/epub-source restart: unless-stopped healthcheck: test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8000/api/health').read()"] @@ -30,8 +36,13 @@ services: environment: DATABASE_URL: postgresql://reader:reader@postgres:5432/reader MONGODB_URI: mongodb://mongo:27017/reader + NAS_CONTENT_ROOT: ${NAS_CONTENT_ROOT:-/data/content} + EPUB_SOURCE_ROOT: ${EPUB_SOURCE_ROOT:-/data/epub-source} ports: - "8001:8000" + volumes: + - nas_chapter_content:/data/content + - nas_epub_source:/data/epub-source restart: unless-stopped healthcheck: test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8000/api/health').read()"] @@ -104,3 +115,5 @@ volumes: web_uploads: postgres_data: mongo_data: + nas_chapter_content: + nas_epub_source: diff --git a/migrations/2026_04_nas_content_storage.sql b/migrations/2026_04_nas_content_storage.sql new file mode 100644 index 0000000..c2064da --- /dev/null +++ b/migrations/2026_04_nas_content_storage.sql @@ -0,0 +1,43 @@ +CREATE EXTENSION IF NOT EXISTS unaccent; + +CREATE TABLE IF NOT EXISTS "SourceAsset" ( + id TEXT PRIMARY KEY, + path TEXT NOT NULL, + sha256 TEXT NOT NULL, + opf_identifier TEXT, + title TEXT, + author TEXT, + status TEXT NOT NULL DEFAULT 'discovered', + "createdAt" TIMESTAMPTZ NOT NULL DEFAULT NOW(), + "updatedAt" TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE UNIQUE INDEX IF NOT EXISTS "SourceAsset_sha256_key" ON "SourceAsset"(sha256); + +CREATE TABLE IF NOT EXISTS "ImportJob" ( + id TEXT PRIMARY KEY, + "sourceAssetId" TEXT NOT NULL REFERENCES "SourceAsset"(id) ON DELETE CASCADE, + status TEXT NOT NULL DEFAULT 'pending', + error TEXT, + "createdAt" TIMESTAMPTZ NOT NULL DEFAULT NOW(), + "updatedAt" TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS "ChapterContentRef" ( + "chapterId" TEXT PRIMARY KEY, + "txtHref" TEXT NOT NULL, + "rawHtmlHref" TEXT NOT NULL, + "contentHash" TEXT NOT NULL, + "createdAt" TIMESTAMPTZ NOT NULL DEFAULT NOW(), + "updatedAt" TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS "AssetNovelMapping" ( + id TEXT PRIMARY KEY, + "sourceAssetId" TEXT NOT NULL REFERENCES "SourceAsset"(id) ON DELETE CASCADE, + "novelId" TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + note TEXT, + "createdAt" TIMESTAMPTZ NOT NULL DEFAULT NOW(), + "updatedAt" TIMESTAMPTZ NOT NULL DEFAULT NOW() +); diff --git a/scripts/backfill_chapter_content_refs.py b/scripts/backfill_chapter_content_refs.py new file mode 100644 index 0000000..bd6b498 --- /dev/null +++ b/scripts/backfill_chapter_content_refs.py @@ -0,0 +1,118 @@ +from __future__ import annotations + +import argparse +import asyncio +import hashlib +import json +from pathlib import Path +from bson import ObjectId +from sqlalchemy import text + +from app.config import settings +from app.database import SessionLocal, mongo_db +from app.storage import storage + + +async def backfill(limit: int, dry_run: bool, after_id: str | None, state_file: str | None) -> None: + query = { + "$or": [ + {"content": {"$exists": True, "$type": "string", "$ne": ""}}, + {"contentHtml": {"$exists": True, "$type": "string", "$ne": ""}}, + ] + } + if after_id: + query["_id"] = {"$gt": ObjectId(after_id)} + + docs = ( + await mongo_db["chapters"] + .find(query, {"content": 1, "contentHtml": 1}) + .sort("_id", 1) + .limit(limit) + .to_list(limit) + ) + + mapped = 0 + skipped = 0 + async with SessionLocal() as db: + for doc in docs: + chapter_id = str(doc.get("_id") or "") + if not chapter_id: + skipped += 1 + continue + + exists = ( + await db.execute( + text('SELECT "chapterId" FROM "ChapterContentRef" WHERE "chapterId" = :id LIMIT 1'), + {"id": chapter_id}, + ) + ).mappings().first() + if exists: + skipped += 1 + continue + + txt = str(doc.get("content") or "").strip() + raw_html = str(doc.get("contentHtml") or doc.get("content") or "") + if not txt: + skipped += 1 + continue + + txt_href = f"legacy/{chapter_id}.txt" + raw_href = f"legacy/{chapter_id}.raw.html" + content_hash = hashlib.sha256(txt.encode("utf-8")).hexdigest() + + if not dry_run: + storage.write_text(txt_href, txt) + storage.write_text(raw_href, raw_html) + await db.execute( + text( + 'INSERT INTO "ChapterContentRef" ("chapterId", "txtHref", "rawHtmlHref", "contentHash") ' + 'VALUES (:chapter_id, :txt_href, :raw_href, :hash) ' + 'ON CONFLICT ("chapterId") DO NOTHING' + ), + { + "chapter_id": chapter_id, + "txt_href": txt_href, + "raw_href": raw_href, + "hash": content_hash, + }, + ) + mapped += 1 + + if not dry_run: + await db.commit() + + last_id = str(docs[-1]["_id"]) if docs else None + summary = { + "scanned": len(docs), + "mapped": mapped, + "skipped": skipped, + "dryRun": dry_run, + "contentRoot": settings.nas_content_root, + "nextAfterId": last_id, + } + if state_file and last_id and not dry_run: + Path(state_file).write_text(json.dumps({"afterId": last_id}, ensure_ascii=True), encoding="utf-8") + print(summary) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Backfill ChapterContentRef from Mongo chapters") + parser.add_argument("--limit", type=int, default=1000) + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--after-id", type=str, default="") + parser.add_argument("--state-file", type=str, default="") + args = parser.parse_args() + after_id = args.after_id.strip() or None + state_file = args.state_file.strip() or None + if state_file and not after_id: + p = Path(state_file) + if p.exists(): + try: + after_id = json.loads(p.read_text(encoding="utf-8")).get("afterId") + except Exception: + after_id = None + asyncio.run(backfill(limit=args.limit, dry_run=args.dry_run, after_id=after_id, state_file=state_file)) + + +if __name__ == "__main__": + main()