feat: ortak tracker adapter yapisi ve PrivateHD destegini ekle

This commit is contained in:
2026-03-13 02:08:17 +03:00
parent daf75166db
commit fe429b6cef
8 changed files with 815 additions and 229 deletions

View File

@@ -1,13 +1,10 @@
from __future__ import annotations
import argparse
import json
from pathlib import Path
from wscraper.sites.happyfappy import run_download_torrent_files, run_get_bookmarks
SITE_ALIASES = {
"happyfappy": "happyfappy",
"hf": "happyfappy",
}
from wscraper.registry import get_tracker, list_trackers, normalize_tracker
ACTION_ALIASES = {
"get-bookmarks": "get-bookmarks",
@@ -16,17 +13,12 @@ ACTION_ALIASES = {
"download-torrent-files": "download-torrent-files",
"dtf": "download-torrent-files",
"download": "download-torrent-files",
"remove-bookmark": "remove-bookmark",
"remove": "remove-bookmark",
"rb": "remove-bookmark",
}
def normalize_site(value: str) -> str:
key = value.strip().lower()
if key not in SITE_ALIASES:
supported = ", ".join(sorted(SITE_ALIASES))
raise ValueError(f"Unsupported site: {value!r}. Supported values: {supported}")
return SITE_ALIASES[key]
def normalize_action(value: str) -> str:
key = value.strip().lower()
if key not in ACTION_ALIASES:
@@ -36,98 +28,96 @@ def normalize_action(value: str) -> str:
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="wscraper: multi-site scraping entrypoint")
parser.add_argument("site", help="Site key, e.g. happyfappy or hf")
supported_sites = ", ".join(sorted({tracker.key for tracker in list_trackers()}))
parser = argparse.ArgumentParser(description=f"wscraper: multi-site scraping entrypoint ({supported_sites})")
parser.add_argument("site", help="Site key, e.g. happyfappy, hf, privatehd or phd")
parser.add_argument("-a", "--action", required=True, help="Action to run")
parser.add_argument("--base-url", help="Override site base URL")
parser.add_argument("--cookie", help='Raw cookie string, e.g. "a=1; b=2"')
parser.add_argument("-c", "--cookie-file", help="Path to cookie file")
parser.add_argument("--wishlist-url", help="Tracker-specific wishlist URL override")
parser.add_argument("-u", "--url", help="Detail page URL")
parser.add_argument("--download-url", help="Direct torrent download URL")
parser.add_argument("--remove-token", help="Tracker-specific remove token")
parser.add_argument("--title", help="Item title")
parser.add_argument("--image-url", help="Background image URL")
parser.add_argument("--size", help="Torrent size text")
parser.add_argument("--seeders", type=int, help="Seeders count")
parser.add_argument("--leechers", type=int, help="Leechers count")
parser.add_argument("-u", "--url", help="Detail page URL (required for download action)")
parser.add_argument(
"-o",
"--output",
help="Output target: file path for get-bookmarks, directory path for download-torrent-files",
)
parser.add_argument(
"-rmb",
"--rm-bookmark",
action="store_true",
help="When used with download-torrent-files, remove bookmark after successful torrent download",
)
parser.add_argument("-r", "--retries", type=int, default=3)
parser.add_argument("--backoff-base", type=float, default=5.0)
parser.add_argument("--delay-min", type=float, default=1.8)
parser.add_argument("--delay-max", type=float, default=3.2)
parser.add_argument("--max-pages", type=int, default=200)
return parser
def run_happyfappy(args: argparse.Namespace, action: str) -> None:
base_url = args.base_url or "https://www.happyfappy.net"
def read_cookie(args: argparse.Namespace) -> str:
cookie_value = args.cookie or ""
if not cookie_value and args.cookie_file:
cookie_value = Path(args.cookie_file).read_text(encoding="utf-8").strip()
if not cookie_value:
raise ValueError("Cookie is required. Use --cookie or --cookie-file/-c.")
return cookie_value
if action == "get-bookmarks":
run_get_bookmarks(
argparse.Namespace(
base_url=base_url,
cookie=args.cookie,
cookie_file=args.cookie_file,
output=args.output or "bookmarks.json",
delay_min=args.delay_min,
delay_max=args.delay_max,
retries=args.retries,
backoff_base=args.backoff_base,
max_pages=args.max_pages,
)
)
return
if action == "download-torrent-files":
if not args.url:
raise ValueError("--url is required for action=download-torrent-files.")
run_download_torrent_files(
argparse.Namespace(
url=args.url,
base_url=base_url,
cookie=args.cookie,
cookie_file=args.cookie_file,
output_dir=args.output or "torrent",
rm_bookmark=args.rm_bookmark,
retries=args.retries,
backoff_base=args.backoff_base,
)
)
return
raise ValueError(f"Unsupported action for happyfappy: {action}")
def build_item(args: argparse.Namespace) -> dict[str, object]:
if not args.url:
raise ValueError("--url is required for item-based actions.")
item: dict[str, object] = {
"pageURL": args.url,
"title": args.title or "",
}
if args.download_url:
item["downloadURL"] = args.download_url
if args.remove_token:
item["removeToken"] = args.remove_token
if args.image_url:
item["backgroundImage"] = args.image_url
if args.size:
item["size"] = args.size
if args.seeders is not None:
item["seeders"] = args.seeders
if args.leechers is not None:
item["leechers"] = args.leechers
return item
def main() -> None:
parser = build_parser()
args = parser.parse_args()
if args.retries < 1:
raise ValueError("--retries must be at least 1.")
if args.backoff_base < 0:
raise ValueError("--backoff-base must be >= 0.")
if args.delay_min < 0 or args.delay_max < 0:
raise ValueError("Delay values must be non-negative.")
if args.delay_min > args.delay_max:
raise ValueError("--delay-min cannot be greater than --delay-max.")
site = normalize_site(args.site)
tracker = get_tracker(normalize_tracker(args.site))
action = normalize_action(args.action)
cookie = read_cookie(args)
if not args.cookie and not args.cookie_file:
raise ValueError("Cookie is required. Use --cookie or --cookie-file/-c.")
if site == "happyfappy":
run_happyfappy(args, action)
if action == "get-bookmarks":
items = tracker.get_bookmarks(cookie, wishlist_url=args.wishlist_url)
output_path = Path(args.output or "bookmarks.json").resolve()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(items, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"Saved {len(items)} records to {output_path}")
return
raise ValueError(f"Unsupported site: {site}")
item = build_item(args)
if action == "download-torrent-files":
result = tracker.download_torrent(cookie, item, wishlist_url=args.wishlist_url)
output_dir = Path(args.output or "torrent").resolve()
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / result["filename"]
output_path.write_bytes(result["data"])
print(f"Saved torrent to {output_path}")
return
if action == "remove-bookmark":
tracker.remove_bookmark(cookie, item, wishlist_url=args.wishlist_url)
print("Bookmark removed successfully.")
return
raise ValueError(f"Unsupported action: {action}")
if __name__ == "__main__":

View File

@@ -0,0 +1 @@
__all__ = []

View File

@@ -0,0 +1,91 @@
from __future__ import annotations
from typing import Any
def domain_matches(target_host: str, cookie_domain: str) -> bool:
normalized_cookie_domain = cookie_domain.replace("#HttpOnly_", "").lstrip(".").lower()
normalized_target_host = target_host.lower()
return (
normalized_target_host == normalized_cookie_domain
or normalized_target_host.endswith("." + normalized_cookie_domain)
)
def parse_cookie_string(cookie_string: str, target_host: str) -> dict[str, str]:
cookies: dict[str, str] = {}
lines = cookie_string.splitlines()
looks_like_netscape = len(lines) > 1 and any(
("\t" in line) or len(line.split()) >= 7 for line in lines if line.strip()
)
if looks_like_netscape:
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t") if "\t" in line else line.split()
if len(parts) < 7:
continue
domain, _flag, _path, _secure, _expires, name, value = parts[:7]
if not domain_matches(target_host, domain):
continue
if name:
cookies[name] = value
return cookies
for chunk in cookie_string.split(";"):
piece = chunk.strip()
if not piece or "=" not in piece:
continue
key, value = piece.split("=", 1)
key = key.strip()
value = value.strip()
if key:
cookies[key] = value
return cookies
def parse_cookies_for_playwright(
cookie_string: str,
target_host: str,
base_url: str,
) -> list[dict[str, Any]]:
lines = cookie_string.splitlines()
cookies: list[dict[str, Any]] = []
looks_like_netscape = len(lines) > 1 and any(
("\t" in line) or len(line.split()) >= 7 for line in lines if line.strip()
)
if looks_like_netscape:
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t") if "\t" in line else line.split()
if len(parts) < 7:
continue
domain, _flag, path, secure, expires, name, value = parts[:7]
if not domain_matches(target_host, domain):
continue
if not name:
continue
cookie_obj: dict[str, Any] = {
"name": name,
"value": value,
"domain": domain.replace("#HttpOnly_", "").lstrip("."),
"path": path or "/",
"secure": (secure.upper() == "TRUE"),
}
if expires.isdigit():
expires_number = int(expires)
if expires_number > 0:
cookie_obj["expires"] = float(expires_number)
cookies.append(cookie_obj)
return cookies
kv = parse_cookie_string(cookie_string, target_host)
for name, value in kv.items():
cookies.append({"name": name, "value": value, "url": base_url})
return cookies

View File

@@ -0,0 +1,78 @@
from __future__ import annotations
import re
import socket
import time
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
def absolute_url(base_url: str, href: str) -> str:
href = href.strip()
if href.startswith("http://") or href.startswith("https://"):
return href
if href.startswith("/"):
return f"{base_url.rstrip('/')}{href}"
return f"{base_url.rstrip('/')}/{href}"
def ensure_hosts_entry(host: str) -> None:
try:
ip = socket.gethostbyname(host)
except OSError:
return
hosts_path = Path("/etc/hosts")
try:
current = hosts_path.read_text(encoding="utf-8")
except OSError:
return
if re.search(rf"(^|\s){re.escape(host)}(\s|$)", current, flags=re.MULTILINE):
return
try:
with hosts_path.open("a", encoding="utf-8") as handle:
handle.write(f"\n{ip} {host}\n")
except OSError:
return
def ensure_tracker_hosts(base_url: str) -> None:
parsed = urlparse(base_url)
host = parsed.hostname
if not host:
return
variants = {host}
if host.startswith("www."):
variants.add(host[4:])
else:
variants.add(f"www.{host}")
for candidate in variants:
ensure_hosts_entry(candidate)
def fetch_dynamic_with_retry(session: Any, url: str, retries: int, backoff_base: float) -> Any:
last_error: Exception | None = None
for attempt in range(retries):
try:
response = session.fetch(
url,
timeout=45_000,
load_dom=True,
network_idle=False,
google_search=False,
)
status = response.status
if status in (403, 404, 429) or status >= 500:
raise RuntimeError(f"HTTP {status}")
return response
except Exception as err: # noqa: BLE001
last_error = err
if attempt == retries - 1:
break
time.sleep(backoff_base * (2**attempt))
raise RuntimeError(f"Request failed for {url}: {last_error}") from last_error

34
src/wscraper/registry.py Normal file
View File

@@ -0,0 +1,34 @@
from __future__ import annotations
from wscraper.sites.happyfappy import adapter as happyfappy_adapter
from wscraper.sites.privatehd import adapter as privatehd_adapter
from wscraper.types import TrackerAdapter, TrackerInfo
TRACKERS: dict[str, TrackerAdapter] = {
happyfappy_adapter.key: happyfappy_adapter,
privatehd_adapter.key: privatehd_adapter,
}
TRACKER_ALIASES = {
"hf": "happyfappy",
"happyfappy": "happyfappy",
"phd": "privatehd",
"privatehd": "privatehd",
}
def normalize_tracker(value: str) -> str:
key = value.strip().lower()
if key not in TRACKER_ALIASES:
supported = ", ".join(sorted(TRACKER_ALIASES))
raise ValueError(f"Unsupported tracker: {value!r}. Supported values: {supported}")
return TRACKER_ALIASES[key]
def get_tracker(value: str) -> TrackerAdapter:
normalized = normalize_tracker(value)
return TRACKERS[normalized]
def list_trackers() -> list[TrackerInfo]:
return [TrackerInfo(key=tracker.key, label=tracker.label) for tracker in TRACKERS.values()]

View File

@@ -4,165 +4,21 @@ import argparse
import json
import random
import re
import socket
import tempfile
import time
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from scrapling.fetchers import DynamicSession
from wscraper.common.cookies import parse_cookie_string, parse_cookies_for_playwright
from wscraper.common.net import absolute_url, ensure_tracker_hosts, fetch_dynamic_with_retry
from wscraper.types import BookmarkItem, DownloadResult
STOP_TEXT = "You have not bookmarked any torrents."
BG_URL_RE = re.compile(r"url\((?:'|\")?(.*?)(?:'|\")?\)")
def _domain_matches(target_host: str, cookie_domain: str) -> bool:
cd = cookie_domain.lstrip(".").lower()
th = target_host.lower()
return th == cd or th.endswith("." + cd)
def parse_cookie_string(cookie_string: str, target_host: str) -> dict[str, str]:
cookies: dict[str, str] = {}
lines = cookie_string.splitlines()
looks_like_netscape = len(lines) > 1 and any("\t" in line for line in lines)
if looks_like_netscape:
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) < 7:
continue
domain, _flag, _path, _secure, _expires, name, value = parts[:7]
if not _domain_matches(target_host, domain):
continue
if name:
cookies[name] = value
return cookies
for chunk in cookie_string.split(";"):
piece = chunk.strip()
if not piece or "=" not in piece:
continue
key, value = piece.split("=", 1)
key = key.strip()
value = value.strip()
if key:
cookies[key] = value
return cookies
def parse_cookies_for_playwright(cookie_string: str, target_host: str, base_url: str) -> list[dict[str, Any]]:
lines = cookie_string.splitlines()
cookies: list[dict[str, Any]] = []
looks_like_netscape = len(lines) > 1 and any("\t" in line for line in lines)
if looks_like_netscape:
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) < 7:
continue
domain, _flag, path, secure, expires, name, value = parts[:7]
if not _domain_matches(target_host, domain):
continue
if not name:
continue
cookie_obj: dict[str, Any] = {
"name": name,
"value": value,
"domain": domain.lstrip("."),
"path": path or "/",
"secure": (secure.upper() == "TRUE"),
}
if expires.isdigit():
exp_num = int(expires)
if exp_num > 0:
cookie_obj["expires"] = float(exp_num)
cookies.append(cookie_obj)
return cookies
kv = parse_cookie_string(cookie_string, target_host)
for name, value in kv.items():
cookies.append({"name": name, "value": value, "url": base_url})
return cookies
def absolute_url(base_url: str, href: str) -> str:
href = href.strip()
if href.startswith("http://") or href.startswith("https://"):
return href
if href.startswith("/"):
return f"{base_url.rstrip('/')}{href}"
return f"{base_url.rstrip('/')}/{href}"
def ensure_hosts_entry(host: str) -> None:
try:
ip = socket.gethostbyname(host)
except OSError:
return
hosts_path = Path("/etc/hosts")
try:
current = hosts_path.read_text(encoding="utf-8")
except OSError:
return
if re.search(rf"(^|\s){re.escape(host)}(\s|$)", current, flags=re.MULTILINE):
return
try:
with hosts_path.open("a", encoding="utf-8") as handle:
handle.write(f"\n{ip} {host}\n")
except OSError:
return
def ensure_tracker_hosts(base_url: str) -> None:
parsed = urlparse(base_url)
host = parsed.hostname
if not host:
return
variants = {host}
if host.startswith("www."):
variants.add(host[4:])
else:
variants.add(f"www.{host}")
for candidate in variants:
ensure_hosts_entry(candidate)
def fetch_dynamic_with_retry(session: Any, url: str, retries: int, backoff_base: float) -> Any:
last_error: Exception | None = None
for attempt in range(retries):
try:
response = session.fetch(
url,
timeout=45_000,
load_dom=True,
network_idle=False,
google_search=False,
)
status = response.status
if status in (403, 404, 429) or status >= 500:
raise RuntimeError(f"HTTP {status}")
return response
except Exception as err: # noqa: BLE001
last_error = err
if attempt == retries - 1:
break
time.sleep(backoff_base * (2**attempt))
raise RuntimeError(f"Request failed for {url}: {last_error}") from last_error
# bookmarks
def extract_background_image(style: str) -> str | None:
@@ -671,3 +527,126 @@ def remove_bookmark_with_retry(
page.close()
raise RuntimeError(f"Bookmark remove failed for {detail_url}: {last_error}") from last_error
def get_bookmarks(cookie: str, *, base_url: str = "https://www.happyfappy.net") -> list[BookmarkItem]:
with tempfile.TemporaryDirectory(prefix="happyfappy-bookmarks-") as tmpdir:
output_path = Path(tmpdir) / "bookmarks.json"
run_get_bookmarks(
argparse.Namespace(
base_url=base_url,
cookie=cookie,
cookie_file=None,
output=str(output_path),
delay_min=1.8,
delay_max=3.2,
retries=3,
backoff_base=5.0,
max_pages=200,
)
)
return json.loads(output_path.read_text(encoding="utf-8"))
def download_torrent(
cookie: str,
item: BookmarkItem,
*,
base_url: str = "https://www.happyfappy.net",
) -> DownloadResult:
detail_url = (item.get("pageURL") or "").strip()
if not detail_url:
raise ValueError("pageURL is required for HappyFappy download.")
with tempfile.TemporaryDirectory(prefix="happyfappy-download-") as tmpdir:
output_dir = Path(tmpdir) / "torrent"
run_download_torrent_files(
argparse.Namespace(
url=detail_url,
base_url=base_url,
cookie=cookie,
cookie_file=None,
output_dir=str(output_dir),
rm_bookmark=False,
retries=3,
backoff_base=5.0,
)
)
files = sorted(output_dir.glob("*.torrent"))
if not files:
raise RuntimeError("No torrent file produced")
torrent_path = files[0]
return {
"filename": torrent_path.name,
"data": torrent_path.read_bytes(),
}
def remove_bookmark(
cookie: str,
item: BookmarkItem,
*,
base_url: str = "https://www.happyfappy.net",
) -> None:
detail_url = (item.get("pageURL") or "").strip()
if not detail_url:
raise ValueError("pageURL is required for HappyFappy bookmark removal.")
target_host = urlparse(base_url).hostname or "www.happyfappy.net"
ensure_tracker_hosts(base_url)
pw_cookies = parse_cookies_for_playwright(
cookie,
target_host=target_host,
base_url=base_url.rstrip("/"),
)
if not pw_cookies:
raise ValueError("No Playwright-compatible cookies generated for target host.")
torrent_id = extract_torrent_id(detail_url)
with DynamicSession(
headless=True,
disable_resources=True,
cookies=pw_cookies,
google_search=False,
retries=1,
retry_delay=1,
) as session:
remove_bookmark_with_retry(
session=session,
detail_url=detail_url,
torrent_id=torrent_id,
retries=3,
backoff_base=5.0,
)
class HappyFappyAdapter:
key = "happyfappy"
label = "HappyFappy"
def get_bookmarks(self, cookie: str, *, wishlist_url: str | None = None) -> list[BookmarkItem]:
del wishlist_url
return get_bookmarks(cookie)
def download_torrent(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> DownloadResult:
del wishlist_url
return download_torrent(cookie, item)
def remove_bookmark(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> None:
del wishlist_url
remove_bookmark(cookie, item)
adapter = HappyFappyAdapter()

View File

@@ -0,0 +1,359 @@
from __future__ import annotations
import re
import time
from http.cookies import SimpleCookie
from typing import Any
from urllib.parse import unquote
from urllib.parse import urlparse
from curl_cffi import requests
from scrapling.fetchers import DynamicSession
from wscraper.sites.happyfappy import (
absolute_url,
fetch_dynamic_with_retry,
looks_like_torrent_bytes,
normalize_filename,
parse_cookie_string,
parse_cookies_for_playwright,
)
from wscraper.types import BookmarkItem, DownloadResult
DEFAULT_BASE_URL = "https://privatehd.to"
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36"
)
def normalize_wishlist_url(base_url: str, wishlist_url: str | None) -> str:
candidate = (wishlist_url or "").strip()
if not candidate:
raise ValueError("PrivateHD icin wishlistUrl zorunlu.")
if candidate.startswith("http://") or candidate.startswith("https://"):
return candidate
return absolute_url(base_url, candidate)
def parse_int(value: str) -> int | None:
cleaned = value.strip()
if not cleaned:
return None
match = re.search(r"\d+", cleaned.replace(",", ""))
return int(match.group(0)) if match else None
def extract_rows(response: Any, base_url: str) -> list[BookmarkItem]:
records: list[BookmarkItem] = []
rows = response.css("table.table tbody tr")
for row in rows:
detail_href = (row.css("a.torrent-filename::attr(href)").get("") or "").strip()
if not detail_href:
continue
detail_url = absolute_url(base_url, detail_href)
title = " ".join(row.css("a.torrent-filename::text").getall()).strip()
download_href = (
row.css("a.torrent-download-icon::attr(href)").get("")
or row.css("a[href*='/download/torrent/']::attr(href)").get("")
or ""
).strip()
download_url = absolute_url(base_url, download_href) if download_href else None
remove_token = (
row.css("button.btn-delete-wishlist::attr(data-id)").get("")
or row.css("button[data-id]::attr(data-id)").get("")
or ""
).strip() or None
cells = row.css("td")
size = None
seeders = None
leechers = None
if len(cells) >= 7:
size = " ".join(cells[4].css("::text").getall()).strip() or None
seeders = parse_int(" ".join(cells[5].css("::text").getall()))
leechers = parse_int(" ".join(cells[6].css("::text").getall()))
records.append(
{
"pageURL": detail_url,
"title": title,
"downloadURL": download_url,
"removeToken": remove_token,
"size": size,
"seeders": seeders,
"leechers": leechers,
}
)
return records
def enrich_bookmark(response: Any, base_url: str, item: BookmarkItem) -> BookmarkItem:
poster = (
response.css("img[src*='/images/posters/']::attr(src)").get("")
or response.css("img.img-responsive::attr(src)").get("")
or ""
).strip()
title = (
" ".join(response.css("a[href*='/movie/']::text").getall()).strip()
or " ".join(response.css("a[href*='/tv/']::text").getall()).strip()
or " ".join(response.css("h1::text").getall()).strip()
or item.get("title")
or ""
)
download_href = (
response.css("a.btn.btn-xs.btn-primary[href*='/download/torrent/']::attr(href)").get("")
or response.css("a[href*='/download/torrent/']::attr(href)").get("")
or ""
).strip()
return {
**item,
"title": title.strip(),
"backgroundImage": absolute_url(base_url, poster) if poster else item.get("backgroundImage"),
"downloadURL": absolute_url(base_url, download_href) if download_href else item.get("downloadURL"),
}
def build_dynamic_session(
cookie: str,
*,
base_url: str = DEFAULT_BASE_URL,
) -> DynamicSession:
target_host = urlparse(base_url).hostname or "privatehd.to"
pw_cookies = parse_cookies_for_playwright(cookie, target_host=target_host, base_url=base_url.rstrip("/"))
if not pw_cookies:
raise ValueError("No Playwright-compatible cookies generated for target host.")
return DynamicSession(
headless=True,
disable_resources=True,
cookies=pw_cookies,
google_search=False,
retries=1,
retry_delay=1,
)
def get_bookmarks(
cookie: str,
*,
wishlist_url: str,
base_url: str = DEFAULT_BASE_URL,
) -> list[BookmarkItem]:
normalized_wishlist_url = normalize_wishlist_url(base_url, wishlist_url)
with build_dynamic_session(cookie, base_url=base_url) as session:
response = fetch_dynamic_with_retry(
session,
normalized_wishlist_url,
retries=3,
backoff_base=5.0,
)
items = extract_rows(response, base_url)
enriched: list[BookmarkItem] = []
for index, item in enumerate(items):
detail_response = fetch_dynamic_with_retry(
session,
item["pageURL"],
retries=3,
backoff_base=5.0,
)
enriched.append(enrich_bookmark(detail_response, base_url, item))
if index < len(items) - 1:
time.sleep(1.2)
return enriched
def build_http_cookies(cookie: str, target_url: str) -> dict[str, str]:
target_host = urlparse(target_url).hostname or "privatehd.to"
cookies = parse_cookie_string(cookie, target_host=target_host)
if not cookies:
raise ValueError("No valid cookies parsed for target host.")
return cookies
def build_http_session(cookie: str, target_url: str) -> requests.Session:
session = requests.Session()
session.cookies.update(build_http_cookies(cookie, target_url))
session.headers.update({"User-Agent": USER_AGENT})
return session
def fetch_wishlist_token(
cookie: str,
*,
wishlist_url: str,
base_url: str = DEFAULT_BASE_URL,
) -> str:
normalized_wishlist_url = normalize_wishlist_url(base_url, wishlist_url)
with build_dynamic_session(cookie, base_url=base_url) as session:
response = fetch_dynamic_with_retry(
session,
normalized_wishlist_url,
retries=3,
backoff_base=5.0,
)
token = (
response.css("input[name='_token']::attr(value)").get("")
or response.css("meta[name='csrf-token']::attr(content)").get("")
or ""
).strip()
if not token:
cookies = build_http_cookies(cookie, normalized_wishlist_url)
raw_xsrf = cookies.get("XSRF-TOKEN", "").strip()
if raw_xsrf:
return unquote(raw_xsrf)
set_cookie_values = response.headers.get_list("set-cookie") if hasattr(response.headers, "get_list") else []
for raw_header in set_cookie_values:
jar = SimpleCookie()
jar.load(raw_header)
morsel = jar.get("XSRF-TOKEN")
if morsel and morsel.value:
return unquote(morsel.value)
raise RuntimeError("PrivateHD CSRF token bulunamadi.")
return token
def validate_download(download_url: str, filename: str, data: bytes) -> None:
good_ext = filename.lower().endswith(".torrent") or urlparse(download_url).path.lower().endswith(".torrent")
if not good_ext:
raise RuntimeError("Downloaded content has no .torrent extension in URL/filename.")
if not looks_like_torrent_bytes(data):
raise RuntimeError("Downloaded file failed torrent bencode check.")
def download_torrent(
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
base_url: str = DEFAULT_BASE_URL,
) -> DownloadResult:
del wishlist_url
download_url = (item.get("downloadURL") or "").strip()
if not download_url:
raise ValueError("PrivateHD item icin downloadURL zorunlu.")
cookies = build_http_cookies(cookie, download_url)
response = requests.get(
download_url,
cookies=cookies,
headers={
"Referer": item.get("pageURL") or base_url,
"User-Agent": USER_AGENT,
},
timeout=60,
)
if response.status_code >= 400:
raise RuntimeError(f"PrivateHD torrent indirme basarisiz: HTTP {response.status_code}")
filename = normalize_filename("", download_url)
validate_download(download_url, filename, response.content)
return {
"filename": filename,
"data": response.content,
}
def remove_bookmark(
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
base_url: str = DEFAULT_BASE_URL,
) -> None:
remove_token = (item.get("removeToken") or "").strip()
if not remove_token:
raise ValueError("PrivateHD item icin removeToken zorunlu.")
normalized_wishlist_url = normalize_wishlist_url(base_url, wishlist_url)
target_host = urlparse(base_url).hostname or "privatehd.to"
pw_cookies = parse_cookies_for_playwright(cookie, target_host=target_host, base_url=base_url.rstrip("/"))
if not pw_cookies:
raise ValueError("No Playwright-compatible cookies generated for target host.")
with DynamicSession(
headless=True,
disable_resources=True,
cookies=pw_cookies,
google_search=False,
retries=1,
retry_delay=1,
) as session:
page = session.context.new_page()
try:
page.goto(normalized_wishlist_url, wait_until="domcontentloaded", timeout=45_000)
delete_button = page.locator(f"button.btn-delete-wishlist[data-id='{remove_token}']")
if delete_button.count() == 0:
raise RuntimeError("PrivateHD wishlist silme butonu bulunamadi.")
delete_button.first.click()
page.wait_for_timeout(500)
confirm_selectors = [
"button.swal2-confirm",
"button.confirm",
"button.btn-danger",
"button:has-text('Yes')",
]
clicked_confirm = False
for selector in confirm_selectors:
locator = page.locator(selector)
if locator.count() > 0 and locator.first.is_visible():
locator.first.click()
clicked_confirm = True
break
if not clicked_confirm:
confirm_result = page.evaluate(
"""
() => {
const nodes = Array.from(document.querySelectorAll("button,a"));
const target = nodes.find((node) =>
/^(yes|ok|confirm)$/i.test((node.textContent || "").trim())
);
if (!target) return false;
target.click();
return true;
}
"""
)
clicked_confirm = bool(confirm_result)
if not clicked_confirm:
raise RuntimeError("PrivateHD wishlist onay butonu bulunamadi.")
page.wait_for_timeout(1800)
page.reload(wait_until="domcontentloaded", timeout=45_000)
if page.locator(f"button.btn-delete-wishlist[data-id='{remove_token}']").count() > 0:
raise RuntimeError("PrivateHD wishlist silme dogrulanamadi: kayit hala listede gorunuyor.")
finally:
page.close()
class PrivateHDAdapter:
key = "privatehd"
label = "PrivateHD"
def get_bookmarks(self, cookie: str, *, wishlist_url: str | None = None) -> list[BookmarkItem]:
return get_bookmarks(cookie, wishlist_url=wishlist_url or "")
def download_torrent(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> DownloadResult:
return download_torrent(cookie, item, wishlist_url=wishlist_url)
def remove_bookmark(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> None:
return remove_bookmark(cookie, item, wishlist_url=wishlist_url)
adapter = PrivateHDAdapter()

54
src/wscraper/types.py Normal file
View File

@@ -0,0 +1,54 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Protocol, TypedDict
class BookmarkItem(TypedDict, total=False):
pageURL: str
title: str
backgroundImage: str | None
downloadURL: str | None
removeToken: str | None
size: str | None
seeders: int | None
leechers: int | None
class DownloadResult(TypedDict):
filename: str
data: bytes
@dataclass(frozen=True)
class TrackerInfo:
key: str
label: str
class TrackerAdapter(Protocol):
key: str
label: str
def get_bookmarks(
self,
cookie: str,
*,
wishlist_url: str | None = None,
) -> list[BookmarkItem]: ...
def download_torrent(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> DownloadResult: ...
def remove_bookmark(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> None: ...