151 lines
5.0 KiB
Python
151 lines
5.0 KiB
Python
import asyncio
|
|
from typing import Any
|
|
|
|
from app.tools.base import Tool
|
|
|
|
|
|
def _escape_applescript(value: str) -> str:
|
|
return value.replace("\\", "\\\\").replace('"', '\\"')
|
|
|
|
|
|
def _body_to_notes_html(title: str, body: str) -> str:
|
|
if not body:
|
|
return title
|
|
html_body = body.replace("\n", "<br>")
|
|
return f"{title}<br><br>{html_body}"
|
|
|
|
|
|
class AppleNotesTool(Tool):
|
|
name = "apple_notes"
|
|
description = "Create notes in Apple Notes through AppleScript."
|
|
|
|
def parameters_schema(self) -> dict[str, Any]:
|
|
return {
|
|
"type": "object",
|
|
"properties": {
|
|
"action": {
|
|
"type": "string",
|
|
"enum": ["create_note"],
|
|
"description": "The Apple Notes action to perform.",
|
|
},
|
|
"title": {
|
|
"type": "string",
|
|
"description": "Title for the new note.",
|
|
},
|
|
"body": {
|
|
"type": "string",
|
|
"description": "Optional body content for the note.",
|
|
},
|
|
"folder": {
|
|
"type": "string",
|
|
"description": "Optional Notes folder name. Defaults to Notes.",
|
|
},
|
|
},
|
|
"required": ["action", "title"],
|
|
"additionalProperties": False,
|
|
}
|
|
|
|
async def run(self, payload: dict[str, Any]) -> dict[str, Any]:
|
|
action = str(payload.get("action", "create_note")).strip()
|
|
title = str(payload.get("title", "")).strip()
|
|
body = str(payload.get("body", "")).strip()
|
|
folder = str(payload.get("folder", "Notes")).strip() or "Notes"
|
|
|
|
if action != "create_note":
|
|
return {
|
|
"tool": self.name,
|
|
"status": "error",
|
|
"message": f"Unsupported action: {action}",
|
|
}
|
|
if not title:
|
|
return {
|
|
"tool": self.name,
|
|
"status": "error",
|
|
"message": "title is required.",
|
|
}
|
|
|
|
note_html = _body_to_notes_html(title, body)
|
|
script = f'''
|
|
tell application "Notes"
|
|
activate
|
|
if not (exists folder "{_escape_applescript(folder)}") then
|
|
make new folder with properties {{name:"{_escape_applescript(folder)}"}}
|
|
end if
|
|
set targetFolder to folder "{_escape_applescript(folder)}"
|
|
set newNote to make new note at targetFolder with properties {{body:"{_escape_applescript(note_html)}"}}
|
|
return id of newNote
|
|
end tell
|
|
'''.strip()
|
|
|
|
created = await self._run_osascript(script)
|
|
if created["status"] != "ok":
|
|
return {
|
|
"tool": self.name,
|
|
"status": "error",
|
|
"action": action,
|
|
"title": title,
|
|
"folder": folder,
|
|
"message": created["message"],
|
|
}
|
|
|
|
note_id = created["stdout"]
|
|
verify_script = f'''
|
|
tell application "Notes"
|
|
set matchedNotes to every note of folder "{_escape_applescript(folder)}" whose id is "{_escape_applescript(note_id)}"
|
|
if (count of matchedNotes) is 0 then
|
|
return "NOT_FOUND"
|
|
end if
|
|
set matchedNote to item 1 of matchedNotes
|
|
return name of matchedNote
|
|
end tell
|
|
'''.strip()
|
|
verified = await self._run_osascript(verify_script)
|
|
if verified["status"] != "ok":
|
|
return {
|
|
"tool": self.name,
|
|
"status": "error",
|
|
"action": action,
|
|
"title": title,
|
|
"folder": folder,
|
|
"note_id": note_id,
|
|
"message": f'Note was created but could not be verified: {verified["message"]}',
|
|
}
|
|
|
|
verified_title = verified["stdout"]
|
|
if verified_title == "NOT_FOUND":
|
|
return {
|
|
"tool": self.name,
|
|
"status": "error",
|
|
"action": action,
|
|
"title": title,
|
|
"folder": folder,
|
|
"note_id": note_id,
|
|
"message": "Note was created but could not be found during verification.",
|
|
}
|
|
|
|
return {
|
|
"tool": self.name,
|
|
"status": "ok",
|
|
"action": action,
|
|
"title": title,
|
|
"body": body,
|
|
"folder": folder,
|
|
"note_id": note_id,
|
|
"verified_title": verified_title,
|
|
}
|
|
|
|
async def _run_osascript(self, script: str) -> dict[str, str]:
|
|
process = await asyncio.create_subprocess_exec(
|
|
"osascript",
|
|
"-e",
|
|
script,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await process.communicate()
|
|
stdout_text = stdout.decode("utf-8", errors="replace").strip()
|
|
stderr_text = stderr.decode("utf-8", errors="replace").strip()
|
|
if process.returncode != 0:
|
|
return {"status": "error", "message": stderr_text or "AppleScript command failed.", "stdout": stdout_text}
|
|
return {"status": "ok", "message": "", "stdout": stdout_text}
|