feat: watcher akislarini ve wscraper servis entegrasyonunu ekle
This commit is contained in:
@@ -1,6 +1,9 @@
|
||||
FROM node:20-alpine
|
||||
FROM node:20-bookworm-slim
|
||||
WORKDIR /app
|
||||
RUN corepack enable
|
||||
RUN corepack enable \
|
||||
&& apt-get update \
|
||||
&& apt-get install -y --no-install-recommends build-essential \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
COPY package.json /app/apps/server/package.json
|
||||
COPY package.json /app/package.json
|
||||
COPY pnpm-workspace.yaml /app/pnpm-workspace.yaml
|
||||
|
||||
@@ -23,12 +23,21 @@ export const config = {
|
||||
webPort: envNumber(process.env.WEB_PORT, 5173),
|
||||
webOrigin: process.env.WEB_ORIGIN ?? "",
|
||||
webAllowedOrigins: process.env.WEB_ALLOWED_ORIGINS ?? "",
|
||||
watcherSecretKey: process.env.WATCHER_SECRET_KEY ?? "",
|
||||
watcherEnabled: (process.env.WATCHER_ENABLED ?? "true").toLowerCase() !== "false",
|
||||
watcherTickMs: envNumber(process.env.WATCHER_TICK_MS, 30_000),
|
||||
watcherTimeoutMs: envNumber(process.env.WATCHER_TIMEOUT_MS, 180_000),
|
||||
watcherRuntimeDir: process.env.WATCHER_RUNTIME_DIR ?? "/tmp/qbuffer-watchers",
|
||||
wscraperServiceBaseUrl:
|
||||
process.env.WSCRAPER_SERVICE_BASE_URL ?? "http://host.docker.internal:8787",
|
||||
wscraperServiceToken: process.env.WSCRAPER_SERVICE_TOKEN ?? "",
|
||||
dataDir: "/app/data",
|
||||
dbPath: "/app/data/db.json",
|
||||
logsPath: "/app/data/logs.json",
|
||||
loopLogsDir: "/app/data/loop-logs",
|
||||
loopLogsArchiveDir: "/app/data/loop-logs-archive",
|
||||
torrentArchiveDir: "/app/data/torrents",
|
||||
watcherDownloadsDir: "/app/data/watcher-downloads",
|
||||
webPublicDir: path.resolve("/app/apps/server/public"),
|
||||
};
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ import loopRoutes from "./loop/loop.routes"
|
||||
import profilesRoutes from "./loop/profiles.routes"
|
||||
import statusRoutes from "./status/status.routes"
|
||||
import timerRoutes from "./timer/timer.routes"
|
||||
import watcherRoutes from "./watcher/watcher.routes"
|
||||
import { QbitClient } from "./qbit/qbit.client"
|
||||
import { detectCapabilities } from "./qbit/qbit.capabilities"
|
||||
import { setQbitClient, setQbitCapabilities } from "./qbit/qbit.context"
|
||||
@@ -24,6 +25,7 @@ import { initEmitter, emitQbitHealth } from "./realtime/emitter"
|
||||
import { startLoopScheduler } from "./loop/loop.scheduler"
|
||||
import { startEnforcementWorker } from "./enforcement/enforcement.worker"
|
||||
import { startTimerWorker } from "./timer/timer.worker"
|
||||
import { startWatcherWorker } from "./watcher/watcher.worker"
|
||||
import { logger } from "./utils/logger"
|
||||
|
||||
const crashLogPath = "/app/data/crash.log";
|
||||
@@ -94,6 +96,7 @@ const bootstrap = async () => {
|
||||
app.use("/api/profiles", requireAuth, profilesRoutes);
|
||||
app.use("/api/status", requireAuth, statusRoutes);
|
||||
app.use("/api/timer", requireAuth, timerRoutes);
|
||||
app.use("/api/watchers", requireAuth, watcherRoutes);
|
||||
|
||||
if (!isDev) {
|
||||
app.use(express.static(config.webPublicDir));
|
||||
@@ -126,6 +129,7 @@ const bootstrap = async () => {
|
||||
startLoopScheduler(qbit, config.pollIntervalMs);
|
||||
startEnforcementWorker(config.enforceIntervalMs);
|
||||
startTimerWorker(qbit, config.timerPollMs);
|
||||
startWatcherWorker(config.watcherTickMs);
|
||||
|
||||
server.listen(config.port, () => {
|
||||
serverStarted = true;
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import { QbitClient } from "../qbit/qbit.client"
|
||||
import { tickLoopJobs } from "./loop.engine"
|
||||
import { getStatusSnapshot, refreshJobsStatus, setQbitStatus, setTorrentsStatus } from "../status/status.service"
|
||||
import { emitQbitHealth, emitStatusUpdate } from "../realtime/emitter"
|
||||
import { emitQbitHealth, emitStatusUpdate, emitWatcherItems } from "../realtime/emitter"
|
||||
import { logger } from "../utils/logger"
|
||||
import { getWatcherItems } from "../watcher/watcher.service"
|
||||
|
||||
export const startLoopScheduler = (qbit: QbitClient, intervalMs: number) => {
|
||||
setInterval(async () => {
|
||||
@@ -19,6 +20,7 @@ export const startLoopScheduler = (qbit: QbitClient, intervalMs: number) => {
|
||||
transfer,
|
||||
jobs,
|
||||
});
|
||||
emitWatcherItems(await getWatcherItems());
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : "Unknown error";
|
||||
logger.error({ error }, "Loop scheduler tick failed");
|
||||
|
||||
@@ -6,6 +6,7 @@ import fs from "node:fs";
|
||||
import { config } from "../config"
|
||||
import { logger } from "../utils/logger"
|
||||
import {
|
||||
QbitCategory,
|
||||
QbitPeerList,
|
||||
QbitTorrentInfo,
|
||||
QbitTorrentProperties,
|
||||
@@ -96,6 +97,13 @@ export class QbitClient {
|
||||
return response.data;
|
||||
}
|
||||
|
||||
async getCategories(): Promise<Record<string, QbitCategory>> {
|
||||
const response = await this.request(() =>
|
||||
this.client.get<Record<string, QbitCategory>>("/api/v2/torrents/categories")
|
||||
);
|
||||
return response.data;
|
||||
}
|
||||
|
||||
async getTorrentProperties(hash: string): Promise<QbitTorrentProperties> {
|
||||
const response = await this.request(() =>
|
||||
this.client.get<QbitTorrentProperties>("/api/v2/torrents/properties", {
|
||||
|
||||
@@ -11,6 +11,8 @@ export interface QbitTorrentInfo {
|
||||
tags?: string;
|
||||
category?: string;
|
||||
tracker?: string;
|
||||
num_seeds?: number;
|
||||
num_leechs?: number;
|
||||
added_on?: number;
|
||||
seeding_time?: number;
|
||||
uploaded?: number;
|
||||
@@ -51,3 +53,8 @@ export interface QbitCapabilities {
|
||||
hasPeersEndpoint: boolean;
|
||||
hasBanEndpoint: boolean;
|
||||
}
|
||||
|
||||
export interface QbitCategory {
|
||||
name?: string;
|
||||
savePath?: string;
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ import { Server } from "socket.io";
|
||||
import { EVENTS } from "./events";
|
||||
import { StatusSnapshot } from "../status/status.service";
|
||||
import { LoopJob, TimerLog, TimerSummary } from "../types";
|
||||
import { EnrichedWatcherItem, WatcherListItem, WatcherSummaryResponse } from "../watcher/watcher.types";
|
||||
|
||||
let io: Server | null = null;
|
||||
|
||||
@@ -41,3 +42,15 @@ export const emitTimerLog = (payload: TimerLog) => {
|
||||
export const emitTimerSummary = (payload: TimerSummary) => {
|
||||
io?.emit(EVENTS.TIMER_SUMMARY, payload);
|
||||
};
|
||||
|
||||
export const emitWatchersList = (payload: WatcherListItem[]) => {
|
||||
io?.emit(EVENTS.WATCHERS_LIST, payload);
|
||||
};
|
||||
|
||||
export const emitWatcherItems = (payload: EnrichedWatcherItem[]) => {
|
||||
io?.emit(EVENTS.WATCHER_ITEMS, payload);
|
||||
};
|
||||
|
||||
export const emitWatcherSummary = (payload: WatcherSummaryResponse) => {
|
||||
io?.emit(EVENTS.WATCHER_SUMMARY, payload);
|
||||
};
|
||||
|
||||
@@ -6,4 +6,7 @@ export const EVENTS = {
|
||||
QBIT_HEALTH: "qbit:health",
|
||||
TIMER_LOG: "timer:log",
|
||||
TIMER_SUMMARY: "timer:summary",
|
||||
WATCHERS_LIST: "watchers:list",
|
||||
WATCHER_ITEMS: "watcher:items",
|
||||
WATCHER_SUMMARY: "watcher:summary",
|
||||
};
|
||||
|
||||
@@ -4,6 +4,7 @@ import { verifyToken } from "../auth/auth.service";
|
||||
import { getStatusSnapshot } from "../status/status.service";
|
||||
import { EVENTS } from "./events";
|
||||
import { config, isDev } from "../config";
|
||||
import { getWatcherItems, getWatcherSummary, listWatchers } from "../watcher/watcher.service";
|
||||
|
||||
const parseCookies = (cookieHeader?: string) => {
|
||||
const cookies: Record<string, string> = {};
|
||||
@@ -53,6 +54,9 @@ export const createSocketServer = (server: http.Server) => {
|
||||
io.on("connection", async (socket) => {
|
||||
const snapshot = await getStatusSnapshot();
|
||||
socket.emit(EVENTS.STATUS_SNAPSHOT, snapshot);
|
||||
socket.emit(EVENTS.WATCHERS_LIST, await listWatchers());
|
||||
socket.emit(EVENTS.WATCHER_SUMMARY, await getWatcherSummary());
|
||||
socket.emit(EVENTS.WATCHER_ITEMS, await getWatcherItems());
|
||||
});
|
||||
|
||||
return io;
|
||||
|
||||
@@ -26,6 +26,16 @@ const defaultDb = (): DbSchema => ({
|
||||
totalUploadedBytes: 0,
|
||||
updatedAt: new Date().toISOString(),
|
||||
},
|
||||
watchers: [],
|
||||
watcherItems: [],
|
||||
watcherRuns: [],
|
||||
watcherSummary: {
|
||||
activeWatchers: 0,
|
||||
totalImported: 0,
|
||||
totalSeen: 0,
|
||||
trackedLabels: [],
|
||||
updatedAt: new Date().toISOString(),
|
||||
},
|
||||
});
|
||||
|
||||
const rotateBackups = async (dbPath: string) => {
|
||||
@@ -71,6 +81,16 @@ export const readDb = async (): Promise<DbSchema> => {
|
||||
totalUploadedBytes: 0,
|
||||
updatedAt: new Date().toISOString(),
|
||||
};
|
||||
parsed.watchers ??= [];
|
||||
parsed.watcherItems ??= [];
|
||||
parsed.watcherRuns ??= [];
|
||||
parsed.watcherSummary ??= {
|
||||
activeWatchers: 0,
|
||||
totalImported: 0,
|
||||
totalSeen: 0,
|
||||
trackedLabels: [],
|
||||
updatedAt: new Date().toISOString(),
|
||||
};
|
||||
return parsed;
|
||||
} catch (error) {
|
||||
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
|
||||
|
||||
@@ -6,4 +6,6 @@ export const ensureDataPaths = async () => {
|
||||
await fs.mkdir(config.torrentArchiveDir, { recursive: true });
|
||||
await fs.mkdir(config.loopLogsDir, { recursive: true });
|
||||
await fs.mkdir(config.loopLogsArchiveDir, { recursive: true });
|
||||
await fs.mkdir(config.watcherDownloadsDir, { recursive: true });
|
||||
await fs.mkdir(config.watcherRuntimeDir, { recursive: true });
|
||||
};
|
||||
|
||||
@@ -98,6 +98,87 @@ export interface TimerSummary {
|
||||
updatedAt: string;
|
||||
}
|
||||
|
||||
export type WatcherTracker = "happyfappy";
|
||||
|
||||
export type WatcherItemStatus =
|
||||
| "bookmarked"
|
||||
| "downloading_torrent"
|
||||
| "sending_to_qbit"
|
||||
| "sent_to_qbit"
|
||||
| "downloading"
|
||||
| "completed"
|
||||
| "failed";
|
||||
|
||||
export interface Watcher {
|
||||
id: string;
|
||||
tracker: WatcherTracker;
|
||||
trackerLabel: string;
|
||||
category?: string;
|
||||
cookieEncrypted: string;
|
||||
cookieHint: string;
|
||||
intervalMinutes: number;
|
||||
enabled: boolean;
|
||||
lastRunAt?: string;
|
||||
lastSuccessAt?: string;
|
||||
lastError?: string;
|
||||
nextRunAt?: string;
|
||||
totalImported: number;
|
||||
totalSeen: number;
|
||||
createdAt: string;
|
||||
updatedAt: string;
|
||||
}
|
||||
|
||||
export interface WatcherItem {
|
||||
id: string;
|
||||
watcherId: string;
|
||||
tracker: WatcherTracker;
|
||||
trackerLabel: string;
|
||||
sourceKey: string;
|
||||
pageUrl: string;
|
||||
title: string;
|
||||
imageUrl?: string;
|
||||
status: WatcherItemStatus;
|
||||
statusLabel: string;
|
||||
qbitHash?: string;
|
||||
qbitName?: string;
|
||||
qbitProgress?: number;
|
||||
qbitState?: string;
|
||||
qbitCategory?: string;
|
||||
sizeBytes?: number;
|
||||
seeders?: number;
|
||||
leechers?: number;
|
||||
trackerTorrentId?: string;
|
||||
seenAt: string;
|
||||
downloadedAt?: string;
|
||||
importedAt?: string;
|
||||
lastSyncAt: string;
|
||||
errorMessage?: string;
|
||||
}
|
||||
|
||||
export interface WatcherRun {
|
||||
id: string;
|
||||
watcherId: string;
|
||||
startedAt: string;
|
||||
finishedAt?: string;
|
||||
status: "RUNNING" | "SUCCESS" | "FAILED";
|
||||
newBookmarks: number;
|
||||
importedCount: number;
|
||||
failedCount: number;
|
||||
message?: string;
|
||||
}
|
||||
|
||||
export interface WatcherSummary {
|
||||
activeWatchers: number;
|
||||
totalImported: number;
|
||||
totalSeen: number;
|
||||
trackedLabels: string[];
|
||||
lastRunAt?: string;
|
||||
lastSuccessAt?: string;
|
||||
nextRunAt?: string;
|
||||
lastError?: string;
|
||||
updatedAt: string;
|
||||
}
|
||||
|
||||
export interface AuditLog {
|
||||
id: string;
|
||||
level: "INFO" | "WARN" | "ERROR";
|
||||
@@ -133,4 +214,8 @@ export interface DbSchema {
|
||||
timerRules?: TimerRule[];
|
||||
timerLogs?: TimerLog[];
|
||||
timerSummary?: TimerSummary;
|
||||
watchers?: Watcher[];
|
||||
watcherItems?: WatcherItem[];
|
||||
watcherRuns?: WatcherRun[];
|
||||
watcherSummary?: WatcherSummary;
|
||||
}
|
||||
|
||||
42
apps/server/src/watcher/watcher.crypto.ts
Normal file
42
apps/server/src/watcher/watcher.crypto.ts
Normal file
@@ -0,0 +1,42 @@
|
||||
import crypto from "node:crypto";
|
||||
import { config } from "../config";
|
||||
|
||||
const ensureSecret = () => {
|
||||
if (!config.watcherSecretKey) {
|
||||
throw new Error("WATCHER_SECRET_KEY missing");
|
||||
}
|
||||
return crypto.createHash("sha256").update(config.watcherSecretKey).digest();
|
||||
};
|
||||
|
||||
export const encryptWatcherCookie = (value: string) => {
|
||||
const key = ensureSecret();
|
||||
const iv = crypto.randomBytes(12);
|
||||
const cipher = crypto.createCipheriv("aes-256-gcm", key, iv);
|
||||
const encrypted = Buffer.concat([cipher.update(value, "utf8"), cipher.final()]);
|
||||
const tag = cipher.getAuthTag();
|
||||
return Buffer.concat([iv, tag, encrypted]).toString("base64");
|
||||
};
|
||||
|
||||
export const decryptWatcherCookie = (payload: string) => {
|
||||
const key = ensureSecret();
|
||||
const source = Buffer.from(payload, "base64");
|
||||
const iv = source.subarray(0, 12);
|
||||
const tag = source.subarray(12, 28);
|
||||
const encrypted = source.subarray(28);
|
||||
const decipher = crypto.createDecipheriv("aes-256-gcm", key, iv);
|
||||
decipher.setAuthTag(tag);
|
||||
return Buffer.concat([decipher.update(encrypted), decipher.final()]).toString("utf8");
|
||||
};
|
||||
|
||||
export const buildCookieHint = (value: string) => {
|
||||
const segments = value
|
||||
.split(/[;\n]/)
|
||||
.map((entry) => entry.trim())
|
||||
.filter(Boolean)
|
||||
.slice(0, 2)
|
||||
.map((entry) => {
|
||||
const [name] = entry.split("=", 1);
|
||||
return `${name}=...`;
|
||||
});
|
||||
return segments.length > 0 ? segments.join("; ") : "Cookie kayitli";
|
||||
};
|
||||
14
apps/server/src/watcher/watcher.registry.ts
Normal file
14
apps/server/src/watcher/watcher.registry.ts
Normal file
@@ -0,0 +1,14 @@
|
||||
import { TrackerDefinition } from "./watcher.types";
|
||||
|
||||
export const trackerRegistry: TrackerDefinition[] = [
|
||||
{
|
||||
key: "happyfappy",
|
||||
label: "HappyFappy",
|
||||
cliSiteKey: "happyfappy",
|
||||
supportsRemoveBookmark: true,
|
||||
},
|
||||
];
|
||||
|
||||
export const getTrackerDefinition = (key: string) => {
|
||||
return trackerRegistry.find((tracker) => tracker.key === key) ?? null;
|
||||
};
|
||||
116
apps/server/src/watcher/watcher.routes.ts
Normal file
116
apps/server/src/watcher/watcher.routes.ts
Normal file
@@ -0,0 +1,116 @@
|
||||
import { Router } from "express";
|
||||
import { z } from "zod";
|
||||
import {
|
||||
createWatcher,
|
||||
deleteWatcher,
|
||||
fetchWatcherImage,
|
||||
getQbitCategories,
|
||||
getWatcherItems,
|
||||
getWatcherSummary,
|
||||
listTrackers,
|
||||
listWatchers,
|
||||
runWatcherById,
|
||||
updateWatcher,
|
||||
} from "./watcher.service";
|
||||
|
||||
const router = Router();
|
||||
|
||||
const createWatcherSchema = z.object({
|
||||
tracker: z.string().min(1),
|
||||
cookie: z.string().min(1),
|
||||
intervalMinutes: z.number().int().min(1).max(24 * 60),
|
||||
category: z.string().optional(),
|
||||
enabled: z.boolean().optional(),
|
||||
});
|
||||
|
||||
const updateWatcherSchema = z.object({
|
||||
cookie: z.string().min(1).optional(),
|
||||
intervalMinutes: z.number().int().min(1).max(24 * 60).optional(),
|
||||
category: z.string().optional(),
|
||||
enabled: z.boolean().optional(),
|
||||
});
|
||||
|
||||
router.get("/trackers", (_req, res) => {
|
||||
return res.json(listTrackers());
|
||||
});
|
||||
|
||||
router.get("/categories", async (_req, res) => {
|
||||
try {
|
||||
return res.json(await getQbitCategories());
|
||||
} catch (error) {
|
||||
return res.status(400).json({ error: error instanceof Error ? error.message : "Failed to load qBittorrent categories" });
|
||||
}
|
||||
});
|
||||
|
||||
router.get("/", async (_req, res) => {
|
||||
return res.json(await listWatchers());
|
||||
});
|
||||
|
||||
router.post("/", async (req, res) => {
|
||||
const parsed = createWatcherSchema.safeParse(req.body ?? {});
|
||||
if (!parsed.success) {
|
||||
return res.status(400).json({ error: "Invalid watcher payload" });
|
||||
}
|
||||
try {
|
||||
const watcher = await createWatcher(parsed.data);
|
||||
return res.status(201).json(watcher);
|
||||
} catch (error) {
|
||||
return res.status(400).json({ error: error instanceof Error ? error.message : "Failed to create watcher" });
|
||||
}
|
||||
});
|
||||
|
||||
router.patch("/:id", async (req, res) => {
|
||||
const parsed = updateWatcherSchema.safeParse(req.body ?? {});
|
||||
if (!parsed.success) {
|
||||
return res.status(400).json({ error: "Invalid watcher payload" });
|
||||
}
|
||||
try {
|
||||
const watcher = await updateWatcher(req.params.id, parsed.data);
|
||||
return res.json(watcher);
|
||||
} catch (error) {
|
||||
return res.status(400).json({ error: error instanceof Error ? error.message : "Failed to update watcher" });
|
||||
}
|
||||
});
|
||||
|
||||
router.delete("/:id", async (req, res) => {
|
||||
try {
|
||||
return res.json(await deleteWatcher(req.params.id));
|
||||
} catch (error) {
|
||||
return res.status(400).json({ error: error instanceof Error ? error.message : "Failed to delete watcher" });
|
||||
}
|
||||
});
|
||||
|
||||
router.post("/:id/run", async (req, res) => {
|
||||
try {
|
||||
await runWatcherById(req.params.id);
|
||||
return res.json({ ok: true });
|
||||
} catch (error) {
|
||||
return res.status(400).json({ error: error instanceof Error ? error.message : "Watcher run failed" });
|
||||
}
|
||||
});
|
||||
|
||||
router.get("/summary", async (_req, res) => {
|
||||
return res.json(await getWatcherSummary());
|
||||
});
|
||||
|
||||
router.get("/items", async (_req, res) => {
|
||||
return res.json(await getWatcherItems());
|
||||
});
|
||||
|
||||
router.get("/image", async (req, res) => {
|
||||
const watcherId = String(req.query.watcherId ?? "");
|
||||
const imageUrl = String(req.query.url ?? "");
|
||||
if (!watcherId || !imageUrl) {
|
||||
return res.status(400).json({ error: "Missing watcherId or url" });
|
||||
}
|
||||
try {
|
||||
const image = await fetchWatcherImage(watcherId, imageUrl);
|
||||
res.setHeader("Content-Type", image.contentType);
|
||||
res.setHeader("Cache-Control", "private, max-age=300");
|
||||
return res.send(image.data);
|
||||
} catch (error) {
|
||||
return res.status(400).json({ error: error instanceof Error ? error.message : "Failed to fetch image" });
|
||||
}
|
||||
});
|
||||
|
||||
export default router;
|
||||
102
apps/server/src/watcher/watcher.scraper.ts
Normal file
102
apps/server/src/watcher/watcher.scraper.ts
Normal file
@@ -0,0 +1,102 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import axios from "axios";
|
||||
import { config } from "../config";
|
||||
import { logger } from "../utils/logger";
|
||||
import { BookmarkRecord, ScraperRunPaths } from "./watcher.types";
|
||||
|
||||
const ensureDir = async (target: string) => {
|
||||
await fs.mkdir(target, { recursive: true });
|
||||
};
|
||||
|
||||
export const createScraperRunPaths = async (watcherId: string): Promise<ScraperRunPaths> => {
|
||||
const runDir = path.join(config.watcherRuntimeDir, watcherId, randomUUID());
|
||||
const torrentDir = path.join(runDir, "torrent");
|
||||
await ensureDir(torrentDir);
|
||||
return {
|
||||
runDir,
|
||||
cookiesPath: path.join(runDir, "cookies.txt"),
|
||||
bookmarksPath: path.join(runDir, "bookmarks.json"),
|
||||
torrentDir,
|
||||
};
|
||||
};
|
||||
|
||||
const scraperClient = axios.create({
|
||||
baseURL: config.wscraperServiceBaseUrl,
|
||||
timeout: config.watcherTimeoutMs,
|
||||
});
|
||||
|
||||
const buildHeaders = () => {
|
||||
if (!config.wscraperServiceToken) {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
Authorization: `Bearer ${config.wscraperServiceToken}`,
|
||||
};
|
||||
};
|
||||
|
||||
const request = async <T>(method: "GET" | "POST", url: string, body?: unknown) => {
|
||||
logger.info(
|
||||
{
|
||||
method,
|
||||
url,
|
||||
baseUrl: config.wscraperServiceBaseUrl,
|
||||
},
|
||||
"Calling wscraper service"
|
||||
);
|
||||
try {
|
||||
const response = await scraperClient.request<T>({
|
||||
method,
|
||||
url,
|
||||
data: body,
|
||||
headers: buildHeaders(),
|
||||
});
|
||||
return response.data;
|
||||
} catch (error) {
|
||||
if (axios.isAxiosError(error)) {
|
||||
const message =
|
||||
typeof error.response?.data === "object" && error.response?.data && "error" in error.response.data
|
||||
? String((error.response.data as { error: string }).error)
|
||||
: error.message;
|
||||
throw new Error(`wscraper-service request failed: ${message}`);
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
|
||||
export const runBookmarkFetch = async (
|
||||
trackerSiteKey: string,
|
||||
cookie: string
|
||||
) => {
|
||||
const response = await request<{ items: BookmarkRecord[] }>("POST", "/bookmarks", {
|
||||
tracker: trackerSiteKey,
|
||||
cookie,
|
||||
});
|
||||
return response.items;
|
||||
};
|
||||
|
||||
export const runTorrentDownload = async (
|
||||
trackerSiteKey: string,
|
||||
cookie: string,
|
||||
detailUrl: string,
|
||||
outputDir: string
|
||||
) => {
|
||||
const response = await request<{ filename: string; contentBase64: string }>("POST", "/download", {
|
||||
tracker: trackerSiteKey,
|
||||
cookie,
|
||||
url: detailUrl,
|
||||
removeBookmark: true,
|
||||
});
|
||||
const targetPath = path.join(outputDir, response.filename);
|
||||
await fs.writeFile(targetPath, Buffer.from(response.contentBase64, "base64"));
|
||||
return targetPath;
|
||||
};
|
||||
|
||||
export const cleanupRunPaths = async (paths: ScraperRunPaths) => {
|
||||
try {
|
||||
await fs.rm(paths.runDir, { recursive: true, force: true });
|
||||
} catch (error) {
|
||||
logger.warn({ error, runDir: paths.runDir }, "Failed to cleanup watcher runtime directory");
|
||||
}
|
||||
};
|
||||
742
apps/server/src/watcher/watcher.service.ts
Normal file
742
apps/server/src/watcher/watcher.service.ts
Normal file
@@ -0,0 +1,742 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import path from "node:path";
|
||||
import axios from "axios";
|
||||
import { getQbitClient } from "../qbit/qbit.context";
|
||||
import { QbitTorrentInfo } from "../qbit/qbit.types";
|
||||
import { readDb, writeDb } from "../storage/jsondb";
|
||||
import { Watcher, WatcherItem, WatcherRun, WatcherSummary } from "../types";
|
||||
import { nowIso } from "../utils/time";
|
||||
import { appendAuditLog, logger } from "../utils/logger";
|
||||
import { emitWatcherItems, emitWatchersList, emitWatcherSummary } from "../realtime/emitter";
|
||||
import { buildCookieHint, decryptWatcherCookie, encryptWatcherCookie } from "./watcher.crypto";
|
||||
import { getTrackerDefinition, trackerRegistry } from "./watcher.registry";
|
||||
import { cleanupRunPaths, createScraperRunPaths, runBookmarkFetch, runTorrentDownload } from "./watcher.scraper";
|
||||
import { BookmarkRecord, EnrichedWatcherItem, WatcherListItem, WatcherSummaryResponse } from "./watcher.types";
|
||||
|
||||
const MAX_WATCHER_ITEMS = 500;
|
||||
const MAX_WATCHER_RUNS = 200;
|
||||
const QBIT_VERIFY_ATTEMPTS = 5;
|
||||
const QBIT_VERIFY_DELAY_MS = 2500;
|
||||
const activeWatcherRuns = new Set<string>();
|
||||
|
||||
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
|
||||
|
||||
const toListItem = (watcher: Watcher): WatcherListItem => ({
|
||||
...watcher,
|
||||
hasCookie: Boolean(watcher.cookieEncrypted),
|
||||
});
|
||||
|
||||
const statusLabel = (status: WatcherItem["status"]) => {
|
||||
switch (status) {
|
||||
case "bookmarked":
|
||||
return "Bookmark bulundu";
|
||||
case "downloading_torrent":
|
||||
return "Torrent indiriliyor";
|
||||
case "sending_to_qbit":
|
||||
return "qBittorrent'a gonderiliyor";
|
||||
case "sent_to_qbit":
|
||||
return "qBittorrent'a gonderildi";
|
||||
case "downloading":
|
||||
return "Indiriliyor";
|
||||
case "completed":
|
||||
return "Tamamlandi";
|
||||
case "failed":
|
||||
default:
|
||||
return "Hata";
|
||||
}
|
||||
};
|
||||
|
||||
const deriveTrackerTorrentId = (pageUrl: string) => {
|
||||
try {
|
||||
const url = new URL(pageUrl);
|
||||
return url.searchParams.get("id") ?? undefined;
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
};
|
||||
|
||||
const normalizeImageUrl = (pageUrl: string, imageUrl?: string | null) => {
|
||||
if (!imageUrl?.trim()) {
|
||||
return undefined;
|
||||
}
|
||||
try {
|
||||
return new URL(imageUrl, pageUrl).toString();
|
||||
} catch {
|
||||
return imageUrl;
|
||||
}
|
||||
};
|
||||
|
||||
const domainMatches = (targetHost: string, cookieDomain: string) => {
|
||||
const normalizedCookieDomain = cookieDomain.replace(/^\./, "").toLowerCase();
|
||||
const normalizedTargetHost = targetHost.toLowerCase();
|
||||
return (
|
||||
normalizedTargetHost === normalizedCookieDomain ||
|
||||
normalizedTargetHost.endsWith(`.${normalizedCookieDomain}`)
|
||||
);
|
||||
};
|
||||
|
||||
const toCookieHeader = (cookieValue: string, targetUrl: string) => {
|
||||
let targetHost = "";
|
||||
try {
|
||||
targetHost = new URL(targetUrl).hostname;
|
||||
} catch {
|
||||
return "";
|
||||
}
|
||||
|
||||
const lines = cookieValue.split(/\r?\n/);
|
||||
const looksLikeNetscape =
|
||||
lines.length > 1 && lines.some((line) => line.includes("\t"));
|
||||
|
||||
if (looksLikeNetscape) {
|
||||
const pairs: string[] = [];
|
||||
for (const rawLine of lines) {
|
||||
const line = rawLine.trim();
|
||||
if (!line || line.startsWith("#")) {
|
||||
continue;
|
||||
}
|
||||
const parts = line.split("\t");
|
||||
if (parts.length < 7) {
|
||||
continue;
|
||||
}
|
||||
const [domain, , , , , name, value] = parts;
|
||||
if (!name || !domainMatches(targetHost, domain)) {
|
||||
continue;
|
||||
}
|
||||
pairs.push(`${name}=${value}`);
|
||||
}
|
||||
return pairs.join("; ");
|
||||
}
|
||||
|
||||
return cookieValue
|
||||
.split(";")
|
||||
.map((entry) => entry.trim())
|
||||
.filter(Boolean)
|
||||
.join("; ");
|
||||
};
|
||||
|
||||
const computeNextRunAt = (intervalMinutes: number) => {
|
||||
return new Date(Date.now() + intervalMinutes * 60_000).toISOString();
|
||||
};
|
||||
|
||||
const deriveLiveStatus = (
|
||||
itemStatus: WatcherItem["status"],
|
||||
torrent?: QbitTorrentInfo
|
||||
): WatcherItem["status"] => {
|
||||
if (!torrent) {
|
||||
return itemStatus;
|
||||
}
|
||||
if ((torrent.progress ?? 0) >= 1) {
|
||||
return "completed";
|
||||
}
|
||||
if (itemStatus === "failed") {
|
||||
return "failed";
|
||||
}
|
||||
if (
|
||||
(torrent.progress ?? 0) > 0 ||
|
||||
(torrent.dlspeed ?? 0) > 0 ||
|
||||
(torrent.state ?? "").toLowerCase().includes("download")
|
||||
) {
|
||||
return "downloading";
|
||||
}
|
||||
return "sent_to_qbit";
|
||||
};
|
||||
|
||||
const refreshWatcherSummary = async () => {
|
||||
const db = await readDb();
|
||||
const watchers = db.watchers ?? [];
|
||||
const updatedAt = nowIso();
|
||||
const summary: WatcherSummary = {
|
||||
activeWatchers: watchers.filter((watcher) => watcher.enabled).length,
|
||||
totalImported: watchers.reduce((sum, watcher) => sum + watcher.totalImported, 0),
|
||||
totalSeen: watchers.reduce((sum, watcher) => sum + watcher.totalSeen, 0),
|
||||
trackedLabels: Array.from(new Set(watchers.filter((watcher) => watcher.enabled).map((watcher) => watcher.trackerLabel))),
|
||||
lastRunAt: watchers
|
||||
.map((watcher) => watcher.lastRunAt)
|
||||
.filter(Boolean)
|
||||
.sort()
|
||||
.at(-1),
|
||||
lastSuccessAt: watchers
|
||||
.map((watcher) => watcher.lastSuccessAt)
|
||||
.filter(Boolean)
|
||||
.sort()
|
||||
.at(-1),
|
||||
nextRunAt: watchers
|
||||
.map((watcher) => watcher.nextRunAt)
|
||||
.filter(Boolean)
|
||||
.sort()[0],
|
||||
lastError: watchers.map((watcher) => watcher.lastError).filter(Boolean).at(-1),
|
||||
updatedAt,
|
||||
};
|
||||
db.watcherSummary = summary;
|
||||
await writeDb(db);
|
||||
emitWatcherSummary({
|
||||
...summary,
|
||||
watchers: watchers.map((watcher) => ({
|
||||
id: watcher.id,
|
||||
tracker: watcher.tracker,
|
||||
trackerLabel: watcher.trackerLabel,
|
||||
enabled: watcher.enabled,
|
||||
lastRunAt: watcher.lastRunAt,
|
||||
lastSuccessAt: watcher.lastSuccessAt,
|
||||
nextRunAt: watcher.nextRunAt,
|
||||
totalImported: watcher.totalImported,
|
||||
})),
|
||||
});
|
||||
return summary;
|
||||
};
|
||||
|
||||
const upsertWatcherItem = (items: WatcherItem[], nextItem: WatcherItem) => {
|
||||
const index = items.findIndex((item) => item.id === nextItem.id);
|
||||
if (index >= 0) {
|
||||
items[index] = nextItem;
|
||||
} else {
|
||||
items.unshift(nextItem);
|
||||
}
|
||||
return items.slice(0, MAX_WATCHER_ITEMS);
|
||||
};
|
||||
|
||||
const mergeQbitState = (item: WatcherItem, torrents: QbitTorrentInfo[]): EnrichedWatcherItem => {
|
||||
const matched =
|
||||
torrents.find((torrent) => item.qbitHash && torrent.hash === item.qbitHash) ??
|
||||
torrents.find((torrent) => torrent.name === (item.qbitName || item.title));
|
||||
const progress = matched?.progress ?? item.qbitProgress ?? 0;
|
||||
const status = deriveLiveStatus(item.status, matched);
|
||||
return {
|
||||
...item,
|
||||
status,
|
||||
statusLabel: statusLabel(status),
|
||||
progress,
|
||||
state: matched?.state ?? item.qbitState,
|
||||
qbitHash: matched?.hash ?? item.qbitHash,
|
||||
qbitName: matched?.name ?? item.qbitName,
|
||||
qbitProgress: progress,
|
||||
qbitState: matched?.state ?? item.qbitState,
|
||||
qbitCategory: matched?.category ?? item.qbitCategory,
|
||||
sizeBytes: matched?.size ?? item.sizeBytes,
|
||||
seeders: matched?.num_seeds ?? item.seeders,
|
||||
leechers: matched?.num_leechs ?? item.leechers,
|
||||
};
|
||||
};
|
||||
|
||||
const persistWatcher = async (watcherId: string, update: (watcher: Watcher) => Watcher) => {
|
||||
const db = await readDb();
|
||||
const watchers = db.watchers ?? [];
|
||||
const index = watchers.findIndex((watcher) => watcher.id === watcherId);
|
||||
if (index < 0) {
|
||||
throw new Error("Watcher not found");
|
||||
}
|
||||
watchers[index] = update(watchers[index]);
|
||||
db.watchers = watchers;
|
||||
await writeDb(db);
|
||||
emitWatchersList(watchers.map(toListItem));
|
||||
return watchers[index];
|
||||
};
|
||||
|
||||
const persistWatcherProgress = async (payload: {
|
||||
watcherId: string;
|
||||
item: WatcherItem;
|
||||
importedDelta?: number;
|
||||
seenDelta?: number;
|
||||
lastError?: string;
|
||||
}) => {
|
||||
const db = await readDb();
|
||||
db.watcherItems = upsertWatcherItem(db.watcherItems ?? [], payload.item);
|
||||
db.watchers = (db.watchers ?? []).map((watcher) =>
|
||||
watcher.id === payload.watcherId
|
||||
? {
|
||||
...watcher,
|
||||
totalImported: watcher.totalImported + (payload.importedDelta ?? 0),
|
||||
totalSeen: watcher.totalSeen + (payload.seenDelta ?? 0),
|
||||
lastError: payload.lastError,
|
||||
updatedAt: nowIso(),
|
||||
}
|
||||
: watcher
|
||||
);
|
||||
await writeDb(db);
|
||||
emitWatchersList((db.watchers ?? []).map(toListItem));
|
||||
emitWatcherItems(await getWatcherItems());
|
||||
await refreshWatcherSummary();
|
||||
};
|
||||
|
||||
export const listTrackers = () => trackerRegistry;
|
||||
|
||||
export const listWatchers = async () => {
|
||||
const db = await readDb();
|
||||
return (db.watchers ?? []).map(toListItem);
|
||||
};
|
||||
|
||||
export const deleteWatcher = async (watcherId: string) => {
|
||||
const db = await readDb();
|
||||
const watcher = (db.watchers ?? []).find((entry) => entry.id === watcherId);
|
||||
if (!watcher) {
|
||||
throw new Error("Watcher not found");
|
||||
}
|
||||
db.watchers = (db.watchers ?? []).filter((entry) => entry.id !== watcherId);
|
||||
db.watcherItems = (db.watcherItems ?? []).filter((item) => item.watcherId !== watcherId);
|
||||
db.watcherRuns = (db.watcherRuns ?? []).filter((run) => run.watcherId !== watcherId);
|
||||
await writeDb(db);
|
||||
emitWatchersList((db.watchers ?? []).map(toListItem));
|
||||
emitWatcherItems(await getWatcherItems());
|
||||
await refreshWatcherSummary();
|
||||
return { ok: true };
|
||||
};
|
||||
|
||||
export const createWatcher = async (payload: {
|
||||
tracker: string;
|
||||
cookie: string;
|
||||
intervalMinutes: number;
|
||||
category?: string;
|
||||
enabled?: boolean;
|
||||
}) => {
|
||||
const tracker = getTrackerDefinition(payload.tracker);
|
||||
if (!tracker) {
|
||||
throw new Error("Unsupported tracker");
|
||||
}
|
||||
if (!payload.cookie.trim()) {
|
||||
throw new Error("Cookie is required");
|
||||
}
|
||||
const db = await readDb();
|
||||
const watcher: Watcher = {
|
||||
id: randomUUID(),
|
||||
tracker: tracker.key,
|
||||
trackerLabel: tracker.label,
|
||||
category: payload.category?.trim() || undefined,
|
||||
cookieEncrypted: encryptWatcherCookie(payload.cookie.trim()),
|
||||
cookieHint: buildCookieHint(payload.cookie),
|
||||
intervalMinutes: payload.intervalMinutes,
|
||||
enabled: payload.enabled ?? true,
|
||||
nextRunAt: computeNextRunAt(payload.intervalMinutes),
|
||||
totalImported: 0,
|
||||
totalSeen: 0,
|
||||
createdAt: nowIso(),
|
||||
updatedAt: nowIso(),
|
||||
};
|
||||
db.watchers = [watcher, ...(db.watchers ?? [])];
|
||||
await writeDb(db);
|
||||
emitWatchersList((db.watchers ?? []).map(toListItem));
|
||||
await refreshWatcherSummary();
|
||||
return toListItem(watcher);
|
||||
};
|
||||
|
||||
export const updateWatcher = async (
|
||||
watcherId: string,
|
||||
payload: { cookie?: string; intervalMinutes?: number; category?: string; enabled?: boolean }
|
||||
) => {
|
||||
const updated = await persistWatcher(watcherId, (watcher) => {
|
||||
const next: Watcher = {
|
||||
...watcher,
|
||||
updatedAt: nowIso(),
|
||||
};
|
||||
if (typeof payload.intervalMinutes === "number") {
|
||||
next.intervalMinutes = payload.intervalMinutes;
|
||||
next.nextRunAt = computeNextRunAt(payload.intervalMinutes);
|
||||
}
|
||||
if (typeof payload.enabled === "boolean") {
|
||||
next.enabled = payload.enabled;
|
||||
}
|
||||
if (typeof payload.category === "string") {
|
||||
next.category = payload.category.trim() || undefined;
|
||||
}
|
||||
if (payload.cookie && payload.cookie.trim()) {
|
||||
next.cookieEncrypted = encryptWatcherCookie(payload.cookie.trim());
|
||||
next.cookieHint = buildCookieHint(payload.cookie);
|
||||
}
|
||||
return next;
|
||||
});
|
||||
emitWatchersList((await readDb()).watchers?.map(toListItem) ?? []);
|
||||
await refreshWatcherSummary();
|
||||
return toListItem(updated);
|
||||
};
|
||||
|
||||
export const getWatcherSummary = async (): Promise<WatcherSummaryResponse> => {
|
||||
const summary = await refreshWatcherSummary();
|
||||
const watchers = await listWatchers();
|
||||
return {
|
||||
...summary,
|
||||
watchers: watchers.map((watcher) => ({
|
||||
id: watcher.id,
|
||||
tracker: watcher.tracker,
|
||||
trackerLabel: watcher.trackerLabel,
|
||||
enabled: watcher.enabled,
|
||||
lastRunAt: watcher.lastRunAt,
|
||||
lastSuccessAt: watcher.lastSuccessAt,
|
||||
nextRunAt: watcher.nextRunAt,
|
||||
totalImported: watcher.totalImported,
|
||||
})),
|
||||
};
|
||||
};
|
||||
|
||||
export const getWatcherItems = async () => {
|
||||
const db = await readDb();
|
||||
const torrents = await getQbitClient().getTorrentsInfo().catch(() => []);
|
||||
return (db.watcherItems ?? []).map((item) => mergeQbitState(item, torrents));
|
||||
};
|
||||
|
||||
export const getQbitCategories = async () => {
|
||||
const qbit = getQbitClient();
|
||||
const categories = await qbit.getCategories().catch(async () => {
|
||||
const torrents = await qbit.getTorrentsInfo();
|
||||
return torrents.reduce<Record<string, { name?: string }>>((acc, torrent) => {
|
||||
if (torrent.category?.trim()) {
|
||||
acc[torrent.category] = { name: torrent.category };
|
||||
}
|
||||
return acc;
|
||||
}, {});
|
||||
});
|
||||
return Object.keys(categories).sort((a, b) => a.localeCompare(b, "tr"));
|
||||
};
|
||||
|
||||
export const fetchWatcherImage = async (watcherId: string, imageUrl: string) => {
|
||||
const db = await readDb();
|
||||
const watcher = (db.watchers ?? []).find((entry) => entry.id === watcherId);
|
||||
if (!watcher) {
|
||||
throw new Error("Watcher not found");
|
||||
}
|
||||
const cookie = toCookieHeader(
|
||||
decryptWatcherCookie(watcher.cookieEncrypted),
|
||||
imageUrl
|
||||
);
|
||||
const response = await axios.get<ArrayBuffer>(imageUrl, {
|
||||
responseType: "arraybuffer",
|
||||
headers: {
|
||||
...(cookie ? { Cookie: cookie } : {}),
|
||||
Referer: "https://www.happyfappy.net/",
|
||||
"User-Agent":
|
||||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36",
|
||||
},
|
||||
});
|
||||
return {
|
||||
contentType: response.headers["content-type"] || "image/jpeg",
|
||||
data: Buffer.from(response.data),
|
||||
};
|
||||
};
|
||||
|
||||
const recordRun = async (run: WatcherRun) => {
|
||||
const db = await readDb();
|
||||
db.watcherRuns = [run, ...(db.watcherRuns ?? [])].slice(0, MAX_WATCHER_RUNS);
|
||||
await writeDb(db);
|
||||
};
|
||||
|
||||
const updateRun = async (runId: string, patch: Partial<WatcherRun>) => {
|
||||
const db = await readDb();
|
||||
const runs = db.watcherRuns ?? [];
|
||||
const index = runs.findIndex((run) => run.id === runId);
|
||||
if (index >= 0) {
|
||||
runs[index] = { ...runs[index], ...patch };
|
||||
db.watcherRuns = runs;
|
||||
await writeDb(db);
|
||||
}
|
||||
};
|
||||
|
||||
const findExistingItem = (items: WatcherItem[], watcherId: string, sourceKey: string) => {
|
||||
return items.find((item) => item.watcherId === watcherId && item.sourceKey === sourceKey);
|
||||
};
|
||||
|
||||
const verifyQbitImport = async (
|
||||
torrentPath: string,
|
||||
bookmark: BookmarkRecord,
|
||||
options: { category?: string } = {}
|
||||
) => {
|
||||
const qbit = getQbitClient();
|
||||
const fs = await import("node:fs/promises");
|
||||
const { default: parseTorrent } = await import("parse-torrent");
|
||||
const parsed = parseTorrent(await fs.readFile(torrentPath));
|
||||
const expectedHash = String(parsed.infoHash ?? "").toLowerCase();
|
||||
await qbit.addTorrentByFile(
|
||||
torrentPath,
|
||||
options.category ? { category: options.category } : {}
|
||||
);
|
||||
let lastSeen: QbitTorrentInfo | undefined;
|
||||
for (let attempt = 0; attempt < QBIT_VERIFY_ATTEMPTS; attempt += 1) {
|
||||
const torrents = await qbit.getTorrentsInfo();
|
||||
lastSeen = torrents.find((torrent) => {
|
||||
const sameHash = Boolean(expectedHash) && torrent.hash.toLowerCase() === expectedHash;
|
||||
const sameName =
|
||||
torrent.name === path.basename(torrentPath, ".torrent") || torrent.name === bookmark.title;
|
||||
return sameHash || sameName;
|
||||
});
|
||||
if (lastSeen) {
|
||||
return {
|
||||
ok: true,
|
||||
duplicate: false,
|
||||
torrent: lastSeen,
|
||||
};
|
||||
}
|
||||
await delay(QBIT_VERIFY_DELAY_MS);
|
||||
}
|
||||
const torrents = await qbit.getTorrentsInfo();
|
||||
const duplicate = torrents.find(
|
||||
(torrent) =>
|
||||
(Boolean(expectedHash) && torrent.hash.toLowerCase() === expectedHash) ||
|
||||
torrent.name === bookmark.title
|
||||
);
|
||||
if (duplicate) {
|
||||
return {
|
||||
ok: true,
|
||||
duplicate: true,
|
||||
torrent: duplicate,
|
||||
};
|
||||
}
|
||||
return {
|
||||
ok: false,
|
||||
duplicate: false,
|
||||
torrent: lastSeen,
|
||||
};
|
||||
};
|
||||
|
||||
const processBookmark = async (watcher: Watcher, bookmark: BookmarkRecord) => {
|
||||
const tracker = getTrackerDefinition(watcher.tracker);
|
||||
if (!tracker) {
|
||||
throw new Error("Unsupported tracker");
|
||||
}
|
||||
const runPaths = await createScraperRunPaths(watcher.id);
|
||||
const itemId = randomUUID();
|
||||
const seenAt = nowIso();
|
||||
let item: WatcherItem = {
|
||||
id: itemId,
|
||||
watcherId: watcher.id,
|
||||
tracker: watcher.tracker,
|
||||
trackerLabel: watcher.trackerLabel,
|
||||
sourceKey: bookmark.pageURL,
|
||||
pageUrl: bookmark.pageURL,
|
||||
title: bookmark.title,
|
||||
imageUrl: normalizeImageUrl(bookmark.pageURL, bookmark.backgroundImage),
|
||||
status: "downloading_torrent",
|
||||
statusLabel: statusLabel("downloading_torrent"),
|
||||
trackerTorrentId: deriveTrackerTorrentId(bookmark.pageURL),
|
||||
seenAt,
|
||||
lastSyncAt: seenAt,
|
||||
};
|
||||
try {
|
||||
const cookie = decryptWatcherCookie(watcher.cookieEncrypted);
|
||||
const torrentPath = await runTorrentDownload(
|
||||
tracker.cliSiteKey,
|
||||
cookie,
|
||||
bookmark.pageURL,
|
||||
runPaths.torrentDir
|
||||
);
|
||||
item = {
|
||||
...item,
|
||||
status: "sending_to_qbit",
|
||||
statusLabel: statusLabel("sending_to_qbit"),
|
||||
downloadedAt: nowIso(),
|
||||
lastSyncAt: nowIso(),
|
||||
};
|
||||
const qbitResult = await verifyQbitImport(torrentPath, bookmark, {
|
||||
category: watcher.category,
|
||||
});
|
||||
if (!qbitResult.ok || !qbitResult.torrent) {
|
||||
throw new Error("Torrent qBittorrent listesinde dogrulanamadi");
|
||||
}
|
||||
item = {
|
||||
...item,
|
||||
status: deriveLiveStatus("sent_to_qbit", qbitResult.torrent),
|
||||
statusLabel: statusLabel(deriveLiveStatus("sent_to_qbit", qbitResult.torrent)),
|
||||
importedAt: nowIso(),
|
||||
qbitHash: qbitResult.torrent.hash,
|
||||
qbitName: qbitResult.torrent.name,
|
||||
qbitProgress: qbitResult.torrent.progress,
|
||||
qbitState: qbitResult.torrent.state,
|
||||
qbitCategory: qbitResult.torrent.category,
|
||||
sizeBytes: qbitResult.torrent.size,
|
||||
seeders: qbitResult.torrent.num_seeds,
|
||||
leechers: qbitResult.torrent.num_leechs,
|
||||
lastSyncAt: nowIso(),
|
||||
};
|
||||
return item;
|
||||
} finally {
|
||||
await cleanupRunPaths(runPaths);
|
||||
}
|
||||
};
|
||||
|
||||
export const runWatcherById = async (watcherId: string) => {
|
||||
if (activeWatcherRuns.has(watcherId)) {
|
||||
logger.info({ watcherId }, "Watcher run skipped because another run is already active");
|
||||
return;
|
||||
}
|
||||
activeWatcherRuns.add(watcherId);
|
||||
const db = await readDb();
|
||||
const watcher = (db.watchers ?? []).find((entry) => entry.id === watcherId);
|
||||
if (!watcher || !watcher.enabled) {
|
||||
logger.info({ watcherId }, "Watcher skipped because it is missing or disabled");
|
||||
activeWatcherRuns.delete(watcherId);
|
||||
return;
|
||||
}
|
||||
const tracker = getTrackerDefinition(watcher.tracker);
|
||||
if (!tracker) {
|
||||
throw new Error("Unsupported tracker");
|
||||
}
|
||||
logger.info({ watcherId, tracker: watcher.tracker, nextRunAt: watcher.nextRunAt }, "Watcher run started");
|
||||
const runId = randomUUID();
|
||||
await recordRun({
|
||||
id: runId,
|
||||
watcherId: watcher.id,
|
||||
startedAt: nowIso(),
|
||||
status: "RUNNING",
|
||||
newBookmarks: 0,
|
||||
importedCount: 0,
|
||||
failedCount: 0,
|
||||
});
|
||||
const runPaths = await createScraperRunPaths(watcher.id);
|
||||
let newBookmarks = 0;
|
||||
let importedCount = 0;
|
||||
let failedCount = 0;
|
||||
try {
|
||||
await persistWatcher(watcher.id, (current) => ({
|
||||
...current,
|
||||
lastRunAt: nowIso(),
|
||||
lastError: undefined,
|
||||
nextRunAt: computeNextRunAt(current.intervalMinutes),
|
||||
updatedAt: nowIso(),
|
||||
}));
|
||||
const cookie = decryptWatcherCookie(watcher.cookieEncrypted);
|
||||
const records = await runBookmarkFetch(
|
||||
tracker.cliSiteKey,
|
||||
cookie
|
||||
);
|
||||
logger.info({ watcherId, count: records.length }, "Watcher bookmarks fetched");
|
||||
newBookmarks = records.length;
|
||||
const freshDb = await readDb();
|
||||
const items = freshDb.watcherItems ?? [];
|
||||
const processedSourceKeys = new Set(items.map((item) => item.sourceKey));
|
||||
for (const bookmark of records) {
|
||||
if (processedSourceKeys.has(bookmark.pageURL)) {
|
||||
logger.info({ watcherId, sourceKey: bookmark.pageURL }, "Watcher bookmark skipped due to dedupe set");
|
||||
continue;
|
||||
}
|
||||
const existing = findExistingItem(items, watcher.id, bookmark.pageURL);
|
||||
if (existing) {
|
||||
existing.lastSyncAt = nowIso();
|
||||
await persistWatcherProgress({
|
||||
watcherId: watcher.id,
|
||||
item: existing,
|
||||
});
|
||||
processedSourceKeys.add(bookmark.pageURL);
|
||||
logger.info({ watcherId, sourceKey: bookmark.pageURL }, "Watcher bookmark already processed");
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
const nextItem = await processBookmark(watcher, bookmark);
|
||||
importedCount += 1;
|
||||
await persistWatcherProgress({
|
||||
watcherId: watcher.id,
|
||||
item: nextItem,
|
||||
importedDelta: 1,
|
||||
seenDelta: 1,
|
||||
});
|
||||
processedSourceKeys.add(bookmark.pageURL);
|
||||
logger.info(
|
||||
{
|
||||
watcherId,
|
||||
sourceKey: bookmark.pageURL,
|
||||
qbitHash: nextItem.qbitHash,
|
||||
status: nextItem.status,
|
||||
},
|
||||
"Watcher bookmark imported"
|
||||
);
|
||||
} catch (error) {
|
||||
failedCount += 1;
|
||||
logger.error(
|
||||
{
|
||||
watcherId,
|
||||
sourceKey: bookmark.pageURL,
|
||||
error: error instanceof Error ? error.message : error,
|
||||
},
|
||||
"Watcher bookmark import failed"
|
||||
);
|
||||
const failedItem: WatcherItem = {
|
||||
id: randomUUID(),
|
||||
watcherId: watcher.id,
|
||||
tracker: watcher.tracker,
|
||||
trackerLabel: watcher.trackerLabel,
|
||||
sourceKey: bookmark.pageURL,
|
||||
pageUrl: bookmark.pageURL,
|
||||
title: bookmark.title,
|
||||
imageUrl: normalizeImageUrl(bookmark.pageURL, bookmark.backgroundImage),
|
||||
status: "failed",
|
||||
statusLabel: statusLabel("failed"),
|
||||
trackerTorrentId: deriveTrackerTorrentId(bookmark.pageURL),
|
||||
seenAt: nowIso(),
|
||||
lastSyncAt: nowIso(),
|
||||
errorMessage: error instanceof Error ? error.message : "Unknown watcher error",
|
||||
};
|
||||
await persistWatcherProgress({
|
||||
watcherId: watcher.id,
|
||||
item: failedItem,
|
||||
seenDelta: 1,
|
||||
lastError: failedItem.errorMessage,
|
||||
});
|
||||
processedSourceKeys.add(bookmark.pageURL);
|
||||
}
|
||||
}
|
||||
const finalDb = await readDb();
|
||||
finalDb.watchers = (finalDb.watchers ?? []).map((entry) =>
|
||||
entry.id === watcher.id
|
||||
? {
|
||||
...entry,
|
||||
lastSuccessAt: nowIso(),
|
||||
lastError: failedCount > 0 ? `${failedCount} item hata verdi` : undefined,
|
||||
updatedAt: nowIso(),
|
||||
}
|
||||
: entry
|
||||
);
|
||||
await writeDb(finalDb);
|
||||
await refreshWatcherSummary();
|
||||
logger.info({ watcherId, importedCount, failedCount }, "Watcher run completed");
|
||||
await updateRun(runId, {
|
||||
finishedAt: nowIso(),
|
||||
status: failedCount > 0 ? "FAILED" : "SUCCESS",
|
||||
newBookmarks,
|
||||
importedCount,
|
||||
failedCount,
|
||||
message: failedCount > 0 ? `${failedCount} item hata verdi` : `${importedCount} item qBittorrent'a gonderildi`,
|
||||
});
|
||||
await appendAuditLog({
|
||||
level: failedCount > 0 ? "WARN" : "INFO",
|
||||
event: "ARCHIVE_SUCCESS",
|
||||
message: `Watcher ${watcher.trackerLabel}: ${importedCount} imported, ${failedCount} failed`,
|
||||
});
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : "Watcher run failed";
|
||||
await persistWatcher(watcher.id, (current) => ({
|
||||
...current,
|
||||
lastError: message,
|
||||
updatedAt: nowIso(),
|
||||
}));
|
||||
await updateRun(runId, {
|
||||
finishedAt: nowIso(),
|
||||
status: "FAILED",
|
||||
newBookmarks,
|
||||
importedCount,
|
||||
failedCount: failedCount + 1,
|
||||
message,
|
||||
});
|
||||
logger.error(
|
||||
{
|
||||
watcherId,
|
||||
message,
|
||||
stack: error instanceof Error ? error.stack : undefined,
|
||||
error: error instanceof Error ? { name: error.name, message: error.message } : error,
|
||||
},
|
||||
"Watcher run failed"
|
||||
);
|
||||
} finally {
|
||||
await cleanupRunPaths(runPaths);
|
||||
activeWatcherRuns.delete(watcherId);
|
||||
}
|
||||
};
|
||||
|
||||
export const runDueWatchers = async () => {
|
||||
const db = await readDb();
|
||||
const dueWatchers = (db.watchers ?? []).filter((watcher) => {
|
||||
if (!watcher.enabled) {
|
||||
return false;
|
||||
}
|
||||
if (!watcher.nextRunAt) {
|
||||
return true;
|
||||
}
|
||||
return new Date(watcher.nextRunAt).getTime() <= Date.now();
|
||||
});
|
||||
logger.info({ watcherIds: dueWatchers.map((watcher) => watcher.id) }, "Watcher due check completed");
|
||||
return dueWatchers.map((watcher) => watcher.id);
|
||||
};
|
||||
35
apps/server/src/watcher/watcher.types.ts
Normal file
35
apps/server/src/watcher/watcher.types.ts
Normal file
@@ -0,0 +1,35 @@
|
||||
import { Watcher, WatcherItem, WatcherSummary, WatcherTracker } from "../types";
|
||||
|
||||
export interface TrackerDefinition {
|
||||
key: WatcherTracker;
|
||||
label: string;
|
||||
cliSiteKey: string;
|
||||
supportsRemoveBookmark: boolean;
|
||||
}
|
||||
|
||||
export interface BookmarkRecord {
|
||||
pageURL: string;
|
||||
isVR?: boolean;
|
||||
title: string;
|
||||
backgroundImage?: string | null;
|
||||
}
|
||||
|
||||
export interface ScraperRunPaths {
|
||||
runDir: string;
|
||||
cookiesPath: string;
|
||||
bookmarksPath: string;
|
||||
torrentDir: string;
|
||||
}
|
||||
|
||||
export interface WatcherListItem extends Omit<Watcher, "cookieEncrypted"> {
|
||||
hasCookie: boolean;
|
||||
}
|
||||
|
||||
export interface EnrichedWatcherItem extends WatcherItem {
|
||||
progress: number;
|
||||
state?: string;
|
||||
}
|
||||
|
||||
export interface WatcherSummaryResponse extends WatcherSummary {
|
||||
watchers: Array<Pick<Watcher, "id" | "tracker" | "trackerLabel" | "enabled" | "lastRunAt" | "lastSuccessAt" | "nextRunAt" | "totalImported">>;
|
||||
}
|
||||
34
apps/server/src/watcher/watcher.worker.ts
Normal file
34
apps/server/src/watcher/watcher.worker.ts
Normal file
@@ -0,0 +1,34 @@
|
||||
import { config } from "../config";
|
||||
import { logger } from "../utils/logger";
|
||||
import { runDueWatchers, runWatcherById } from "./watcher.service";
|
||||
|
||||
const activeRuns = new Set<string>();
|
||||
|
||||
export const startWatcherWorker = (intervalMs: number) => {
|
||||
if (!config.watcherEnabled) {
|
||||
logger.info("Watcher worker disabled");
|
||||
return;
|
||||
}
|
||||
setInterval(async () => {
|
||||
try {
|
||||
const dueWatcherIds = await runDueWatchers();
|
||||
logger.info({ count: dueWatcherIds.length, watcherIds: dueWatcherIds }, "Watcher worker tick");
|
||||
for (const watcherId of dueWatcherIds) {
|
||||
if (activeRuns.has(watcherId)) {
|
||||
logger.info({ watcherId }, "Watcher run skipped because another run is active");
|
||||
continue;
|
||||
}
|
||||
activeRuns.add(watcherId);
|
||||
runWatcherById(watcherId)
|
||||
.catch((error) => {
|
||||
logger.error({ error, watcherId }, "Unhandled watcher run failure");
|
||||
})
|
||||
.finally(() => {
|
||||
activeRuns.delete(watcherId);
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error({ error }, "Watcher worker tick failed");
|
||||
}
|
||||
}, intervalMs);
|
||||
};
|
||||
Reference in New Issue
Block a user