Files
ratebubble/src/config/socket.ts

186 lines
3.8 KiB
TypeScript

import { Server as HttpServer } from 'http';
import { Server, Socket } from 'socket.io';
import logger from '../utils/logger.js';
import type { DataSource, GetInfoResponse } from '../types/index.js';
/**
* Socket.IO Server singleton
*/
let io: Server | null = null;
export interface SocketData {
subscribedJobs: Set<string>;
}
export interface ContentRealtimeEvent {
action: 'created' | 'updated' | 'deleted';
url: string;
content?: GetInfoResponse;
occurredAt: string;
}
export interface CacheRealtimeEvent {
action: 'written' | 'deleted' | 'cleared';
key?: string;
ttlSeconds?: number;
count?: number;
occurredAt: string;
}
export interface MetricsRealtimeEvent {
cacheHits: number;
cacheMisses: number;
sourceCounts: {
cache: number;
database: number;
scraper: number;
};
occurredAt: string;
}
/**
* Initialize Socket.IO server
*/
export function initializeSocket(httpServer: HttpServer): Server {
io = new Server(httpServer, {
cors: {
origin: '*', // Configure based on your frontend domains
methods: ['GET', 'POST'],
},
transports: ['websocket', 'polling'],
});
io.on('connection', (socket: Socket) => {
logger.info('Client connected', { socketId: socket.id });
// Initialize socket data
(socket.data as SocketData).subscribedJobs = new Set();
// Handle job subscription
socket.on('job:subscribe', (jobId: string) => {
(socket.data as SocketData).subscribedJobs.add(jobId);
socket.join(`job:${jobId}`);
logger.debug('Client subscribed to job', { socketId: socket.id, jobId });
});
// Handle job unsubscription
socket.on('job:unsubscribe', (jobId: string) => {
(socket.data as SocketData).subscribedJobs.delete(jobId);
socket.leave(`job:${jobId}`);
logger.debug('Client unsubscribed from job', { socketId: socket.id, jobId });
});
socket.on('disconnect', () => {
logger.info('Client disconnected', { socketId: socket.id });
});
});
logger.info('Socket.IO server initialized');
return io;
}
/**
* Get Socket.IO server instance
*/
export function getSocketIO(): Server {
if (!io) {
throw new Error('Socket.IO not initialized. Call initializeSocket first.');
}
return io;
}
/**
* Emit job progress to subscribers
*/
export function emitJobProgress(
jobId: string,
progress: number,
status: string,
step: string
): void {
if (io) {
io.to(`job:${jobId}`).emit('job:progress', {
jobId,
progress,
status,
step,
});
}
}
/**
* Emit job completed event
*/
export function emitJobCompleted(
jobId: string,
data: GetInfoResponse,
source: DataSource
): void {
if (io) {
io.to(`job:${jobId}`).emit('job:completed', {
jobId,
data,
source,
});
}
}
/**
* Emit realtime content mutation event to all clients
*/
export function emitContentEvent(event: ContentRealtimeEvent): void {
if (io) {
io.emit('content:event', event);
}
}
/**
* Emit realtime cache event to all clients
*/
export function emitCacheEvent(event: CacheRealtimeEvent): void {
if (io) {
io.emit('cache:event', event);
}
}
/**
* Emit realtime metrics event to all clients
*/
export function emitMetricsEvent(event: MetricsRealtimeEvent): void {
if (io) {
io.emit('metrics:updated', event);
}
}
/**
* Emit job error event
*/
export function emitJobError(
jobId: string,
error: { code: string; message: string }
): void {
if (io) {
io.to(`job:${jobId}`).emit('job:error', {
jobId,
error,
});
}
}
/**
* Close Socket.IO server
*/
export async function closeSocket(): Promise<void> {
if (io) {
await new Promise<void>((resolve) => {
io!.close(() => {
logger.info('Socket.IO server closed');
resolve();
});
});
io = null;
}
}
export default io;