feat(backend): realtime event system, admin API ve metrics altyapısı

- socket.ts: ContentRealtimeEvent, CacheRealtimeEvent, MetricsRealtimeEvent
  tipleri eklendi; emitContentEvent / emitCacheEvent / emitMetricsEvent
  fonksiyonları ile tüm istemcilere broadcast desteği getirildi.
  emitJobCompleted imzası GetInfoResponse + DataSource ile güçlendirildi.

- auth.middleware.ts: require() tabanlı env erişimi static import'a
  dönüştürüldü; admin-only endpointler için adminOnlyMiddleware eklendi
  (X-API-Key !== API_KEY_ADMIN → 403).

- cache.service.ts: set / delete / clearAll işlemlerinden sonra
  emitCacheEvent çağrısı eklenerek cache mutasyonları anlık yayınlanıyor.

- content.service.ts: create / update / delete sonrasında emitContentEvent
  çağrısı eklenerek DB yazımları Socket.IO üzerinden duyuruluyor.

- job.service.ts: async ve sync akışa MetricsService entegrasyonu eklendi;
  cache hit/miss ve kaynak (cache/database/netflix) sayaçları her işlemde
  artırılıyor.

- types/index.ts: AdminOverviewResponse ve AdminActionResponse tipleri
  merkezi olarak tanımlandı.

- admin.service.ts (yeni): getOverview, clearCache, warmupCacheFromDatabase,
  retryFailedJobs, refreshStaleContent operasyonları implement edildi.
  Redis pipeline ile TTL/boyut analizi ve DB metrikleri paralel toplanıyor.

- metrics.service.ts (yeni): Redis hash tabanlı cache hit/miss ve kaynak
  sayaçları; her artışta MetricsRealtimeEvent yayınlanıyor.

- api.routes.ts: Admin endpointleri eklendi:
    GET  /api/admin/overview
    POST /api/admin/cache/clear
    POST /api/admin/cache/warmup
    POST /api/admin/jobs/retry-failed
    POST /api/admin/content/refresh-stale

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-28 11:59:59 +03:00
parent 97fb289fe7
commit 5c496277f2
9 changed files with 932 additions and 10 deletions

View File

@@ -1,6 +1,7 @@
import { Server as HttpServer } from 'http'; import { Server as HttpServer } from 'http';
import { Server, Socket } from 'socket.io'; import { Server, Socket } from 'socket.io';
import logger from '../utils/logger.js'; import logger from '../utils/logger.js';
import type { DataSource, GetInfoResponse } from '../types/index.js';
/** /**
* Socket.IO Server singleton * Socket.IO Server singleton
@@ -11,6 +12,32 @@ export interface SocketData {
subscribedJobs: Set<string>; subscribedJobs: Set<string>;
} }
export interface ContentRealtimeEvent {
action: 'created' | 'updated' | 'deleted';
url: string;
content?: GetInfoResponse;
occurredAt: string;
}
export interface CacheRealtimeEvent {
action: 'written' | 'deleted' | 'cleared';
key?: string;
ttlSeconds?: number;
count?: number;
occurredAt: string;
}
export interface MetricsRealtimeEvent {
cacheHits: number;
cacheMisses: number;
sourceCounts: {
cache: number;
database: number;
netflix: number;
};
occurredAt: string;
}
/** /**
* Initialize Socket.IO server * Initialize Socket.IO server
*/ */
@@ -86,8 +113,8 @@ export function emitJobProgress(
*/ */
export function emitJobCompleted( export function emitJobCompleted(
jobId: string, jobId: string,
data: unknown, data: GetInfoResponse,
source: string source: DataSource
): void { ): void {
if (io) { if (io) {
io.to(`job:${jobId}`).emit('job:completed', { io.to(`job:${jobId}`).emit('job:completed', {
@@ -98,6 +125,33 @@ export function emitJobCompleted(
} }
} }
/**
* Emit realtime content mutation event to all clients
*/
export function emitContentEvent(event: ContentRealtimeEvent): void {
if (io) {
io.emit('content:event', event);
}
}
/**
* Emit realtime cache event to all clients
*/
export function emitCacheEvent(event: CacheRealtimeEvent): void {
if (io) {
io.emit('cache:event', event);
}
}
/**
* Emit realtime metrics event to all clients
*/
export function emitMetricsEvent(event: MetricsRealtimeEvent): void {
if (io) {
io.emit('metrics:updated', event);
}
}
/** /**
* Emit job error event * Emit job error event
*/ */

View File

@@ -1,5 +1,5 @@
import { Request, Response, NextFunction } from 'express'; import { Request, Response, NextFunction } from 'express';
import { getValidApiKeys } from '../config/env.js'; import { env, getValidApiKeys } from '../config/env.js';
import logger from '../utils/logger.js'; import logger from '../utils/logger.js';
import type { ApiResponse } from '../types/index.js'; import type { ApiResponse } from '../types/index.js';
@@ -61,8 +61,6 @@ export function authMiddleware(
* Optional: Identify which client made the request * Optional: Identify which client made the request
*/ */
export function identifyClient(apiKey: string): string { export function identifyClient(apiKey: string): string {
const { env } = require('../config/env.js');
if (apiKey === env.API_KEY_WEB) return 'web'; if (apiKey === env.API_KEY_WEB) return 'web';
if (apiKey === env.API_KEY_MOBILE) return 'mobile'; if (apiKey === env.API_KEY_MOBILE) return 'mobile';
if (apiKey === env.API_KEY_ADMIN) return 'admin'; if (apiKey === env.API_KEY_ADMIN) return 'admin';
@@ -70,4 +68,46 @@ export function identifyClient(apiKey: string): string {
return 'unknown'; return 'unknown';
} }
/**
* Strict admin API key guard.
* Use for admin-only operational endpoints.
*/
export function adminOnlyMiddleware(
req: Request,
res: Response,
next: NextFunction
): void {
const apiKey = req.headers['x-api-key'] as string | undefined;
if (!apiKey) {
res.status(401).json({
success: false,
error: {
code: 'MISSING_API_KEY',
message: 'API key is required. Include X-API-Key header.',
},
} satisfies ApiResponse<never>);
return;
}
if (apiKey !== env.API_KEY_ADMIN) {
logger.warn('Admin endpoint access denied', {
ip: req.ip,
path: req.path,
keyPrefix: apiKey.substring(0, 8) + '...',
});
res.status(403).json({
success: false,
error: {
code: 'ADMIN_API_KEY_REQUIRED',
message: 'Admin API key required.',
},
} satisfies ApiResponse<never>);
return;
}
next();
}
export default authMiddleware; export default authMiddleware;

View File

@@ -1,17 +1,31 @@
import { Router, Request, Response } from 'express'; import { Router, Request, Response } from 'express';
import { z } from 'zod'; import { z } from 'zod';
import { authMiddleware } from '../middleware/auth.middleware.js'; import { adminOnlyMiddleware, authMiddleware } from '../middleware/auth.middleware.js';
import { scrapeRateLimiter } from '../middleware/rateLimit.middleware.js'; import { scrapeRateLimiter } from '../middleware/rateLimit.middleware.js';
import { validateGetInfo } from '../middleware/validation.middleware.js'; import { validateGetInfo } from '../middleware/validation.middleware.js';
import { JobService } from '../services/job.service.js'; import { JobService } from '../services/job.service.js';
import { ContentService } from '../services/content.service.js'; import { ContentService } from '../services/content.service.js';
import type { ApiResponse, GetInfoRequest, GetInfoResponse } from '../types/index.js'; import { AdminService } from '../services/admin.service.js';
import type {
AdminOverviewResponse,
AdminActionResponse,
ApiResponse,
GetInfoRequest,
GetInfoResponse,
} from '../types/index.js';
const router = Router(); const router = Router();
const listContentSchema = z.object({ const listContentSchema = z.object({
type: z.enum(['movie', 'tvshow']).optional(), type: z.enum(['movie', 'tvshow']).optional(),
limit: z.coerce.number().int().min(1).max(100).optional(), limit: z.coerce.number().int().min(1).max(100).optional(),
}); });
const retryFailedJobsSchema = z.object({
limit: z.coerce.number().int().min(1).max(50).default(10),
});
const refreshStaleSchema = z.object({
days: z.coerce.number().int().min(1).max(365).default(30),
limit: z.coerce.number().int().min(1).max(100).default(20),
});
/** /**
* POST /api/getinfo * POST /api/getinfo
@@ -118,6 +132,160 @@ router.get(
} }
); );
/**
* GET /api/admin/overview
* Admin dashboard summary metrics (cache, db, jobs)
*
* Headers: X-API-Key: <api_key>
*/
router.get(
'/admin/overview',
adminOnlyMiddleware,
async (_req: Request, res: Response<ApiResponse<AdminOverviewResponse>>) => {
try {
const overview = await AdminService.getOverview();
res.json({
success: true,
data: overview,
});
} catch (error) {
res.status(500).json({
success: false,
error: {
code: 'ADMIN_OVERVIEW_ERROR',
message:
error instanceof Error
? error.message
: 'Failed to fetch admin overview',
},
});
}
}
);
/**
* POST /api/admin/cache/clear
* Delete all content cache keys from Redis.
*/
router.post(
'/admin/cache/clear',
adminOnlyMiddleware,
async (_req: Request, res: Response<ApiResponse<AdminActionResponse>>) => {
try {
const result = await AdminService.clearCache();
res.json({ success: true, data: result });
} catch (error) {
res.status(500).json({
success: false,
error: {
code: 'ADMIN_CACHE_CLEAR_ERROR',
message: error instanceof Error ? error.message : 'Failed to clear cache',
},
});
}
}
);
/**
* POST /api/admin/cache/warmup
* Warm Redis cache from all DB content.
*/
router.post(
'/admin/cache/warmup',
adminOnlyMiddleware,
async (_req: Request, res: Response<ApiResponse<AdminActionResponse>>) => {
try {
const result = await AdminService.warmupCacheFromDatabase();
res.json({ success: true, data: result });
} catch (error) {
res.status(500).json({
success: false,
error: {
code: 'ADMIN_CACHE_WARMUP_ERROR',
message: error instanceof Error ? error.message : 'Failed to warm cache',
},
});
}
}
);
/**
* POST /api/admin/jobs/retry-failed
* Requeue recent failed jobs.
*/
router.post(
'/admin/jobs/retry-failed',
adminOnlyMiddleware,
async (req: Request, res: Response<ApiResponse<AdminActionResponse>>) => {
const parsed = retryFailedJobsSchema.safeParse(req.body ?? {});
if (!parsed.success) {
res.status(400).json({
success: false,
error: {
code: 'VALIDATION_ERROR',
message: 'Invalid retry request',
details: { errors: parsed.error.issues },
},
});
return;
}
try {
const result = await AdminService.retryFailedJobs(parsed.data.limit);
res.json({ success: true, data: result });
} catch (error) {
res.status(500).json({
success: false,
error: {
code: 'ADMIN_RETRY_FAILED_ERROR',
message:
error instanceof Error ? error.message : 'Failed to retry failed jobs',
},
});
}
}
);
/**
* POST /api/admin/content/refresh-stale
* Queue refresh jobs for stale content entries.
*/
router.post(
'/admin/content/refresh-stale',
adminOnlyMiddleware,
async (req: Request, res: Response<ApiResponse<AdminActionResponse>>) => {
const parsed = refreshStaleSchema.safeParse(req.body ?? {});
if (!parsed.success) {
res.status(400).json({
success: false,
error: {
code: 'VALIDATION_ERROR',
message: 'Invalid stale refresh request',
details: { errors: parsed.error.issues },
},
});
return;
}
try {
const result = await AdminService.refreshStaleContent(
parsed.data.days,
parsed.data.limit
);
res.json({ success: true, data: result });
} catch (error) {
res.status(500).json({
success: false,
error: {
code: 'ADMIN_STALE_REFRESH_ERROR',
message:
error instanceof Error ? error.message : 'Failed to queue stale refresh',
},
});
}
}
);
/** /**
* POST /api/getinfo/async * POST /api/getinfo/async
* Create async job for content scraping * Create async job for content scraping

View File

@@ -0,0 +1,455 @@
import prisma from '../config/database.js';
import redis from '../config/redis.js';
import { env } from '../config/env.js';
import { JobService } from './job.service.js';
import { MetricsService } from './metrics.service.js';
import { CacheService } from './cache.service.js';
import { ContentService } from './content.service.js';
import type { AdminActionResponse, AdminOverviewResponse } from '../types/index.js';
const CACHE_PREFIX = 'netflix:content:';
const MAX_CACHE_KEYS_FOR_ANALYSIS = 1000;
function formatCacheKeyLabel(key: string): string {
return key.replace(CACHE_PREFIX, '');
}
function extractTitleIdFromCacheKey(key: string): string | null {
const normalized = formatCacheKeyLabel(key);
return /^\d+$/.test(normalized) ? normalized : null;
}
function extractTitleIdFromUrl(url: string): string | null {
return url.match(/\/title\/(\d+)/)?.[1] ?? null;
}
function parseRedisInfoValue(info: string, key: string): number | null {
const line = info
.split('\n')
.map((item) => item.trim())
.find((item) => item.startsWith(`${key}:`));
if (!line) return null;
const raw = line.slice(key.length + 1).trim();
const value = Number.parseInt(raw, 10);
return Number.isFinite(value) ? value : null;
}
async function collectCacheKeys(limit?: number): Promise<{ keys: string[]; sampled: boolean }> {
let cursor = '0';
const keys: string[] = [];
do {
const [nextCursor, batchKeys] = await redis.scan(
cursor,
'MATCH',
`${CACHE_PREFIX}*`,
'COUNT',
200
);
cursor = nextCursor;
keys.push(...batchKeys);
if (limit && keys.length >= limit) {
return { keys: keys.slice(0, limit), sampled: true };
}
} while (cursor !== '0');
return { keys, sampled: false };
}
export class AdminService {
static async getOverview(): Promise<AdminOverviewResponse> {
const now = new Date();
const oneDayAgo = new Date(now.getTime() - 24 * 60 * 60 * 1000);
const sevenDaysAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000);
const [
totalContent,
recent24h,
recent7d,
missingPlot,
missingAgeRating,
missingBackdrop,
groupedTypes,
groupedJobs,
recentFailedJobs,
recentFinishedJobs,
topGenreLinks,
{ keys: cacheKeys, sampled: cacheSampled },
metricsSnapshot,
redisMemoryInfo,
] = await Promise.all([
prisma.content.count(),
prisma.content.count({ where: { createdAt: { gte: oneDayAgo } } }),
prisma.content.count({ where: { createdAt: { gte: sevenDaysAgo } } }),
prisma.content.count({ where: { plot: null } }),
prisma.content.count({ where: { ageRating: null } }),
prisma.content.count({ where: { backdropUrl: null } }),
prisma.content.groupBy({ by: ['type'], _count: { type: true } }),
prisma.scrapeJob.groupBy({ by: ['status'], _count: { status: true } }),
prisma.scrapeJob.findMany({
where: { status: 'failed' },
orderBy: { updatedAt: 'desc' },
take: 8,
select: { id: true, url: true, error: true, updatedAt: true },
}),
prisma.scrapeJob.findMany({
where: { status: { in: ['completed', 'failed'] } },
orderBy: { updatedAt: 'desc' },
take: 300,
select: { createdAt: true, updatedAt: true },
}),
prisma.contentGenre.groupBy({
by: ['genreId'],
_count: { genreId: true },
orderBy: { _count: { genreId: 'desc' } },
take: 10,
}),
collectCacheKeys(MAX_CACHE_KEYS_FOR_ANALYSIS),
MetricsService.getSnapshot(),
redis.info('memory').catch(() => ''),
]);
const genreIds = topGenreLinks.map((item) => item.genreId);
const genres = genreIds.length
? await prisma.genre.findMany({
where: { id: { in: genreIds } },
select: { id: true, name: true },
})
: [];
const genreMap = new Map(genres.map((genre) => [genre.id, genre.name]));
const ttlPipeline = redis.pipeline();
const sizePipeline = redis.pipeline();
const valuePipeline = redis.pipeline();
for (const key of cacheKeys) {
ttlPipeline.ttl(key);
sizePipeline.strlen(key);
valuePipeline.get(key);
}
const [ttlResults, sizeResults, valueResults] = await Promise.all([
ttlPipeline.exec(),
sizePipeline.exec(),
valuePipeline.exec(),
]);
const ttlDistribution = {
expiredOrNoTtl: 0,
lessThan5Min: 0,
min5To30: 0,
min30Plus: 0,
};
const cacheTitleIds = Array.from(
new Set(cacheKeys.map((key) => extractTitleIdFromCacheKey(key)).filter((id): id is string => Boolean(id)))
);
const relatedContent = cacheTitleIds.length
? await prisma.content.findMany({
where: {
OR: cacheTitleIds.map((id) => ({
url: { contains: `/title/${id}` },
})),
},
select: {
url: true,
title: true,
},
})
: [];
const titleMap = new Map<string, string>();
for (const item of relatedContent) {
const id = extractTitleIdFromUrl(item.url);
if (id && !titleMap.has(id)) {
titleMap.set(id, item.title);
}
}
const expiringSoon: {
key: string;
mediaTitle?: string | null;
cachedAt?: number | null;
ttlSeconds: number;
}[] = [];
let totalBytes = 0;
for (let i = 0; i < cacheKeys.length; i += 1) {
const ttlValue = Number(ttlResults?.[i]?.[1] ?? -2);
const sizeValue = Number(sizeResults?.[i]?.[1] ?? 0);
const safeSize = Number.isFinite(sizeValue) ? Math.max(0, sizeValue) : 0;
totalBytes += safeSize;
if (ttlValue <= 0) {
ttlDistribution.expiredOrNoTtl += 1;
} else if (ttlValue < 300) {
ttlDistribution.lessThan5Min += 1;
} else if (ttlValue <= 1800) {
ttlDistribution.min5To30 += 1;
} else {
ttlDistribution.min30Plus += 1;
}
if (ttlValue > 0) {
const formattedKey = formatCacheKeyLabel(cacheKeys[i] || '');
const titleId = extractTitleIdFromCacheKey(cacheKeys[i] || '');
const rawValue = valueResults?.[i]?.[1];
let cachedAt: number | null = null;
if (typeof rawValue === 'string') {
try {
const parsed = JSON.parse(rawValue) as { cachedAt?: unknown };
cachedAt = typeof parsed.cachedAt === 'number' ? parsed.cachedAt : null;
} catch {
cachedAt = null;
}
}
expiringSoon.push({
key: formattedKey,
mediaTitle: titleId ? titleMap.get(titleId) ?? null : null,
cachedAt,
ttlSeconds: ttlValue,
});
}
}
expiringSoon.sort((a, b) => {
const aCachedAt = a.cachedAt ?? 0;
const bCachedAt = b.cachedAt ?? 0;
if (aCachedAt !== bCachedAt) return bCachedAt - aCachedAt;
return b.ttlSeconds - a.ttlSeconds;
});
const jobCounts = {
pending: 0,
processing: 0,
completed: 0,
failed: 0,
};
for (const row of groupedJobs) {
if (row.status in jobCounts) {
jobCounts[row.status as keyof typeof jobCounts] = row._count.status;
}
}
const contentByType = {
movie: 0,
tvshow: 0,
};
for (const row of groupedTypes) {
if (row.type in contentByType) {
contentByType[row.type as keyof typeof contentByType] = row._count.type;
}
}
const averageDurationMs =
recentFinishedJobs.length === 0
? 0
: Math.round(
recentFinishedJobs.reduce((sum, job) => {
const duration = job.updatedAt.getTime() - job.createdAt.getTime();
return sum + Math.max(0, duration);
}, 0) / recentFinishedJobs.length
);
const totalCacheLookups = metricsSnapshot.cacheHits + metricsSnapshot.cacheMisses;
const cacheHitRate = totalCacheLookups
? Number((metricsSnapshot.cacheHits / totalCacheLookups).toFixed(4))
: 0;
const redisUsedBytes = parseRedisInfoValue(redisMemoryInfo, 'used_memory') ?? 0;
const redisMaxBytesRaw = parseRedisInfoValue(redisMemoryInfo, 'maxmemory');
const redisMaxBytes = redisMaxBytesRaw && redisMaxBytesRaw > 0 ? redisMaxBytesRaw : null;
return {
generatedAt: now.toISOString(),
environment: env.NODE_ENV,
cache: {
configuredTtlSeconds: env.REDIS_TTL_SECONDS,
keyCount: cacheKeys.length,
analyzedKeyLimit: MAX_CACHE_KEYS_FOR_ANALYSIS,
sampled: cacheSampled,
totalBytes,
redisMemory: {
usedBytes: redisUsedBytes,
maxBytes: redisMaxBytes,
},
ttlDistribution,
expiringSoon: expiringSoon.slice(0, 10),
},
content: {
total: totalContent,
byType: contentByType,
addedLast24h: recent24h,
addedLast7d: recent7d,
metadataGaps: {
missingPlot,
missingAgeRating,
missingBackdrop,
},
topGenres: topGenreLinks.map((item) => ({
name: genreMap.get(item.genreId) ?? 'Unknown',
count: item._count.genreId,
})),
},
jobs: {
counts: jobCounts,
averageDurationMs,
failedRecent: recentFailedJobs.map((job) => ({
id: job.id,
url: job.url,
error: job.error ?? 'Unknown error',
updatedAt: job.updatedAt.toISOString(),
})),
},
requestMetrics: {
cacheHits: metricsSnapshot.cacheHits,
cacheMisses: metricsSnapshot.cacheMisses,
cacheHitRate,
sourceCounts: metricsSnapshot.bySource,
},
};
}
static async clearCache(): Promise<AdminActionResponse> {
const { keys } = await collectCacheKeys();
if (keys.length === 0) {
return {
queued: 0,
skipped: 0,
details: 'No cache keys matched prefix',
};
}
await redis.del(...keys);
return {
queued: keys.length,
skipped: 0,
details: 'Cache keys deleted',
};
}
static async warmupCacheFromDatabase(): Promise<AdminActionResponse> {
const allContent = await prisma.content.findMany({
include: {
genres: {
include: {
genre: true,
},
},
castMembers: {
orderBy: { name: 'asc' },
},
},
orderBy: { createdAt: 'desc' },
});
let queued = 0;
for (const item of allContent) {
const apiPayload = ContentService.toApiResponse({
id: item.id,
url: item.url,
title: item.title,
year: item.year,
plot: item.plot,
backdropUrl: item.backdropUrl,
ageRating: item.ageRating,
type: item.type as 'movie' | 'tvshow',
currentSeason: item.currentSeason,
genres: item.genres.map((g) => g.genre.name),
cast: item.castMembers.map((c) => c.name),
createdAt: item.createdAt,
updatedAt: item.updatedAt,
});
await CacheService.set(item.url, apiPayload);
queued += 1;
}
return {
queued,
skipped: 0,
details: 'Database content written to Redis cache',
};
}
static async retryFailedJobs(limit: number): Promise<AdminActionResponse> {
const failedJobs = await prisma.scrapeJob.findMany({
where: { status: 'failed' },
orderBy: { updatedAt: 'desc' },
take: limit,
select: { url: true },
});
let queued = 0;
let skipped = 0;
const uniqueUrls = Array.from(new Set(failedJobs.map((job) => job.url)));
for (const url of uniqueUrls) {
const activeJob = await prisma.scrapeJob.findFirst({
where: { url, status: { in: ['pending', 'processing'] } },
select: { id: true },
});
if (activeJob) {
skipped += 1;
continue;
}
const job = await JobService.create(url);
JobService.process(job.id).catch(() => {
// async retry failures are reflected in job status
});
queued += 1;
}
return {
queued,
skipped,
details: 'Failed jobs retried',
};
}
static async refreshStaleContent(days: number, limit: number): Promise<AdminActionResponse> {
const threshold = new Date(Date.now() - days * 24 * 60 * 60 * 1000);
const staleContent = await prisma.content.findMany({
where: { updatedAt: { lt: threshold } },
orderBy: { updatedAt: 'asc' },
take: limit,
select: { url: true },
});
let queued = 0;
let skipped = 0;
for (const item of staleContent) {
const activeJob = await prisma.scrapeJob.findFirst({
where: { url: item.url, status: { in: ['pending', 'processing'] } },
select: { id: true },
});
if (activeJob) {
skipped += 1;
continue;
}
const job = await JobService.create(item.url);
JobService.process(job.id).catch(() => {
// async refresh failures are reflected in job status
});
queued += 1;
}
return {
queued,
skipped,
details: `Stale content refresh queued for items older than ${days} days`,
};
}
}
export default AdminService;

View File

@@ -1,7 +1,8 @@
import redis from '../config/redis.js'; import redis from '../config/redis.js';
import { env } from '../config/env.js'; import { env } from '../config/env.js';
import { emitCacheEvent } from '../config/socket.js';
import logger from '../utils/logger.js'; import logger from '../utils/logger.js';
import type { GetInfoResponse, CacheEntry, DataSource } from '../types/index.js'; import type { GetInfoResponse, CacheEntry } from '../types/index.js';
/** /**
* Cache key prefix for Netflix content * Cache key prefix for Netflix content
@@ -63,6 +64,12 @@ export class CacheService {
try { try {
await redis.setex(key, ttl, JSON.stringify(entry)); await redis.setex(key, ttl, JSON.stringify(entry));
emitCacheEvent({
action: 'written',
key,
ttlSeconds: ttl,
occurredAt: new Date().toISOString(),
});
logger.debug('Cache set', { url, ttl }); logger.debug('Cache set', { url, ttl });
} catch (error) { } catch (error) {
logger.error('Cache set error', { logger.error('Cache set error', {
@@ -80,6 +87,11 @@ export class CacheService {
try { try {
await redis.del(key); await redis.del(key);
emitCacheEvent({
action: 'deleted',
key,
occurredAt: new Date().toISOString(),
});
logger.debug('Cache deleted', { url }); logger.debug('Cache deleted', { url });
} catch (error) { } catch (error) {
logger.error('Cache delete error', { logger.error('Cache delete error', {
@@ -133,6 +145,11 @@ export class CacheService {
if (keys.length > 0) { if (keys.length > 0) {
await redis.del(...keys); await redis.del(...keys);
emitCacheEvent({
action: 'cleared',
count: keys.length,
occurredAt: new Date().toISOString(),
});
logger.info('Cache cleared', { count: keys.length }); logger.info('Cache cleared', { count: keys.length });
} }
} catch (error) { } catch (error) {

View File

@@ -1,4 +1,5 @@
import prisma from '../config/database.js'; import prisma from '../config/database.js';
import { emitContentEvent } from '../config/socket.js';
import type { ContentData, ScraperResult, GetInfoResponse } from '../types/index.js'; import type { ContentData, ScraperResult, GetInfoResponse } from '../types/index.js';
/** /**
@@ -105,7 +106,14 @@ export class ContentService {
}, },
}); });
return this.mapToContentData(content); const mapped = this.mapToContentData(content);
emitContentEvent({
action: 'created',
url,
content: this.toApiResponse(mapped),
occurredAt: new Date().toISOString(),
});
return mapped;
} }
/** /**
@@ -171,7 +179,14 @@ export class ContentService {
}, },
}); });
return this.mapToContentData(content); const mapped = this.mapToContentData(content);
emitContentEvent({
action: 'updated',
url,
content: this.toApiResponse(mapped),
occurredAt: new Date().toISOString(),
});
return mapped;
} }
/** /**
@@ -181,6 +196,11 @@ export class ContentService {
await prisma.content.delete({ await prisma.content.delete({
where: { url }, where: { url },
}); });
emitContentEvent({
action: 'deleted',
url,
occurredAt: new Date().toISOString(),
});
} }
/** /**

View File

@@ -3,6 +3,7 @@ import prisma from '../config/database.js';
import { CacheService } from './cache.service.js'; import { CacheService } from './cache.service.js';
import { ContentService } from './content.service.js'; import { ContentService } from './content.service.js';
import { ScraperService } from './scraper.service.js'; import { ScraperService } from './scraper.service.js';
import { MetricsService } from './metrics.service.js';
import { import {
emitJobProgress, emitJobProgress,
emitJobCompleted, emitJobCompleted,
@@ -94,9 +95,11 @@ export class JobService {
// Step 1: Check cache // Step 1: Check cache
const cachedData = await CacheService.get(job.url); const cachedData = await CacheService.get(job.url);
if (cachedData) { if (cachedData) {
await MetricsService.incrementCacheHit();
await this.completeJob(jobId, cachedData, 'cache'); await this.completeJob(jobId, cachedData, 'cache');
return; return;
} }
await MetricsService.incrementCacheMiss();
// Update progress // Update progress
await this.update(jobId, { progress: 30, step: 'checking_database' }); await this.update(jobId, { progress: 30, step: 'checking_database' });
@@ -169,6 +172,7 @@ export class JobService {
}); });
emitJobCompleted(jobId, data, source); emitJobCompleted(jobId, data, source);
await MetricsService.incrementSource(source);
logger.info('Job completed', { jobId, source }); logger.info('Job completed', { jobId, source });
} }
@@ -182,14 +186,18 @@ export class JobService {
// Step 1: Check cache // Step 1: Check cache
const cachedData = await CacheService.get(url); const cachedData = await CacheService.get(url);
if (cachedData) { if (cachedData) {
await MetricsService.incrementCacheHit();
await MetricsService.incrementSource('cache');
return { data: cachedData, source: 'cache' }; return { data: cachedData, source: 'cache' };
} }
await MetricsService.incrementCacheMiss();
// Step 2: Check database // Step 2: Check database
const dbContent = await ContentService.findByUrl(url); const dbContent = await ContentService.findByUrl(url);
if (dbContent) { if (dbContent) {
const responseData = ContentService.toApiResponse(dbContent); const responseData = ContentService.toApiResponse(dbContent);
await CacheService.set(url, responseData); await CacheService.set(url, responseData);
await MetricsService.incrementSource('database');
return { data: responseData, source: 'database' }; return { data: responseData, source: 'database' };
} }
@@ -202,6 +210,7 @@ export class JobService {
// Step 5: Cache the result // Step 5: Cache the result
await CacheService.set(url, responseData); await CacheService.set(url, responseData);
await MetricsService.incrementSource('netflix');
return { data: responseData, source: 'netflix' }; return { data: responseData, source: 'netflix' };
} }

View File

@@ -0,0 +1,82 @@
import redis from '../config/redis.js';
import { emitMetricsEvent } from '../config/socket.js';
import type { DataSource } from '../types/index.js';
const COUNTERS_KEY = 'metrics:counters';
const SOURCES_KEY = 'metrics:sources';
function toInt(value: string | null | undefined): number {
if (!value) return 0;
const parsed = Number.parseInt(value, 10);
return Number.isFinite(parsed) ? parsed : 0;
}
export class MetricsService {
private static async emitSnapshot(): Promise<void> {
try {
const snapshot = await this.getSnapshot();
emitMetricsEvent({
cacheHits: snapshot.cacheHits,
cacheMisses: snapshot.cacheMisses,
sourceCounts: snapshot.bySource,
occurredAt: new Date().toISOString(),
});
} catch {
// best-effort metrics emit
}
}
static async incrementCacheHit(): Promise<void> {
try {
await redis.hincrby(COUNTERS_KEY, 'cache_hits', 1);
await this.emitSnapshot();
} catch {
// best-effort metrics
}
}
static async incrementCacheMiss(): Promise<void> {
try {
await redis.hincrby(COUNTERS_KEY, 'cache_misses', 1);
await this.emitSnapshot();
} catch {
// best-effort metrics
}
}
static async incrementSource(source: DataSource): Promise<void> {
try {
await redis.hincrby(SOURCES_KEY, source, 1);
await this.emitSnapshot();
} catch {
// best-effort metrics
}
}
static async getSnapshot(): Promise<{
cacheHits: number;
cacheMisses: number;
bySource: {
cache: number;
database: number;
netflix: number;
};
}> {
const [counters, sources] = await Promise.all([
redis.hgetall(COUNTERS_KEY),
redis.hgetall(SOURCES_KEY),
]);
return {
cacheHits: toInt(counters.cache_hits),
cacheMisses: toInt(counters.cache_misses),
bySource: {
cache: toInt(sources.cache),
database: toInt(sources.database),
netflix: toInt(sources.netflix),
},
};
}
}
export default MetricsService;

View File

@@ -68,6 +68,83 @@ export interface GetInfoResponse {
currentSeason: number | null; currentSeason: number | null;
} }
export interface AdminOverviewResponse {
generatedAt: string;
environment: 'development' | 'production' | 'test';
cache: {
configuredTtlSeconds: number;
keyCount: number;
analyzedKeyLimit: number;
sampled: boolean;
totalBytes: number;
redisMemory: {
usedBytes: number;
maxBytes: number | null;
};
ttlDistribution: {
expiredOrNoTtl: number;
lessThan5Min: number;
min5To30: number;
min30Plus: number;
};
expiringSoon: Array<{
key: string;
mediaTitle?: string | null;
cachedAt?: number | null;
ttlSeconds: number;
}>;
};
content: {
total: number;
byType: {
movie: number;
tvshow: number;
};
addedLast24h: number;
addedLast7d: number;
metadataGaps: {
missingPlot: number;
missingAgeRating: number;
missingBackdrop: number;
};
topGenres: Array<{
name: string;
count: number;
}>;
};
jobs: {
counts: {
pending: number;
processing: number;
completed: number;
failed: number;
};
averageDurationMs: number;
failedRecent: Array<{
id: string;
url: string;
error: string;
updatedAt: string;
}>;
};
requestMetrics: {
cacheHits: number;
cacheMisses: number;
cacheHitRate: number;
sourceCounts: {
cache: number;
database: number;
netflix: number;
};
};
}
export interface AdminActionResponse {
queued: number;
skipped: number;
details?: string;
}
// ============================================ // ============================================
// Cache Types // Cache Types
// ============================================ // ============================================