From 5c496277f2432aeb9fe09ffceb61017520d073c7 Mon Sep 17 00:00:00 2001 From: szbk Date: Sat, 28 Feb 2026 11:59:59 +0300 Subject: [PATCH] =?UTF-8?q?feat(backend):=20realtime=20event=20system,=20a?= =?UTF-8?q?dmin=20API=20ve=20metrics=20altyap=C4=B1s=C4=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - socket.ts: ContentRealtimeEvent, CacheRealtimeEvent, MetricsRealtimeEvent tipleri eklendi; emitContentEvent / emitCacheEvent / emitMetricsEvent fonksiyonları ile tüm istemcilere broadcast desteği getirildi. emitJobCompleted imzası GetInfoResponse + DataSource ile güçlendirildi. - auth.middleware.ts: require() tabanlı env erişimi static import'a dönüştürüldü; admin-only endpointler için adminOnlyMiddleware eklendi (X-API-Key !== API_KEY_ADMIN → 403). - cache.service.ts: set / delete / clearAll işlemlerinden sonra emitCacheEvent çağrısı eklenerek cache mutasyonları anlık yayınlanıyor. - content.service.ts: create / update / delete sonrasında emitContentEvent çağrısı eklenerek DB yazımları Socket.IO üzerinden duyuruluyor. - job.service.ts: async ve sync akışa MetricsService entegrasyonu eklendi; cache hit/miss ve kaynak (cache/database/netflix) sayaçları her işlemde artırılıyor. - types/index.ts: AdminOverviewResponse ve AdminActionResponse tipleri merkezi olarak tanımlandı. - admin.service.ts (yeni): getOverview, clearCache, warmupCacheFromDatabase, retryFailedJobs, refreshStaleContent operasyonları implement edildi. Redis pipeline ile TTL/boyut analizi ve DB metrikleri paralel toplanıyor. - metrics.service.ts (yeni): Redis hash tabanlı cache hit/miss ve kaynak sayaçları; her artışta MetricsRealtimeEvent yayınlanıyor. - api.routes.ts: Admin endpointleri eklendi: GET /api/admin/overview POST /api/admin/cache/clear POST /api/admin/cache/warmup POST /api/admin/jobs/retry-failed POST /api/admin/content/refresh-stale Co-Authored-By: Claude Sonnet 4.6 --- src/config/socket.ts | 58 +++- src/middleware/auth.middleware.ts | 46 ++- src/routes/api.routes.ts | 172 ++++++++++- src/services/admin.service.ts | 455 ++++++++++++++++++++++++++++++ src/services/cache.service.ts | 19 +- src/services/content.service.ts | 24 +- src/services/job.service.ts | 9 + src/services/metrics.service.ts | 82 ++++++ src/types/index.ts | 77 +++++ 9 files changed, 932 insertions(+), 10 deletions(-) create mode 100644 src/services/admin.service.ts create mode 100644 src/services/metrics.service.ts diff --git a/src/config/socket.ts b/src/config/socket.ts index 23c2586..359c6a1 100644 --- a/src/config/socket.ts +++ b/src/config/socket.ts @@ -1,6 +1,7 @@ import { Server as HttpServer } from 'http'; import { Server, Socket } from 'socket.io'; import logger from '../utils/logger.js'; +import type { DataSource, GetInfoResponse } from '../types/index.js'; /** * Socket.IO Server singleton @@ -11,6 +12,32 @@ export interface SocketData { subscribedJobs: Set; } +export interface ContentRealtimeEvent { + action: 'created' | 'updated' | 'deleted'; + url: string; + content?: GetInfoResponse; + occurredAt: string; +} + +export interface CacheRealtimeEvent { + action: 'written' | 'deleted' | 'cleared'; + key?: string; + ttlSeconds?: number; + count?: number; + occurredAt: string; +} + +export interface MetricsRealtimeEvent { + cacheHits: number; + cacheMisses: number; + sourceCounts: { + cache: number; + database: number; + netflix: number; + }; + occurredAt: string; +} + /** * Initialize Socket.IO server */ @@ -86,8 +113,8 @@ export function emitJobProgress( */ export function emitJobCompleted( jobId: string, - data: unknown, - source: string + data: GetInfoResponse, + source: DataSource ): void { if (io) { io.to(`job:${jobId}`).emit('job:completed', { @@ -98,6 +125,33 @@ export function emitJobCompleted( } } +/** + * Emit realtime content mutation event to all clients + */ +export function emitContentEvent(event: ContentRealtimeEvent): void { + if (io) { + io.emit('content:event', event); + } +} + +/** + * Emit realtime cache event to all clients + */ +export function emitCacheEvent(event: CacheRealtimeEvent): void { + if (io) { + io.emit('cache:event', event); + } +} + +/** + * Emit realtime metrics event to all clients + */ +export function emitMetricsEvent(event: MetricsRealtimeEvent): void { + if (io) { + io.emit('metrics:updated', event); + } +} + /** * Emit job error event */ diff --git a/src/middleware/auth.middleware.ts b/src/middleware/auth.middleware.ts index 75d921e..09dca9c 100644 --- a/src/middleware/auth.middleware.ts +++ b/src/middleware/auth.middleware.ts @@ -1,5 +1,5 @@ import { Request, Response, NextFunction } from 'express'; -import { getValidApiKeys } from '../config/env.js'; +import { env, getValidApiKeys } from '../config/env.js'; import logger from '../utils/logger.js'; import type { ApiResponse } from '../types/index.js'; @@ -61,8 +61,6 @@ export function authMiddleware( * Optional: Identify which client made the request */ export function identifyClient(apiKey: string): string { - const { env } = require('../config/env.js'); - if (apiKey === env.API_KEY_WEB) return 'web'; if (apiKey === env.API_KEY_MOBILE) return 'mobile'; if (apiKey === env.API_KEY_ADMIN) return 'admin'; @@ -70,4 +68,46 @@ export function identifyClient(apiKey: string): string { return 'unknown'; } +/** + * Strict admin API key guard. + * Use for admin-only operational endpoints. + */ +export function adminOnlyMiddleware( + req: Request, + res: Response, + next: NextFunction +): void { + const apiKey = req.headers['x-api-key'] as string | undefined; + + if (!apiKey) { + res.status(401).json({ + success: false, + error: { + code: 'MISSING_API_KEY', + message: 'API key is required. Include X-API-Key header.', + }, + } satisfies ApiResponse); + return; + } + + if (apiKey !== env.API_KEY_ADMIN) { + logger.warn('Admin endpoint access denied', { + ip: req.ip, + path: req.path, + keyPrefix: apiKey.substring(0, 8) + '...', + }); + + res.status(403).json({ + success: false, + error: { + code: 'ADMIN_API_KEY_REQUIRED', + message: 'Admin API key required.', + }, + } satisfies ApiResponse); + return; + } + + next(); +} + export default authMiddleware; diff --git a/src/routes/api.routes.ts b/src/routes/api.routes.ts index 25dc49e..32b603d 100644 --- a/src/routes/api.routes.ts +++ b/src/routes/api.routes.ts @@ -1,17 +1,31 @@ import { Router, Request, Response } from 'express'; import { z } from 'zod'; -import { authMiddleware } from '../middleware/auth.middleware.js'; +import { adminOnlyMiddleware, authMiddleware } from '../middleware/auth.middleware.js'; import { scrapeRateLimiter } from '../middleware/rateLimit.middleware.js'; import { validateGetInfo } from '../middleware/validation.middleware.js'; import { JobService } from '../services/job.service.js'; import { ContentService } from '../services/content.service.js'; -import type { ApiResponse, GetInfoRequest, GetInfoResponse } from '../types/index.js'; +import { AdminService } from '../services/admin.service.js'; +import type { + AdminOverviewResponse, + AdminActionResponse, + ApiResponse, + GetInfoRequest, + GetInfoResponse, +} from '../types/index.js'; const router = Router(); const listContentSchema = z.object({ type: z.enum(['movie', 'tvshow']).optional(), limit: z.coerce.number().int().min(1).max(100).optional(), }); +const retryFailedJobsSchema = z.object({ + limit: z.coerce.number().int().min(1).max(50).default(10), +}); +const refreshStaleSchema = z.object({ + days: z.coerce.number().int().min(1).max(365).default(30), + limit: z.coerce.number().int().min(1).max(100).default(20), +}); /** * POST /api/getinfo @@ -118,6 +132,160 @@ router.get( } ); +/** + * GET /api/admin/overview + * Admin dashboard summary metrics (cache, db, jobs) + * + * Headers: X-API-Key: + */ +router.get( + '/admin/overview', + adminOnlyMiddleware, + async (_req: Request, res: Response>) => { + try { + const overview = await AdminService.getOverview(); + res.json({ + success: true, + data: overview, + }); + } catch (error) { + res.status(500).json({ + success: false, + error: { + code: 'ADMIN_OVERVIEW_ERROR', + message: + error instanceof Error + ? error.message + : 'Failed to fetch admin overview', + }, + }); + } + } +); + +/** + * POST /api/admin/cache/clear + * Delete all content cache keys from Redis. + */ +router.post( + '/admin/cache/clear', + adminOnlyMiddleware, + async (_req: Request, res: Response>) => { + try { + const result = await AdminService.clearCache(); + res.json({ success: true, data: result }); + } catch (error) { + res.status(500).json({ + success: false, + error: { + code: 'ADMIN_CACHE_CLEAR_ERROR', + message: error instanceof Error ? error.message : 'Failed to clear cache', + }, + }); + } + } +); + +/** + * POST /api/admin/cache/warmup + * Warm Redis cache from all DB content. + */ +router.post( + '/admin/cache/warmup', + adminOnlyMiddleware, + async (_req: Request, res: Response>) => { + try { + const result = await AdminService.warmupCacheFromDatabase(); + res.json({ success: true, data: result }); + } catch (error) { + res.status(500).json({ + success: false, + error: { + code: 'ADMIN_CACHE_WARMUP_ERROR', + message: error instanceof Error ? error.message : 'Failed to warm cache', + }, + }); + } + } +); + +/** + * POST /api/admin/jobs/retry-failed + * Requeue recent failed jobs. + */ +router.post( + '/admin/jobs/retry-failed', + adminOnlyMiddleware, + async (req: Request, res: Response>) => { + const parsed = retryFailedJobsSchema.safeParse(req.body ?? {}); + if (!parsed.success) { + res.status(400).json({ + success: false, + error: { + code: 'VALIDATION_ERROR', + message: 'Invalid retry request', + details: { errors: parsed.error.issues }, + }, + }); + return; + } + + try { + const result = await AdminService.retryFailedJobs(parsed.data.limit); + res.json({ success: true, data: result }); + } catch (error) { + res.status(500).json({ + success: false, + error: { + code: 'ADMIN_RETRY_FAILED_ERROR', + message: + error instanceof Error ? error.message : 'Failed to retry failed jobs', + }, + }); + } + } +); + +/** + * POST /api/admin/content/refresh-stale + * Queue refresh jobs for stale content entries. + */ +router.post( + '/admin/content/refresh-stale', + adminOnlyMiddleware, + async (req: Request, res: Response>) => { + const parsed = refreshStaleSchema.safeParse(req.body ?? {}); + if (!parsed.success) { + res.status(400).json({ + success: false, + error: { + code: 'VALIDATION_ERROR', + message: 'Invalid stale refresh request', + details: { errors: parsed.error.issues }, + }, + }); + return; + } + + try { + const result = await AdminService.refreshStaleContent( + parsed.data.days, + parsed.data.limit + ); + res.json({ success: true, data: result }); + } catch (error) { + res.status(500).json({ + success: false, + error: { + code: 'ADMIN_STALE_REFRESH_ERROR', + message: + error instanceof Error ? error.message : 'Failed to queue stale refresh', + }, + }); + } + } +); + /** * POST /api/getinfo/async * Create async job for content scraping diff --git a/src/services/admin.service.ts b/src/services/admin.service.ts new file mode 100644 index 0000000..09feb20 --- /dev/null +++ b/src/services/admin.service.ts @@ -0,0 +1,455 @@ +import prisma from '../config/database.js'; +import redis from '../config/redis.js'; +import { env } from '../config/env.js'; +import { JobService } from './job.service.js'; +import { MetricsService } from './metrics.service.js'; +import { CacheService } from './cache.service.js'; +import { ContentService } from './content.service.js'; +import type { AdminActionResponse, AdminOverviewResponse } from '../types/index.js'; + +const CACHE_PREFIX = 'netflix:content:'; +const MAX_CACHE_KEYS_FOR_ANALYSIS = 1000; + +function formatCacheKeyLabel(key: string): string { + return key.replace(CACHE_PREFIX, ''); +} + +function extractTitleIdFromCacheKey(key: string): string | null { + const normalized = formatCacheKeyLabel(key); + return /^\d+$/.test(normalized) ? normalized : null; +} + +function extractTitleIdFromUrl(url: string): string | null { + return url.match(/\/title\/(\d+)/)?.[1] ?? null; +} + +function parseRedisInfoValue(info: string, key: string): number | null { + const line = info + .split('\n') + .map((item) => item.trim()) + .find((item) => item.startsWith(`${key}:`)); + if (!line) return null; + const raw = line.slice(key.length + 1).trim(); + const value = Number.parseInt(raw, 10); + return Number.isFinite(value) ? value : null; +} + +async function collectCacheKeys(limit?: number): Promise<{ keys: string[]; sampled: boolean }> { + let cursor = '0'; + const keys: string[] = []; + + do { + const [nextCursor, batchKeys] = await redis.scan( + cursor, + 'MATCH', + `${CACHE_PREFIX}*`, + 'COUNT', + 200 + ); + + cursor = nextCursor; + keys.push(...batchKeys); + + if (limit && keys.length >= limit) { + return { keys: keys.slice(0, limit), sampled: true }; + } + } while (cursor !== '0'); + + return { keys, sampled: false }; +} + +export class AdminService { + static async getOverview(): Promise { + const now = new Date(); + const oneDayAgo = new Date(now.getTime() - 24 * 60 * 60 * 1000); + const sevenDaysAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000); + + const [ + totalContent, + recent24h, + recent7d, + missingPlot, + missingAgeRating, + missingBackdrop, + groupedTypes, + groupedJobs, + recentFailedJobs, + recentFinishedJobs, + topGenreLinks, + { keys: cacheKeys, sampled: cacheSampled }, + metricsSnapshot, + redisMemoryInfo, + ] = await Promise.all([ + prisma.content.count(), + prisma.content.count({ where: { createdAt: { gte: oneDayAgo } } }), + prisma.content.count({ where: { createdAt: { gte: sevenDaysAgo } } }), + prisma.content.count({ where: { plot: null } }), + prisma.content.count({ where: { ageRating: null } }), + prisma.content.count({ where: { backdropUrl: null } }), + prisma.content.groupBy({ by: ['type'], _count: { type: true } }), + prisma.scrapeJob.groupBy({ by: ['status'], _count: { status: true } }), + prisma.scrapeJob.findMany({ + where: { status: 'failed' }, + orderBy: { updatedAt: 'desc' }, + take: 8, + select: { id: true, url: true, error: true, updatedAt: true }, + }), + prisma.scrapeJob.findMany({ + where: { status: { in: ['completed', 'failed'] } }, + orderBy: { updatedAt: 'desc' }, + take: 300, + select: { createdAt: true, updatedAt: true }, + }), + prisma.contentGenre.groupBy({ + by: ['genreId'], + _count: { genreId: true }, + orderBy: { _count: { genreId: 'desc' } }, + take: 10, + }), + collectCacheKeys(MAX_CACHE_KEYS_FOR_ANALYSIS), + MetricsService.getSnapshot(), + redis.info('memory').catch(() => ''), + ]); + + const genreIds = topGenreLinks.map((item) => item.genreId); + const genres = genreIds.length + ? await prisma.genre.findMany({ + where: { id: { in: genreIds } }, + select: { id: true, name: true }, + }) + : []; + + const genreMap = new Map(genres.map((genre) => [genre.id, genre.name])); + + const ttlPipeline = redis.pipeline(); + const sizePipeline = redis.pipeline(); + const valuePipeline = redis.pipeline(); + + for (const key of cacheKeys) { + ttlPipeline.ttl(key); + sizePipeline.strlen(key); + valuePipeline.get(key); + } + + const [ttlResults, sizeResults, valueResults] = await Promise.all([ + ttlPipeline.exec(), + sizePipeline.exec(), + valuePipeline.exec(), + ]); + + const ttlDistribution = { + expiredOrNoTtl: 0, + lessThan5Min: 0, + min5To30: 0, + min30Plus: 0, + }; + + const cacheTitleIds = Array.from( + new Set(cacheKeys.map((key) => extractTitleIdFromCacheKey(key)).filter((id): id is string => Boolean(id))) + ); + + const relatedContent = cacheTitleIds.length + ? await prisma.content.findMany({ + where: { + OR: cacheTitleIds.map((id) => ({ + url: { contains: `/title/${id}` }, + })), + }, + select: { + url: true, + title: true, + }, + }) + : []; + + const titleMap = new Map(); + for (const item of relatedContent) { + const id = extractTitleIdFromUrl(item.url); + if (id && !titleMap.has(id)) { + titleMap.set(id, item.title); + } + } + + const expiringSoon: { + key: string; + mediaTitle?: string | null; + cachedAt?: number | null; + ttlSeconds: number; + }[] = []; + let totalBytes = 0; + + for (let i = 0; i < cacheKeys.length; i += 1) { + const ttlValue = Number(ttlResults?.[i]?.[1] ?? -2); + const sizeValue = Number(sizeResults?.[i]?.[1] ?? 0); + const safeSize = Number.isFinite(sizeValue) ? Math.max(0, sizeValue) : 0; + totalBytes += safeSize; + + if (ttlValue <= 0) { + ttlDistribution.expiredOrNoTtl += 1; + } else if (ttlValue < 300) { + ttlDistribution.lessThan5Min += 1; + } else if (ttlValue <= 1800) { + ttlDistribution.min5To30 += 1; + } else { + ttlDistribution.min30Plus += 1; + } + + if (ttlValue > 0) { + const formattedKey = formatCacheKeyLabel(cacheKeys[i] || ''); + const titleId = extractTitleIdFromCacheKey(cacheKeys[i] || ''); + const rawValue = valueResults?.[i]?.[1]; + let cachedAt: number | null = null; + if (typeof rawValue === 'string') { + try { + const parsed = JSON.parse(rawValue) as { cachedAt?: unknown }; + cachedAt = typeof parsed.cachedAt === 'number' ? parsed.cachedAt : null; + } catch { + cachedAt = null; + } + } + expiringSoon.push({ + key: formattedKey, + mediaTitle: titleId ? titleMap.get(titleId) ?? null : null, + cachedAt, + ttlSeconds: ttlValue, + }); + } + } + + expiringSoon.sort((a, b) => { + const aCachedAt = a.cachedAt ?? 0; + const bCachedAt = b.cachedAt ?? 0; + if (aCachedAt !== bCachedAt) return bCachedAt - aCachedAt; + return b.ttlSeconds - a.ttlSeconds; + }); + + const jobCounts = { + pending: 0, + processing: 0, + completed: 0, + failed: 0, + }; + + for (const row of groupedJobs) { + if (row.status in jobCounts) { + jobCounts[row.status as keyof typeof jobCounts] = row._count.status; + } + } + + const contentByType = { + movie: 0, + tvshow: 0, + }; + + for (const row of groupedTypes) { + if (row.type in contentByType) { + contentByType[row.type as keyof typeof contentByType] = row._count.type; + } + } + + const averageDurationMs = + recentFinishedJobs.length === 0 + ? 0 + : Math.round( + recentFinishedJobs.reduce((sum, job) => { + const duration = job.updatedAt.getTime() - job.createdAt.getTime(); + return sum + Math.max(0, duration); + }, 0) / recentFinishedJobs.length + ); + + const totalCacheLookups = metricsSnapshot.cacheHits + metricsSnapshot.cacheMisses; + const cacheHitRate = totalCacheLookups + ? Number((metricsSnapshot.cacheHits / totalCacheLookups).toFixed(4)) + : 0; + const redisUsedBytes = parseRedisInfoValue(redisMemoryInfo, 'used_memory') ?? 0; + const redisMaxBytesRaw = parseRedisInfoValue(redisMemoryInfo, 'maxmemory'); + const redisMaxBytes = redisMaxBytesRaw && redisMaxBytesRaw > 0 ? redisMaxBytesRaw : null; + + return { + generatedAt: now.toISOString(), + environment: env.NODE_ENV, + cache: { + configuredTtlSeconds: env.REDIS_TTL_SECONDS, + keyCount: cacheKeys.length, + analyzedKeyLimit: MAX_CACHE_KEYS_FOR_ANALYSIS, + sampled: cacheSampled, + totalBytes, + redisMemory: { + usedBytes: redisUsedBytes, + maxBytes: redisMaxBytes, + }, + ttlDistribution, + expiringSoon: expiringSoon.slice(0, 10), + }, + content: { + total: totalContent, + byType: contentByType, + addedLast24h: recent24h, + addedLast7d: recent7d, + metadataGaps: { + missingPlot, + missingAgeRating, + missingBackdrop, + }, + topGenres: topGenreLinks.map((item) => ({ + name: genreMap.get(item.genreId) ?? 'Unknown', + count: item._count.genreId, + })), + }, + jobs: { + counts: jobCounts, + averageDurationMs, + failedRecent: recentFailedJobs.map((job) => ({ + id: job.id, + url: job.url, + error: job.error ?? 'Unknown error', + updatedAt: job.updatedAt.toISOString(), + })), + }, + requestMetrics: { + cacheHits: metricsSnapshot.cacheHits, + cacheMisses: metricsSnapshot.cacheMisses, + cacheHitRate, + sourceCounts: metricsSnapshot.bySource, + }, + }; + } + + static async clearCache(): Promise { + const { keys } = await collectCacheKeys(); + if (keys.length === 0) { + return { + queued: 0, + skipped: 0, + details: 'No cache keys matched prefix', + }; + } + + await redis.del(...keys); + return { + queued: keys.length, + skipped: 0, + details: 'Cache keys deleted', + }; + } + + static async warmupCacheFromDatabase(): Promise { + const allContent = await prisma.content.findMany({ + include: { + genres: { + include: { + genre: true, + }, + }, + castMembers: { + orderBy: { name: 'asc' }, + }, + }, + orderBy: { createdAt: 'desc' }, + }); + + let queued = 0; + for (const item of allContent) { + const apiPayload = ContentService.toApiResponse({ + id: item.id, + url: item.url, + title: item.title, + year: item.year, + plot: item.plot, + backdropUrl: item.backdropUrl, + ageRating: item.ageRating, + type: item.type as 'movie' | 'tvshow', + currentSeason: item.currentSeason, + genres: item.genres.map((g) => g.genre.name), + cast: item.castMembers.map((c) => c.name), + createdAt: item.createdAt, + updatedAt: item.updatedAt, + }); + + await CacheService.set(item.url, apiPayload); + queued += 1; + } + + return { + queued, + skipped: 0, + details: 'Database content written to Redis cache', + }; + } + + static async retryFailedJobs(limit: number): Promise { + const failedJobs = await prisma.scrapeJob.findMany({ + where: { status: 'failed' }, + orderBy: { updatedAt: 'desc' }, + take: limit, + select: { url: true }, + }); + + let queued = 0; + let skipped = 0; + const uniqueUrls = Array.from(new Set(failedJobs.map((job) => job.url))); + + for (const url of uniqueUrls) { + const activeJob = await prisma.scrapeJob.findFirst({ + where: { url, status: { in: ['pending', 'processing'] } }, + select: { id: true }, + }); + + if (activeJob) { + skipped += 1; + continue; + } + + const job = await JobService.create(url); + JobService.process(job.id).catch(() => { + // async retry failures are reflected in job status + }); + queued += 1; + } + + return { + queued, + skipped, + details: 'Failed jobs retried', + }; + } + + static async refreshStaleContent(days: number, limit: number): Promise { + const threshold = new Date(Date.now() - days * 24 * 60 * 60 * 1000); + const staleContent = await prisma.content.findMany({ + where: { updatedAt: { lt: threshold } }, + orderBy: { updatedAt: 'asc' }, + take: limit, + select: { url: true }, + }); + + let queued = 0; + let skipped = 0; + + for (const item of staleContent) { + const activeJob = await prisma.scrapeJob.findFirst({ + where: { url: item.url, status: { in: ['pending', 'processing'] } }, + select: { id: true }, + }); + + if (activeJob) { + skipped += 1; + continue; + } + + const job = await JobService.create(item.url); + JobService.process(job.id).catch(() => { + // async refresh failures are reflected in job status + }); + queued += 1; + } + + return { + queued, + skipped, + details: `Stale content refresh queued for items older than ${days} days`, + }; + } +} + +export default AdminService; diff --git a/src/services/cache.service.ts b/src/services/cache.service.ts index 06ed479..31d1e16 100644 --- a/src/services/cache.service.ts +++ b/src/services/cache.service.ts @@ -1,7 +1,8 @@ import redis from '../config/redis.js'; import { env } from '../config/env.js'; +import { emitCacheEvent } from '../config/socket.js'; import logger from '../utils/logger.js'; -import type { GetInfoResponse, CacheEntry, DataSource } from '../types/index.js'; +import type { GetInfoResponse, CacheEntry } from '../types/index.js'; /** * Cache key prefix for Netflix content @@ -63,6 +64,12 @@ export class CacheService { try { await redis.setex(key, ttl, JSON.stringify(entry)); + emitCacheEvent({ + action: 'written', + key, + ttlSeconds: ttl, + occurredAt: new Date().toISOString(), + }); logger.debug('Cache set', { url, ttl }); } catch (error) { logger.error('Cache set error', { @@ -80,6 +87,11 @@ export class CacheService { try { await redis.del(key); + emitCacheEvent({ + action: 'deleted', + key, + occurredAt: new Date().toISOString(), + }); logger.debug('Cache deleted', { url }); } catch (error) { logger.error('Cache delete error', { @@ -133,6 +145,11 @@ export class CacheService { if (keys.length > 0) { await redis.del(...keys); + emitCacheEvent({ + action: 'cleared', + count: keys.length, + occurredAt: new Date().toISOString(), + }); logger.info('Cache cleared', { count: keys.length }); } } catch (error) { diff --git a/src/services/content.service.ts b/src/services/content.service.ts index 685c0a3..79155d1 100644 --- a/src/services/content.service.ts +++ b/src/services/content.service.ts @@ -1,4 +1,5 @@ import prisma from '../config/database.js'; +import { emitContentEvent } from '../config/socket.js'; import type { ContentData, ScraperResult, GetInfoResponse } from '../types/index.js'; /** @@ -105,7 +106,14 @@ export class ContentService { }, }); - return this.mapToContentData(content); + const mapped = this.mapToContentData(content); + emitContentEvent({ + action: 'created', + url, + content: this.toApiResponse(mapped), + occurredAt: new Date().toISOString(), + }); + return mapped; } /** @@ -171,7 +179,14 @@ export class ContentService { }, }); - return this.mapToContentData(content); + const mapped = this.mapToContentData(content); + emitContentEvent({ + action: 'updated', + url, + content: this.toApiResponse(mapped), + occurredAt: new Date().toISOString(), + }); + return mapped; } /** @@ -181,6 +196,11 @@ export class ContentService { await prisma.content.delete({ where: { url }, }); + emitContentEvent({ + action: 'deleted', + url, + occurredAt: new Date().toISOString(), + }); } /** diff --git a/src/services/job.service.ts b/src/services/job.service.ts index c22f651..cff15a3 100644 --- a/src/services/job.service.ts +++ b/src/services/job.service.ts @@ -3,6 +3,7 @@ import prisma from '../config/database.js'; import { CacheService } from './cache.service.js'; import { ContentService } from './content.service.js'; import { ScraperService } from './scraper.service.js'; +import { MetricsService } from './metrics.service.js'; import { emitJobProgress, emitJobCompleted, @@ -94,9 +95,11 @@ export class JobService { // Step 1: Check cache const cachedData = await CacheService.get(job.url); if (cachedData) { + await MetricsService.incrementCacheHit(); await this.completeJob(jobId, cachedData, 'cache'); return; } + await MetricsService.incrementCacheMiss(); // Update progress await this.update(jobId, { progress: 30, step: 'checking_database' }); @@ -169,6 +172,7 @@ export class JobService { }); emitJobCompleted(jobId, data, source); + await MetricsService.incrementSource(source); logger.info('Job completed', { jobId, source }); } @@ -182,14 +186,18 @@ export class JobService { // Step 1: Check cache const cachedData = await CacheService.get(url); if (cachedData) { + await MetricsService.incrementCacheHit(); + await MetricsService.incrementSource('cache'); return { data: cachedData, source: 'cache' }; } + await MetricsService.incrementCacheMiss(); // Step 2: Check database const dbContent = await ContentService.findByUrl(url); if (dbContent) { const responseData = ContentService.toApiResponse(dbContent); await CacheService.set(url, responseData); + await MetricsService.incrementSource('database'); return { data: responseData, source: 'database' }; } @@ -202,6 +210,7 @@ export class JobService { // Step 5: Cache the result await CacheService.set(url, responseData); + await MetricsService.incrementSource('netflix'); return { data: responseData, source: 'netflix' }; } diff --git a/src/services/metrics.service.ts b/src/services/metrics.service.ts new file mode 100644 index 0000000..aad8635 --- /dev/null +++ b/src/services/metrics.service.ts @@ -0,0 +1,82 @@ +import redis from '../config/redis.js'; +import { emitMetricsEvent } from '../config/socket.js'; +import type { DataSource } from '../types/index.js'; + +const COUNTERS_KEY = 'metrics:counters'; +const SOURCES_KEY = 'metrics:sources'; + +function toInt(value: string | null | undefined): number { + if (!value) return 0; + const parsed = Number.parseInt(value, 10); + return Number.isFinite(parsed) ? parsed : 0; +} + +export class MetricsService { + private static async emitSnapshot(): Promise { + try { + const snapshot = await this.getSnapshot(); + emitMetricsEvent({ + cacheHits: snapshot.cacheHits, + cacheMisses: snapshot.cacheMisses, + sourceCounts: snapshot.bySource, + occurredAt: new Date().toISOString(), + }); + } catch { + // best-effort metrics emit + } + } + + static async incrementCacheHit(): Promise { + try { + await redis.hincrby(COUNTERS_KEY, 'cache_hits', 1); + await this.emitSnapshot(); + } catch { + // best-effort metrics + } + } + + static async incrementCacheMiss(): Promise { + try { + await redis.hincrby(COUNTERS_KEY, 'cache_misses', 1); + await this.emitSnapshot(); + } catch { + // best-effort metrics + } + } + + static async incrementSource(source: DataSource): Promise { + try { + await redis.hincrby(SOURCES_KEY, source, 1); + await this.emitSnapshot(); + } catch { + // best-effort metrics + } + } + + static async getSnapshot(): Promise<{ + cacheHits: number; + cacheMisses: number; + bySource: { + cache: number; + database: number; + netflix: number; + }; + }> { + const [counters, sources] = await Promise.all([ + redis.hgetall(COUNTERS_KEY), + redis.hgetall(SOURCES_KEY), + ]); + + return { + cacheHits: toInt(counters.cache_hits), + cacheMisses: toInt(counters.cache_misses), + bySource: { + cache: toInt(sources.cache), + database: toInt(sources.database), + netflix: toInt(sources.netflix), + }, + }; + } +} + +export default MetricsService; diff --git a/src/types/index.ts b/src/types/index.ts index fdb40f0..68ee890 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -68,6 +68,83 @@ export interface GetInfoResponse { currentSeason: number | null; } +export interface AdminOverviewResponse { + generatedAt: string; + environment: 'development' | 'production' | 'test'; + cache: { + configuredTtlSeconds: number; + keyCount: number; + analyzedKeyLimit: number; + sampled: boolean; + totalBytes: number; + redisMemory: { + usedBytes: number; + maxBytes: number | null; + }; + ttlDistribution: { + expiredOrNoTtl: number; + lessThan5Min: number; + min5To30: number; + min30Plus: number; + }; + expiringSoon: Array<{ + key: string; + mediaTitle?: string | null; + cachedAt?: number | null; + ttlSeconds: number; + }>; + }; + content: { + total: number; + byType: { + movie: number; + tvshow: number; + }; + addedLast24h: number; + addedLast7d: number; + metadataGaps: { + missingPlot: number; + missingAgeRating: number; + missingBackdrop: number; + }; + topGenres: Array<{ + name: string; + count: number; + }>; + }; + jobs: { + counts: { + pending: number; + processing: number; + completed: number; + failed: number; + }; + averageDurationMs: number; + failedRecent: Array<{ + id: string; + url: string; + error: string; + updatedAt: string; + }>; + }; + requestMetrics: { + cacheHits: number; + cacheMisses: number; + cacheHitRate: number; + sourceCounts: { + cache: number; + database: number; + netflix: number; + }; + }; +} + +export interface AdminActionResponse { + queued: number; + skipped: number; + details?: string; +} + // ============================================ // Cache Types // ============================================