From baad2b3e96706341ce9d737cc6e152f2882d1b11 Mon Sep 17 00:00:00 2001 From: wisecolt Date: Thu, 12 Mar 2026 22:30:43 +0300 Subject: [PATCH] feat: watcher akislarini ve wscraper servis entegrasyonunu ekle --- .env.example | 11 + .gitignore | 3 +- Dockerfile | 7 +- README.md | 68 +- apps/server/Dockerfile | 7 +- apps/server/src/config.ts | 9 + apps/server/src/index.ts | 4 + apps/server/src/loop/loop.scheduler.ts | 4 +- apps/server/src/qbit/qbit.client.ts | 8 + apps/server/src/qbit/qbit.types.ts | 7 + apps/server/src/realtime/emitter.ts | 13 + apps/server/src/realtime/events.ts | 3 + apps/server/src/realtime/socket.ts | 4 + apps/server/src/storage/jsondb.ts | 20 + apps/server/src/storage/paths.ts | 2 + apps/server/src/types.ts | 85 ++ apps/server/src/watcher/watcher.crypto.ts | 42 + apps/server/src/watcher/watcher.registry.ts | 14 + apps/server/src/watcher/watcher.routes.ts | 116 +++ apps/server/src/watcher/watcher.scraper.ts | 102 +++ apps/server/src/watcher/watcher.service.ts | 742 ++++++++++++++++++ apps/server/src/watcher/watcher.types.ts | 35 + apps/server/src/watcher/watcher.worker.ts | 34 + apps/web/src/App.tsx | 2 + apps/web/src/components/layout/AppLayout.tsx | 21 + apps/web/src/components/ui/Textarea.tsx | 22 + apps/web/src/pages/WatcherPage.tsx | 782 +++++++++++++++++++ apps/web/src/socket/socket.ts | 12 + apps/web/src/store/useAppStore.ts | 95 +++ bin/wscraper-service/server.py | 168 ++++ docker-compose.dev.yml | 18 +- docker-compose.yml | 13 +- package.json | 4 + scripts/bootstrap.sh | 197 +++++ 34 files changed, 2663 insertions(+), 11 deletions(-) create mode 100644 apps/server/src/watcher/watcher.crypto.ts create mode 100644 apps/server/src/watcher/watcher.registry.ts create mode 100644 apps/server/src/watcher/watcher.routes.ts create mode 100644 apps/server/src/watcher/watcher.scraper.ts create mode 100644 apps/server/src/watcher/watcher.service.ts create mode 100644 apps/server/src/watcher/watcher.types.ts create mode 100644 apps/server/src/watcher/watcher.worker.ts create mode 100644 apps/web/src/components/ui/Textarea.tsx create mode 100644 apps/web/src/pages/WatcherPage.tsx create mode 100644 bin/wscraper-service/server.py create mode 100755 scripts/bootstrap.sh diff --git a/.env.example b/.env.example index a9c2af3..7e68fea 100644 --- a/.env.example +++ b/.env.example @@ -9,10 +9,21 @@ SERVER_PORT=3001 WEB_PORT=5173 WEB_ORIGIN=http://localhost:5173 WEB_ALLOWED_ORIGINS=http://192.168.1.205:5175,http://qbuffer.bee +WEB_ALLOWED_HOSTS=localhost,192.168.1.205,qbuffer.bee POLL_INTERVAL_MS=3000 ENFORCE_INTERVAL_MS=2000 DEFAULT_DELAY_MS=3000 MAX_LOOP_LIMIT=1000 STALLED_RECOVERY_MS=300000 TIMER_POLL_MS=60000 +WATCHER_SECRET_KEY=replace_me_watcher +WATCHER_ENABLED=true +WATCHER_TICK_MS=30000 +WATCHER_TIMEOUT_MS=180000 +WATCHER_RUNTIME_DIR=/tmp/qbuffer-watchers +WSCRAPER_SERVICE_BASE_URL=http://host.docker.internal:8787 +WSCRAPER_SERVICE_TOKEN= +WSCRAPER_SERVICE_HOST=0.0.0.0 +WSCRAPER_SERVICE_PORT=8787 +WSCRAPER_SERVICE_PYTHON_BIN=python3.12 NODE_ENV=development diff --git a/.gitignore b/.gitignore index d47242b..755aee5 100644 --- a/.gitignore +++ b/.gitignore @@ -36,6 +36,7 @@ lcov-report/ .vite/ .tmp/ .cache/ +.runtime/ # Docker files / volumes docker/*-volume/ @@ -77,4 +78,4 @@ movie/movieData/**/backdrop.jpg *.log # Client -client/src/assets/avatar_.png \ No newline at end of file +client/src/assets/avatar_.png diff --git a/Dockerfile b/Dockerfile index 6975b4c..d774075 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,9 @@ -FROM node:20-alpine AS base +FROM node:20-bookworm-slim AS base WORKDIR /app -RUN corepack enable +RUN corepack enable \ + && apt-get update \ + && apt-get install -y --no-install-recommends build-essential \ + && rm -rf /var/lib/apt/lists/* FROM base AS deps COPY package.json pnpm-workspace.yaml ./ diff --git a/README.md b/README.md index 7ef7da2..067f7e5 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ q-buffer, qBittorrent üzerinde torrentleri kontrollü şekilde döngüye almay 2) Geliştirme ortamını başlatın: ```bash -docker-compose -f docker-compose.dev.yml up --build +./scripts/bootstrap.sh --dev-mode ``` 3) Açın: @@ -24,6 +24,69 @@ docker-compose -f docker-compose.dev.yml up --build - Web: http://localhost:5173 - API/Socket: http://localhost:3001 +## Watcher Notu + +`Watcher` akışı `wscraper -> scrapling -> Playwright` zincirini kullanır. Playwright DNS ve browser bağımlılıklarını Docker içine taşımak yerine `wscraper-service` host makinede çalışır; `web` ve `server` ise Docker içinde kalır. `server`, host servisle `http://host.docker.internal:8787` üzerinden konuşur. + +`bootstrap.sh` şu işleri tek komutta yapar: + +- Docker servislerini `up --build` ile kaldırır +- host `wscraper-service` için Python venv hazırlar +- eksik Python paketlerini ve Playwright bağımlılıklarını kurar +- `wscraper-service`i başlatır + +`wscraper-service` kurulumu her çalıştırmada sıfırdan yapılmaz. Kurulum daha önce tamamlandıysa script sadece kontrol eder ve eksik yoksa yeniden kurmaz. + +Host makinede Python 3.10+ gerekir. Script sırasıyla `python3.12`, `python3.11`, `python3.10`, `python3` ikililerini dener ve uygun ilk sürümü seçer. Gerekirse `.env` içine `WSCRAPER_SERVICE_PYTHON_BIN=python3.12` benzeri açık bir değer verebilirsiniz. + +Kullanılabilir bayraklar: + +```bash +./scripts/bootstrap.sh --dev-mode +./scripts/bootstrap.sh --prod-mode +./scripts/bootstrap.sh --dev-mode --skip-wscraper-install +./scripts/bootstrap.sh --dev-mode --restart-wscraper +``` + +Docker tarafında normal ağ erişimi hâlâ gereklidir. DNS problemi yaşarsanız Docker Desktop içinde sabit resolver (`8.8.8.8`, `1.1.1.1`) tanımlayın. Docker DNS doğru ayarlanmamışsa: + +- `pnpm install` +- image pull işlemleri +- container içi paket kurulumları + +kurulum sırasında kırılabilir. + +Önerilen Docker Engine ayarı: + +```json +{ + "builder": { + "gc": { + "defaultKeepStorage": "20GB", + "enabled": true + } + }, + "experimental": false, + "dns": ["8.8.8.8", "1.1.1.1"] +} +``` + +Docker Desktop yeniden başladıktan sonra şu testler başarılı olmalıdır: + +```bash +docker run --rm alpine nslookup registry-1.docker.io +docker run --rm alpine nslookup files.pythonhosted.org +docker run --rm alpine nslookup cdn.playwright.dev +``` + +Host servis için kullanılacak ortam değişkenleri: + +- `WSCRAPER_SERVICE_BASE_URL` varsayılan: `http://host.docker.internal:8787` +- `WSCRAPER_SERVICE_TOKEN` varsayılan: boş +- `WSCRAPER_SERVICE_HOST` varsayılan: `0.0.0.0` +- `WSCRAPER_SERVICE_PORT` varsayılan: `8787` +- `WSCRAPER_SERVICE_PYTHON_BIN` örnek: `python3.12` + ## Kullanım (Buffer) 1) qBittorrent’te torrentleri ekleyin (UI listeye düşer). @@ -41,7 +104,7 @@ docker-compose -f docker-compose.dev.yml up --build ## Production ```bash -docker-compose up --build +./scripts/bootstrap.sh --prod-mode ``` Ardından http://localhost:3001 @@ -52,6 +115,7 @@ Ardından http://localhost:3001 - `APP_USERNAME`, `APP_PASSWORD`, `JWT_SECRET` - `POLL_INTERVAL_MS`, `ENFORCE_INTERVAL_MS`, `DEFAULT_DELAY_MS`, `MAX_LOOP_LIMIT` - `WEB_ALLOWED_HOSTS` (ör: `localhost,qbuffer.bee,qbuffer.panda`) +- `WSCRAPER_SERVICE_BASE_URL`, `WSCRAPER_SERVICE_TOKEN` ## Klasör Yapısı diff --git a/apps/server/Dockerfile b/apps/server/Dockerfile index d2af8f6..5cedb8f 100644 --- a/apps/server/Dockerfile +++ b/apps/server/Dockerfile @@ -1,6 +1,9 @@ -FROM node:20-alpine +FROM node:20-bookworm-slim WORKDIR /app -RUN corepack enable +RUN corepack enable \ + && apt-get update \ + && apt-get install -y --no-install-recommends build-essential \ + && rm -rf /var/lib/apt/lists/* COPY package.json /app/apps/server/package.json COPY package.json /app/package.json COPY pnpm-workspace.yaml /app/pnpm-workspace.yaml diff --git a/apps/server/src/config.ts b/apps/server/src/config.ts index 51b82bd..702dea4 100644 --- a/apps/server/src/config.ts +++ b/apps/server/src/config.ts @@ -23,12 +23,21 @@ export const config = { webPort: envNumber(process.env.WEB_PORT, 5173), webOrigin: process.env.WEB_ORIGIN ?? "", webAllowedOrigins: process.env.WEB_ALLOWED_ORIGINS ?? "", + watcherSecretKey: process.env.WATCHER_SECRET_KEY ?? "", + watcherEnabled: (process.env.WATCHER_ENABLED ?? "true").toLowerCase() !== "false", + watcherTickMs: envNumber(process.env.WATCHER_TICK_MS, 30_000), + watcherTimeoutMs: envNumber(process.env.WATCHER_TIMEOUT_MS, 180_000), + watcherRuntimeDir: process.env.WATCHER_RUNTIME_DIR ?? "/tmp/qbuffer-watchers", + wscraperServiceBaseUrl: + process.env.WSCRAPER_SERVICE_BASE_URL ?? "http://host.docker.internal:8787", + wscraperServiceToken: process.env.WSCRAPER_SERVICE_TOKEN ?? "", dataDir: "/app/data", dbPath: "/app/data/db.json", logsPath: "/app/data/logs.json", loopLogsDir: "/app/data/loop-logs", loopLogsArchiveDir: "/app/data/loop-logs-archive", torrentArchiveDir: "/app/data/torrents", + watcherDownloadsDir: "/app/data/watcher-downloads", webPublicDir: path.resolve("/app/apps/server/public"), }; diff --git a/apps/server/src/index.ts b/apps/server/src/index.ts index a609db2..9913b9e 100644 --- a/apps/server/src/index.ts +++ b/apps/server/src/index.ts @@ -15,6 +15,7 @@ import loopRoutes from "./loop/loop.routes" import profilesRoutes from "./loop/profiles.routes" import statusRoutes from "./status/status.routes" import timerRoutes from "./timer/timer.routes" +import watcherRoutes from "./watcher/watcher.routes" import { QbitClient } from "./qbit/qbit.client" import { detectCapabilities } from "./qbit/qbit.capabilities" import { setQbitClient, setQbitCapabilities } from "./qbit/qbit.context" @@ -24,6 +25,7 @@ import { initEmitter, emitQbitHealth } from "./realtime/emitter" import { startLoopScheduler } from "./loop/loop.scheduler" import { startEnforcementWorker } from "./enforcement/enforcement.worker" import { startTimerWorker } from "./timer/timer.worker" +import { startWatcherWorker } from "./watcher/watcher.worker" import { logger } from "./utils/logger" const crashLogPath = "/app/data/crash.log"; @@ -94,6 +96,7 @@ const bootstrap = async () => { app.use("/api/profiles", requireAuth, profilesRoutes); app.use("/api/status", requireAuth, statusRoutes); app.use("/api/timer", requireAuth, timerRoutes); + app.use("/api/watchers", requireAuth, watcherRoutes); if (!isDev) { app.use(express.static(config.webPublicDir)); @@ -126,6 +129,7 @@ const bootstrap = async () => { startLoopScheduler(qbit, config.pollIntervalMs); startEnforcementWorker(config.enforceIntervalMs); startTimerWorker(qbit, config.timerPollMs); + startWatcherWorker(config.watcherTickMs); server.listen(config.port, () => { serverStarted = true; diff --git a/apps/server/src/loop/loop.scheduler.ts b/apps/server/src/loop/loop.scheduler.ts index 3bd7b5d..749a7bf 100644 --- a/apps/server/src/loop/loop.scheduler.ts +++ b/apps/server/src/loop/loop.scheduler.ts @@ -1,8 +1,9 @@ import { QbitClient } from "../qbit/qbit.client" import { tickLoopJobs } from "./loop.engine" import { getStatusSnapshot, refreshJobsStatus, setQbitStatus, setTorrentsStatus } from "../status/status.service" -import { emitQbitHealth, emitStatusUpdate } from "../realtime/emitter" +import { emitQbitHealth, emitStatusUpdate, emitWatcherItems } from "../realtime/emitter" import { logger } from "../utils/logger" +import { getWatcherItems } from "../watcher/watcher.service" export const startLoopScheduler = (qbit: QbitClient, intervalMs: number) => { setInterval(async () => { @@ -19,6 +20,7 @@ export const startLoopScheduler = (qbit: QbitClient, intervalMs: number) => { transfer, jobs, }); + emitWatcherItems(await getWatcherItems()); } catch (error) { const message = error instanceof Error ? error.message : "Unknown error"; logger.error({ error }, "Loop scheduler tick failed"); diff --git a/apps/server/src/qbit/qbit.client.ts b/apps/server/src/qbit/qbit.client.ts index 47cccd6..166c936 100644 --- a/apps/server/src/qbit/qbit.client.ts +++ b/apps/server/src/qbit/qbit.client.ts @@ -6,6 +6,7 @@ import fs from "node:fs"; import { config } from "../config" import { logger } from "../utils/logger" import { + QbitCategory, QbitPeerList, QbitTorrentInfo, QbitTorrentProperties, @@ -96,6 +97,13 @@ export class QbitClient { return response.data; } + async getCategories(): Promise> { + const response = await this.request(() => + this.client.get>("/api/v2/torrents/categories") + ); + return response.data; + } + async getTorrentProperties(hash: string): Promise { const response = await this.request(() => this.client.get("/api/v2/torrents/properties", { diff --git a/apps/server/src/qbit/qbit.types.ts b/apps/server/src/qbit/qbit.types.ts index c857cb0..891b82d 100644 --- a/apps/server/src/qbit/qbit.types.ts +++ b/apps/server/src/qbit/qbit.types.ts @@ -11,6 +11,8 @@ export interface QbitTorrentInfo { tags?: string; category?: string; tracker?: string; + num_seeds?: number; + num_leechs?: number; added_on?: number; seeding_time?: number; uploaded?: number; @@ -51,3 +53,8 @@ export interface QbitCapabilities { hasPeersEndpoint: boolean; hasBanEndpoint: boolean; } + +export interface QbitCategory { + name?: string; + savePath?: string; +} diff --git a/apps/server/src/realtime/emitter.ts b/apps/server/src/realtime/emitter.ts index 01bc0f5..e9e41bc 100644 --- a/apps/server/src/realtime/emitter.ts +++ b/apps/server/src/realtime/emitter.ts @@ -2,6 +2,7 @@ import { Server } from "socket.io"; import { EVENTS } from "./events"; import { StatusSnapshot } from "../status/status.service"; import { LoopJob, TimerLog, TimerSummary } from "../types"; +import { EnrichedWatcherItem, WatcherListItem, WatcherSummaryResponse } from "../watcher/watcher.types"; let io: Server | null = null; @@ -41,3 +42,15 @@ export const emitTimerLog = (payload: TimerLog) => { export const emitTimerSummary = (payload: TimerSummary) => { io?.emit(EVENTS.TIMER_SUMMARY, payload); }; + +export const emitWatchersList = (payload: WatcherListItem[]) => { + io?.emit(EVENTS.WATCHERS_LIST, payload); +}; + +export const emitWatcherItems = (payload: EnrichedWatcherItem[]) => { + io?.emit(EVENTS.WATCHER_ITEMS, payload); +}; + +export const emitWatcherSummary = (payload: WatcherSummaryResponse) => { + io?.emit(EVENTS.WATCHER_SUMMARY, payload); +}; diff --git a/apps/server/src/realtime/events.ts b/apps/server/src/realtime/events.ts index 3589dca..21de749 100644 --- a/apps/server/src/realtime/events.ts +++ b/apps/server/src/realtime/events.ts @@ -6,4 +6,7 @@ export const EVENTS = { QBIT_HEALTH: "qbit:health", TIMER_LOG: "timer:log", TIMER_SUMMARY: "timer:summary", + WATCHERS_LIST: "watchers:list", + WATCHER_ITEMS: "watcher:items", + WATCHER_SUMMARY: "watcher:summary", }; diff --git a/apps/server/src/realtime/socket.ts b/apps/server/src/realtime/socket.ts index 6126f3b..bf2206a 100644 --- a/apps/server/src/realtime/socket.ts +++ b/apps/server/src/realtime/socket.ts @@ -4,6 +4,7 @@ import { verifyToken } from "../auth/auth.service"; import { getStatusSnapshot } from "../status/status.service"; import { EVENTS } from "./events"; import { config, isDev } from "../config"; +import { getWatcherItems, getWatcherSummary, listWatchers } from "../watcher/watcher.service"; const parseCookies = (cookieHeader?: string) => { const cookies: Record = {}; @@ -53,6 +54,9 @@ export const createSocketServer = (server: http.Server) => { io.on("connection", async (socket) => { const snapshot = await getStatusSnapshot(); socket.emit(EVENTS.STATUS_SNAPSHOT, snapshot); + socket.emit(EVENTS.WATCHERS_LIST, await listWatchers()); + socket.emit(EVENTS.WATCHER_SUMMARY, await getWatcherSummary()); + socket.emit(EVENTS.WATCHER_ITEMS, await getWatcherItems()); }); return io; diff --git a/apps/server/src/storage/jsondb.ts b/apps/server/src/storage/jsondb.ts index a5cab65..ede86ad 100644 --- a/apps/server/src/storage/jsondb.ts +++ b/apps/server/src/storage/jsondb.ts @@ -26,6 +26,16 @@ const defaultDb = (): DbSchema => ({ totalUploadedBytes: 0, updatedAt: new Date().toISOString(), }, + watchers: [], + watcherItems: [], + watcherRuns: [], + watcherSummary: { + activeWatchers: 0, + totalImported: 0, + totalSeen: 0, + trackedLabels: [], + updatedAt: new Date().toISOString(), + }, }); const rotateBackups = async (dbPath: string) => { @@ -71,6 +81,16 @@ export const readDb = async (): Promise => { totalUploadedBytes: 0, updatedAt: new Date().toISOString(), }; + parsed.watchers ??= []; + parsed.watcherItems ??= []; + parsed.watcherRuns ??= []; + parsed.watcherSummary ??= { + activeWatchers: 0, + totalImported: 0, + totalSeen: 0, + trackedLabels: [], + updatedAt: new Date().toISOString(), + }; return parsed; } catch (error) { if ((error as NodeJS.ErrnoException).code !== "ENOENT") { diff --git a/apps/server/src/storage/paths.ts b/apps/server/src/storage/paths.ts index 04f93de..177439c 100644 --- a/apps/server/src/storage/paths.ts +++ b/apps/server/src/storage/paths.ts @@ -6,4 +6,6 @@ export const ensureDataPaths = async () => { await fs.mkdir(config.torrentArchiveDir, { recursive: true }); await fs.mkdir(config.loopLogsDir, { recursive: true }); await fs.mkdir(config.loopLogsArchiveDir, { recursive: true }); + await fs.mkdir(config.watcherDownloadsDir, { recursive: true }); + await fs.mkdir(config.watcherRuntimeDir, { recursive: true }); }; diff --git a/apps/server/src/types.ts b/apps/server/src/types.ts index cf2395a..5bb780e 100644 --- a/apps/server/src/types.ts +++ b/apps/server/src/types.ts @@ -98,6 +98,87 @@ export interface TimerSummary { updatedAt: string; } +export type WatcherTracker = "happyfappy"; + +export type WatcherItemStatus = + | "bookmarked" + | "downloading_torrent" + | "sending_to_qbit" + | "sent_to_qbit" + | "downloading" + | "completed" + | "failed"; + +export interface Watcher { + id: string; + tracker: WatcherTracker; + trackerLabel: string; + category?: string; + cookieEncrypted: string; + cookieHint: string; + intervalMinutes: number; + enabled: boolean; + lastRunAt?: string; + lastSuccessAt?: string; + lastError?: string; + nextRunAt?: string; + totalImported: number; + totalSeen: number; + createdAt: string; + updatedAt: string; +} + +export interface WatcherItem { + id: string; + watcherId: string; + tracker: WatcherTracker; + trackerLabel: string; + sourceKey: string; + pageUrl: string; + title: string; + imageUrl?: string; + status: WatcherItemStatus; + statusLabel: string; + qbitHash?: string; + qbitName?: string; + qbitProgress?: number; + qbitState?: string; + qbitCategory?: string; + sizeBytes?: number; + seeders?: number; + leechers?: number; + trackerTorrentId?: string; + seenAt: string; + downloadedAt?: string; + importedAt?: string; + lastSyncAt: string; + errorMessage?: string; +} + +export interface WatcherRun { + id: string; + watcherId: string; + startedAt: string; + finishedAt?: string; + status: "RUNNING" | "SUCCESS" | "FAILED"; + newBookmarks: number; + importedCount: number; + failedCount: number; + message?: string; +} + +export interface WatcherSummary { + activeWatchers: number; + totalImported: number; + totalSeen: number; + trackedLabels: string[]; + lastRunAt?: string; + lastSuccessAt?: string; + nextRunAt?: string; + lastError?: string; + updatedAt: string; +} + export interface AuditLog { id: string; level: "INFO" | "WARN" | "ERROR"; @@ -133,4 +214,8 @@ export interface DbSchema { timerRules?: TimerRule[]; timerLogs?: TimerLog[]; timerSummary?: TimerSummary; + watchers?: Watcher[]; + watcherItems?: WatcherItem[]; + watcherRuns?: WatcherRun[]; + watcherSummary?: WatcherSummary; } diff --git a/apps/server/src/watcher/watcher.crypto.ts b/apps/server/src/watcher/watcher.crypto.ts new file mode 100644 index 0000000..f76e9be --- /dev/null +++ b/apps/server/src/watcher/watcher.crypto.ts @@ -0,0 +1,42 @@ +import crypto from "node:crypto"; +import { config } from "../config"; + +const ensureSecret = () => { + if (!config.watcherSecretKey) { + throw new Error("WATCHER_SECRET_KEY missing"); + } + return crypto.createHash("sha256").update(config.watcherSecretKey).digest(); +}; + +export const encryptWatcherCookie = (value: string) => { + const key = ensureSecret(); + const iv = crypto.randomBytes(12); + const cipher = crypto.createCipheriv("aes-256-gcm", key, iv); + const encrypted = Buffer.concat([cipher.update(value, "utf8"), cipher.final()]); + const tag = cipher.getAuthTag(); + return Buffer.concat([iv, tag, encrypted]).toString("base64"); +}; + +export const decryptWatcherCookie = (payload: string) => { + const key = ensureSecret(); + const source = Buffer.from(payload, "base64"); + const iv = source.subarray(0, 12); + const tag = source.subarray(12, 28); + const encrypted = source.subarray(28); + const decipher = crypto.createDecipheriv("aes-256-gcm", key, iv); + decipher.setAuthTag(tag); + return Buffer.concat([decipher.update(encrypted), decipher.final()]).toString("utf8"); +}; + +export const buildCookieHint = (value: string) => { + const segments = value + .split(/[;\n]/) + .map((entry) => entry.trim()) + .filter(Boolean) + .slice(0, 2) + .map((entry) => { + const [name] = entry.split("=", 1); + return `${name}=...`; + }); + return segments.length > 0 ? segments.join("; ") : "Cookie kayitli"; +}; diff --git a/apps/server/src/watcher/watcher.registry.ts b/apps/server/src/watcher/watcher.registry.ts new file mode 100644 index 0000000..5698765 --- /dev/null +++ b/apps/server/src/watcher/watcher.registry.ts @@ -0,0 +1,14 @@ +import { TrackerDefinition } from "./watcher.types"; + +export const trackerRegistry: TrackerDefinition[] = [ + { + key: "happyfappy", + label: "HappyFappy", + cliSiteKey: "happyfappy", + supportsRemoveBookmark: true, + }, +]; + +export const getTrackerDefinition = (key: string) => { + return trackerRegistry.find((tracker) => tracker.key === key) ?? null; +}; diff --git a/apps/server/src/watcher/watcher.routes.ts b/apps/server/src/watcher/watcher.routes.ts new file mode 100644 index 0000000..b72f616 --- /dev/null +++ b/apps/server/src/watcher/watcher.routes.ts @@ -0,0 +1,116 @@ +import { Router } from "express"; +import { z } from "zod"; +import { + createWatcher, + deleteWatcher, + fetchWatcherImage, + getQbitCategories, + getWatcherItems, + getWatcherSummary, + listTrackers, + listWatchers, + runWatcherById, + updateWatcher, +} from "./watcher.service"; + +const router = Router(); + +const createWatcherSchema = z.object({ + tracker: z.string().min(1), + cookie: z.string().min(1), + intervalMinutes: z.number().int().min(1).max(24 * 60), + category: z.string().optional(), + enabled: z.boolean().optional(), +}); + +const updateWatcherSchema = z.object({ + cookie: z.string().min(1).optional(), + intervalMinutes: z.number().int().min(1).max(24 * 60).optional(), + category: z.string().optional(), + enabled: z.boolean().optional(), +}); + +router.get("/trackers", (_req, res) => { + return res.json(listTrackers()); +}); + +router.get("/categories", async (_req, res) => { + try { + return res.json(await getQbitCategories()); + } catch (error) { + return res.status(400).json({ error: error instanceof Error ? error.message : "Failed to load qBittorrent categories" }); + } +}); + +router.get("/", async (_req, res) => { + return res.json(await listWatchers()); +}); + +router.post("/", async (req, res) => { + const parsed = createWatcherSchema.safeParse(req.body ?? {}); + if (!parsed.success) { + return res.status(400).json({ error: "Invalid watcher payload" }); + } + try { + const watcher = await createWatcher(parsed.data); + return res.status(201).json(watcher); + } catch (error) { + return res.status(400).json({ error: error instanceof Error ? error.message : "Failed to create watcher" }); + } +}); + +router.patch("/:id", async (req, res) => { + const parsed = updateWatcherSchema.safeParse(req.body ?? {}); + if (!parsed.success) { + return res.status(400).json({ error: "Invalid watcher payload" }); + } + try { + const watcher = await updateWatcher(req.params.id, parsed.data); + return res.json(watcher); + } catch (error) { + return res.status(400).json({ error: error instanceof Error ? error.message : "Failed to update watcher" }); + } +}); + +router.delete("/:id", async (req, res) => { + try { + return res.json(await deleteWatcher(req.params.id)); + } catch (error) { + return res.status(400).json({ error: error instanceof Error ? error.message : "Failed to delete watcher" }); + } +}); + +router.post("/:id/run", async (req, res) => { + try { + await runWatcherById(req.params.id); + return res.json({ ok: true }); + } catch (error) { + return res.status(400).json({ error: error instanceof Error ? error.message : "Watcher run failed" }); + } +}); + +router.get("/summary", async (_req, res) => { + return res.json(await getWatcherSummary()); +}); + +router.get("/items", async (_req, res) => { + return res.json(await getWatcherItems()); +}); + +router.get("/image", async (req, res) => { + const watcherId = String(req.query.watcherId ?? ""); + const imageUrl = String(req.query.url ?? ""); + if (!watcherId || !imageUrl) { + return res.status(400).json({ error: "Missing watcherId or url" }); + } + try { + const image = await fetchWatcherImage(watcherId, imageUrl); + res.setHeader("Content-Type", image.contentType); + res.setHeader("Cache-Control", "private, max-age=300"); + return res.send(image.data); + } catch (error) { + return res.status(400).json({ error: error instanceof Error ? error.message : "Failed to fetch image" }); + } +}); + +export default router; diff --git a/apps/server/src/watcher/watcher.scraper.ts b/apps/server/src/watcher/watcher.scraper.ts new file mode 100644 index 0000000..8dbb084 --- /dev/null +++ b/apps/server/src/watcher/watcher.scraper.ts @@ -0,0 +1,102 @@ +import { randomUUID } from "node:crypto"; +import fs from "node:fs/promises"; +import path from "node:path"; +import axios from "axios"; +import { config } from "../config"; +import { logger } from "../utils/logger"; +import { BookmarkRecord, ScraperRunPaths } from "./watcher.types"; + +const ensureDir = async (target: string) => { + await fs.mkdir(target, { recursive: true }); +}; + +export const createScraperRunPaths = async (watcherId: string): Promise => { + const runDir = path.join(config.watcherRuntimeDir, watcherId, randomUUID()); + const torrentDir = path.join(runDir, "torrent"); + await ensureDir(torrentDir); + return { + runDir, + cookiesPath: path.join(runDir, "cookies.txt"), + bookmarksPath: path.join(runDir, "bookmarks.json"), + torrentDir, + }; +}; + +const scraperClient = axios.create({ + baseURL: config.wscraperServiceBaseUrl, + timeout: config.watcherTimeoutMs, +}); + +const buildHeaders = () => { + if (!config.wscraperServiceToken) { + return undefined; + } + return { + Authorization: `Bearer ${config.wscraperServiceToken}`, + }; +}; + +const request = async (method: "GET" | "POST", url: string, body?: unknown) => { + logger.info( + { + method, + url, + baseUrl: config.wscraperServiceBaseUrl, + }, + "Calling wscraper service" + ); + try { + const response = await scraperClient.request({ + method, + url, + data: body, + headers: buildHeaders(), + }); + return response.data; + } catch (error) { + if (axios.isAxiosError(error)) { + const message = + typeof error.response?.data === "object" && error.response?.data && "error" in error.response.data + ? String((error.response.data as { error: string }).error) + : error.message; + throw new Error(`wscraper-service request failed: ${message}`); + } + throw error; + } +}; + +export const runBookmarkFetch = async ( + trackerSiteKey: string, + cookie: string +) => { + const response = await request<{ items: BookmarkRecord[] }>("POST", "/bookmarks", { + tracker: trackerSiteKey, + cookie, + }); + return response.items; +}; + +export const runTorrentDownload = async ( + trackerSiteKey: string, + cookie: string, + detailUrl: string, + outputDir: string +) => { + const response = await request<{ filename: string; contentBase64: string }>("POST", "/download", { + tracker: trackerSiteKey, + cookie, + url: detailUrl, + removeBookmark: true, + }); + const targetPath = path.join(outputDir, response.filename); + await fs.writeFile(targetPath, Buffer.from(response.contentBase64, "base64")); + return targetPath; +}; + +export const cleanupRunPaths = async (paths: ScraperRunPaths) => { + try { + await fs.rm(paths.runDir, { recursive: true, force: true }); + } catch (error) { + logger.warn({ error, runDir: paths.runDir }, "Failed to cleanup watcher runtime directory"); + } +}; diff --git a/apps/server/src/watcher/watcher.service.ts b/apps/server/src/watcher/watcher.service.ts new file mode 100644 index 0000000..213541b --- /dev/null +++ b/apps/server/src/watcher/watcher.service.ts @@ -0,0 +1,742 @@ +import { randomUUID } from "node:crypto"; +import path from "node:path"; +import axios from "axios"; +import { getQbitClient } from "../qbit/qbit.context"; +import { QbitTorrentInfo } from "../qbit/qbit.types"; +import { readDb, writeDb } from "../storage/jsondb"; +import { Watcher, WatcherItem, WatcherRun, WatcherSummary } from "../types"; +import { nowIso } from "../utils/time"; +import { appendAuditLog, logger } from "../utils/logger"; +import { emitWatcherItems, emitWatchersList, emitWatcherSummary } from "../realtime/emitter"; +import { buildCookieHint, decryptWatcherCookie, encryptWatcherCookie } from "./watcher.crypto"; +import { getTrackerDefinition, trackerRegistry } from "./watcher.registry"; +import { cleanupRunPaths, createScraperRunPaths, runBookmarkFetch, runTorrentDownload } from "./watcher.scraper"; +import { BookmarkRecord, EnrichedWatcherItem, WatcherListItem, WatcherSummaryResponse } from "./watcher.types"; + +const MAX_WATCHER_ITEMS = 500; +const MAX_WATCHER_RUNS = 200; +const QBIT_VERIFY_ATTEMPTS = 5; +const QBIT_VERIFY_DELAY_MS = 2500; +const activeWatcherRuns = new Set(); + +const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + +const toListItem = (watcher: Watcher): WatcherListItem => ({ + ...watcher, + hasCookie: Boolean(watcher.cookieEncrypted), +}); + +const statusLabel = (status: WatcherItem["status"]) => { + switch (status) { + case "bookmarked": + return "Bookmark bulundu"; + case "downloading_torrent": + return "Torrent indiriliyor"; + case "sending_to_qbit": + return "qBittorrent'a gonderiliyor"; + case "sent_to_qbit": + return "qBittorrent'a gonderildi"; + case "downloading": + return "Indiriliyor"; + case "completed": + return "Tamamlandi"; + case "failed": + default: + return "Hata"; + } +}; + +const deriveTrackerTorrentId = (pageUrl: string) => { + try { + const url = new URL(pageUrl); + return url.searchParams.get("id") ?? undefined; + } catch { + return undefined; + } +}; + +const normalizeImageUrl = (pageUrl: string, imageUrl?: string | null) => { + if (!imageUrl?.trim()) { + return undefined; + } + try { + return new URL(imageUrl, pageUrl).toString(); + } catch { + return imageUrl; + } +}; + +const domainMatches = (targetHost: string, cookieDomain: string) => { + const normalizedCookieDomain = cookieDomain.replace(/^\./, "").toLowerCase(); + const normalizedTargetHost = targetHost.toLowerCase(); + return ( + normalizedTargetHost === normalizedCookieDomain || + normalizedTargetHost.endsWith(`.${normalizedCookieDomain}`) + ); +}; + +const toCookieHeader = (cookieValue: string, targetUrl: string) => { + let targetHost = ""; + try { + targetHost = new URL(targetUrl).hostname; + } catch { + return ""; + } + + const lines = cookieValue.split(/\r?\n/); + const looksLikeNetscape = + lines.length > 1 && lines.some((line) => line.includes("\t")); + + if (looksLikeNetscape) { + const pairs: string[] = []; + for (const rawLine of lines) { + const line = rawLine.trim(); + if (!line || line.startsWith("#")) { + continue; + } + const parts = line.split("\t"); + if (parts.length < 7) { + continue; + } + const [domain, , , , , name, value] = parts; + if (!name || !domainMatches(targetHost, domain)) { + continue; + } + pairs.push(`${name}=${value}`); + } + return pairs.join("; "); + } + + return cookieValue + .split(";") + .map((entry) => entry.trim()) + .filter(Boolean) + .join("; "); +}; + +const computeNextRunAt = (intervalMinutes: number) => { + return new Date(Date.now() + intervalMinutes * 60_000).toISOString(); +}; + +const deriveLiveStatus = ( + itemStatus: WatcherItem["status"], + torrent?: QbitTorrentInfo +): WatcherItem["status"] => { + if (!torrent) { + return itemStatus; + } + if ((torrent.progress ?? 0) >= 1) { + return "completed"; + } + if (itemStatus === "failed") { + return "failed"; + } + if ( + (torrent.progress ?? 0) > 0 || + (torrent.dlspeed ?? 0) > 0 || + (torrent.state ?? "").toLowerCase().includes("download") + ) { + return "downloading"; + } + return "sent_to_qbit"; +}; + +const refreshWatcherSummary = async () => { + const db = await readDb(); + const watchers = db.watchers ?? []; + const updatedAt = nowIso(); + const summary: WatcherSummary = { + activeWatchers: watchers.filter((watcher) => watcher.enabled).length, + totalImported: watchers.reduce((sum, watcher) => sum + watcher.totalImported, 0), + totalSeen: watchers.reduce((sum, watcher) => sum + watcher.totalSeen, 0), + trackedLabels: Array.from(new Set(watchers.filter((watcher) => watcher.enabled).map((watcher) => watcher.trackerLabel))), + lastRunAt: watchers + .map((watcher) => watcher.lastRunAt) + .filter(Boolean) + .sort() + .at(-1), + lastSuccessAt: watchers + .map((watcher) => watcher.lastSuccessAt) + .filter(Boolean) + .sort() + .at(-1), + nextRunAt: watchers + .map((watcher) => watcher.nextRunAt) + .filter(Boolean) + .sort()[0], + lastError: watchers.map((watcher) => watcher.lastError).filter(Boolean).at(-1), + updatedAt, + }; + db.watcherSummary = summary; + await writeDb(db); + emitWatcherSummary({ + ...summary, + watchers: watchers.map((watcher) => ({ + id: watcher.id, + tracker: watcher.tracker, + trackerLabel: watcher.trackerLabel, + enabled: watcher.enabled, + lastRunAt: watcher.lastRunAt, + lastSuccessAt: watcher.lastSuccessAt, + nextRunAt: watcher.nextRunAt, + totalImported: watcher.totalImported, + })), + }); + return summary; +}; + +const upsertWatcherItem = (items: WatcherItem[], nextItem: WatcherItem) => { + const index = items.findIndex((item) => item.id === nextItem.id); + if (index >= 0) { + items[index] = nextItem; + } else { + items.unshift(nextItem); + } + return items.slice(0, MAX_WATCHER_ITEMS); +}; + +const mergeQbitState = (item: WatcherItem, torrents: QbitTorrentInfo[]): EnrichedWatcherItem => { + const matched = + torrents.find((torrent) => item.qbitHash && torrent.hash === item.qbitHash) ?? + torrents.find((torrent) => torrent.name === (item.qbitName || item.title)); + const progress = matched?.progress ?? item.qbitProgress ?? 0; + const status = deriveLiveStatus(item.status, matched); + return { + ...item, + status, + statusLabel: statusLabel(status), + progress, + state: matched?.state ?? item.qbitState, + qbitHash: matched?.hash ?? item.qbitHash, + qbitName: matched?.name ?? item.qbitName, + qbitProgress: progress, + qbitState: matched?.state ?? item.qbitState, + qbitCategory: matched?.category ?? item.qbitCategory, + sizeBytes: matched?.size ?? item.sizeBytes, + seeders: matched?.num_seeds ?? item.seeders, + leechers: matched?.num_leechs ?? item.leechers, + }; +}; + +const persistWatcher = async (watcherId: string, update: (watcher: Watcher) => Watcher) => { + const db = await readDb(); + const watchers = db.watchers ?? []; + const index = watchers.findIndex((watcher) => watcher.id === watcherId); + if (index < 0) { + throw new Error("Watcher not found"); + } + watchers[index] = update(watchers[index]); + db.watchers = watchers; + await writeDb(db); + emitWatchersList(watchers.map(toListItem)); + return watchers[index]; +}; + +const persistWatcherProgress = async (payload: { + watcherId: string; + item: WatcherItem; + importedDelta?: number; + seenDelta?: number; + lastError?: string; +}) => { + const db = await readDb(); + db.watcherItems = upsertWatcherItem(db.watcherItems ?? [], payload.item); + db.watchers = (db.watchers ?? []).map((watcher) => + watcher.id === payload.watcherId + ? { + ...watcher, + totalImported: watcher.totalImported + (payload.importedDelta ?? 0), + totalSeen: watcher.totalSeen + (payload.seenDelta ?? 0), + lastError: payload.lastError, + updatedAt: nowIso(), + } + : watcher + ); + await writeDb(db); + emitWatchersList((db.watchers ?? []).map(toListItem)); + emitWatcherItems(await getWatcherItems()); + await refreshWatcherSummary(); +}; + +export const listTrackers = () => trackerRegistry; + +export const listWatchers = async () => { + const db = await readDb(); + return (db.watchers ?? []).map(toListItem); +}; + +export const deleteWatcher = async (watcherId: string) => { + const db = await readDb(); + const watcher = (db.watchers ?? []).find((entry) => entry.id === watcherId); + if (!watcher) { + throw new Error("Watcher not found"); + } + db.watchers = (db.watchers ?? []).filter((entry) => entry.id !== watcherId); + db.watcherItems = (db.watcherItems ?? []).filter((item) => item.watcherId !== watcherId); + db.watcherRuns = (db.watcherRuns ?? []).filter((run) => run.watcherId !== watcherId); + await writeDb(db); + emitWatchersList((db.watchers ?? []).map(toListItem)); + emitWatcherItems(await getWatcherItems()); + await refreshWatcherSummary(); + return { ok: true }; +}; + +export const createWatcher = async (payload: { + tracker: string; + cookie: string; + intervalMinutes: number; + category?: string; + enabled?: boolean; +}) => { + const tracker = getTrackerDefinition(payload.tracker); + if (!tracker) { + throw new Error("Unsupported tracker"); + } + if (!payload.cookie.trim()) { + throw new Error("Cookie is required"); + } + const db = await readDb(); + const watcher: Watcher = { + id: randomUUID(), + tracker: tracker.key, + trackerLabel: tracker.label, + category: payload.category?.trim() || undefined, + cookieEncrypted: encryptWatcherCookie(payload.cookie.trim()), + cookieHint: buildCookieHint(payload.cookie), + intervalMinutes: payload.intervalMinutes, + enabled: payload.enabled ?? true, + nextRunAt: computeNextRunAt(payload.intervalMinutes), + totalImported: 0, + totalSeen: 0, + createdAt: nowIso(), + updatedAt: nowIso(), + }; + db.watchers = [watcher, ...(db.watchers ?? [])]; + await writeDb(db); + emitWatchersList((db.watchers ?? []).map(toListItem)); + await refreshWatcherSummary(); + return toListItem(watcher); +}; + +export const updateWatcher = async ( + watcherId: string, + payload: { cookie?: string; intervalMinutes?: number; category?: string; enabled?: boolean } +) => { + const updated = await persistWatcher(watcherId, (watcher) => { + const next: Watcher = { + ...watcher, + updatedAt: nowIso(), + }; + if (typeof payload.intervalMinutes === "number") { + next.intervalMinutes = payload.intervalMinutes; + next.nextRunAt = computeNextRunAt(payload.intervalMinutes); + } + if (typeof payload.enabled === "boolean") { + next.enabled = payload.enabled; + } + if (typeof payload.category === "string") { + next.category = payload.category.trim() || undefined; + } + if (payload.cookie && payload.cookie.trim()) { + next.cookieEncrypted = encryptWatcherCookie(payload.cookie.trim()); + next.cookieHint = buildCookieHint(payload.cookie); + } + return next; + }); + emitWatchersList((await readDb()).watchers?.map(toListItem) ?? []); + await refreshWatcherSummary(); + return toListItem(updated); +}; + +export const getWatcherSummary = async (): Promise => { + const summary = await refreshWatcherSummary(); + const watchers = await listWatchers(); + return { + ...summary, + watchers: watchers.map((watcher) => ({ + id: watcher.id, + tracker: watcher.tracker, + trackerLabel: watcher.trackerLabel, + enabled: watcher.enabled, + lastRunAt: watcher.lastRunAt, + lastSuccessAt: watcher.lastSuccessAt, + nextRunAt: watcher.nextRunAt, + totalImported: watcher.totalImported, + })), + }; +}; + +export const getWatcherItems = async () => { + const db = await readDb(); + const torrents = await getQbitClient().getTorrentsInfo().catch(() => []); + return (db.watcherItems ?? []).map((item) => mergeQbitState(item, torrents)); +}; + +export const getQbitCategories = async () => { + const qbit = getQbitClient(); + const categories = await qbit.getCategories().catch(async () => { + const torrents = await qbit.getTorrentsInfo(); + return torrents.reduce>((acc, torrent) => { + if (torrent.category?.trim()) { + acc[torrent.category] = { name: torrent.category }; + } + return acc; + }, {}); + }); + return Object.keys(categories).sort((a, b) => a.localeCompare(b, "tr")); +}; + +export const fetchWatcherImage = async (watcherId: string, imageUrl: string) => { + const db = await readDb(); + const watcher = (db.watchers ?? []).find((entry) => entry.id === watcherId); + if (!watcher) { + throw new Error("Watcher not found"); + } + const cookie = toCookieHeader( + decryptWatcherCookie(watcher.cookieEncrypted), + imageUrl + ); + const response = await axios.get(imageUrl, { + responseType: "arraybuffer", + headers: { + ...(cookie ? { Cookie: cookie } : {}), + Referer: "https://www.happyfappy.net/", + "User-Agent": + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36", + }, + }); + return { + contentType: response.headers["content-type"] || "image/jpeg", + data: Buffer.from(response.data), + }; +}; + +const recordRun = async (run: WatcherRun) => { + const db = await readDb(); + db.watcherRuns = [run, ...(db.watcherRuns ?? [])].slice(0, MAX_WATCHER_RUNS); + await writeDb(db); +}; + +const updateRun = async (runId: string, patch: Partial) => { + const db = await readDb(); + const runs = db.watcherRuns ?? []; + const index = runs.findIndex((run) => run.id === runId); + if (index >= 0) { + runs[index] = { ...runs[index], ...patch }; + db.watcherRuns = runs; + await writeDb(db); + } +}; + +const findExistingItem = (items: WatcherItem[], watcherId: string, sourceKey: string) => { + return items.find((item) => item.watcherId === watcherId && item.sourceKey === sourceKey); +}; + +const verifyQbitImport = async ( + torrentPath: string, + bookmark: BookmarkRecord, + options: { category?: string } = {} +) => { + const qbit = getQbitClient(); + const fs = await import("node:fs/promises"); + const { default: parseTorrent } = await import("parse-torrent"); + const parsed = parseTorrent(await fs.readFile(torrentPath)); + const expectedHash = String(parsed.infoHash ?? "").toLowerCase(); + await qbit.addTorrentByFile( + torrentPath, + options.category ? { category: options.category } : {} + ); + let lastSeen: QbitTorrentInfo | undefined; + for (let attempt = 0; attempt < QBIT_VERIFY_ATTEMPTS; attempt += 1) { + const torrents = await qbit.getTorrentsInfo(); + lastSeen = torrents.find((torrent) => { + const sameHash = Boolean(expectedHash) && torrent.hash.toLowerCase() === expectedHash; + const sameName = + torrent.name === path.basename(torrentPath, ".torrent") || torrent.name === bookmark.title; + return sameHash || sameName; + }); + if (lastSeen) { + return { + ok: true, + duplicate: false, + torrent: lastSeen, + }; + } + await delay(QBIT_VERIFY_DELAY_MS); + } + const torrents = await qbit.getTorrentsInfo(); + const duplicate = torrents.find( + (torrent) => + (Boolean(expectedHash) && torrent.hash.toLowerCase() === expectedHash) || + torrent.name === bookmark.title + ); + if (duplicate) { + return { + ok: true, + duplicate: true, + torrent: duplicate, + }; + } + return { + ok: false, + duplicate: false, + torrent: lastSeen, + }; +}; + +const processBookmark = async (watcher: Watcher, bookmark: BookmarkRecord) => { + const tracker = getTrackerDefinition(watcher.tracker); + if (!tracker) { + throw new Error("Unsupported tracker"); + } + const runPaths = await createScraperRunPaths(watcher.id); + const itemId = randomUUID(); + const seenAt = nowIso(); + let item: WatcherItem = { + id: itemId, + watcherId: watcher.id, + tracker: watcher.tracker, + trackerLabel: watcher.trackerLabel, + sourceKey: bookmark.pageURL, + pageUrl: bookmark.pageURL, + title: bookmark.title, + imageUrl: normalizeImageUrl(bookmark.pageURL, bookmark.backgroundImage), + status: "downloading_torrent", + statusLabel: statusLabel("downloading_torrent"), + trackerTorrentId: deriveTrackerTorrentId(bookmark.pageURL), + seenAt, + lastSyncAt: seenAt, + }; + try { + const cookie = decryptWatcherCookie(watcher.cookieEncrypted); + const torrentPath = await runTorrentDownload( + tracker.cliSiteKey, + cookie, + bookmark.pageURL, + runPaths.torrentDir + ); + item = { + ...item, + status: "sending_to_qbit", + statusLabel: statusLabel("sending_to_qbit"), + downloadedAt: nowIso(), + lastSyncAt: nowIso(), + }; + const qbitResult = await verifyQbitImport(torrentPath, bookmark, { + category: watcher.category, + }); + if (!qbitResult.ok || !qbitResult.torrent) { + throw new Error("Torrent qBittorrent listesinde dogrulanamadi"); + } + item = { + ...item, + status: deriveLiveStatus("sent_to_qbit", qbitResult.torrent), + statusLabel: statusLabel(deriveLiveStatus("sent_to_qbit", qbitResult.torrent)), + importedAt: nowIso(), + qbitHash: qbitResult.torrent.hash, + qbitName: qbitResult.torrent.name, + qbitProgress: qbitResult.torrent.progress, + qbitState: qbitResult.torrent.state, + qbitCategory: qbitResult.torrent.category, + sizeBytes: qbitResult.torrent.size, + seeders: qbitResult.torrent.num_seeds, + leechers: qbitResult.torrent.num_leechs, + lastSyncAt: nowIso(), + }; + return item; + } finally { + await cleanupRunPaths(runPaths); + } +}; + +export const runWatcherById = async (watcherId: string) => { + if (activeWatcherRuns.has(watcherId)) { + logger.info({ watcherId }, "Watcher run skipped because another run is already active"); + return; + } + activeWatcherRuns.add(watcherId); + const db = await readDb(); + const watcher = (db.watchers ?? []).find((entry) => entry.id === watcherId); + if (!watcher || !watcher.enabled) { + logger.info({ watcherId }, "Watcher skipped because it is missing or disabled"); + activeWatcherRuns.delete(watcherId); + return; + } + const tracker = getTrackerDefinition(watcher.tracker); + if (!tracker) { + throw new Error("Unsupported tracker"); + } + logger.info({ watcherId, tracker: watcher.tracker, nextRunAt: watcher.nextRunAt }, "Watcher run started"); + const runId = randomUUID(); + await recordRun({ + id: runId, + watcherId: watcher.id, + startedAt: nowIso(), + status: "RUNNING", + newBookmarks: 0, + importedCount: 0, + failedCount: 0, + }); + const runPaths = await createScraperRunPaths(watcher.id); + let newBookmarks = 0; + let importedCount = 0; + let failedCount = 0; + try { + await persistWatcher(watcher.id, (current) => ({ + ...current, + lastRunAt: nowIso(), + lastError: undefined, + nextRunAt: computeNextRunAt(current.intervalMinutes), + updatedAt: nowIso(), + })); + const cookie = decryptWatcherCookie(watcher.cookieEncrypted); + const records = await runBookmarkFetch( + tracker.cliSiteKey, + cookie + ); + logger.info({ watcherId, count: records.length }, "Watcher bookmarks fetched"); + newBookmarks = records.length; + const freshDb = await readDb(); + const items = freshDb.watcherItems ?? []; + const processedSourceKeys = new Set(items.map((item) => item.sourceKey)); + for (const bookmark of records) { + if (processedSourceKeys.has(bookmark.pageURL)) { + logger.info({ watcherId, sourceKey: bookmark.pageURL }, "Watcher bookmark skipped due to dedupe set"); + continue; + } + const existing = findExistingItem(items, watcher.id, bookmark.pageURL); + if (existing) { + existing.lastSyncAt = nowIso(); + await persistWatcherProgress({ + watcherId: watcher.id, + item: existing, + }); + processedSourceKeys.add(bookmark.pageURL); + logger.info({ watcherId, sourceKey: bookmark.pageURL }, "Watcher bookmark already processed"); + continue; + } + try { + const nextItem = await processBookmark(watcher, bookmark); + importedCount += 1; + await persistWatcherProgress({ + watcherId: watcher.id, + item: nextItem, + importedDelta: 1, + seenDelta: 1, + }); + processedSourceKeys.add(bookmark.pageURL); + logger.info( + { + watcherId, + sourceKey: bookmark.pageURL, + qbitHash: nextItem.qbitHash, + status: nextItem.status, + }, + "Watcher bookmark imported" + ); + } catch (error) { + failedCount += 1; + logger.error( + { + watcherId, + sourceKey: bookmark.pageURL, + error: error instanceof Error ? error.message : error, + }, + "Watcher bookmark import failed" + ); + const failedItem: WatcherItem = { + id: randomUUID(), + watcherId: watcher.id, + tracker: watcher.tracker, + trackerLabel: watcher.trackerLabel, + sourceKey: bookmark.pageURL, + pageUrl: bookmark.pageURL, + title: bookmark.title, + imageUrl: normalizeImageUrl(bookmark.pageURL, bookmark.backgroundImage), + status: "failed", + statusLabel: statusLabel("failed"), + trackerTorrentId: deriveTrackerTorrentId(bookmark.pageURL), + seenAt: nowIso(), + lastSyncAt: nowIso(), + errorMessage: error instanceof Error ? error.message : "Unknown watcher error", + }; + await persistWatcherProgress({ + watcherId: watcher.id, + item: failedItem, + seenDelta: 1, + lastError: failedItem.errorMessage, + }); + processedSourceKeys.add(bookmark.pageURL); + } + } + const finalDb = await readDb(); + finalDb.watchers = (finalDb.watchers ?? []).map((entry) => + entry.id === watcher.id + ? { + ...entry, + lastSuccessAt: nowIso(), + lastError: failedCount > 0 ? `${failedCount} item hata verdi` : undefined, + updatedAt: nowIso(), + } + : entry + ); + await writeDb(finalDb); + await refreshWatcherSummary(); + logger.info({ watcherId, importedCount, failedCount }, "Watcher run completed"); + await updateRun(runId, { + finishedAt: nowIso(), + status: failedCount > 0 ? "FAILED" : "SUCCESS", + newBookmarks, + importedCount, + failedCount, + message: failedCount > 0 ? `${failedCount} item hata verdi` : `${importedCount} item qBittorrent'a gonderildi`, + }); + await appendAuditLog({ + level: failedCount > 0 ? "WARN" : "INFO", + event: "ARCHIVE_SUCCESS", + message: `Watcher ${watcher.trackerLabel}: ${importedCount} imported, ${failedCount} failed`, + }); + } catch (error) { + const message = error instanceof Error ? error.message : "Watcher run failed"; + await persistWatcher(watcher.id, (current) => ({ + ...current, + lastError: message, + updatedAt: nowIso(), + })); + await updateRun(runId, { + finishedAt: nowIso(), + status: "FAILED", + newBookmarks, + importedCount, + failedCount: failedCount + 1, + message, + }); + logger.error( + { + watcherId, + message, + stack: error instanceof Error ? error.stack : undefined, + error: error instanceof Error ? { name: error.name, message: error.message } : error, + }, + "Watcher run failed" + ); + } finally { + await cleanupRunPaths(runPaths); + activeWatcherRuns.delete(watcherId); + } +}; + +export const runDueWatchers = async () => { + const db = await readDb(); + const dueWatchers = (db.watchers ?? []).filter((watcher) => { + if (!watcher.enabled) { + return false; + } + if (!watcher.nextRunAt) { + return true; + } + return new Date(watcher.nextRunAt).getTime() <= Date.now(); + }); + logger.info({ watcherIds: dueWatchers.map((watcher) => watcher.id) }, "Watcher due check completed"); + return dueWatchers.map((watcher) => watcher.id); +}; diff --git a/apps/server/src/watcher/watcher.types.ts b/apps/server/src/watcher/watcher.types.ts new file mode 100644 index 0000000..821c036 --- /dev/null +++ b/apps/server/src/watcher/watcher.types.ts @@ -0,0 +1,35 @@ +import { Watcher, WatcherItem, WatcherSummary, WatcherTracker } from "../types"; + +export interface TrackerDefinition { + key: WatcherTracker; + label: string; + cliSiteKey: string; + supportsRemoveBookmark: boolean; +} + +export interface BookmarkRecord { + pageURL: string; + isVR?: boolean; + title: string; + backgroundImage?: string | null; +} + +export interface ScraperRunPaths { + runDir: string; + cookiesPath: string; + bookmarksPath: string; + torrentDir: string; +} + +export interface WatcherListItem extends Omit { + hasCookie: boolean; +} + +export interface EnrichedWatcherItem extends WatcherItem { + progress: number; + state?: string; +} + +export interface WatcherSummaryResponse extends WatcherSummary { + watchers: Array>; +} diff --git a/apps/server/src/watcher/watcher.worker.ts b/apps/server/src/watcher/watcher.worker.ts new file mode 100644 index 0000000..25c8716 --- /dev/null +++ b/apps/server/src/watcher/watcher.worker.ts @@ -0,0 +1,34 @@ +import { config } from "../config"; +import { logger } from "../utils/logger"; +import { runDueWatchers, runWatcherById } from "./watcher.service"; + +const activeRuns = new Set(); + +export const startWatcherWorker = (intervalMs: number) => { + if (!config.watcherEnabled) { + logger.info("Watcher worker disabled"); + return; + } + setInterval(async () => { + try { + const dueWatcherIds = await runDueWatchers(); + logger.info({ count: dueWatcherIds.length, watcherIds: dueWatcherIds }, "Watcher worker tick"); + for (const watcherId of dueWatcherIds) { + if (activeRuns.has(watcherId)) { + logger.info({ watcherId }, "Watcher run skipped because another run is active"); + continue; + } + activeRuns.add(watcherId); + runWatcherById(watcherId) + .catch((error) => { + logger.error({ error, watcherId }, "Unhandled watcher run failure"); + }) + .finally(() => { + activeRuns.delete(watcherId); + }); + } + } catch (error) { + logger.error({ error }, "Watcher worker tick failed"); + } + }, intervalMs); +}; diff --git a/apps/web/src/App.tsx b/apps/web/src/App.tsx index ad0efe2..a6f9575 100644 --- a/apps/web/src/App.tsx +++ b/apps/web/src/App.tsx @@ -3,6 +3,7 @@ import { BrowserRouter, Navigate, Route, Routes } from "react-router-dom"; import { LoginPage } from "./pages/LoginPage"; import { DashboardPage } from "./pages/DashboardPage"; import { TimerPage } from "./pages/TimerPage"; +import { WatcherPage } from "./pages/WatcherPage"; import { AppLayout } from "./components/layout/AppLayout"; import { useAuthStore } from "./store/useAuthStore"; @@ -25,6 +26,7 @@ export const App = () => { } /> } /> } /> + } /> } /> diff --git a/apps/web/src/components/layout/AppLayout.tsx b/apps/web/src/components/layout/AppLayout.tsx index 7d3e487..1270954 100644 --- a/apps/web/src/components/layout/AppLayout.tsx +++ b/apps/web/src/components/layout/AppLayout.tsx @@ -112,6 +112,16 @@ export const AppLayout = ({ children }: { children: React.ReactNode }) => { > Timer + + `rounded-full px-3 py-1 ${ + isActive ? "bg-slate-900 text-white" : "hover:bg-white" + }` + } + > + Watcher +