import axios from "axios"; import { getQbitCapabilities, getQbitClient } from "../qbit/qbit.context"; import { readDb, writeDb } from "../storage/jsondb"; import { nowIso } from "../utils/time"; import { emitJobLog, emitJobMetrics } from "../realtime/emitter"; import { appendAuditLog } from "../utils/logger"; import { appendLoopLog } from "../storage/loopLogs"; const peerErrorThrottle = new Map(); export const startEnforcementWorker = (intervalMs: number) => { setInterval(async () => { let db; try { const qbit = getQbitClient(); const caps = getQbitCapabilities(); db = await readDb(); for (const job of db.loopJobs) { try { if (job.status !== "RUNNING") { continue; } if (!caps) { continue; } if (!caps?.hasPeersEndpoint) { emitJobLog({ jobId: job.id, level: "WARN", message: "Peer listing unsupported; enforcement disabled", createdAt: nowIso(), }); await appendLoopLog({ jobId: job.id, level: "WARN", message: "Peer listing unsupported; enforcement disabled", createdAt: nowIso(), }); continue; } let peersResponse; try { peersResponse = await qbit.getTorrentPeers(job.torrentHash); } catch (error) { const status = axios.isAxiosError(error) ? error.response?.status : undefined; if (status === 404) { const lastWarn = peerErrorThrottle.get(job.id) ?? 0; if (Date.now() - lastWarn > 60_000) { emitJobLog({ jobId: job.id, level: "WARN", message: "Peer listesi desteklenmiyor; enforcement devre dışı.", createdAt: nowIso(), }); await appendLoopLog({ jobId: job.id, level: "WARN", message: "Peer listesi desteklenmiyor; enforcement devre dışı.", createdAt: nowIso(), }); peerErrorThrottle.set(job.id, Date.now()); } continue; } throw error; } const peers = Object.values(peersResponse.peers || {}); let allowIpConnected = false; const banned: string[] = []; for (const peer of peers) { if (peer.ip === job.allowIp) { allowIpConnected = true; continue; } if (caps?.hasBanEndpoint) { const peerKey = `${peer.ip}:${peer.port}`; banned.push(peerKey); } } if (banned.length > 0 && caps?.hasBanEndpoint) { await qbit.banPeers(banned); job.bans.bannedIps.push(...banned.map((peer) => peer.split(":")[0])); job.bans.lastBanAt = nowIso(); emitJobLog({ jobId: job.id, level: "WARN", message: `Banned ${banned.length} peers`, createdAt: nowIso(), }); await appendLoopLog({ jobId: job.id, level: "WARN", message: `Banned ${banned.length} peers`, createdAt: nowIso(), }); await appendAuditLog({ level: "WARN", event: "PEER_BANNED", message: `Job ${job.id}: banned ${banned.length} peers`, }); } if (!caps?.hasBanEndpoint) { emitJobLog({ jobId: job.id, level: "WARN", message: "Peer ban unsupported; warn-only enforcement", createdAt: nowIso(), }); await appendLoopLog({ jobId: job.id, level: "WARN", message: "Peer ban unsupported; warn-only enforcement", createdAt: nowIso(), }); } if (!allowIpConnected) { emitJobLog({ jobId: job.id, level: "WARN", message: "Allowed IP not connected", createdAt: nowIso(), }); await appendLoopLog({ jobId: job.id, level: "WARN", message: "Allowed IP not connected", createdAt: nowIso(), }); } job.updatedAt = nowIso(); emitJobMetrics(job); } catch (error) { emitJobLog({ jobId: job.id, level: "ERROR", message: "Enforcement error; continuing.", createdAt: nowIso(), }); await appendLoopLog({ jobId: job.id, level: "ERROR", message: "Enforcement error; continuing.", createdAt: nowIso(), }); } } await writeDb(db); } catch (error) { // Keep worker alive on errors. } }, intervalMs); };