import asyncio import json import os import time from pathlib import Path from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import Flow from app.config import Settings GOOGLE_SCOPES = [ "https://www.googleapis.com/auth/gmail.readonly", "https://www.googleapis.com/auth/drive.metadata.readonly", "https://www.googleapis.com/auth/drive.file", ] class GoogleAuthError(RuntimeError): pass _OAUTH_STATES: dict[str, float] = {} _OAUTH_STATE_TTL_SECONDS = 900 class GoogleAuthManager: def __init__(self, settings: Settings, workspace_root: Path) -> None: self.settings = settings self.workspace_root = workspace_root.resolve() async def get_credentials(self) -> Credentials: return await asyncio.to_thread(self._load_credentials) def _load_credentials(self) -> Credentials: token_path = self.token_path client_path = self.client_path if not client_path.exists(): raise GoogleAuthError( f"Google client secrets file is missing: {client_path}. " "Create a Google OAuth Web Application and place its JSON here." ) if not token_path.exists(): raise GoogleAuthError( f"Google token file is missing: {token_path}. " "Run the Google OAuth bootstrap step first." ) credentials = Credentials.from_authorized_user_file(str(token_path), GOOGLE_SCOPES) if credentials.expired and credentials.refresh_token: credentials.refresh(Request()) token_path.write_text(credentials.to_json(), encoding="utf-8") if not credentials.valid: raise GoogleAuthError( "Google credentials are invalid. Re-run the Google OAuth bootstrap step." ) return credentials @property def client_path(self) -> Path: return self._resolve_path(self.settings.google_client_secrets_file) @property def token_path(self) -> Path: return self._resolve_path(self.settings.google_token_file) def write_client_secrets_file(self, client_id: str, client_secret: str) -> Path: client_id = client_id.strip() client_secret = client_secret.strip() if not client_id or not client_secret: raise GoogleAuthError("Google client ID and client secret are required.") redirect_uris = [ f"http://127.0.0.1:{self.settings.admin_port}/admin/integrations/google/callback", f"http://localhost:{self.settings.admin_port}/admin/integrations/google/callback", ] payload = { "web": { "client_id": client_id, "project_id": "wiseclaw-local", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_secret": client_secret, "redirect_uris": redirect_uris, } } self.client_path.parent.mkdir(parents=True, exist_ok=True) self.client_path.write_text(json.dumps(payload, indent=2), encoding="utf-8") return self.client_path def begin_web_oauth(self, redirect_uri: str) -> str: if not self.client_path.exists(): raise GoogleAuthError( f"Google client secrets file is missing: {self.client_path}. " "Place your Google OAuth client JSON there first." ) self._configure_local_oauth_transport(redirect_uri) flow = Flow.from_client_secrets_file(str(self.client_path), GOOGLE_SCOPES) flow.redirect_uri = redirect_uri authorization_url, state = flow.authorization_url( access_type="offline", include_granted_scopes="true", prompt="consent", ) self._store_state(state) return authorization_url def complete_web_oauth(self, redirect_uri: str, state: str, authorization_response: str) -> Credentials: if not self._consume_state(state): raise GoogleAuthError("Google OAuth state is missing or expired. Start the connect flow again.") self._configure_local_oauth_transport(redirect_uri) flow = Flow.from_client_secrets_file(str(self.client_path), GOOGLE_SCOPES, state=state) flow.redirect_uri = redirect_uri flow.fetch_token(authorization_response=authorization_response) credentials = flow.credentials self.token_path.parent.mkdir(parents=True, exist_ok=True) self.token_path.write_text(credentials.to_json(), encoding="utf-8") return credentials def oauth_status(self) -> tuple[bool, bool, str]: if not self.client_path.exists(): return False, False, ( f"Missing Google OAuth client file at {self.client_path}. " "Add client_secret.json first." ) if not self.token_path.exists(): return False, False, "Google account is not connected yet." try: self._load_credentials() except GoogleAuthError as exc: return True, False, str(exc) return True, True, "Google account is connected." def _resolve_path(self, raw_path: str) -> Path: path = Path(raw_path).expanduser() if not path.is_absolute(): path = (self.workspace_root / path).resolve() else: path = path.resolve() return path def _store_state(self, state: str) -> None: now = time.time() expired = [item for item, created_at in _OAUTH_STATES.items() if now - created_at > _OAUTH_STATE_TTL_SECONDS] for item in expired: _OAUTH_STATES.pop(item, None) _OAUTH_STATES[state] = now def _consume_state(self, state: str) -> bool: created_at = _OAUTH_STATES.pop(state, None) if created_at is None: return False return time.time() - created_at <= _OAUTH_STATE_TTL_SECONDS def _configure_local_oauth_transport(self, redirect_uri: str) -> None: if redirect_uri.startswith("http://127.0.0.1") or redirect_uri.startswith("http://localhost"): os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"