feat: altyazı otomasyon sistemi MVP'sini ekle

Docker tabanlı mikro servis mimarisi ile altyazı otomasyon sistemi altyapısı kuruldu.

- Core (Node.js): Chokidar dosya izleyici, BullMQ iş kuyrukları, ffprobe medya analizi, MongoDB entegrasyonu ve dosya yazma işlemleri.
- API (Fastify): Mock sağlayıcılar, arşiv güvenliği (zip-slip), altyazı doğrulama, puanlama ve aday seçim motoru.
- UI (React/Vite): İş yönetimi paneli, canlı SSE log akışı, manuel inceleme arayüzü ve sistem ayarları.
- Altyapı: Docker Compose (dev/prod), Redis, Mongo ve çevresel değişken yapılandırmaları.
This commit is contained in:
2026-02-15 23:12:24 +03:00
commit f1a1f093e6
72 changed files with 2882 additions and 0 deletions

24
services/core/Dockerfile Normal file
View File

@@ -0,0 +1,24 @@
FROM node:20-bookworm AS base
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends ffmpeg && rm -rf /var/lib/apt/lists/*
COPY package*.json ./
FROM base AS dev
RUN npm install
COPY . .
EXPOSE 3001
CMD ["npm", "run", "dev"]
FROM base AS build
RUN npm ci
COPY . .
RUN npm run build
FROM node:20-bookworm AS prod
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends ffmpeg && rm -rf /var/lib/apt/lists/*
COPY --from=build /app/package*.json ./
COPY --from=build /app/node_modules ./node_modules
COPY --from=build /app/dist ./dist
EXPOSE 3001
CMD ["npm", "run", "start"]

View File

@@ -0,0 +1,31 @@
{
"name": "subwatcher-core",
"version": "1.0.0",
"private": true,
"type": "module",
"scripts": {
"dev": "tsx watch src/index.ts",
"build": "tsc -p tsconfig.json",
"start": "node dist/index.js",
"test": "vitest run"
},
"dependencies": {
"axios": "^1.8.2",
"bullmq": "^5.40.3",
"chokidar": "^4.0.3",
"dotenv": "^16.4.7",
"fastify": "^5.2.1",
"@fastify/cors": "^11.0.0",
"ioredis": "^5.4.2",
"iconv-lite": "^0.6.3",
"jschardet": "^3.1.4",
"mongoose": "^8.10.0",
"zod": "^3.24.1"
},
"devDependencies": {
"@types/node": "^22.13.1",
"tsx": "^4.19.2",
"typescript": "^5.7.3",
"vitest": "^3.0.5"
}
}

34
services/core/src/app.ts Normal file
View File

@@ -0,0 +1,34 @@
import Fastify from 'fastify';
import cors from '@fastify/cors';
import { env } from './config/env.js';
import { healthRoutes } from './routes/health.js';
import { settingsRoutes } from './routes/settings.js';
import { watchedPathRoutes } from './routes/watchedPaths.js';
import { jobRoutes } from './routes/jobs.js';
import { reviewRoutes } from './routes/review.js';
import { debugRoutes } from './routes/debug.js';
export async function buildApp() {
const app = Fastify({ logger: true });
await app.register(cors, { origin: true });
if (env.enableApiKey && env.apiKey) {
app.addHook('preHandler', async (req, reply) => {
if (!req.url.startsWith('/api')) return;
const key = req.headers['x-api-key'];
if (key !== env.apiKey) {
return reply.status(401).send({ error: 'invalid api key' });
}
});
}
await healthRoutes(app);
await settingsRoutes(app);
await watchedPathRoutes(app);
await jobRoutes(app);
await reviewRoutes(app);
await debugRoutes(app);
return app;
}

View File

@@ -0,0 +1,18 @@
import dotenv from 'dotenv';
dotenv.config();
export const env = {
nodeEnv: process.env.NODE_ENV ?? 'development',
port: Number(process.env.CORE_PORT ?? 3001),
mongoUri: process.env.MONGO_URI ?? 'mongodb://mongo:27017/subwatcher',
redisHost: process.env.REDIS_HOST ?? 'redis',
redisPort: Number(process.env.REDIS_PORT ?? 6379),
apiBaseUrl: process.env.API_BASE_URL ?? 'http://api:3002',
tempRoot: process.env.TEMP_ROOT ?? '/temp',
mediaTvPath: process.env.MEDIA_TV_PATH ?? '/media/tv',
mediaMoviePath: process.env.MEDIA_MOVIE_PATH ?? '/media/movie',
enableApiKey: process.env.ENABLE_API_KEY === 'true',
apiKey: process.env.API_KEY ?? '',
isDev: (process.env.NODE_ENV ?? 'development') !== 'production'
};

View File

@@ -0,0 +1,7 @@
import mongoose from 'mongoose';
import { env } from '../config/env.js';
export async function connectMongo(): Promise<void> {
await mongoose.connect(env.mongoUri);
console.log(`[core] Mongo connected: ${env.mongoUri}`);
}

View File

@@ -0,0 +1,8 @@
import IORedis from 'ioredis';
import { env } from '../config/env.js';
export const redis = new IORedis({
host: env.redisHost,
port: env.redisPort,
maxRetriesPerRequest: null
});

View File

@@ -0,0 +1,49 @@
import { connectMongo } from './db/mongo.js';
import { env } from './config/env.js';
import { buildApp } from './app.js';
import { SettingModel } from './models/Setting.js';
import { ensureDefaultWatchedPaths, startWatcher } from './watcher/index.js';
import { startWorkers } from './workers/pipeline.js';
async function bootstrap() {
await connectMongo();
await SettingModel.findByIdAndUpdate(
'global',
{
$setOnInsert: {
_id: 'global',
languages: ['tr'],
multiSubtitleEnabled: false,
overwriteExisting: false,
fileStableSeconds: 20,
stableChecks: 3,
stableIntervalSeconds: 10,
autoWriteThreshold: 0.75,
securityLimits: {
maxFiles: 300,
maxTotalBytes: 250 * 1024 * 1024,
maxSingleBytes: 10 * 1024 * 1024
},
preferHI: false,
preferForced: false,
features: { clamavEnabled: false }
}
},
{ upsert: true, new: true }
);
await ensureDefaultWatchedPaths(env.mediaTvPath, env.mediaMoviePath);
startWorkers();
await startWatcher();
const app = await buildApp();
await app.listen({ port: env.port, host: '0.0.0.0' });
console.log(`[core] running on :${env.port}`);
}
bootstrap().catch((err) => {
console.error(err);
process.exit(1);
});

View File

@@ -0,0 +1,35 @@
import { Schema, model } from 'mongoose';
export const JobStatuses = [
'PENDING',
'WAITING_FILE_STABLE',
'PARSED',
'ANALYZED',
'REQUESTING_API',
'FOUND_TEMP',
'NORMALIZING_ENCODING',
'WRITING_SUBTITLE',
'DONE',
'NEEDS_REVIEW',
'NOT_FOUND',
'AMBIGUOUS',
'SECURITY_REJECTED',
'ERROR'
] as const;
export type JobStatus = (typeof JobStatuses)[number];
const jobSchema = new Schema(
{
mediaFileId: { type: Schema.Types.ObjectId, ref: 'media_files', required: true },
status: { type: String, enum: JobStatuses, default: 'PENDING' },
attempts: { type: Number, default: 0 },
requestSnapshot: Schema.Types.Mixed,
apiSnapshot: Schema.Types.Mixed,
result: Schema.Types.Mixed,
error: Schema.Types.Mixed
},
{ timestamps: true }
);
export const JobModel = model('jobs', jobSchema);

View File

@@ -0,0 +1,15 @@
import { Schema, model } from 'mongoose';
const jobLogSchema = new Schema(
{
jobId: { type: Schema.Types.ObjectId, ref: 'jobs', required: true, index: true },
ts: { type: Date, default: Date.now, index: true },
level: { type: String, enum: ['info', 'warn', 'error'], default: 'info' },
step: { type: String, required: true },
message: { type: String, required: true },
meta: Schema.Types.Mixed
},
{ versionKey: false }
);
export const JobLogModel = model('job_logs', jobLogSchema);

View File

@@ -0,0 +1,24 @@
import { Schema, model } from 'mongoose';
export type MediaType = 'movie' | 'tv';
const mediaFileSchema = new Schema(
{
path: { type: String, required: true, unique: true },
type: { type: String, enum: ['movie', 'tv'], required: true },
title: String,
year: Number,
season: Number,
episode: Number,
release: String,
size: Number,
mtime: Date,
status: { type: String, enum: ['ACTIVE', 'MISSING'], default: 'ACTIVE' },
lastSeenAt: Date,
mediaInfo: Schema.Types.Mixed,
analyzedAt: Date
},
{ timestamps: true }
);
export const MediaFileModel = model('media_files', mediaFileSchema);

View File

@@ -0,0 +1,27 @@
import { Schema, model } from 'mongoose';
const settingSchema = new Schema(
{
_id: { type: String, default: 'global' },
languages: { type: [String], default: ['tr'] },
multiSubtitleEnabled: { type: Boolean, default: false },
overwriteExisting: { type: Boolean, default: false },
fileStableSeconds: { type: Number, default: 20 },
stableChecks: { type: Number, default: 3 },
stableIntervalSeconds: { type: Number, default: 10 },
autoWriteThreshold: { type: Number, default: 0.75 },
securityLimits: {
maxFiles: { type: Number, default: 300 },
maxTotalBytes: { type: Number, default: 250 * 1024 * 1024 },
maxSingleBytes: { type: Number, default: 10 * 1024 * 1024 }
},
preferHI: { type: Boolean, default: false },
preferForced: { type: Boolean, default: false },
features: {
clamavEnabled: { type: Boolean, default: false }
}
},
{ timestamps: true }
);
export const SettingModel = model('settings', settingSchema);

View File

@@ -0,0 +1,14 @@
import { Schema, model } from 'mongoose';
export type WatchedKind = 'tv' | 'movie' | 'mixed';
const watchedPathSchema = new Schema(
{
path: { type: String, required: true, unique: true },
kind: { type: String, enum: ['tv', 'movie', 'mixed'], required: true },
enabled: { type: Boolean, default: true }
},
{ timestamps: { createdAt: true, updatedAt: false } }
);
export const WatchedPathModel = model('watched_paths', watchedPathSchema);

View File

@@ -0,0 +1,11 @@
import { Queue, Worker } from 'bullmq';
import { redis } from '../db/redis.js';
export const fileEventsQueue = new Queue('fileEvents', { connection: redis });
export const mediaAnalysisQueue = new Queue('mediaAnalysis', { connection: redis });
export const subtitleFetchQueue = new Queue('subtitleFetch', { connection: redis });
export const finalizeWriteQueue = new Queue('finalizeWrite', { connection: redis });
export function createWorker(name: string, processor: any): Worker {
return new Worker(name, processor, { connection: redis, concurrency: 2 });
}

View File

@@ -0,0 +1,27 @@
import fs from 'node:fs/promises';
import { FastifyInstance } from 'fastify';
import { z } from 'zod';
import { env } from '../config/env.js';
import { createJobForPath } from '../workers/pipeline.js';
import { MediaFileModel } from '../models/MediaFile.js';
import { fileEventsQueue } from '../queues/queues.js';
export async function debugRoutes(app: FastifyInstance): Promise<void> {
if (!env.isDev) return;
app.post('/api/debug/enqueue', async (req, reply) => {
const body = z.object({ path: z.string(), kind: z.enum(['tv', 'movie']).default('movie') }).parse(req.body);
try {
await fs.access(body.path);
} catch {
return reply.status(400).send({ error: 'Path does not exist in container' });
}
const jobId = await createJobForPath(body.path, body.kind);
const media = await MediaFileModel.findOne({ path: body.path }).lean();
if (!media) return reply.status(500).send({ error: 'media not persisted' });
await fileEventsQueue.add('debug', { jobId, mediaFileId: String(media._id), path: body.path });
return { ok: true, jobId };
});
}

View File

@@ -0,0 +1,5 @@
import { FastifyInstance } from 'fastify';
export async function healthRoutes(app: FastifyInstance): Promise<void> {
app.get('/api/health', async () => ({ ok: true, service: 'core' }));
}

View File

@@ -0,0 +1,69 @@
import { FastifyInstance } from 'fastify';
import { z } from 'zod';
import { JobModel } from '../models/Job.js';
import { JobLogModel } from '../models/JobLog.js';
import { subscribeLogs } from '../utils/logger.js';
export async function jobRoutes(app: FastifyInstance): Promise<void> {
app.get('/api/jobs', async (req) => {
const q = z
.object({
page: z.coerce.number().default(1),
limit: z.coerce.number().default(20),
status: z.string().optional(),
search: z.string().optional()
})
.parse(req.query);
const filter: any = {};
if (q.status) filter.status = q.status;
if (q.search) filter.$or = [{ 'requestSnapshot.title': { $regex: q.search, $options: 'i' } }];
const skip = (q.page - 1) * q.limit;
const [items, total] = await Promise.all([
JobModel.find(filter).sort({ createdAt: -1 }).skip(skip).limit(q.limit).lean(),
JobModel.countDocuments(filter)
]);
return { items, total, page: q.page, limit: q.limit };
});
app.get('/api/jobs/:id', async (req, reply) => {
const p = z.object({ id: z.string() }).parse(req.params);
const item = await JobModel.findById(p.id).populate('mediaFileId').lean();
if (!item) return reply.status(404).send({ error: 'Not found' });
return item;
});
app.get('/api/jobs/:id/logs', async (req) => {
const p = z.object({ id: z.string() }).parse(req.params);
const q = z.object({ page: z.coerce.number().default(1), limit: z.coerce.number().default(100) }).parse(req.query);
const skip = (q.page - 1) * q.limit;
const [items, total] = await Promise.all([
JobLogModel.find({ jobId: p.id }).sort({ ts: 1 }).skip(skip).limit(q.limit).lean(),
JobLogModel.countDocuments({ jobId: p.id })
]);
return { items, total, page: q.page, limit: q.limit };
});
app.get('/api/jobs/:id/stream', async (req, reply) => {
const p = z.object({ id: z.string() }).parse(req.params);
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive'
});
const unsubscribe = subscribeLogs(p.id, (log) => {
reply.raw.write(`data: ${JSON.stringify(log)}\n\n`);
});
const ping = setInterval(() => reply.raw.write(': ping\n\n'), 15000);
req.raw.on('close', () => {
clearInterval(ping);
unsubscribe();
reply.raw.end();
});
});
}

View File

@@ -0,0 +1,103 @@
import { FastifyInstance } from 'fastify';
import { z } from 'zod';
import { JobModel } from '../models/Job.js';
import { MediaFileModel } from '../models/MediaFile.js';
import { SettingModel } from '../models/Setting.js';
import { apiClient } from '../utils/apiClient.js';
import { finalizeWriteQueue } from '../queues/queues.js';
import { writeJobLog } from '../utils/logger.js';
export async function reviewRoutes(app: FastifyInstance): Promise<void> {
app.get('/api/review', async () => {
return JobModel.find({ status: 'NEEDS_REVIEW' }).sort({ updatedAt: -1 }).limit(200).lean();
});
app.post('/api/review/:jobId/search', async (req, reply) => {
const p = z.object({ jobId: z.string() }).parse(req.params);
const body = z
.object({
title: z.string().optional(),
year: z.number().optional(),
release: z.string().optional(),
season: z.number().optional(),
episode: z.number().optional(),
languages: z.array(z.string()).optional()
})
.parse(req.body);
const j = await JobModel.findById(p.jobId).lean();
if (!j) return reply.status(404).send({ error: 'Job not found' });
const media = await MediaFileModel.findById(j.mediaFileId).lean();
const settings = await SettingModel.findById('global').lean();
if (!media || !settings) return reply.status(404).send({ error: 'Related data not found' });
const payload = {
jobToken: p.jobId,
type: media.type,
title: body.title ?? media.title,
year: body.year ?? media.year,
release: body.release ?? media.release,
season: body.season ?? media.season,
episode: body.episode ?? media.episode,
languages: body.languages ?? settings.languages,
mediaInfo: media.mediaInfo,
preferHI: settings.preferHI,
preferForced: settings.preferForced,
securityLimits: settings.securityLimits
};
const res = await apiClient.post('/v1/subtitles/search', payload);
await JobModel.findByIdAndUpdate(p.jobId, {
apiSnapshot: {
...(j.apiSnapshot ?? {}),
manualSearch: payload,
candidates: res.data.candidates,
status: res.data.status
}
});
if (Array.isArray(res.data.trace)) {
for (const t of res.data.trace) {
await writeJobLog({
jobId: p.jobId,
step: t.step,
message: t.message,
level: t.level || 'info',
meta: t.meta
});
}
}
return res.data;
});
app.post('/api/review/:jobId/choose', async (req, reply) => {
const p = z.object({ jobId: z.string() }).parse(req.params);
const body = z.object({ chosenCandidateId: z.string().optional(), chosenPath: z.string().optional(), lang: z.string().default('tr') }).parse(req.body);
const j = await JobModel.findById(p.jobId).lean();
if (!j) return reply.status(404).send({ error: 'Job not found' });
const chooseRes = await apiClient.post('/v1/subtitles/choose', {
jobToken: p.jobId,
chosenCandidateId: body.chosenCandidateId,
chosenPath: body.chosenPath
});
if (chooseRes.data.status !== 'FOUND' || !chooseRes.data.bestPath) {
return reply.status(400).send({ error: 'Could not produce best subtitle', details: chooseRes.data });
}
await finalizeWriteQueue.add('finalize', {
jobId: p.jobId,
mediaFileId: String(j.mediaFileId),
bestPath: chooseRes.data.bestPath,
lang: body.lang,
source: chooseRes.data.source ?? 'manual',
confidence: chooseRes.data.confidence ?? 0.7
});
await JobModel.findByIdAndUpdate(p.jobId, { status: 'FOUND_TEMP' });
return { ok: true, queued: true };
});
}

View File

@@ -0,0 +1,37 @@
import { FastifyInstance } from 'fastify';
import { z } from 'zod';
import { SettingModel } from '../models/Setting.js';
const SettingsSchema = z.object({
languages: z.array(z.string()).min(1),
multiSubtitleEnabled: z.boolean(),
overwriteExisting: z.boolean(),
fileStableSeconds: z.number(),
stableChecks: z.number().min(1),
stableIntervalSeconds: z.number().min(1),
autoWriteThreshold: z.number(),
securityLimits: z.object({
maxFiles: z.number().min(1),
maxTotalBytes: z.number().min(1024),
maxSingleBytes: z.number().min(1024)
}),
preferHI: z.boolean(),
preferForced: z.boolean(),
features: z.object({
clamavEnabled: z.boolean()
}).optional()
});
export async function settingsRoutes(app: FastifyInstance): Promise<void> {
app.get('/api/settings', async () => {
const settings = await SettingModel.findById('global').lean();
return settings;
});
app.post('/api/settings', async (req, reply) => {
const parsed = SettingsSchema.safeParse(req.body);
if (!parsed.success) return reply.status(400).send({ error: parsed.error.flatten() });
const doc = await SettingModel.findByIdAndUpdate('global', parsed.data, { upsert: true, new: true });
return doc;
});
}

View File

@@ -0,0 +1,49 @@
import { FastifyInstance } from 'fastify';
import { z } from 'zod';
import { WatchedPathModel } from '../models/WatchedPath.js';
const BodySchema = z.object({
action: z.enum(['add', 'remove', 'toggle', 'update']),
path: z.string(),
kind: z.enum(['tv', 'movie', 'mixed']).optional(),
enabled: z.boolean().optional()
});
export async function watchedPathRoutes(app: FastifyInstance): Promise<void> {
app.get('/api/watched-paths', async () => {
return WatchedPathModel.find().sort({ createdAt: -1 }).lean();
});
app.post('/api/watched-paths', async (req, reply) => {
const parsed = BodySchema.safeParse(req.body);
if (!parsed.success) return reply.status(400).send({ error: parsed.error.flatten() });
const body = parsed.data;
if (body.action === 'add') {
return WatchedPathModel.findOneAndUpdate(
{ path: body.path },
{ $setOnInsert: { path: body.path, kind: body.kind ?? 'mixed', enabled: true } },
{ upsert: true, new: true }
);
}
if (body.action === 'remove') {
await WatchedPathModel.deleteOne({ path: body.path });
return { ok: true };
}
if (body.action === 'toggle') {
const current = await WatchedPathModel.findOne({ path: body.path });
if (!current) return reply.status(404).send({ error: 'Not found' });
current.enabled = body.enabled ?? !current.enabled;
await current.save();
return current;
}
return WatchedPathModel.findOneAndUpdate(
{ path: body.path },
{ kind: body.kind ?? 'mixed', enabled: body.enabled ?? true },
{ new: true }
);
});
}

View File

@@ -0,0 +1,14 @@
import axios from 'axios';
import { env } from '../config/env.js';
export const apiClient = axios.create({
baseURL: env.apiBaseUrl,
timeout: 30000
});
if (env.enableApiKey && env.apiKey) {
apiClient.interceptors.request.use((config) => {
config.headers['x-api-key'] = env.apiKey;
return config;
});
}

View File

@@ -0,0 +1,46 @@
import { execFile } from 'node:child_process';
import { promisify } from 'node:util';
const execFileAsync = promisify(execFile);
export async function analyzeWithFfprobe(path: string): Promise<any> {
const { stdout } = await execFileAsync('ffprobe', [
'-v',
'error',
'-print_format',
'json',
'-show_streams',
'-show_format',
path
]);
const json = JSON.parse(stdout);
const video = (json.streams || []).find((s: any) => s.codec_type === 'video');
const audio = (json.streams || [])
.filter((s: any) => s.codec_type === 'audio')
.map((s: any) => ({ codec_name: s.codec_name, channels: s.channels, language: s.tags?.language }));
return {
video: video
? {
codec_name: video.codec_name,
width: video.width,
height: video.height,
r_frame_rate: video.r_frame_rate
}
: null,
audio,
format: {
duration: json.format?.duration,
bit_rate: json.format?.bit_rate,
format_name: json.format?.format_name
}
};
}
export function fallbackMediaInfo(): any {
return {
video: { codec_name: 'unknown', width: 1920, height: 1080, r_frame_rate: '24/1' },
audio: [{ codec_name: 'unknown', channels: 2, language: 'und' }],
format: { duration: '0', bit_rate: '0', format_name: 'matroska' }
};
}

View File

@@ -0,0 +1,79 @@
import fs from 'node:fs/promises';
import path from 'node:path';
import iconv from 'iconv-lite';
import jschardet from 'jschardet';
export function isVideoFile(filePath: string): boolean {
const p = filePath.toLowerCase();
if (!p.endsWith('.mkv')) return false;
return !/(\.part|\.tmp|\.partial)$/.test(p);
}
export async function waitForStable(filePath: string, checks: number, intervalSeconds: number): Promise<boolean> {
let prev: { size: number; mtimeMs: number } | null = null;
for (let i = 0; i < checks; i++) {
const st = await fs.stat(filePath);
const current = { size: st.size, mtimeMs: st.mtimeMs };
if (prev && prev.size === current.size && prev.mtimeMs === current.mtimeMs) {
// continue
} else if (prev) {
prev = current;
await new Promise((r) => setTimeout(r, intervalSeconds * 1000));
continue;
}
prev = current;
await new Promise((r) => setTimeout(r, intervalSeconds * 1000));
}
const a = await fs.stat(filePath);
await new Promise((r) => setTimeout(r, intervalSeconds * 1000));
const b = await fs.stat(filePath);
return a.size === b.size && a.mtimeMs === b.mtimeMs;
}
export function normalizeSubtitleBuffer(input: Buffer): string {
if (input.length >= 3 && input[0] === 0xef && input[1] === 0xbb && input[2] === 0xbf) {
return input.toString('utf8').replace(/^\uFEFF/, '').replace(/\r\n/g, '\n').replace(/\r/g, '\n');
}
const detected = jschardet.detect(input);
const enc = (detected.encoding || '').toLowerCase();
let decoded: string;
if (enc.includes('utf')) {
decoded = input.toString('utf8');
} else if (enc.includes('windows-1254') || enc.includes('iso-8859-9')) {
decoded = iconv.decode(input, 'windows-1254');
} else {
decoded = iconv.decode(input, 'latin1');
}
return decoded.replace(/^\uFEFF/, '').replace(/\r\n/g, '\n').replace(/\r/g, '\n');
}
export async function nextSubtitlePath(basePathWithoutExt: string, lang: string, ext: 'srt' | 'ass', overwrite: boolean): Promise<string> {
const preferred = `${basePathWithoutExt}.${lang}.${ext}`;
if (overwrite) return preferred;
try {
await fs.access(preferred);
} catch {
return preferred;
}
let i = 2;
while (true) {
const p = `${basePathWithoutExt}.${lang}.${i}.${ext}`;
try {
await fs.access(p);
i += 1;
} catch {
return p;
}
}
}
export function extensionFromPath(p: string): 'srt' | 'ass' {
return path.extname(p).toLowerCase() === '.ass' ? 'ass' : 'srt';
}

View File

@@ -0,0 +1,45 @@
import { Types } from 'mongoose';
import { JobLogModel } from '../models/JobLog.js';
interface LogInput {
jobId: string | Types.ObjectId;
level?: 'info' | 'warn' | 'error';
step: string;
message: string;
meta?: any;
}
type Subscriber = (log: any) => void;
const subscribers = new Map<string, Set<Subscriber>>();
export function subscribeLogs(jobId: string, cb: Subscriber): () => void {
if (!subscribers.has(jobId)) subscribers.set(jobId, new Set());
subscribers.get(jobId)!.add(cb);
return () => subscribers.get(jobId)?.delete(cb);
}
export async function writeJobLog(input: LogInput): Promise<void> {
const doc = await JobLogModel.create({
jobId: input.jobId,
level: input.level ?? 'info',
step: input.step,
message: input.message,
meta: input.meta,
ts: new Date()
});
const key = String(input.jobId);
const set = subscribers.get(key);
if (set) {
const payload = {
_id: String(doc._id),
jobId: key,
level: doc.level,
step: doc.step,
message: doc.message,
meta: doc.meta,
ts: doc.ts
};
set.forEach((fn) => fn(payload));
}
}

View File

@@ -0,0 +1,50 @@
export interface ParsedMedia {
type: 'tv' | 'movie';
title: string;
year?: number;
season?: number;
episode?: number;
release?: string;
}
function cleanTitle(raw: string): string {
return raw
.replace(/[._]+/g, ' ')
.replace(/\s+/g, ' ')
.replace(/\b(1080p|720p|2160p|x264|x265|h264|bluray|webrip|web-dl|dvdrip|hdrip)\b/gi, '')
.trim();
}
export function parseMediaFilename(filename: string): ParsedMedia {
const noExt = filename.replace(/\.[^.]+$/, '');
const tvPatterns = [
/(.+?)[ ._-]+S(\d{1,2})E(\d{1,2})(?:[ ._-]+(.*))?/i,
/(.+?)[ ._-]+(\d{1,2})x(\d{1,2})(?:[ ._-]+(.*))?/i,
/(.+?)[ ._-]+S(\d{1,2})[ ._-]*x[ ._-]*E(\d{1,2})(?:[ ._-]+(.*))?/i
];
for (const p of tvPatterns) {
const m = noExt.match(p);
if (m) {
const yearMatch = noExt.match(/\b(19\d{2}|20\d{2})\b/);
return {
type: 'tv',
title: cleanTitle(m[1]),
season: Number(m[2]),
episode: Number(m[3]),
year: yearMatch ? Number(yearMatch[1]) : undefined,
release: m[4]?.replace(/[._]/g, ' ').trim()
};
}
}
const yearMatch = noExt.match(/\b(19\d{2}|20\d{2})\b/);
const splitByYear = yearMatch ? noExt.split(yearMatch[1])[0] : noExt;
const releaseMatch = noExt.match(/-(\w[\w.-]*)$/);
return {
type: 'movie',
title: cleanTitle(splitByYear),
year: yearMatch ? Number(yearMatch[1]) : undefined,
release: releaseMatch?.[1]
};
}

View File

@@ -0,0 +1,56 @@
import chokidar from 'chokidar';
import { WatchedPathModel } from '../models/WatchedPath.js';
import { createJobForPath } from '../workers/pipeline.js';
import { fileEventsQueue } from '../queues/queues.js';
import { isVideoFile } from '../utils/file.js';
export async function ensureDefaultWatchedPaths(tvPath: string, moviePath: string): Promise<void> {
await WatchedPathModel.updateOne({ path: tvPath }, { $setOnInsert: { path: tvPath, kind: 'tv', enabled: true } }, { upsert: true });
await WatchedPathModel.updateOne({ path: moviePath }, { $setOnInsert: { path: moviePath, kind: 'movie', enabled: true } }, { upsert: true });
}
export async function startWatcher(): Promise<void> {
const watched = await WatchedPathModel.find({ enabled: true }).lean();
const paths = watched.map((w) => w.path);
if (paths.length === 0) {
console.log('[core] no watched paths enabled');
return;
}
const byPath = new Map(watched.map((w) => [w.path, w.kind]));
const watcher = chokidar.watch(paths, { ignoreInitial: false, awaitWriteFinish: false, persistent: true });
watcher.on('add', async (p) => {
if (!isVideoFile(p)) return;
const kind = resolveKind(p, byPath);
const jobId = await createJobForPath(p, kind === 'movie' ? 'movie' : 'tv');
const media = await import('../models/MediaFile.js');
const mediaDoc = await media.MediaFileModel.findOne({ path: p }).lean();
if (!mediaDoc) return;
await fileEventsQueue.add('add', { jobId, mediaFileId: String(mediaDoc._id), path: p });
});
watcher.on('change', async (p) => {
if (!isVideoFile(p)) return;
const kind = resolveKind(p, byPath);
const jobId = await createJobForPath(p, kind === 'movie' ? 'movie' : 'tv');
const media = await import('../models/MediaFile.js');
const mediaDoc = await media.MediaFileModel.findOne({ path: p }).lean();
if (!mediaDoc) return;
await fileEventsQueue.add('change', { jobId, mediaFileId: String(mediaDoc._id), path: p });
});
watcher.on('unlink', async (p) => {
const media = await import('../models/MediaFile.js');
await media.MediaFileModel.updateOne({ path: p }, { status: 'MISSING', lastSeenAt: new Date() });
});
console.log(`[core] watcher started for: ${paths.join(', ')}`);
}
function resolveKind(filePath: string, byPath: Map<string, 'tv' | 'movie' | 'mixed'>): 'tv' | 'movie' {
const matched = [...byPath.entries()].find(([root]) => filePath.startsWith(root));
if (!matched) return 'movie';
if (matched[1] === 'mixed') return 'movie';
return matched[1];
}

View File

@@ -0,0 +1,274 @@
import fs from 'node:fs/promises';
import path from 'node:path';
import { Types } from 'mongoose';
import { JobModel } from '../models/Job.js';
import { MediaFileModel } from '../models/MediaFile.js';
import { SettingModel } from '../models/Setting.js';
import { parseMediaFilename } from '../utils/parser.js';
import { analyzeWithFfprobe, fallbackMediaInfo } from '../utils/ffprobe.js';
import { extensionFromPath, isVideoFile, nextSubtitlePath, normalizeSubtitleBuffer, waitForStable } from '../utils/file.js';
import { writeJobLog } from '../utils/logger.js';
import { apiClient } from '../utils/apiClient.js';
import { createWorker, finalizeWriteQueue, mediaAnalysisQueue, subtitleFetchQueue } from '../queues/queues.js';
interface FileEventData {
jobId: string;
mediaFileId: string;
path: string;
}
interface SubtitleFetchData {
jobId: string;
mediaFileId: string;
}
interface FinalizeData {
jobId: string;
mediaFileId: string;
bestPath: string;
lang: string;
source: string;
confidence: number;
}
const activeWorkers: any[] = [];
export function startWorkers(): void {
activeWorkers.push(createWorker('fileEvents', async (job) => {
const data = job.data as FileEventData;
const settings = await SettingModel.findById('global').lean();
if (!settings) throw new Error('settings missing');
await JobModel.findByIdAndUpdate(data.jobId, { status: 'WAITING_FILE_STABLE' });
await writeJobLog({ jobId: data.jobId, step: 'WAIT_FILE_STABLE_CHECK', message: `Stability check started for ${data.path}` });
if (!isVideoFile(data.path)) {
await writeJobLog({ jobId: data.jobId, step: 'WATCH_EVENT_RECEIVED', level: 'warn', message: `Ignored non-video file: ${data.path}` });
return;
}
const stable = await waitForStable(data.path, settings.stableChecks, settings.stableIntervalSeconds);
if (!stable) {
await JobModel.findByIdAndUpdate(data.jobId, { status: 'ERROR', error: { code: 'FILE_NOT_STABLE', message: 'file not stable' } });
await writeJobLog({ jobId: data.jobId, step: 'JOB_ERROR', level: 'error', message: 'File did not become stable' });
return;
}
const parsed = parseMediaFilename(path.basename(data.path));
await MediaFileModel.findByIdAndUpdate(data.mediaFileId, {
type: parsed.type,
title: parsed.title,
year: parsed.year,
season: parsed.season,
episode: parsed.episode,
release: parsed.release,
lastSeenAt: new Date(),
status: 'ACTIVE'
});
await JobModel.findByIdAndUpdate(data.jobId, {
status: 'PARSED',
requestSnapshot: parsed
});
await writeJobLog({
jobId: data.jobId,
step: 'PARSE_DONE',
message: `Parsed as ${parsed.type}: ${parsed.title}`,
meta: {
title: parsed.title,
year: parsed.year,
season: parsed.season,
episode: parsed.episode,
release: parsed.release
}
});
await writeJobLog({ jobId: data.jobId, step: 'FILE_STABLE_CONFIRMED', message: data.path });
await mediaAnalysisQueue.add('analyze', { jobId: data.jobId, mediaFileId: data.mediaFileId });
}));
activeWorkers.push(createWorker('mediaAnalysis', async (job) => {
const { jobId, mediaFileId } = job.data as SubtitleFetchData;
const media = await MediaFileModel.findById(mediaFileId).lean();
if (!media) return;
await writeJobLog({ jobId, step: 'FFPROBE_STARTED', message: media.path });
let mediaInfo: any;
try {
mediaInfo = await analyzeWithFfprobe(media.path);
} catch (err: any) {
mediaInfo = fallbackMediaInfo();
await writeJobLog({
jobId,
step: 'FFPROBE_DONE',
level: 'warn',
message: 'ffprobe failed, fallback metadata used',
meta: { error: err?.message }
});
}
await MediaFileModel.findByIdAndUpdate(mediaFileId, { mediaInfo, analyzedAt: new Date() });
await JobModel.findByIdAndUpdate(jobId, { status: 'ANALYZED' });
await writeJobLog({ jobId, step: 'FFPROBE_DONE', message: 'Media analysis done', meta: mediaInfo });
await subtitleFetchQueue.add('search', { jobId, mediaFileId });
}));
activeWorkers.push(createWorker('subtitleFetch', async (job) => {
const { jobId, mediaFileId } = job.data as SubtitleFetchData;
const media = await MediaFileModel.findById(mediaFileId).lean();
const settings = await SettingModel.findById('global').lean();
if (!media || !settings) return;
await JobModel.findByIdAndUpdate(jobId, { status: 'REQUESTING_API' });
await writeJobLog({ jobId, step: 'SUBTITLE_SEARCH_STARTED', message: 'Searching subtitle API' });
const payload = {
jobToken: jobId,
type: media.type,
title: media.title,
year: media.year,
release: media.release,
languages: settings.multiSubtitleEnabled ? settings.languages : [settings.languages[0] ?? 'tr'],
season: media.season,
episode: media.episode,
mediaInfo: media.mediaInfo,
preferHI: settings.preferHI,
preferForced: settings.preferForced,
securityLimits: settings.securityLimits
};
const res = await apiClient.post('/v1/subtitles/search', payload);
const data = res.data;
if (Array.isArray(data.trace)) {
for (const t of data.trace) {
await writeJobLog({
jobId,
step: t.step,
message: t.message,
level: t.level || 'info',
meta: t.meta
});
}
}
await writeJobLog({ jobId, step: 'SUBTITLE_SEARCH_DONE', message: `API status ${data.status}` });
if (data.status === 'FOUND' && data.bestPath) {
await JobModel.findByIdAndUpdate(jobId, {
status: 'FOUND_TEMP',
apiSnapshot: { status: data.status, confidence: data.confidence, source: data.source, candidates: data.candidates }
});
await finalizeWriteQueue.add('finalize', {
jobId,
mediaFileId,
bestPath: data.bestPath,
lang: (payload.languages[0] ?? 'tr') as string,
source: data.source ?? 'mock',
confidence: data.confidence ?? 0.8
} satisfies FinalizeData);
return;
}
const status = data.status === 'NOT_FOUND' ? 'NOT_FOUND' : 'AMBIGUOUS';
await JobModel.findByIdAndUpdate(jobId, {
status: 'NEEDS_REVIEW',
apiSnapshot: {
status,
confidence: data.confidence,
source: data.source,
candidates: data.candidates
}
});
await writeJobLog({
jobId,
step: data.status === 'NOT_FOUND' ? 'NOT_FOUND_NEEDS_REVIEW' : 'AMBIGUOUS_NEEDS_REVIEW',
message: 'Manual review required'
});
}));
activeWorkers.push(createWorker('finalizeWrite', async (job) => {
const data = job.data as FinalizeData;
const media = await MediaFileModel.findById(data.mediaFileId).lean();
const settings = await SettingModel.findById('global').lean();
if (!media || !settings) return;
await JobModel.findByIdAndUpdate(data.jobId, { status: 'NORMALIZING_ENCODING' });
const raw = await fs.readFile(data.bestPath);
const normalized = normalizeSubtitleBuffer(raw);
await writeJobLog({ jobId: data.jobId, step: 'ENCODING_NORMALIZED_UTF8', message: 'Subtitle normalized to UTF-8/LF' });
await JobModel.findByIdAndUpdate(data.jobId, { status: 'WRITING_SUBTITLE' });
const parsed = path.parse(media.path);
const target = await nextSubtitlePath(path.join(parsed.dir, parsed.name), data.lang, extensionFromPath(data.bestPath), settings.overwriteExisting);
const targetExists = target !== `${path.join(parsed.dir, parsed.name)}.${data.lang}.${extensionFromPath(data.bestPath)}`;
if (targetExists) {
await writeJobLog({ jobId: data.jobId, step: 'WRITE_TARGET_SKIPPED_EXISTS', message: 'Target exists, using incremented filename', meta: { target } });
}
await fs.writeFile(target, normalized, 'utf8');
await writeJobLog({ jobId: data.jobId, step: 'WRITE_TARGET_DONE', message: target });
await JobModel.findByIdAndUpdate(data.jobId, {
status: 'DONE',
result: {
subtitles: [
{
lang: data.lang,
source: data.source,
confidence: data.confidence,
writtenPath: target,
ext: extensionFromPath(data.bestPath)
}
]
}
});
await writeJobLog({ jobId: data.jobId, step: 'JOB_DONE', message: 'Pipeline completed' });
try {
await apiClient.post('/v1/subtitles/cleanup', { jobToken: data.jobId });
await writeJobLog({ jobId: data.jobId, step: 'CLEANUP_TEMP_DONE', message: 'Temporary files cleanup requested' });
} catch {
await writeJobLog({ jobId: data.jobId, step: 'CLEANUP_TEMP_DONE', level: 'warn', message: 'Cleanup endpoint unavailable, periodic cleanup will handle it' });
}
}));
}
export async function createJobForPath(filePath: string, mediaTypeHint: 'tv' | 'movie'): Promise<string> {
const st = await fs.stat(filePath);
const media = await MediaFileModel.findOneAndUpdate(
{ path: filePath },
{
$set: {
path: filePath,
type: mediaTypeHint,
size: st.size,
mtime: st.mtime,
status: 'ACTIVE',
lastSeenAt: new Date()
}
},
{ upsert: true, new: true }
);
const created = await JobModel.create({
mediaFileId: media._id,
status: 'PENDING',
attempts: 0
});
await writeJobLog({
jobId: created._id as Types.ObjectId,
step: 'WATCH_EVENT_RECEIVED',
message: `Queued watch event for ${filePath}`
});
return String(created._id);
}

View File

@@ -0,0 +1,26 @@
import { describe, expect, it } from 'vitest';
import { parseMediaFilename } from '../src/utils/parser.js';
describe('parseMediaFilename', () => {
it('parses SxxEyy pattern', () => {
const p = parseMediaFilename('The.Last.of.Us.S01E02.1080p-FLUX.mkv');
expect(p.type).toBe('tv');
expect(p.title).toBe('The Last of Us');
expect(p.season).toBe(1);
expect(p.episode).toBe(2);
});
it('parses 1x02 pattern', () => {
const p = parseMediaFilename('Dark.1x02.WEBRip.mkv');
expect(p.type).toBe('tv');
expect(p.season).toBe(1);
expect(p.episode).toBe(2);
});
it('parses movie pattern', () => {
const p = parseMediaFilename('John.Wick.2014.1080p.BluRay-FLUX.mkv');
expect(p.type).toBe('movie');
expect(p.title).toBe('John Wick');
expect(p.year).toBe(2014);
});
});

View File

@@ -0,0 +1,15 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "NodeNext",
"moduleResolution": "NodeNext",
"outDir": "dist",
"rootDir": "src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"types": ["node"]
},
"include": ["src/**/*"]
}