Files
wiseclaw/backend/app/tools/google_drive.py

168 lines
6.4 KiB
Python

import asyncio
from pathlib import Path
from typing import Any
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaFileUpload
from app.google.auth import GoogleAuthError, GoogleAuthManager
from app.tools.base import Tool
class GoogleDriveTool(Tool):
name = "google_drive"
description = "List, search, and upload files to the connected Google Drive account."
def __init__(self, auth_manager: GoogleAuthManager) -> None:
self.auth_manager = auth_manager
def parameters_schema(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["list", "upload"],
"description": "Drive action to perform. Defaults to list.",
},
"query": {
"type": "string",
"description": "Optional Drive API query or free-text filename search.",
},
"max_results": {
"type": "integer",
"description": "Maximum number of files to return, from 1 to 20.",
"minimum": 1,
"maximum": 20,
},
"local_path": {
"type": "string",
"description": "Absolute local file path to upload when action is upload.",
},
"filename": {
"type": "string",
"description": "Optional destination filename for uploads.",
},
"mime_type": {
"type": "string",
"description": "Optional MIME type for uploads.",
},
},
"additionalProperties": False,
}
async def run(self, payload: dict[str, Any]) -> dict[str, Any]:
action = str(payload.get("action", "list") or "list").strip().lower()
query = str(payload.get("query", "")).strip()
max_results = max(1, min(20, int(payload.get("max_results", 10) or 10)))
local_path = str(payload.get("local_path", "")).strip()
filename = str(payload.get("filename", "")).strip()
mime_type = str(payload.get("mime_type", "")).strip()
try:
creds = await self.auth_manager.get_credentials()
except GoogleAuthError as exc:
return {"tool": self.name, "status": "error", "message": str(exc)}
if action == "upload":
if not local_path:
return {"tool": self.name, "status": "error", "message": "local_path is required for uploads."}
try:
return await asyncio.to_thread(self._upload_file, creds, local_path, filename, mime_type)
except HttpError as exc:
return {"tool": self.name, "status": "error", "message": self._format_http_error(exc)}
return await asyncio.to_thread(self._list_files, creds, query, max_results)
def _list_files(self, credentials: Any, query: str, max_results: int) -> dict[str, Any]:
service = build("drive", "v3", credentials=credentials, cache_discovery=False)
api_query = ""
if query:
if any(token in query for token in ("name contains", "mimeType", "trashed", "modifiedTime")):
api_query = query
else:
escaped = query.replace("'", "\\'")
api_query = f"name contains '{escaped}' and trashed = false"
else:
api_query = "trashed = false"
response = (
service.files()
.list(
q=api_query,
pageSize=max_results,
orderBy="modifiedTime desc",
fields="files(id,name,mimeType,modifiedTime,webViewLink,owners(displayName))",
)
.execute()
)
files = []
for item in response.get("files", []):
owners = item.get("owners", [])
files.append(
{
"id": item.get("id", ""),
"name": item.get("name", ""),
"mime_type": item.get("mimeType", ""),
"modified_time": item.get("modifiedTime", ""),
"web_view_link": item.get("webViewLink", ""),
"owners": [owner.get("displayName", "") for owner in owners],
}
)
return {
"tool": self.name,
"status": "ok",
"query": query,
"count": len(files),
"files": files,
}
def _upload_file(self, credentials: Any, local_path: str, filename: str, mime_type: str) -> dict[str, Any]:
path = Path(local_path)
if not path.exists() or not path.is_file():
return {
"tool": self.name,
"status": "error",
"message": f"Upload file was not found: {path}",
}
service = build("drive", "v3", credentials=credentials, cache_discovery=False)
final_name = filename or path.name
media = MediaFileUpload(str(path), mimetype=mime_type or None, resumable=False)
created = (
service.files()
.create(
body={"name": final_name},
media_body=media,
fields="id,name,mimeType,webViewLink,webContentLink",
)
.execute()
)
return {
"tool": self.name,
"status": "ok",
"action": "upload",
"file": {
"id": created.get("id", ""),
"name": created.get("name", final_name),
"mime_type": created.get("mimeType", mime_type),
"web_view_link": created.get("webViewLink", ""),
"web_content_link": created.get("webContentLink", ""),
},
}
def _format_http_error(self, exc: HttpError) -> str:
content = getattr(exc, "content", b"")
if isinstance(content, bytes):
text = content.decode("utf-8", errors="ignore").strip()
else:
text = str(content).strip()
if "insufficientPermissions" in text or "Insufficient Permission" in text:
return (
"Google Drive upload izni yok. Google'i yeniden baglayip Drive yukleme iznini onaylaman gerekiyor."
)
return text or str(exc)