diff --git a/README.md b/README.md index 74d01a8..b55c873 100644 --- a/README.md +++ b/README.md @@ -13,5 +13,14 @@ scrapling install ## Run ```bash -python scrape_happyfappy_bookmarks.py --cookie-file cookies.txt --output bookmarks.json +.venv/bin/python scrape_happyfappy_bookmarks.py --cookie-file cookies.txt --output bookmarks.json +``` + +## Download Single Torrent + +```bash +.venv/bin/python download_happyfappy_torrent.py \ + --url "https://www.happyfappy.net/torrents.php?id=110178" \ + --cookie-file cookies.txt \ + --output-dir torrent ``` diff --git a/download_happyfappy_torrent.py b/download_happyfappy_torrent.py new file mode 100644 index 0000000..684caba --- /dev/null +++ b/download_happyfappy_torrent.py @@ -0,0 +1,292 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import time +from pathlib import Path +from typing import Any +from urllib.parse import urlparse + +from scrapling.fetchers import DynamicSession + + +def _domain_matches(target_host: str, cookie_domain: str) -> bool: + cd = cookie_domain.lstrip(".").lower() + th = target_host.lower() + return th == cd or th.endswith("." + cd) + + +def parse_cookie_string(cookie_string: str, target_host: str) -> dict[str, str]: + cookies: dict[str, str] = {} + lines = cookie_string.splitlines() + looks_like_netscape = len(lines) > 1 and any("\t" in line for line in lines) + + if looks_like_netscape: + for raw_line in lines: + line = raw_line.strip() + if not line or line.startswith("#"): + continue + parts = line.split("\t") + if len(parts) < 7: + continue + domain, _flag, _path, _secure, _expires, name, value = parts[:7] + if not _domain_matches(target_host, domain): + continue + if name: + cookies[name] = value + return cookies + + for chunk in cookie_string.split(";"): + piece = chunk.strip() + if not piece or "=" not in piece: + continue + key, value = piece.split("=", 1) + key = key.strip() + value = value.strip() + if key: + cookies[key] = value + return cookies + + +def parse_cookies_for_playwright( + cookie_string: str, target_host: str, base_url: str +) -> list[dict[str, Any]]: + lines = cookie_string.splitlines() + cookies: list[dict[str, Any]] = [] + looks_like_netscape = len(lines) > 1 and any("\t" in line for line in lines) + + if looks_like_netscape: + for raw_line in lines: + line = raw_line.strip() + if not line or line.startswith("#"): + continue + parts = line.split("\t") + if len(parts) < 7: + continue + domain, _flag, path, secure, expires, name, value = parts[:7] + if not _domain_matches(target_host, domain): + continue + if not name: + continue + + cookie_obj: dict[str, Any] = { + "name": name, + "value": value, + "domain": domain.lstrip("."), + "path": path or "/", + "secure": (secure.upper() == "TRUE"), + } + if expires.isdigit(): + exp_num = int(expires) + if exp_num > 0: + cookie_obj["expires"] = float(exp_num) + cookies.append(cookie_obj) + return cookies + + kv = parse_cookie_string(cookie_string, target_host) + for name, value in kv.items(): + cookies.append({"name": name, "value": value, "url": base_url}) + return cookies + + +def absolute_url(base_url: str, href: str) -> str: + href = href.strip() + if href.startswith("http://") or href.startswith("https://"): + return href + if href.startswith("/"): + return f"{base_url.rstrip('/')}{href}" + return f"{base_url.rstrip('/')}/{href}" + + +def fetch_dynamic_with_retry(session: Any, url: str, retries: int, backoff_base: float) -> Any: + last_error: Exception | None = None + for attempt in range(retries): + try: + response = session.fetch( + url, + timeout=45_000, + load_dom=True, + network_idle=False, + google_search=False, + ) + status = response.status + if status in (403, 404, 429) or status >= 500: + raise RuntimeError(f"HTTP {status}") + return response + except Exception as err: # noqa: BLE001 + last_error = err + if attempt == retries - 1: + break + time.sleep(backoff_base * (2**attempt)) + raise RuntimeError(f"Request failed for {url}: {last_error}") from last_error + + +def download_via_browser_with_retry( + session: DynamicSession, detail_url: str, retries: int, backoff_base: float +) -> tuple[str, bytes]: + last_error: Exception | None = None + for attempt in range(retries): + page = session.context.new_page() + try: + page.goto(detail_url, wait_until="domcontentloaded", timeout=45_000) + with page.expect_download(timeout=45_000) as download_info: + clicked = False + selectors = [ + 'span.torrent_buttons a[title*="Download"][href*="action=download"]', + 'a.button.blueButton[href*="action=download"]', + 'a[href*="action=download"][href*="torrent_pass"]', + ] + for selector in selectors: + locator = page.locator(selector) + if locator.count() > 0: + locator.first.click() + clicked = True + break + + if not clicked: + locator = page.locator( + "xpath=//a[contains(translate(normalize-space(string(.)),'abcdefghijklmnopqrstuvwxyz','ABCDEFGHIJKLMNOPQRSTUVWXYZ'),'DOWNLOAD') and contains(@href,'action=download')]" + ) + if locator.count() > 0: + locator.first.click() + clicked = True + + if not clicked: + raise RuntimeError("Download button not found in interactive page.") + + download = download_info.value + temp_path = download.path() + if not temp_path: + raise RuntimeError("Downloaded file path is empty.") + data = Path(temp_path).read_bytes() + filename = (download.suggested_filename or "downloaded.torrent").strip() + if not filename: + filename = "downloaded.torrent" + return filename, data + except Exception as err: # noqa: BLE001 + last_error = err + if attempt == retries - 1: + break + time.sleep(backoff_base * (2**attempt)) + finally: + page.close() + raise RuntimeError(f"Request failed for {detail_url}: {last_error}") from last_error + + +def find_download_link(response: Any) -> str: + selectors = [ + 'span.torrent_buttons a[title*="Download"][href*="action=download"]::attr(href)', + 'a.button.blueButton[href*="action=download"]::attr(href)', + 'a[href*="action=download"][href*="torrent_pass"]::attr(href)', + ] + for sel in selectors: + href = (response.css(sel).get("") or "").strip() + if href: + return href + + # Fallback using text match if classes/attributes drift + href = ( + response.xpath( + "//a[contains(translate(normalize-space(string(.))," + "'abcdefghijklmnopqrstuvwxyz','ABCDEFGHIJKLMNOPQRSTUVWXYZ'),'DOWNLOAD') and contains(@href,'action=download')]/@href" + ).get("") + or "" + ).strip() + return href + + +def normalize_filename(filename: str, download_url: str) -> str: + safe_name = Path(filename).name.strip() + if safe_name: + return safe_name if safe_name.lower().endswith(".torrent") else f"{safe_name}.torrent" + from_url = Path(urlparse(download_url).path).name.strip() + if from_url: + return from_url if from_url.lower().endswith(".torrent") else f"{from_url}.torrent" + return "downloaded.torrent" + + +def looks_like_torrent_bytes(data: bytes) -> bool: + # Basic bencode sanity check for torrent files + return bool(data) and data.startswith(b"d") and (b"4:info" in data[:4096]) + + +def validate_torrent_response(download_url: str, filename: str, data: bytes) -> None: + good_ext = filename.lower().endswith(".torrent") or urlparse(download_url).path.lower().endswith(".torrent") + if not good_ext: + raise RuntimeError("Downloaded content has no .torrent extension in URL/filename.") + if not looks_like_torrent_bytes(data): + raise RuntimeError("Downloaded file failed torrent bencode check.") + + +def run(args: argparse.Namespace) -> None: + base_url = args.base_url.rstrip("/") + target_host = urlparse(base_url).hostname or "www.happyfappy.net" + + cookie_value = args.cookie or "" + if not cookie_value and args.cookie_file: + cookie_value = Path(args.cookie_file).read_text(encoding="utf-8").strip() + if not cookie_value: + raise ValueError("Cookie is required. Use --cookie or --cookie-file.") + + cookies = parse_cookie_string(cookie_value, target_host=target_host) + if not cookies: + raise ValueError("No valid cookies parsed for target host.") + pw_cookies = parse_cookies_for_playwright(cookie_value, target_host=target_host, base_url=base_url) + if not pw_cookies: + raise ValueError("No Playwright-compatible cookies generated for target host.") + + output_dir = Path(args.output_dir).resolve() + output_dir.mkdir(parents=True, exist_ok=True) + + with DynamicSession( + headless=True, + disable_resources=True, + cookies=pw_cookies, + google_search=False, + retries=1, + retry_delay=1, + ) as session: + detail_response = fetch_dynamic_with_retry( + session, args.url, retries=args.retries, backoff_base=args.backoff_base + ) + href = find_download_link(detail_response) + if not href: + raise RuntimeError("Download link not found on page.") + + download_url = absolute_url(base_url, href) + suggested_filename, data = download_via_browser_with_retry( + session, args.url, retries=args.retries, backoff_base=args.backoff_base + ) + filename = normalize_filename(suggested_filename, download_url) + validate_torrent_response(download_url, filename, data) + output_path = output_dir / filename + output_path.write_bytes(data) # overwrite behavior by design + print(f"Saved torrent to {output_path}") + + +def make_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Download a torrent file from a single HappyFappy torrent detail page URL.", + ) + parser.add_argument("--url", required=True, help="Torrent detail page URL") + parser.add_argument("--base-url", default="https://www.happyfappy.net") + parser.add_argument("--cookie", help='Raw cookie string, e.g. "a=1; b=2"') + parser.add_argument("--cookie-file", help="Path to cookie file") + parser.add_argument("--output-dir", default="torrent") + parser.add_argument("--retries", type=int, default=3) + parser.add_argument("--backoff-base", type=float, default=5.0) + return parser + + +def main() -> None: + parser = make_parser() + args = parser.parse_args() + if args.retries < 1: + raise ValueError("--retries must be at least 1.") + if args.backoff_base < 0: + raise ValueError("--backoff-base must be >= 0.") + run(args) + + +if __name__ == "__main__": + main() diff --git a/scrape_happyfappy_bookmarks.py b/scrape_happyfappy_bookmarks.py index 5ac9f06..b79e549 100644 --- a/scrape_happyfappy_bookmarks.py +++ b/scrape_happyfappy_bookmarks.py @@ -5,21 +5,12 @@ import argparse import json import random import re -import sys import time from pathlib import Path from typing import Any from urllib.parse import urlparse -try: - from scrapling.fetchers import DynamicSession -except ModuleNotFoundError: - local_repo = Path(__file__).resolve().parent / "Scrapling" - if local_repo.exists(): - sys.path.insert(0, str(local_repo)) - from scrapling.fetchers import DynamicSession - else: - raise +from scrapling.fetchers import DynamicSession STOP_TEXT = "You have not bookmarked any torrents." BG_URL_RE = re.compile(r"url\((?:'|\")?(.*?)(?:'|\")?\)") @@ -122,11 +113,13 @@ def extract_background_image(style: str) -> str | None: return value or None -def extract_torrent_cards(response: Any) -> list[dict[str, Any]]: +def extract_torrent_cards(response: Any, base_url: str) -> list[dict[str, Any]]: records: list[dict[str, Any]] = [] cards = response.css("div.torrent_grid div.torrent_grid__torrent") for card in cards: page_url = (card.css('a[href^="/torrents.php?id="]::attr(href)').get("") or "").strip() + if page_url and not page_url.startswith("http"): + page_url = f"{base_url.rstrip('/')}{page_url}" category = (card.css("span.torrent_grid__torrent__cat::text").get("") or "").strip() title = ( card.css("h3.trim::attr(title)").get("") @@ -220,7 +213,7 @@ def run(args: argparse.Namespace) -> None: if should_stop(response): break - page_records = extract_torrent_cards(response) + page_records = extract_torrent_cards(response, args.base_url) all_records.extend(page_records) print(f"[page={page}] extracted={len(page_records)} total={len(all_records)}") page += 1