feat: add browser-based torrent download and package-only Scrapling usage

This commit is contained in:
2026-03-07 01:04:36 +03:00
parent 690733a224
commit bea3010839
3 changed files with 307 additions and 13 deletions

View File

@@ -13,5 +13,14 @@ scrapling install
## Run ## Run
```bash ```bash
python scrape_happyfappy_bookmarks.py --cookie-file cookies.txt --output bookmarks.json .venv/bin/python scrape_happyfappy_bookmarks.py --cookie-file cookies.txt --output bookmarks.json
```
## Download Single Torrent
```bash
.venv/bin/python download_happyfappy_torrent.py \
--url "https://www.happyfappy.net/torrents.php?id=110178" \
--cookie-file cookies.txt \
--output-dir torrent
``` ```

View File

@@ -0,0 +1,292 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import time
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from scrapling.fetchers import DynamicSession
def _domain_matches(target_host: str, cookie_domain: str) -> bool:
cd = cookie_domain.lstrip(".").lower()
th = target_host.lower()
return th == cd or th.endswith("." + cd)
def parse_cookie_string(cookie_string: str, target_host: str) -> dict[str, str]:
cookies: dict[str, str] = {}
lines = cookie_string.splitlines()
looks_like_netscape = len(lines) > 1 and any("\t" in line for line in lines)
if looks_like_netscape:
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) < 7:
continue
domain, _flag, _path, _secure, _expires, name, value = parts[:7]
if not _domain_matches(target_host, domain):
continue
if name:
cookies[name] = value
return cookies
for chunk in cookie_string.split(";"):
piece = chunk.strip()
if not piece or "=" not in piece:
continue
key, value = piece.split("=", 1)
key = key.strip()
value = value.strip()
if key:
cookies[key] = value
return cookies
def parse_cookies_for_playwright(
cookie_string: str, target_host: str, base_url: str
) -> list[dict[str, Any]]:
lines = cookie_string.splitlines()
cookies: list[dict[str, Any]] = []
looks_like_netscape = len(lines) > 1 and any("\t" in line for line in lines)
if looks_like_netscape:
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) < 7:
continue
domain, _flag, path, secure, expires, name, value = parts[:7]
if not _domain_matches(target_host, domain):
continue
if not name:
continue
cookie_obj: dict[str, Any] = {
"name": name,
"value": value,
"domain": domain.lstrip("."),
"path": path or "/",
"secure": (secure.upper() == "TRUE"),
}
if expires.isdigit():
exp_num = int(expires)
if exp_num > 0:
cookie_obj["expires"] = float(exp_num)
cookies.append(cookie_obj)
return cookies
kv = parse_cookie_string(cookie_string, target_host)
for name, value in kv.items():
cookies.append({"name": name, "value": value, "url": base_url})
return cookies
def absolute_url(base_url: str, href: str) -> str:
href = href.strip()
if href.startswith("http://") or href.startswith("https://"):
return href
if href.startswith("/"):
return f"{base_url.rstrip('/')}{href}"
return f"{base_url.rstrip('/')}/{href}"
def fetch_dynamic_with_retry(session: Any, url: str, retries: int, backoff_base: float) -> Any:
last_error: Exception | None = None
for attempt in range(retries):
try:
response = session.fetch(
url,
timeout=45_000,
load_dom=True,
network_idle=False,
google_search=False,
)
status = response.status
if status in (403, 404, 429) or status >= 500:
raise RuntimeError(f"HTTP {status}")
return response
except Exception as err: # noqa: BLE001
last_error = err
if attempt == retries - 1:
break
time.sleep(backoff_base * (2**attempt))
raise RuntimeError(f"Request failed for {url}: {last_error}") from last_error
def download_via_browser_with_retry(
session: DynamicSession, detail_url: str, retries: int, backoff_base: float
) -> tuple[str, bytes]:
last_error: Exception | None = None
for attempt in range(retries):
page = session.context.new_page()
try:
page.goto(detail_url, wait_until="domcontentloaded", timeout=45_000)
with page.expect_download(timeout=45_000) as download_info:
clicked = False
selectors = [
'span.torrent_buttons a[title*="Download"][href*="action=download"]',
'a.button.blueButton[href*="action=download"]',
'a[href*="action=download"][href*="torrent_pass"]',
]
for selector in selectors:
locator = page.locator(selector)
if locator.count() > 0:
locator.first.click()
clicked = True
break
if not clicked:
locator = page.locator(
"xpath=//a[contains(translate(normalize-space(string(.)),'abcdefghijklmnopqrstuvwxyz','ABCDEFGHIJKLMNOPQRSTUVWXYZ'),'DOWNLOAD') and contains(@href,'action=download')]"
)
if locator.count() > 0:
locator.first.click()
clicked = True
if not clicked:
raise RuntimeError("Download button not found in interactive page.")
download = download_info.value
temp_path = download.path()
if not temp_path:
raise RuntimeError("Downloaded file path is empty.")
data = Path(temp_path).read_bytes()
filename = (download.suggested_filename or "downloaded.torrent").strip()
if not filename:
filename = "downloaded.torrent"
return filename, data
except Exception as err: # noqa: BLE001
last_error = err
if attempt == retries - 1:
break
time.sleep(backoff_base * (2**attempt))
finally:
page.close()
raise RuntimeError(f"Request failed for {detail_url}: {last_error}") from last_error
def find_download_link(response: Any) -> str:
selectors = [
'span.torrent_buttons a[title*="Download"][href*="action=download"]::attr(href)',
'a.button.blueButton[href*="action=download"]::attr(href)',
'a[href*="action=download"][href*="torrent_pass"]::attr(href)',
]
for sel in selectors:
href = (response.css(sel).get("") or "").strip()
if href:
return href
# Fallback using text match if classes/attributes drift
href = (
response.xpath(
"//a[contains(translate(normalize-space(string(.)),"
"'abcdefghijklmnopqrstuvwxyz','ABCDEFGHIJKLMNOPQRSTUVWXYZ'),'DOWNLOAD') and contains(@href,'action=download')]/@href"
).get("")
or ""
).strip()
return href
def normalize_filename(filename: str, download_url: str) -> str:
safe_name = Path(filename).name.strip()
if safe_name:
return safe_name if safe_name.lower().endswith(".torrent") else f"{safe_name}.torrent"
from_url = Path(urlparse(download_url).path).name.strip()
if from_url:
return from_url if from_url.lower().endswith(".torrent") else f"{from_url}.torrent"
return "downloaded.torrent"
def looks_like_torrent_bytes(data: bytes) -> bool:
# Basic bencode sanity check for torrent files
return bool(data) and data.startswith(b"d") and (b"4:info" in data[:4096])
def validate_torrent_response(download_url: str, filename: str, data: bytes) -> None:
good_ext = filename.lower().endswith(".torrent") or urlparse(download_url).path.lower().endswith(".torrent")
if not good_ext:
raise RuntimeError("Downloaded content has no .torrent extension in URL/filename.")
if not looks_like_torrent_bytes(data):
raise RuntimeError("Downloaded file failed torrent bencode check.")
def run(args: argparse.Namespace) -> None:
base_url = args.base_url.rstrip("/")
target_host = urlparse(base_url).hostname or "www.happyfappy.net"
cookie_value = args.cookie or ""
if not cookie_value and args.cookie_file:
cookie_value = Path(args.cookie_file).read_text(encoding="utf-8").strip()
if not cookie_value:
raise ValueError("Cookie is required. Use --cookie or --cookie-file.")
cookies = parse_cookie_string(cookie_value, target_host=target_host)
if not cookies:
raise ValueError("No valid cookies parsed for target host.")
pw_cookies = parse_cookies_for_playwright(cookie_value, target_host=target_host, base_url=base_url)
if not pw_cookies:
raise ValueError("No Playwright-compatible cookies generated for target host.")
output_dir = Path(args.output_dir).resolve()
output_dir.mkdir(parents=True, exist_ok=True)
with DynamicSession(
headless=True,
disable_resources=True,
cookies=pw_cookies,
google_search=False,
retries=1,
retry_delay=1,
) as session:
detail_response = fetch_dynamic_with_retry(
session, args.url, retries=args.retries, backoff_base=args.backoff_base
)
href = find_download_link(detail_response)
if not href:
raise RuntimeError("Download link not found on page.")
download_url = absolute_url(base_url, href)
suggested_filename, data = download_via_browser_with_retry(
session, args.url, retries=args.retries, backoff_base=args.backoff_base
)
filename = normalize_filename(suggested_filename, download_url)
validate_torrent_response(download_url, filename, data)
output_path = output_dir / filename
output_path.write_bytes(data) # overwrite behavior by design
print(f"Saved torrent to {output_path}")
def make_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Download a torrent file from a single HappyFappy torrent detail page URL.",
)
parser.add_argument("--url", required=True, help="Torrent detail page URL")
parser.add_argument("--base-url", default="https://www.happyfappy.net")
parser.add_argument("--cookie", help='Raw cookie string, e.g. "a=1; b=2"')
parser.add_argument("--cookie-file", help="Path to cookie file")
parser.add_argument("--output-dir", default="torrent")
parser.add_argument("--retries", type=int, default=3)
parser.add_argument("--backoff-base", type=float, default=5.0)
return parser
def main() -> None:
parser = make_parser()
args = parser.parse_args()
if args.retries < 1:
raise ValueError("--retries must be at least 1.")
if args.backoff_base < 0:
raise ValueError("--backoff-base must be >= 0.")
run(args)
if __name__ == "__main__":
main()

View File

@@ -5,21 +5,12 @@ import argparse
import json import json
import random import random
import re import re
import sys
import time import time
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from urllib.parse import urlparse from urllib.parse import urlparse
try: from scrapling.fetchers import DynamicSession
from scrapling.fetchers import DynamicSession
except ModuleNotFoundError:
local_repo = Path(__file__).resolve().parent / "Scrapling"
if local_repo.exists():
sys.path.insert(0, str(local_repo))
from scrapling.fetchers import DynamicSession
else:
raise
STOP_TEXT = "You have not bookmarked any torrents." STOP_TEXT = "You have not bookmarked any torrents."
BG_URL_RE = re.compile(r"url\((?:'|\")?(.*?)(?:'|\")?\)") BG_URL_RE = re.compile(r"url\((?:'|\")?(.*?)(?:'|\")?\)")
@@ -122,11 +113,13 @@ def extract_background_image(style: str) -> str | None:
return value or None return value or None
def extract_torrent_cards(response: Any) -> list[dict[str, Any]]: def extract_torrent_cards(response: Any, base_url: str) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = [] records: list[dict[str, Any]] = []
cards = response.css("div.torrent_grid div.torrent_grid__torrent") cards = response.css("div.torrent_grid div.torrent_grid__torrent")
for card in cards: for card in cards:
page_url = (card.css('a[href^="/torrents.php?id="]::attr(href)').get("") or "").strip() page_url = (card.css('a[href^="/torrents.php?id="]::attr(href)').get("") or "").strip()
if page_url and not page_url.startswith("http"):
page_url = f"{base_url.rstrip('/')}{page_url}"
category = (card.css("span.torrent_grid__torrent__cat::text").get("") or "").strip() category = (card.css("span.torrent_grid__torrent__cat::text").get("") or "").strip()
title = ( title = (
card.css("h3.trim::attr(title)").get("") card.css("h3.trim::attr(title)").get("")
@@ -220,7 +213,7 @@ def run(args: argparse.Namespace) -> None:
if should_stop(response): if should_stop(response):
break break
page_records = extract_torrent_cards(response) page_records = extract_torrent_cards(response, args.base_url)
all_records.extend(page_records) all_records.extend(page_records)
print(f"[page={page}] extracted={len(page_records)} total={len(all_records)}") print(f"[page={page}] extracted={len(page_records)} total={len(all_records)}")
page += 1 page += 1