first commit
This commit is contained in:
219
apps/server/src/loop/loop.engine.ts
Normal file
219
apps/server/src/loop/loop.engine.ts
Normal file
@@ -0,0 +1,219 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { QbitClient } from "../qbit/qbit.client";
|
||||
import { readDb, writeDb } from "../storage/jsondb";
|
||||
import { LoopJob } from "../types";
|
||||
import { nowIso } from "../utils/time";
|
||||
import { appendAuditLog, logger } from "../utils/logger";
|
||||
import { emitJobLog, emitJobMetrics } from "../realtime/emitter";
|
||||
import { config } from "../config";
|
||||
|
||||
const logJob = async (jobId: string, level: "INFO" | "WARN" | "ERROR", message: string, event?: string) => {
|
||||
const createdAt = nowIso();
|
||||
emitJobLog({ jobId, level, message, createdAt });
|
||||
if (event) {
|
||||
await appendAuditLog({ level, event: event as any, message });
|
||||
}
|
||||
};
|
||||
|
||||
export const createLoopJob = async (
|
||||
input: {
|
||||
torrentHash: string;
|
||||
name: string;
|
||||
sizeBytes: number;
|
||||
magnet?: string;
|
||||
torrentFilePath?: string;
|
||||
allowIp: string;
|
||||
targetLoops: number;
|
||||
delayMs: number;
|
||||
}
|
||||
): Promise<LoopJob> => {
|
||||
const now = nowIso();
|
||||
const job: LoopJob = {
|
||||
id: randomUUID(),
|
||||
torrentHash: input.torrentHash,
|
||||
name: input.name,
|
||||
sizeBytes: input.sizeBytes,
|
||||
magnet: input.magnet,
|
||||
torrentFilePath: input.torrentFilePath,
|
||||
allowIp: input.allowIp,
|
||||
targetLoops: input.targetLoops,
|
||||
doneLoops: 0,
|
||||
delayMs: input.delayMs,
|
||||
deleteDataBetweenLoops: true,
|
||||
enforcementMode: "aggressive-soft",
|
||||
status: "RUNNING",
|
||||
currentRun: {
|
||||
startedAt: now,
|
||||
lastProgress: 0,
|
||||
lastProgressAt: now,
|
||||
downloadedThisRunBytes: 0,
|
||||
avgSpeed: 0,
|
||||
},
|
||||
totals: {
|
||||
totalDownloadedBytes: 0,
|
||||
totalTimeMs: 0,
|
||||
},
|
||||
bans: {
|
||||
bannedIps: [],
|
||||
},
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
};
|
||||
const db = await readDb();
|
||||
db.loopJobs.push(job);
|
||||
await writeDb(db);
|
||||
await logJob(job.id, "INFO", `Loop job started for ${job.name}`, "JOB_STARTED");
|
||||
return job;
|
||||
};
|
||||
|
||||
export const stopLoopJob = async (jobId: string) => {
|
||||
const db = await readDb();
|
||||
const job = db.loopJobs.find((j) => j.id === jobId);
|
||||
if (!job) {
|
||||
return null;
|
||||
}
|
||||
job.status = "STOPPED";
|
||||
job.nextRunAt = undefined;
|
||||
job.currentRun = undefined;
|
||||
job.updatedAt = nowIso();
|
||||
await writeDb(db);
|
||||
await logJob(job.id, "WARN", "Loop job stopped by user");
|
||||
return job;
|
||||
};
|
||||
|
||||
export const updateJob = async (job: LoopJob) => {
|
||||
const db = await readDb();
|
||||
const index = db.loopJobs.findIndex((j) => j.id === job.id);
|
||||
if (index === -1) {
|
||||
return null;
|
||||
}
|
||||
db.loopJobs[index] = job;
|
||||
await writeDb(db);
|
||||
emitJobMetrics(job);
|
||||
return job;
|
||||
};
|
||||
|
||||
export const tickLoopJobs = async (
|
||||
qbit: QbitClient,
|
||||
torrents: { hash: string; progress: number; state: string; dlspeed: number }[]
|
||||
) => {
|
||||
const db = await readDb();
|
||||
let changed = false;
|
||||
|
||||
for (const job of db.loopJobs) {
|
||||
if (job.status === "RUNNING") {
|
||||
const torrent = torrents.find((t) => t.hash === job.torrentHash);
|
||||
if (!torrent) {
|
||||
try {
|
||||
if (job.torrentFilePath) {
|
||||
await qbit.addTorrentByFile(job.torrentFilePath);
|
||||
} else if (job.magnet) {
|
||||
await qbit.addTorrentByMagnet(job.magnet);
|
||||
}
|
||||
await logJob(job.id, "WARN", "Torrent missing, re-added", "JOB_RESTARTED");
|
||||
} catch (error) {
|
||||
job.status = "ERROR";
|
||||
job.lastError = "Failed to re-add torrent";
|
||||
await logJob(job.id, "ERROR", job.lastError);
|
||||
}
|
||||
job.updatedAt = nowIso();
|
||||
changed = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
job.currentRun = job.currentRun ?? {
|
||||
startedAt: nowIso(),
|
||||
lastProgress: 0,
|
||||
lastProgressAt: nowIso(),
|
||||
downloadedThisRunBytes: 0,
|
||||
avgSpeed: 0,
|
||||
};
|
||||
if (torrent.progress > job.currentRun.lastProgress) {
|
||||
job.currentRun.lastProgress = torrent.progress;
|
||||
job.currentRun.lastProgressAt = nowIso();
|
||||
job.currentRun.stalledSince = undefined;
|
||||
}
|
||||
job.currentRun.avgSpeed = torrent.dlspeed;
|
||||
job.updatedAt = nowIso();
|
||||
|
||||
const stalledState = /stalled|meta/i.test(torrent.state);
|
||||
if (stalledState) {
|
||||
if (!job.currentRun.stalledSince) {
|
||||
job.currentRun.stalledSince = nowIso();
|
||||
}
|
||||
const lastProgressAt = job.currentRun.lastProgressAt
|
||||
? new Date(job.currentRun.lastProgressAt).getTime()
|
||||
: 0;
|
||||
if (Date.now() - lastProgressAt > config.stalledRecoveryMs) {
|
||||
await logJob(
|
||||
job.id,
|
||||
"WARN",
|
||||
"Stalled recovery: torrent will be removed and re-added"
|
||||
);
|
||||
try {
|
||||
await qbit.deleteTorrent(job.torrentHash, true);
|
||||
} catch (error) {
|
||||
logger.error({ error }, "Failed to delete stalled torrent");
|
||||
}
|
||||
job.status = "WAITING_DELAY";
|
||||
job.nextRunAt = new Date(Date.now() + job.delayMs).toISOString();
|
||||
job.updatedAt = nowIso();
|
||||
changed = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (torrent.progress >= 1 && /UP|uploading|stalledUP|pausedUP|queuedUP/i.test(torrent.state)) {
|
||||
job.doneLoops += 1;
|
||||
job.totals.totalDownloadedBytes += job.sizeBytes;
|
||||
await logJob(job.id, "INFO", `Loop ${job.doneLoops} completed`, "JOB_COMPLETED_LOOP");
|
||||
try {
|
||||
await qbit.deleteTorrent(job.torrentHash, true);
|
||||
} catch (error) {
|
||||
logger.error({ error }, "Failed to delete torrent");
|
||||
}
|
||||
if (job.doneLoops >= job.targetLoops) {
|
||||
job.status = "COMPLETED";
|
||||
await logJob(job.id, "INFO", "All loops completed", "JOB_COMPLETED_ALL");
|
||||
} else {
|
||||
job.status = "WAITING_DELAY";
|
||||
job.nextRunAt = new Date(Date.now() + job.delayMs).toISOString();
|
||||
}
|
||||
}
|
||||
changed = true;
|
||||
}
|
||||
|
||||
if (job.status === "WAITING_DELAY") {
|
||||
const nextRunAt = job.nextRunAt ? new Date(job.nextRunAt).getTime() : 0;
|
||||
if (Date.now() >= nextRunAt) {
|
||||
try {
|
||||
if (job.torrentFilePath) {
|
||||
await qbit.addTorrentByFile(job.torrentFilePath);
|
||||
} else if (job.magnet) {
|
||||
await qbit.addTorrentByMagnet(job.magnet);
|
||||
}
|
||||
job.status = "RUNNING";
|
||||
job.currentRun = {
|
||||
startedAt: nowIso(),
|
||||
lastProgress: 0,
|
||||
lastProgressAt: nowIso(),
|
||||
downloadedThisRunBytes: 0,
|
||||
avgSpeed: 0,
|
||||
};
|
||||
await logJob(job.id, "INFO", "Loop restarted", "JOB_RESTARTED");
|
||||
} catch (error) {
|
||||
job.status = "ERROR";
|
||||
job.lastError = "Failed to re-add torrent after delay";
|
||||
await logJob(job.id, "ERROR", job.lastError);
|
||||
}
|
||||
job.updatedAt = nowIso();
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (changed) {
|
||||
await writeDb(db);
|
||||
db.loopJobs.forEach((job) => emitJobMetrics(job));
|
||||
}
|
||||
};
|
||||
151
apps/server/src/loop/loop.routes.ts
Normal file
151
apps/server/src/loop/loop.routes.ts
Normal file
@@ -0,0 +1,151 @@
|
||||
import { Router } from "express";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { getQbitCapabilities, getQbitClient } from "../qbit/qbit.context";
|
||||
import { readDb } from "../storage/jsondb";
|
||||
import { createLoopJob, stopLoopJob } from "./loop.engine";
|
||||
import { dryRunSchema, loopStartSchema } from "../utils/validators";
|
||||
import { getArchiveStatus } from "../torrent/torrent.archive";
|
||||
import { config } from "../config";
|
||||
import { setArchiveStatus } from "../torrent/torrent.archive";
|
||||
import { nowIso } from "../utils/time";
|
||||
|
||||
const router = Router();
|
||||
|
||||
router.post("/start", async (req, res) => {
|
||||
const parsed = loopStartSchema.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
return res.status(400).json({ error: parsed.error.flatten() });
|
||||
}
|
||||
const { hash, allowIp, targetLoops, delayMs } = parsed.data;
|
||||
const db = await readDb();
|
||||
if (targetLoops > db.settings.maxLoopLimit) {
|
||||
return res.status(400).json({ error: "Target loops exceed max limit" });
|
||||
}
|
||||
const qbit = getQbitClient();
|
||||
const torrents = await qbit.getTorrentsInfo();
|
||||
const torrent = torrents.find((t) => t.hash === hash);
|
||||
if (!torrent) {
|
||||
return res.status(404).json({ error: "Torrent not found" });
|
||||
}
|
||||
let archive = await getArchiveStatus(hash);
|
||||
if (!archive?.torrentFilePath) {
|
||||
try {
|
||||
const buffer = await qbit.exportTorrent(hash);
|
||||
const targetPath = path.join(config.torrentArchiveDir, `${hash}.torrent`);
|
||||
await fs.writeFile(targetPath, buffer);
|
||||
archive = await setArchiveStatus({
|
||||
hash,
|
||||
status: "READY",
|
||||
torrentFilePath: targetPath,
|
||||
source: "exported",
|
||||
updatedAt: nowIso(),
|
||||
});
|
||||
} catch (error) {
|
||||
return res.status(400).json({
|
||||
error: "Arşiv yok ve export başarısız. Lütfen Advanced bölümünden .torrent yükleyin.",
|
||||
});
|
||||
}
|
||||
}
|
||||
try {
|
||||
await fs.access(archive.torrentFilePath);
|
||||
} catch (error) {
|
||||
return res.status(400).json({
|
||||
error: "Arşiv dosyası bulunamadı. Lütfen tekrar yükleyin.",
|
||||
});
|
||||
}
|
||||
const job = await createLoopJob({
|
||||
torrentHash: hash,
|
||||
name: torrent.name,
|
||||
sizeBytes: torrent.size,
|
||||
magnet: undefined,
|
||||
torrentFilePath: archive?.torrentFilePath,
|
||||
allowIp,
|
||||
targetLoops,
|
||||
delayMs,
|
||||
});
|
||||
res.json(job);
|
||||
});
|
||||
|
||||
router.post("/stop/:jobId", async (req, res) => {
|
||||
const { jobId } = req.params;
|
||||
const job = await stopLoopJob(jobId);
|
||||
if (!job) {
|
||||
return res.status(404).json({ error: "Job not found" });
|
||||
}
|
||||
try {
|
||||
const qbit = getQbitClient();
|
||||
await qbit.deleteTorrent(job.torrentHash, true);
|
||||
} catch (error) {
|
||||
// Best-effort delete
|
||||
}
|
||||
res.json(job);
|
||||
});
|
||||
|
||||
router.post("/stop-by-hash", async (req, res) => {
|
||||
const { hash } = req.body ?? {};
|
||||
if (!hash) {
|
||||
return res.status(400).json({ error: "Missing hash" });
|
||||
}
|
||||
const db = await readDb();
|
||||
const job = db.loopJobs.find((j) => j.torrentHash === hash);
|
||||
if (!job) {
|
||||
return res.status(404).json({ error: "Job not found" });
|
||||
}
|
||||
const stopped = await stopLoopJob(job.id);
|
||||
try {
|
||||
const qbit = getQbitClient();
|
||||
await qbit.deleteTorrent(hash, true);
|
||||
} catch (error) {
|
||||
// Best-effort delete
|
||||
}
|
||||
res.json(stopped);
|
||||
});
|
||||
|
||||
router.post("/dry-run", async (req, res) => {
|
||||
const parsed = dryRunSchema.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
return res.status(400).json({ error: parsed.error.flatten() });
|
||||
}
|
||||
const { hash } = parsed.data;
|
||||
const qbit = getQbitClient();
|
||||
const caps = getQbitCapabilities();
|
||||
const torrents = await qbit.getTorrentsInfo();
|
||||
const torrent = torrents.find((t) => t.hash === hash);
|
||||
if (!torrent) {
|
||||
return res.status(404).json({ error: "Torrent not found" });
|
||||
}
|
||||
const archive = await getArchiveStatus(hash);
|
||||
res.json({
|
||||
ok: true,
|
||||
qbitVersion: caps?.version,
|
||||
capabilities: caps,
|
||||
hasMagnet: Boolean(torrent.magnet_uri),
|
||||
archiveStatus: archive?.status ?? "MISSING",
|
||||
warnings: [
|
||||
caps?.hasPeersEndpoint ? null : "Peer listing unsupported",
|
||||
caps?.hasBanEndpoint ? null : "Peer ban unsupported; warn-only enforcement",
|
||||
torrent.magnet_uri ? null : "Magnet unavailable; upload .torrent recommended",
|
||||
].filter(Boolean),
|
||||
});
|
||||
});
|
||||
|
||||
router.get("/jobs", async (_req, res) => {
|
||||
const db = await readDb();
|
||||
res.json(db.loopJobs);
|
||||
});
|
||||
|
||||
router.get("/job/:jobId", async (req, res) => {
|
||||
const db = await readDb();
|
||||
const job = db.loopJobs.find((j) => j.id === req.params.jobId);
|
||||
if (!job) {
|
||||
return res.status(404).json({ error: "Job not found" });
|
||||
}
|
||||
res.json(job);
|
||||
});
|
||||
|
||||
router.get("/logs/:jobId", async (req, res) => {
|
||||
res.json({ jobId: req.params.jobId, logs: [] });
|
||||
});
|
||||
|
||||
export default router;
|
||||
26
apps/server/src/loop/loop.scheduler.ts
Normal file
26
apps/server/src/loop/loop.scheduler.ts
Normal file
@@ -0,0 +1,26 @@
|
||||
import { QbitClient } from "../qbit/qbit.client"
|
||||
import { tickLoopJobs } from "./loop.engine"
|
||||
import { getStatusSnapshot, refreshJobsStatus, setTorrentsStatus } from "../status/status.service"
|
||||
import { emitStatusUpdate } from "../realtime/emitter"
|
||||
import { logger } from "../utils/logger"
|
||||
|
||||
export const startLoopScheduler = (qbit: QbitClient, intervalMs: number) => {
|
||||
setInterval(async () => {
|
||||
try {
|
||||
const torrents = await qbit.getTorrentsInfo();
|
||||
const transfer = await qbit.getTransferInfo();
|
||||
setTorrentsStatus(torrents, transfer);
|
||||
await tickLoopJobs(qbit, torrents);
|
||||
const jobs = await refreshJobsStatus();
|
||||
const current = await getStatusSnapshot();
|
||||
emitStatusUpdate({
|
||||
qbit: { ...current.qbit, ok: true },
|
||||
torrents,
|
||||
transfer,
|
||||
jobs,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error({ error }, "Loop scheduler tick failed");
|
||||
}
|
||||
}, intervalMs);
|
||||
};
|
||||
12
apps/server/src/loop/loop.types.ts
Normal file
12
apps/server/src/loop/loop.types.ts
Normal file
@@ -0,0 +1,12 @@
|
||||
import { LoopJob } from "../types"
|
||||
|
||||
export interface LoopStartInput {
|
||||
hash: string;
|
||||
allowIp: string;
|
||||
targetLoops: number;
|
||||
delayMs: number;
|
||||
}
|
||||
|
||||
export interface LoopEngineContext {
|
||||
jobs: LoopJob[];
|
||||
}
|
||||
74
apps/server/src/loop/profiles.routes.ts
Normal file
74
apps/server/src/loop/profiles.routes.ts
Normal file
@@ -0,0 +1,74 @@
|
||||
import { Router } from "express";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { readDb, writeDb } from "../storage/jsondb"
|
||||
import { profileSchema } from "../utils/validators"
|
||||
import { nowIso } from "../utils/time"
|
||||
|
||||
const router = Router();
|
||||
|
||||
router.get("/", async (_req, res) => {
|
||||
const db = await readDb();
|
||||
res.json(db.profiles);
|
||||
});
|
||||
|
||||
router.post("/", async (req, res) => {
|
||||
const parsed = profileSchema.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
return res.status(400).json({ error: parsed.error.flatten() });
|
||||
}
|
||||
const db = await readDb();
|
||||
const profile = {
|
||||
id: randomUUID(),
|
||||
createdAt: nowIso(),
|
||||
...parsed.data,
|
||||
};
|
||||
db.profiles.push(profile);
|
||||
await writeDb(db);
|
||||
res.json(profile);
|
||||
});
|
||||
|
||||
router.put("/:profileId", async (req, res) => {
|
||||
const parsed = profileSchema.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
return res.status(400).json({ error: parsed.error.flatten() });
|
||||
}
|
||||
const db = await readDb();
|
||||
const index = db.profiles.findIndex((p) => p.id === req.params.profileId);
|
||||
if (index === -1) {
|
||||
return res.status(404).json({ error: "Profile not found" });
|
||||
}
|
||||
db.profiles[index] = {
|
||||
...db.profiles[index],
|
||||
...parsed.data,
|
||||
};
|
||||
await writeDb(db);
|
||||
res.json(db.profiles[index]);
|
||||
});
|
||||
|
||||
router.delete("/:profileId", async (req, res) => {
|
||||
const db = await readDb();
|
||||
const next = db.profiles.filter((p) => p.id !== req.params.profileId);
|
||||
if (next.length === db.profiles.length) {
|
||||
return res.status(404).json({ error: "Profile not found" });
|
||||
}
|
||||
db.profiles = next;
|
||||
await writeDb(db);
|
||||
res.json({ ok: true });
|
||||
});
|
||||
|
||||
router.post("/apply", async (req, res) => {
|
||||
const { profileId, hash } = req.body ?? {};
|
||||
const db = await readDb();
|
||||
const profile = db.profiles.find((p) => p.id === profileId);
|
||||
if (!profile) {
|
||||
return res.status(404).json({ error: "Profile not found" });
|
||||
}
|
||||
res.json({
|
||||
hash,
|
||||
allowIp: profile.allowIp,
|
||||
delayMs: profile.delayMs,
|
||||
targetLoops: profile.targetLoops,
|
||||
});
|
||||
});
|
||||
|
||||
export default router;
|
||||
Reference in New Issue
Block a user