Files
wiseclaw/backend/app/admin/routes.py

79 lines
2.5 KiB
Python

from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.admin.services import AdminService
from app.db import get_session
from app.llm.ollama_client import OllamaClient
from app.models import MemoryRecord, OllamaStatus, RuntimeSettings, TelegramStatus, UserRecord
router = APIRouter(prefix="/admin", tags=["admin"])
class SecretPayload(BaseModel):
key: str
value: str
def get_admin_service(session: Session = Depends(get_session)) -> AdminService:
return AdminService(session)
@router.get("/dashboard")
def get_dashboard(service: AdminService = Depends(get_admin_service)):
return service.dashboard()
@router.get("/settings", response_model=RuntimeSettings)
def get_settings(service: AdminService = Depends(get_admin_service)):
return service.get_runtime_settings()
@router.put("/settings", response_model=RuntimeSettings)
def put_settings(payload: RuntimeSettings, service: AdminService = Depends(get_admin_service)):
return service.update_runtime_settings(payload)
@router.get("/users", response_model=list[UserRecord])
def get_users(service: AdminService = Depends(get_admin_service)):
return service.list_users()
@router.post("/users", response_model=UserRecord)
def post_user(payload: UserRecord, service: AdminService = Depends(get_admin_service)):
return service.save_user(payload)
@router.get("/memory", response_model=list[MemoryRecord])
def get_memory(service: AdminService = Depends(get_admin_service)):
return service.list_memory()
@router.delete("/memory")
def delete_memory(service: AdminService = Depends(get_admin_service)):
service.clear_memory()
return {"status": "ok"}
@router.get("/secrets/{key}")
def get_secret(key: str, service: AdminService = Depends(get_admin_service)):
return {"key": key, "masked": service.get_secret_mask(key)}
@router.post("/secrets")
def post_secret(payload: SecretPayload, service: AdminService = Depends(get_admin_service)):
service.save_secret(payload.key, payload.value)
return {"status": "ok"}
@router.get("/integrations/ollama", response_model=OllamaStatus)
async def get_ollama_status(service: AdminService = Depends(get_admin_service)):
runtime = service.get_runtime_settings()
client = OllamaClient(runtime.ollama_base_url)
return await client.status(runtime.default_model)
@router.get("/integrations/telegram", response_model=TelegramStatus)
def get_telegram_status(service: AdminService = Depends(get_admin_service)):
return service.telegram_status()