import asyncio import subprocess from pathlib import Path from typing import Any from app.security import evaluate_terminal_command from app.tools.base import Tool class TerminalTool(Tool): name = "terminal" description = "Run terminal commands under WiseClaw policy." def __init__(self, terminal_mode: int, workspace_root: Path) -> None: self.terminal_mode = terminal_mode self.workspace_root = workspace_root.resolve() def parameters_schema(self) -> dict[str, Any]: return { "type": "object", "properties": { "command": { "type": "string", "description": "A single shell command. Only safe approved prefixes run automatically.", }, "background": { "type": "boolean", "description": "Run the command in the background for long-lived local servers.", }, "workdir": { "type": "string", "description": "Optional relative workspace directory for the command.", }, }, "required": ["command"], "additionalProperties": False, } async def run(self, payload: dict[str, Any]) -> dict[str, Any]: command = str(payload.get("command", "")).strip() background = bool(payload.get("background", False)) workdir = self._resolve_workdir(str(payload.get("workdir", "")).strip()) if payload.get("workdir") else self.workspace_root decision = evaluate_terminal_command(command, self.terminal_mode) if decision.decision != "allow": return { "tool": self.name, "status": "approval_required" if decision.decision == "approval" else "blocked", "command": command, "decision": decision.decision, "reason": decision.reason, } if background: return self._run_background(command, decision.reason, workdir) try: process = await asyncio.create_subprocess_shell( command, cwd=str(workdir), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=15.0) except TimeoutError: return { "tool": self.name, "status": "error", "command": command, "decision": decision.decision, "reason": "Command timed out after 15 seconds.", } stdout_text = stdout.decode("utf-8", errors="replace") stderr_text = stderr.decode("utf-8", errors="replace") return { "tool": self.name, "status": "ok" if process.returncode == 0 else "error", "command": command, "decision": decision.decision, "reason": decision.reason, "workdir": str(workdir), "exit_code": process.returncode, "stdout": stdout_text[:12000], "stderr": stderr_text[:12000], "stdout_truncated": len(stdout_text) > 12000, "stderr_truncated": len(stderr_text) > 12000, } def _run_background(self, command: str, reason: str, workdir: Path) -> dict[str, Any]: logs_dir = self.workspace_root / ".wiseclaw" / "logs" logs_dir.mkdir(parents=True, exist_ok=True) log_path = logs_dir / f"terminal-{abs(hash((command, str(workdir))))}.log" log_handle = log_path.open("ab") process = subprocess.Popen( command, cwd=str(workdir), shell=True, stdout=log_handle, stderr=subprocess.STDOUT, start_new_session=True, ) log_handle.close() return { "tool": self.name, "status": "ok", "command": command, "decision": "allow", "reason": reason, "workdir": str(workdir), "background": True, "pid": process.pid, "log_path": str(log_path), } def _resolve_workdir(self, raw_path: str) -> Path: candidate = Path(raw_path).expanduser() if not candidate.is_absolute(): candidate = (self.workspace_root / candidate).resolve() else: candidate = candidate.resolve() if self.workspace_root not in candidate.parents and candidate != self.workspace_root: raise ValueError(f"Workdir is outside the workspace: {candidate}") if not candidate.exists() or not candidate.is_dir(): raise ValueError(f"Workdir is not a directory: {candidate}") return candidate