#!/usr/bin/env python3 from __future__ import annotations import argparse import json import random import re import time from pathlib import Path from typing import Any from urllib.parse import urlparse from scrapling.fetchers import DynamicSession STOP_TEXT = "You have not bookmarked any torrents." BG_URL_RE = re.compile(r"url\((?:'|\")?(.*?)(?:'|\")?\)") def _domain_matches(target_host: str, cookie_domain: str) -> bool: cd = cookie_domain.lstrip(".").lower() th = target_host.lower() return th == cd or th.endswith("." + cd) def parse_cookie_string(cookie_string: str, target_host: str) -> dict[str, str]: """ Supports: 1) "key=value; key2=value2" cookie header style 2) Netscape cookie file format (tab-separated 7 columns) """ cookies: dict[str, str] = {} lines = cookie_string.splitlines() looks_like_netscape = len(lines) > 1 and any("\t" in line for line in lines) if looks_like_netscape: for raw_line in lines: line = raw_line.strip() if not line or line.startswith("#"): continue parts = line.split("\t") if len(parts) < 7: continue domain, _flag, _path, _secure, _expires, name, value = parts[:7] if not _domain_matches(target_host, domain): continue if name: cookies[name] = value return cookies for chunk in cookie_string.split(";"): piece = chunk.strip() if not piece or "=" not in piece: continue key, value = piece.split("=", 1) key = key.strip() value = value.strip() if key: cookies[key] = value return cookies def parse_cookies_for_playwright( cookie_string: str, target_host: str, base_url: str ) -> list[dict[str, Any]]: """ Converts cookie input into Playwright-compatible cookie objects. """ lines = cookie_string.splitlines() cookies: list[dict[str, Any]] = [] looks_like_netscape = len(lines) > 1 and any("\t" in line for line in lines) if looks_like_netscape: for raw_line in lines: line = raw_line.strip() if not line or line.startswith("#"): continue parts = line.split("\t") if len(parts) < 7: continue domain, _flag, path, secure, expires, name, value = parts[:7] if not _domain_matches(target_host, domain): continue if not name: continue cookie_obj: dict[str, Any] = { "name": name, "value": value, "domain": domain.lstrip("."), "path": path or "/", "secure": (secure.upper() == "TRUE"), } if expires.isdigit(): exp_num = int(expires) if exp_num > 0: cookie_obj["expires"] = float(exp_num) cookies.append(cookie_obj) return cookies kv = parse_cookie_string(cookie_string, target_host) for name, value in kv.items(): cookies.append({"name": name, "value": value, "url": base_url}) return cookies def extract_background_image(style: str) -> str | None: if not style: return None match = BG_URL_RE.search(style) if not match: return None value = match.group(1).strip() return value or None def extract_torrent_cards(response: Any, base_url: str) -> list[dict[str, Any]]: records: list[dict[str, Any]] = [] cards = response.css("div.torrent_grid div.torrent_grid__torrent") for card in cards: page_url = (card.css('a[href^="/torrents.php?id="]::attr(href)').get("") or "").strip() if page_url and not page_url.startswith("http"): page_url = f"{base_url.rstrip('/')}{page_url}" category = (card.css("span.torrent_grid__torrent__cat::text").get("") or "").strip() title = ( card.css("h3.trim::attr(title)").get("") or card.css("h3.trim::text").get("") or "" ).strip() style = (card.css("div.torrent__cover::attr(style)").get("") or "").strip() background_image = extract_background_image(style) records.append( { "pageURL": page_url, "isVR": category == "VR", "title": title, "backgroundImage": background_image, } ) return records def should_stop(response: Any) -> bool: body_text = response.body.decode(response.encoding or "utf-8", errors="ignore") return STOP_TEXT in body_text def fetch_page(session: Any, url: str, retries: int, backoff_base: float) -> Any: last_error: Exception | None = None for attempt in range(retries): try: response = session.fetch( url, timeout=45_000, load_dom=True, network_idle=False, ) status = response.status if status in (403, 429) or status >= 500: raise RuntimeError(f"HTTP {status}") return response except Exception as err: # noqa: BLE001 last_error = err if attempt == retries - 1: break sleep_seconds = backoff_base * (2**attempt) + random.uniform(0.0, 0.7) time.sleep(sleep_seconds) raise RuntimeError(f"Request failed for {url}: {last_error}") from last_error def build_bookmarks_url(base_url: str, page: int) -> str: if page == 1: return f"{base_url}/bookmarks.php?type=torrents" return f"{base_url}/bookmarks.php?page={page}&type=torrents#torrent_table" def run(args: argparse.Namespace) -> None: target_host = urlparse(args.base_url).hostname or "www.happyfappy.net" cookie_value = args.cookie or "" if not cookie_value and args.cookie_file: cookie_value = Path(args.cookie_file).read_text(encoding="utf-8").strip() if not cookie_value: raise ValueError("Cookie is required. Use --cookie or --cookie-file.") cookies = parse_cookie_string(cookie_value, target_host=target_host) if not cookies: raise ValueError("No valid cookies parsed for target host. Check cookie content.") pw_cookies = parse_cookies_for_playwright( cookie_value, target_host=target_host, base_url=args.base_url.rstrip("/") ) if not pw_cookies: raise ValueError("No Playwright-compatible cookies generated for target host.") all_records: list[dict[str, Any]] = [] with DynamicSession( headless=True, disable_resources=True, cookies=pw_cookies, google_search=False, retries=1, retry_delay=1, ) as session: page = 1 while page <= args.max_pages: if page > 1: time.sleep(random.uniform(args.delay_min, args.delay_max)) url = build_bookmarks_url(args.base_url.rstrip("/"), page) response = fetch_page(session, url, retries=args.retries, backoff_base=args.backoff_base) if should_stop(response): break page_records = extract_torrent_cards(response, args.base_url) all_records.extend(page_records) print(f"[page={page}] extracted={len(page_records)} total={len(all_records)}") page += 1 output_path = Path(args.output).resolve() output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(all_records, ensure_ascii=False, indent=2), encoding="utf-8") print(f"Saved {len(all_records)} records to {output_path}") def make_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Scrape HappyFappy torrent bookmarks using an authenticated cookie.", ) parser.add_argument("--base-url", default="https://www.happyfappy.net") parser.add_argument("--cookie", help='Raw cookie string, e.g. "a=1; b=2"') parser.add_argument("--cookie-file", help="Path to a text file containing raw cookie string") parser.add_argument("--output", default="bookmarks.json") parser.add_argument("--delay-min", type=float, default=1.8, help="Minimum delay between page requests") parser.add_argument("--delay-max", type=float, default=3.2, help="Maximum delay between page requests") parser.add_argument("--retries", type=int, default=3, help="Retries per page request") parser.add_argument("--backoff-base", type=float, default=5.0, help="Backoff base seconds") parser.add_argument("--max-pages", type=int, default=200, help="Safety cap for pagination loop") return parser def main() -> None: parser = make_parser() args = parser.parse_args() if args.delay_min < 0 or args.delay_max < 0: raise ValueError("Delay values must be non-negative.") if args.delay_min > args.delay_max: raise ValueError("--delay-min cannot be greater than --delay-max.") if args.retries < 1: raise ValueError("--retries must be at least 1.") run(args) if __name__ == "__main__": main()