Compare commits

..

26 Commits

Author SHA1 Message Date
b8e99ebbd2 test: privatehd icin live e2e testlerini ve test dokumantasyonunu ekle 2026-03-13 08:27:50 +03:00
259531949b docs: privatehd ve q-buffer entegrasyonunu belge 2026-03-13 02:30:31 +03:00
fe429b6cef feat: ortak tracker adapter yapisi ve PrivateHD destegini ekle 2026-03-13 02:08:17 +03:00
daf75166db feat: q-buffer watcher akisini destekle 2026-03-12 22:32:12 +03:00
55459373e5 feat: add optional bookmark removal to torrent download action 2026-03-08 02:58:41 +03:00
7d307c31f9 readme update 2026-03-07 02:47:17 +03:00
b8cd26dc11 chore: ignore macOS metadata files 2026-03-07 02:46:13 +03:00
4ce0aad021 test: add live e2e happyfappy CLI tests with detailed output 2026-03-07 02:46:10 +03:00
6d7cb602b3 readme update 2026-03-07 02:27:28 +03:00
50b26e47d8 readme update 2026-03-07 02:26:29 +03:00
fe535be6a6 readme update 2026-03-07 02:26:02 +03:00
41980be6dd readme update 2026-03-07 02:25:33 +03:00
9bc56b9aa3 logo update 2026-03-07 02:24:29 +03:00
e9ea3c3ebd README.md Güncelle 2026-03-06 23:22:12 +00:00
5d7665b82d README.md Güncelle 2026-03-06 23:21:20 +00:00
358dcc8b2c readme update 2026-03-07 02:20:52 +03:00
8367e4b63d logo update 2026-03-07 02:19:04 +03:00
0924959ff3 logo update 2026-03-07 02:17:30 +03:00
f22258de9b docs: keep logo-only header in README 2026-03-07 02:15:19 +03:00
204c0233f0 docs: add centered logo and styled project title in README 2026-03-07 02:06:08 +03:00
8b4b3c2a30 docs: remove README logo and revert to plain header 2026-03-07 01:47:53 +03:00
891082782d docs: add monochrome logo and embed it in README header 2026-03-07 01:45:36 +03:00
2dd0358163 docs: add repository and python badges to README 2026-03-07 01:42:53 +03:00
8a9ae4a175 docs: update setup and usage for packaged wscraper CLI 2026-03-07 01:40:40 +03:00
672f1d3281 chore: remove generated egg-info and ignore packaging artifacts 2026-03-07 01:40:37 +03:00
1ef7118ba7 refactor: migrate to src package layout and wscraper entry module 2026-03-07 01:40:18 +03:00
24 changed files with 2108 additions and 712 deletions

8
.gitignore vendored
View File

@@ -11,3 +11,11 @@ docs/brainstorms/*.md
# Local clone used during development; package install should be used instead
Scrapling/
# Packaging artifacts
*.egg-info/
# Packaging artifacts
*.egg-info/
.DS_Store

350
README.md
View File

@@ -1,67 +1,353 @@
<p align="center">
<img src="logo-v2.png" alt="wscraper logo" width="240" />
</p>
<p align="center">
<a href="https://gitea.wisecolt-panda.net/wisecolt/Bookmark-Tracker">
<img src="https://img.shields.io/badge/Gitea-Repository-609926?logo=gitea&logoColor=white" alt="Gitea">
</a>
<img src="https://img.shields.io/badge/Python-3.10%2B-3776AB?logo=python&logoColor=white" alt="Python">
<img src="https://img.shields.io/badge/Trackers-HappyFappy%20%7C%20PrivateHD-0A7B83" alt="Trackers">
<img src="https://img.shields.io/badge/Runtime-scrapling%20%2B%20Playwright-1f6feb" alt="Runtime">
</p>
# wscraper
HappyFappy için komutlar `wscraper.py` üzerinden çalışır. Proje ileride başka siteleri de destekleyecek şekilde yapılandırılmıştır.
`wscraper`, tracker bookmark / wishlist akışlarını ortak bir Python adapter katmanında toplayan çoklu tracker scraper paketidir. Bugünkü kullanım şekli iki parçalıdır:
## 1) Repo Clone
- `bin/wscraper/`: Python paketinin kendisi, tracker adapter'ları ve CLI
- `bin/wscraper-service/server.py`: `q-buffer` backend'in HTTP ile konuştuğu host-side servis
```bash
git clone <REPO_URL>
cd <REPO_FOLDER>
```
`q-buffer` watcher akışı artık `wscraper` CLI'yi doğrudan Docker içinde spawn etmek yerine, host makinede çalışan `wscraper-service` üzerinden kullanır. Bunun ana nedeni `scrapling + Playwright` zincirinin tracker tarafında daha stabil çalışmasının host ortamında olmasıdır.
## 2) Kurulum
## Desteklenen Tracker'lar
- `happyfappy` (`hf`)
- `privatehd` (`phd`)
Desteklenen ortak aksiyonlar:
- `get-bookmarks`
- `download-torrent-files`
- `remove-bookmark`
## Mimari
`wscraper` paket yapısı artık tracker-registry tabanlıdır:
- `src/wscraper/registry.py`
- desteklenen tracker adapter'larını kaydeder
- `src/wscraper/types.py`
- ortak `BookmarkItem`, `DownloadResult`, `TrackerAdapter` tiplerini tanımlar
- `src/wscraper/sites/happyfappy.py`
- HappyFappy adapter'ı
- `src/wscraper/sites/privatehd.py`
- PrivateHD adapter'ı
- `src/wscraper/cli.py`
- tüm tracker'lar için ortak CLI entrypoint
Bu sayede yeni tracker eklemek için mevcut CLI'yi kopyalamak yerine sadece yeni bir adapter yazmak yeterlidir.
## Kurulum
### macOS / Linux
```bash
cd bin/wscraper
python3.12 -m venv .venv
source .venv/bin/activate
python -m pip install -U pip
python -m pip install -r requirements.txt
python -m pip install -e .
scrapling install
```
### Windows (PowerShell)
Alternatif:
- `python3.12` yoksa `python3.11` veya `python3.10` kullan
- `scrapling install`, Playwright/browser bağımlılıklarını kurar
### Windows PowerShell
```powershell
cd bin/wscraper
py -3.12 -m venv .venv
.venv\Scripts\Activate.ps1
python -m pip install -U pip
python -m pip install -r requirements.txt
python -m pip install -e .
scrapling install
```
### Windows (CMD)
## CLI Kullanımı
```bat
py -3.12 -m venv .venv
.venv\Scripts\activate.bat
python -m pip install -U pip
python -m pip install -r requirements.txt
scrapling install
```
Not: Ortamı aktive ettikten sonra komutları `python ...` şeklinde çalıştırman yeterli, `.venv/bin/python` yazmak zorunda değilsin.
## 3) HappyFappy Komutları
### Bookmarks Çekme
Genel form:
```bash
python wscraper.py happyfappy --action get-bookmarks -c cookies.txt -o bookmarks.json
wscraper <tracker> --action <action> [opsiyonlar]
```
### Bookmark / Wishlist Çekme
```bash
wscraper happyfappy --action get-bookmarks -c cookies.txt -o bookmarks.json
wscraper privatehd --action get-bookmarks -c cookies.txt -o bookmarks.json
```
İsteğe bağlı `wishlist_url` override:
```bash
wscraper privatehd --action get-bookmarks -c cookies.txt --wishlist-url "https://privatehd.to/bookmarks" -o bookmarks.json
```
### Torrent Dosyası İndirme
```bash
python wscraper.py happyfappy --action download-torrent-files -u "https://www.happyfappy.net/torrents.php?id=110178" -c cookies.txt -o torrent
wscraper happyfappy --action download-torrent-files \
-c cookies.txt \
-u "https://www.happyfappy.net/torrents.php?id=110178" \
--title "Sample" \
--image-url "https://example.com/poster.jpg" \
-o torrent
```
## 4) Kısa Alias Kullanımı
```bash
# site alias: hf
# action alias: gb (get-bookmarks), dtf (download-torrent-files)
python wscraper.py hf -a gb -c cookies.txt -o bookmarks.json
python wscraper.py hf -a dtf -u "https://www.happyfappy.net/torrents.php?id=110178" -c cookies.txt -o torrent
wscraper privatehd --action download-torrent-files \
-c cookies.txt \
-u "https://privatehd.to/torrent/12345" \
--download-url "https://privatehd.to/download.php?id=12345" \
--title "Sample" \
-o torrent
```
### Bookmark Silme
```bash
wscraper happyfappy --action remove-bookmark \
-c cookies.txt \
-u "https://www.happyfappy.net/torrents.php?id=110178" \
--title "Sample"
```
```bash
wscraper privatehd --action remove-bookmark \
-c cookies.txt \
-u "https://privatehd.to/torrent/12345" \
--remove-token "bookmark-delete-token" \
--title "Sample"
```
### Kısa Alias'lar
```bash
wscraper hf -a gb -c cookies.txt -o bookmarks.json
wscraper phd -a gb -c cookies.txt -o bookmarks.json
wscraper hf -a dtf -c cookies.txt -u "https://www.happyfappy.net/torrents.php?id=110178" -o torrent
wscraper phd -a rb -c cookies.txt -u "https://privatehd.to/torrent/12345" --remove-token "token"
```
## q-buffer ile Entegrasyon
`q-buffer` içinde watcher tarafı artık şu şekilde çalışır:
1. UI'dan watcher tanımlanır
2. `q-buffer` backend cookie'yi şifreli saklar
3. Docker içindeki `server`, host'taki `wscraper-service`e HTTP çağrısı yapar
4. `wscraper-service`, `wscraper` adapter'ı ile bookmarkları çeker
5. yeni bookmark için torrent dosyasını indirir
6. `q-buffer` backend `.torrent` içeriğini alır ve qBittorrent'e yollar
7. başarılı import sonrası bookmark tracker tarafında kaldırılır
Bu yüzden `wscraper` tek başına bir CLI olmanın ötesinde artık `q-buffer watcher` entegrasyonunun backend scraping motorudur.
## wscraper-service API
`bin/wscraper-service/server.py` şu endpoint'leri sunar:
- `GET /health`
- `GET /trackers`
- `POST /bookmarks`
- `POST /download`
- `POST /remove-bookmark`
Örnek `POST /bookmarks` payload:
```json
{
"tracker": "happyfappy",
"cookie": "raw-cookie",
"wishlistUrl": "optional-override"
}
```
Örnek `POST /download` payload:
```json
{
"tracker": "privatehd",
"cookie": "raw-cookie",
"wishlistUrl": "optional-override",
"item": {
"pageURL": "https://privatehd.to/torrent/12345",
"title": "Example",
"downloadURL": "https://privatehd.to/download.php?id=12345",
"removeToken": "bookmark-token"
}
}
```
## q-buffer İlk Kurulumunda wscraper Nasıl Hazırlanır?
Bu adım özellikle önemlidir. `q-buffer` reposunda scraping logic `bin/wscraper/` içindedir. Host servis ise `bin/wscraper-service/server.py` dosyasıdır. Yani:
- `bin/wscraper/` yalnızca `server.py` değildir
- `server.py`, ayrı `wscraper-service` klasöründedir
- asıl tracker kodları `bin/wscraper/src/wscraper/...` altındadır
İlk kurulum için önerilen yol:
1. repo root'ta `.env.example` dosyasını `.env` olarak kopyala
2. `.env` içinde watcher servis ayarlarını gözden geçir:
- `WSCRAPER_SERVICE_BASE_URL`
- `WSCRAPER_SERVICE_TOKEN`
- `WSCRAPER_SERVICE_HOST`
- `WSCRAPER_SERVICE_PORT`
- `WSCRAPER_SERVICE_PYTHON_BIN`
3. repo root'ta şu komutu çalıştır:
```bash
./scripts/bootstrap.sh --dev-mode
```
Bu script:
- Docker `web` ve `server` servislerini `up --build` ile kaldırır
- host'ta `.runtime/wscraper-service/.venv` oluşturur
- `scrapling[fetchers]` kurar
- `scrapling install` çalıştırır
- `bin/wscraper-service/server.py` servis sürecini başlatır
Kurulum daha önce tamamsa script aynı işlemleri baştan yapmaz; sadece eksikleri tamamlar.
## Testler
`wscraper` içinde canlı sistemlere karşı çalışan `pytest` tabanlı e2e testleri vardır. Bunlar varsayılan olarak kapalıdır; yalnızca açıkça etkinleştirildiğinde çalışırlar.
Test dosyaları:
- `tests/e2e/test_happyfappy_live.py`
- `tests/e2e/test_privatehd_live.py`
- `tests/e2e/_helpers.py`
### Testleri Etkinleştirme
Tüm live testler için:
```bash
export WSCRAPER_E2E=1
```
Bu değişken yoksa veya `1` değilse, e2e testleri `skip` olur.
### HappyFappy Live Testleri
Mevcut test kapsamı:
- `get-bookmarks`
- `download-torrent-files`
Kullanılan env değişkenleri:
- `WSCRAPER_COOKIE_FILE`
- `WSCRAPER_TEST_TORRENT_URL`
Örnek:
```bash
export WSCRAPER_E2E=1
export WSCRAPER_COOKIE_FILE=/absolute/path/to/happyfappy-cookies.txt
export WSCRAPER_TEST_TORRENT_URL="https://www.happyfappy.net/torrents.php?id=110178"
pytest tests/e2e/test_happyfappy_live.py -m e2e -s
```
### PrivateHD Live Testleri
PrivateHD için eklenen test kapsamı:
- `get-bookmarks`
- `download-torrent-files`
- `remove-bookmark`
Kullanılan env değişkenleri:
- `WSCRAPER_PRIVATEHD_COOKIE_FILE`
- `WSCRAPER_PRIVATEHD_WISHLIST_URL`
- `WSCRAPER_PRIVATEHD_TEST_TORRENT_URL`
- `WSCRAPER_PRIVATEHD_TEST_DOWNLOAD_URL`
- `WSCRAPER_PRIVATEHD_TEST_REMOVE_URL`
- `WSCRAPER_PRIVATEHD_TEST_REMOVE_TOKEN`
Fallback kuralı:
- `WSCRAPER_PRIVATEHD_COOKIE_FILE` yoksa `WSCRAPER_COOKIE_FILE` kullanılır
Örnek:
```bash
export WSCRAPER_E2E=1
export WSCRAPER_PRIVATEHD_COOKIE_FILE=/absolute/path/to/privatehd-cookies.txt
export WSCRAPER_PRIVATEHD_WISHLIST_URL="https://privatehd.to/profile/blackdockers/wishlist"
export WSCRAPER_PRIVATEHD_TEST_TORRENT_URL="https://privatehd.to/torrent/12345-example"
export WSCRAPER_PRIVATEHD_TEST_DOWNLOAD_URL="https://privatehd.to/download/torrent/12345.example.torrent"
pytest tests/e2e/test_privatehd_live.py -m e2e -s
```
### remove-bookmark Testi Hakkında
`PrivateHD remove-bookmark` testi gerçek wishlist kaydını sildiği için özellikle dikkatli kullanılmalıdır.
Bu test:
- yalnızca `WSCRAPER_PRIVATEHD_TEST_REMOVE_URL` ve `WSCRAPER_PRIVATEHD_TEST_REMOVE_TOKEN` verilirse çalışır
- aksi halde güvenli şekilde `skip` olur
Örnek:
```bash
export WSCRAPER_E2E=1
export WSCRAPER_PRIVATEHD_COOKIE_FILE=/absolute/path/to/privatehd-cookies.txt
export WSCRAPER_PRIVATEHD_WISHLIST_URL="https://privatehd.to/profile/blackdockers/wishlist"
export WSCRAPER_PRIVATEHD_TEST_REMOVE_URL="https://privatehd.to/torrent/12345-example"
export WSCRAPER_PRIVATEHD_TEST_REMOVE_TOKEN="467471"
pytest tests/e2e/test_privatehd_live.py -m e2e -s -k remove
```
### Notlar
- Bu testler gerçek tracker hesaplarına ve geçerli cookie'lere ihtiyaç duyar
- `remove-bookmark` testi mutasyon yapar; test datası bilinçli seçilmelidir
- `tests/e2e/_helpers.py`, tüm tracker live testlerinde ortak CLI çalıştırma ve loglama yardımcılarını içerir
## Dizin Yapısı
```text
bin/
├── wscraper/
│ ├── README.md
│ ├── pyproject.toml
│ ├── setup.py
│ └── src/
│ └── wscraper/
│ ├── cli.py
│ ├── registry.py
│ ├── types.py
│ └── sites/
│ ├── happyfappy.py
│ └── privatehd.py
└── wscraper-service/
└── server.py
```
## Notlar
- Cookie hem raw string hem Netscape cookie file formatında verilebilir
- Tracker metadata alanları (`backgroundImage`, `downloadURL`, `removeToken`, `size`, `seeders`, `leechers`) adapter tarafından normalize edilir
- `q-buffer` tarafında image proxy, watcher item cache ve qBittorrent enrichment katmanları bu scraper çıktısını kullanır

View File

@@ -1,292 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import time
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from scrapling.fetchers import DynamicSession
def _domain_matches(target_host: str, cookie_domain: str) -> bool:
cd = cookie_domain.lstrip(".").lower()
th = target_host.lower()
return th == cd or th.endswith("." + cd)
def parse_cookie_string(cookie_string: str, target_host: str) -> dict[str, str]:
cookies: dict[str, str] = {}
lines = cookie_string.splitlines()
looks_like_netscape = len(lines) > 1 and any("\t" in line for line in lines)
if looks_like_netscape:
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) < 7:
continue
domain, _flag, _path, _secure, _expires, name, value = parts[:7]
if not _domain_matches(target_host, domain):
continue
if name:
cookies[name] = value
return cookies
for chunk in cookie_string.split(";"):
piece = chunk.strip()
if not piece or "=" not in piece:
continue
key, value = piece.split("=", 1)
key = key.strip()
value = value.strip()
if key:
cookies[key] = value
return cookies
def parse_cookies_for_playwright(
cookie_string: str, target_host: str, base_url: str
) -> list[dict[str, Any]]:
lines = cookie_string.splitlines()
cookies: list[dict[str, Any]] = []
looks_like_netscape = len(lines) > 1 and any("\t" in line for line in lines)
if looks_like_netscape:
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) < 7:
continue
domain, _flag, path, secure, expires, name, value = parts[:7]
if not _domain_matches(target_host, domain):
continue
if not name:
continue
cookie_obj: dict[str, Any] = {
"name": name,
"value": value,
"domain": domain.lstrip("."),
"path": path or "/",
"secure": (secure.upper() == "TRUE"),
}
if expires.isdigit():
exp_num = int(expires)
if exp_num > 0:
cookie_obj["expires"] = float(exp_num)
cookies.append(cookie_obj)
return cookies
kv = parse_cookie_string(cookie_string, target_host)
for name, value in kv.items():
cookies.append({"name": name, "value": value, "url": base_url})
return cookies
def absolute_url(base_url: str, href: str) -> str:
href = href.strip()
if href.startswith("http://") or href.startswith("https://"):
return href
if href.startswith("/"):
return f"{base_url.rstrip('/')}{href}"
return f"{base_url.rstrip('/')}/{href}"
def fetch_dynamic_with_retry(session: Any, url: str, retries: int, backoff_base: float) -> Any:
last_error: Exception | None = None
for attempt in range(retries):
try:
response = session.fetch(
url,
timeout=45_000,
load_dom=True,
network_idle=False,
google_search=False,
)
status = response.status
if status in (403, 404, 429) or status >= 500:
raise RuntimeError(f"HTTP {status}")
return response
except Exception as err: # noqa: BLE001
last_error = err
if attempt == retries - 1:
break
time.sleep(backoff_base * (2**attempt))
raise RuntimeError(f"Request failed for {url}: {last_error}") from last_error
def download_via_browser_with_retry(
session: DynamicSession, detail_url: str, retries: int, backoff_base: float
) -> tuple[str, bytes]:
last_error: Exception | None = None
for attempt in range(retries):
page = session.context.new_page()
try:
page.goto(detail_url, wait_until="domcontentloaded", timeout=45_000)
with page.expect_download(timeout=45_000) as download_info:
clicked = False
selectors = [
'span.torrent_buttons a[title*="Download"][href*="action=download"]',
'a.button.blueButton[href*="action=download"]',
'a[href*="action=download"][href*="torrent_pass"]',
]
for selector in selectors:
locator = page.locator(selector)
if locator.count() > 0:
locator.first.click()
clicked = True
break
if not clicked:
locator = page.locator(
"xpath=//a[contains(translate(normalize-space(string(.)),'abcdefghijklmnopqrstuvwxyz','ABCDEFGHIJKLMNOPQRSTUVWXYZ'),'DOWNLOAD') and contains(@href,'action=download')]"
)
if locator.count() > 0:
locator.first.click()
clicked = True
if not clicked:
raise RuntimeError("Download button not found in interactive page.")
download = download_info.value
temp_path = download.path()
if not temp_path:
raise RuntimeError("Downloaded file path is empty.")
data = Path(temp_path).read_bytes()
filename = (download.suggested_filename or "downloaded.torrent").strip()
if not filename:
filename = "downloaded.torrent"
return filename, data
except Exception as err: # noqa: BLE001
last_error = err
if attempt == retries - 1:
break
time.sleep(backoff_base * (2**attempt))
finally:
page.close()
raise RuntimeError(f"Request failed for {detail_url}: {last_error}") from last_error
def find_download_link(response: Any) -> str:
selectors = [
'span.torrent_buttons a[title*="Download"][href*="action=download"]::attr(href)',
'a.button.blueButton[href*="action=download"]::attr(href)',
'a[href*="action=download"][href*="torrent_pass"]::attr(href)',
]
for sel in selectors:
href = (response.css(sel).get("") or "").strip()
if href:
return href
# Fallback using text match if classes/attributes drift
href = (
response.xpath(
"//a[contains(translate(normalize-space(string(.)),"
"'abcdefghijklmnopqrstuvwxyz','ABCDEFGHIJKLMNOPQRSTUVWXYZ'),'DOWNLOAD') and contains(@href,'action=download')]/@href"
).get("")
or ""
).strip()
return href
def normalize_filename(filename: str, download_url: str) -> str:
safe_name = Path(filename).name.strip()
if safe_name:
return safe_name if safe_name.lower().endswith(".torrent") else f"{safe_name}.torrent"
from_url = Path(urlparse(download_url).path).name.strip()
if from_url:
return from_url if from_url.lower().endswith(".torrent") else f"{from_url}.torrent"
return "downloaded.torrent"
def looks_like_torrent_bytes(data: bytes) -> bool:
# Basic bencode sanity check for torrent files
return bool(data) and data.startswith(b"d") and (b"4:info" in data[:4096])
def validate_torrent_response(download_url: str, filename: str, data: bytes) -> None:
good_ext = filename.lower().endswith(".torrent") or urlparse(download_url).path.lower().endswith(".torrent")
if not good_ext:
raise RuntimeError("Downloaded content has no .torrent extension in URL/filename.")
if not looks_like_torrent_bytes(data):
raise RuntimeError("Downloaded file failed torrent bencode check.")
def run(args: argparse.Namespace) -> None:
base_url = args.base_url.rstrip("/")
target_host = urlparse(base_url).hostname or "www.happyfappy.net"
cookie_value = args.cookie or ""
if not cookie_value and args.cookie_file:
cookie_value = Path(args.cookie_file).read_text(encoding="utf-8").strip()
if not cookie_value:
raise ValueError("Cookie is required. Use --cookie or --cookie-file.")
cookies = parse_cookie_string(cookie_value, target_host=target_host)
if not cookies:
raise ValueError("No valid cookies parsed for target host.")
pw_cookies = parse_cookies_for_playwright(cookie_value, target_host=target_host, base_url=base_url)
if not pw_cookies:
raise ValueError("No Playwright-compatible cookies generated for target host.")
output_dir = Path(args.output_dir).resolve()
output_dir.mkdir(parents=True, exist_ok=True)
with DynamicSession(
headless=True,
disable_resources=True,
cookies=pw_cookies,
google_search=False,
retries=1,
retry_delay=1,
) as session:
detail_response = fetch_dynamic_with_retry(
session, args.url, retries=args.retries, backoff_base=args.backoff_base
)
href = find_download_link(detail_response)
if not href:
raise RuntimeError("Download link not found on page.")
download_url = absolute_url(base_url, href)
suggested_filename, data = download_via_browser_with_retry(
session, args.url, retries=args.retries, backoff_base=args.backoff_base
)
filename = normalize_filename(suggested_filename, download_url)
validate_torrent_response(download_url, filename, data)
output_path = output_dir / filename
output_path.write_bytes(data) # overwrite behavior by design
print(f"Saved torrent to {output_path}")
def make_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Download a torrent file from a single HappyFappy torrent detail page URL.",
)
parser.add_argument("--url", required=True, help="Torrent detail page URL")
parser.add_argument("--base-url", default="https://www.happyfappy.net")
parser.add_argument("--cookie", help='Raw cookie string, e.g. "a=1; b=2"')
parser.add_argument("--cookie-file", help="Path to cookie file")
parser.add_argument("--output-dir", default="torrent")
parser.add_argument("--retries", type=int, default=3)
parser.add_argument("--backoff-base", type=float, default=5.0)
return parser
def main() -> None:
parser = make_parser()
args = parser.parse_args()
if args.retries < 1:
raise ValueError("--retries must be at least 1.")
if args.backoff_base < 0:
raise ValueError("--backoff-base must be >= 0.")
run(args)
if __name__ == "__main__":
main()

BIN
logo-v2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 127 KiB

32
pyproject.toml Normal file
View File

@@ -0,0 +1,32 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "wscraper"
version = "0.1.0"
description = "Multi-site scraper CLI"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"scrapling[fetchers]==0.4.1",
]
[project.optional-dependencies]
test = [
"pytest>=8.0",
]
[project.scripts]
wscraper = "wscraper.cli:main"
[tool.setuptools]
package-dir = {"" = "src"}
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
markers = [
"e2e: live end-to-end tests against external services",
]

View File

@@ -1 +1 @@
scrapling[fetchers]==0.4.1
-e .

View File

@@ -1,256 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import random
import re
import time
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from scrapling.fetchers import DynamicSession
STOP_TEXT = "You have not bookmarked any torrents."
BG_URL_RE = re.compile(r"url\((?:'|\")?(.*?)(?:'|\")?\)")
def _domain_matches(target_host: str, cookie_domain: str) -> bool:
cd = cookie_domain.lstrip(".").lower()
th = target_host.lower()
return th == cd or th.endswith("." + cd)
def parse_cookie_string(cookie_string: str, target_host: str) -> dict[str, str]:
"""
Supports:
1) "key=value; key2=value2" cookie header style
2) Netscape cookie file format (tab-separated 7 columns)
"""
cookies: dict[str, str] = {}
lines = cookie_string.splitlines()
looks_like_netscape = len(lines) > 1 and any("\t" in line for line in lines)
if looks_like_netscape:
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) < 7:
continue
domain, _flag, _path, _secure, _expires, name, value = parts[:7]
if not _domain_matches(target_host, domain):
continue
if name:
cookies[name] = value
return cookies
for chunk in cookie_string.split(";"):
piece = chunk.strip()
if not piece or "=" not in piece:
continue
key, value = piece.split("=", 1)
key = key.strip()
value = value.strip()
if key:
cookies[key] = value
return cookies
def parse_cookies_for_playwright(
cookie_string: str, target_host: str, base_url: str
) -> list[dict[str, Any]]:
"""
Converts cookie input into Playwright-compatible cookie objects.
"""
lines = cookie_string.splitlines()
cookies: list[dict[str, Any]] = []
looks_like_netscape = len(lines) > 1 and any("\t" in line for line in lines)
if looks_like_netscape:
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) < 7:
continue
domain, _flag, path, secure, expires, name, value = parts[:7]
if not _domain_matches(target_host, domain):
continue
if not name:
continue
cookie_obj: dict[str, Any] = {
"name": name,
"value": value,
"domain": domain.lstrip("."),
"path": path or "/",
"secure": (secure.upper() == "TRUE"),
}
if expires.isdigit():
exp_num = int(expires)
if exp_num > 0:
cookie_obj["expires"] = float(exp_num)
cookies.append(cookie_obj)
return cookies
kv = parse_cookie_string(cookie_string, target_host)
for name, value in kv.items():
cookies.append({"name": name, "value": value, "url": base_url})
return cookies
def extract_background_image(style: str) -> str | None:
if not style:
return None
match = BG_URL_RE.search(style)
if not match:
return None
value = match.group(1).strip()
return value or None
def extract_torrent_cards(response: Any, base_url: str) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
cards = response.css("div.torrent_grid div.torrent_grid__torrent")
for card in cards:
page_url = (card.css('a[href^="/torrents.php?id="]::attr(href)').get("") or "").strip()
if page_url and not page_url.startswith("http"):
page_url = f"{base_url.rstrip('/')}{page_url}"
category = (card.css("span.torrent_grid__torrent__cat::text").get("") or "").strip()
title = (
card.css("h3.trim::attr(title)").get("")
or card.css("h3.trim::text").get("")
or ""
).strip()
style = (card.css("div.torrent__cover::attr(style)").get("") or "").strip()
background_image = extract_background_image(style)
records.append(
{
"pageURL": page_url,
"isVR": category == "VR",
"title": title,
"backgroundImage": background_image,
}
)
return records
def should_stop(response: Any) -> bool:
body_text = response.body.decode(response.encoding or "utf-8", errors="ignore")
return STOP_TEXT in body_text
def fetch_page(session: Any, url: str, retries: int, backoff_base: float) -> Any:
last_error: Exception | None = None
for attempt in range(retries):
try:
response = session.fetch(
url,
timeout=45_000,
load_dom=True,
network_idle=False,
)
status = response.status
if status in (403, 429) or status >= 500:
raise RuntimeError(f"HTTP {status}")
return response
except Exception as err: # noqa: BLE001
last_error = err
if attempt == retries - 1:
break
sleep_seconds = backoff_base * (2**attempt) + random.uniform(0.0, 0.7)
time.sleep(sleep_seconds)
raise RuntimeError(f"Request failed for {url}: {last_error}") from last_error
def build_bookmarks_url(base_url: str, page: int) -> str:
if page == 1:
return f"{base_url}/bookmarks.php?type=torrents"
return f"{base_url}/bookmarks.php?page={page}&type=torrents#torrent_table"
def run(args: argparse.Namespace) -> None:
target_host = urlparse(args.base_url).hostname or "www.happyfappy.net"
cookie_value = args.cookie or ""
if not cookie_value and args.cookie_file:
cookie_value = Path(args.cookie_file).read_text(encoding="utf-8").strip()
if not cookie_value:
raise ValueError("Cookie is required. Use --cookie or --cookie-file.")
cookies = parse_cookie_string(cookie_value, target_host=target_host)
if not cookies:
raise ValueError("No valid cookies parsed for target host. Check cookie content.")
pw_cookies = parse_cookies_for_playwright(
cookie_value, target_host=target_host, base_url=args.base_url.rstrip("/")
)
if not pw_cookies:
raise ValueError("No Playwright-compatible cookies generated for target host.")
all_records: list[dict[str, Any]] = []
with DynamicSession(
headless=True,
disable_resources=True,
cookies=pw_cookies,
google_search=False,
retries=1,
retry_delay=1,
) as session:
page = 1
while page <= args.max_pages:
if page > 1:
time.sleep(random.uniform(args.delay_min, args.delay_max))
url = build_bookmarks_url(args.base_url.rstrip("/"), page)
response = fetch_page(session, url, retries=args.retries, backoff_base=args.backoff_base)
if should_stop(response):
break
page_records = extract_torrent_cards(response, args.base_url)
all_records.extend(page_records)
print(f"[page={page}] extracted={len(page_records)} total={len(all_records)}")
page += 1
output_path = Path(args.output).resolve()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(all_records, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"Saved {len(all_records)} records to {output_path}")
def make_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Scrape HappyFappy torrent bookmarks using an authenticated cookie.",
)
parser.add_argument("--base-url", default="https://www.happyfappy.net")
parser.add_argument("--cookie", help='Raw cookie string, e.g. "a=1; b=2"')
parser.add_argument("--cookie-file", help="Path to a text file containing raw cookie string")
parser.add_argument("--output", default="bookmarks.json")
parser.add_argument("--delay-min", type=float, default=1.8, help="Minimum delay between page requests")
parser.add_argument("--delay-max", type=float, default=3.2, help="Maximum delay between page requests")
parser.add_argument("--retries", type=int, default=3, help="Retries per page request")
parser.add_argument("--backoff-base", type=float, default=5.0, help="Backoff base seconds")
parser.add_argument("--max-pages", type=int, default=200, help="Safety cap for pagination loop")
return parser
def main() -> None:
parser = make_parser()
args = parser.parse_args()
if args.delay_min < 0 or args.delay_max < 0:
raise ValueError("Delay values must be non-negative.")
if args.delay_min > args.delay_max:
raise ValueError("--delay-min cannot be greater than --delay-max.")
if args.retries < 1:
raise ValueError("--retries must be at least 1.")
run(args)
if __name__ == "__main__":
main()

3
setup.py Normal file
View File

@@ -0,0 +1,3 @@
from setuptools import setup
setup()

3
src/wscraper/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
__all__ = ["__version__"]
__version__ = "0.1.0"

5
src/wscraper/__main__.py Normal file
View File

@@ -0,0 +1,5 @@
from wscraper.cli import main
if __name__ == "__main__":
main()

124
src/wscraper/cli.py Normal file
View File

@@ -0,0 +1,124 @@
from __future__ import annotations
import argparse
import json
from pathlib import Path
from wscraper.registry import get_tracker, list_trackers, normalize_tracker
ACTION_ALIASES = {
"get-bookmarks": "get-bookmarks",
"gb": "get-bookmarks",
"bookmarks": "get-bookmarks",
"download-torrent-files": "download-torrent-files",
"dtf": "download-torrent-files",
"download": "download-torrent-files",
"remove-bookmark": "remove-bookmark",
"remove": "remove-bookmark",
"rb": "remove-bookmark",
}
def normalize_action(value: str) -> str:
key = value.strip().lower()
if key not in ACTION_ALIASES:
supported = ", ".join(sorted(ACTION_ALIASES))
raise ValueError(f"Unsupported action: {value!r}. Supported values: {supported}")
return ACTION_ALIASES[key]
def build_parser() -> argparse.ArgumentParser:
supported_sites = ", ".join(sorted({tracker.key for tracker in list_trackers()}))
parser = argparse.ArgumentParser(description=f"wscraper: multi-site scraping entrypoint ({supported_sites})")
parser.add_argument("site", help="Site key, e.g. happyfappy, hf, privatehd or phd")
parser.add_argument("-a", "--action", required=True, help="Action to run")
parser.add_argument("--cookie", help='Raw cookie string, e.g. "a=1; b=2"')
parser.add_argument("-c", "--cookie-file", help="Path to cookie file")
parser.add_argument("--wishlist-url", help="Tracker-specific wishlist URL override")
parser.add_argument("-u", "--url", help="Detail page URL")
parser.add_argument("--download-url", help="Direct torrent download URL")
parser.add_argument("--remove-token", help="Tracker-specific remove token")
parser.add_argument("--title", help="Item title")
parser.add_argument("--image-url", help="Background image URL")
parser.add_argument("--size", help="Torrent size text")
parser.add_argument("--seeders", type=int, help="Seeders count")
parser.add_argument("--leechers", type=int, help="Leechers count")
parser.add_argument(
"-o",
"--output",
help="Output target: file path for get-bookmarks, directory path for download-torrent-files",
)
return parser
def read_cookie(args: argparse.Namespace) -> str:
cookie_value = args.cookie or ""
if not cookie_value and args.cookie_file:
cookie_value = Path(args.cookie_file).read_text(encoding="utf-8").strip()
if not cookie_value:
raise ValueError("Cookie is required. Use --cookie or --cookie-file/-c.")
return cookie_value
def build_item(args: argparse.Namespace) -> dict[str, object]:
if not args.url:
raise ValueError("--url is required for item-based actions.")
item: dict[str, object] = {
"pageURL": args.url,
"title": args.title or "",
}
if args.download_url:
item["downloadURL"] = args.download_url
if args.remove_token:
item["removeToken"] = args.remove_token
if args.image_url:
item["backgroundImage"] = args.image_url
if args.size:
item["size"] = args.size
if args.seeders is not None:
item["seeders"] = args.seeders
if args.leechers is not None:
item["leechers"] = args.leechers
return item
def main() -> None:
parser = build_parser()
args = parser.parse_args()
tracker = get_tracker(normalize_tracker(args.site))
action = normalize_action(args.action)
cookie = read_cookie(args)
if action == "get-bookmarks":
items = tracker.get_bookmarks(cookie, wishlist_url=args.wishlist_url)
output_path = Path(args.output or "bookmarks.json").resolve()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(items, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"Saved {len(items)} records to {output_path}")
return
item = build_item(args)
if action == "download-torrent-files":
result = tracker.download_torrent(cookie, item, wishlist_url=args.wishlist_url)
output_dir = Path(args.output or "torrent").resolve()
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / result["filename"]
output_path.write_bytes(result["data"])
print(f"Saved torrent to {output_path}")
return
if action == "remove-bookmark":
tracker.remove_bookmark(cookie, item, wishlist_url=args.wishlist_url)
print("Bookmark removed successfully.")
return
raise ValueError(f"Unsupported action: {action}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1 @@
__all__ = []

View File

@@ -0,0 +1,91 @@
from __future__ import annotations
from typing import Any
def domain_matches(target_host: str, cookie_domain: str) -> bool:
normalized_cookie_domain = cookie_domain.replace("#HttpOnly_", "").lstrip(".").lower()
normalized_target_host = target_host.lower()
return (
normalized_target_host == normalized_cookie_domain
or normalized_target_host.endswith("." + normalized_cookie_domain)
)
def parse_cookie_string(cookie_string: str, target_host: str) -> dict[str, str]:
cookies: dict[str, str] = {}
lines = cookie_string.splitlines()
looks_like_netscape = len(lines) > 1 and any(
("\t" in line) or len(line.split()) >= 7 for line in lines if line.strip()
)
if looks_like_netscape:
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t") if "\t" in line else line.split()
if len(parts) < 7:
continue
domain, _flag, _path, _secure, _expires, name, value = parts[:7]
if not domain_matches(target_host, domain):
continue
if name:
cookies[name] = value
return cookies
for chunk in cookie_string.split(";"):
piece = chunk.strip()
if not piece or "=" not in piece:
continue
key, value = piece.split("=", 1)
key = key.strip()
value = value.strip()
if key:
cookies[key] = value
return cookies
def parse_cookies_for_playwright(
cookie_string: str,
target_host: str,
base_url: str,
) -> list[dict[str, Any]]:
lines = cookie_string.splitlines()
cookies: list[dict[str, Any]] = []
looks_like_netscape = len(lines) > 1 and any(
("\t" in line) or len(line.split()) >= 7 for line in lines if line.strip()
)
if looks_like_netscape:
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t") if "\t" in line else line.split()
if len(parts) < 7:
continue
domain, _flag, path, secure, expires, name, value = parts[:7]
if not domain_matches(target_host, domain):
continue
if not name:
continue
cookie_obj: dict[str, Any] = {
"name": name,
"value": value,
"domain": domain.replace("#HttpOnly_", "").lstrip("."),
"path": path or "/",
"secure": (secure.upper() == "TRUE"),
}
if expires.isdigit():
expires_number = int(expires)
if expires_number > 0:
cookie_obj["expires"] = float(expires_number)
cookies.append(cookie_obj)
return cookies
kv = parse_cookie_string(cookie_string, target_host)
for name, value in kv.items():
cookies.append({"name": name, "value": value, "url": base_url})
return cookies

View File

@@ -0,0 +1,78 @@
from __future__ import annotations
import re
import socket
import time
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
def absolute_url(base_url: str, href: str) -> str:
href = href.strip()
if href.startswith("http://") or href.startswith("https://"):
return href
if href.startswith("/"):
return f"{base_url.rstrip('/')}{href}"
return f"{base_url.rstrip('/')}/{href}"
def ensure_hosts_entry(host: str) -> None:
try:
ip = socket.gethostbyname(host)
except OSError:
return
hosts_path = Path("/etc/hosts")
try:
current = hosts_path.read_text(encoding="utf-8")
except OSError:
return
if re.search(rf"(^|\s){re.escape(host)}(\s|$)", current, flags=re.MULTILINE):
return
try:
with hosts_path.open("a", encoding="utf-8") as handle:
handle.write(f"\n{ip} {host}\n")
except OSError:
return
def ensure_tracker_hosts(base_url: str) -> None:
parsed = urlparse(base_url)
host = parsed.hostname
if not host:
return
variants = {host}
if host.startswith("www."):
variants.add(host[4:])
else:
variants.add(f"www.{host}")
for candidate in variants:
ensure_hosts_entry(candidate)
def fetch_dynamic_with_retry(session: Any, url: str, retries: int, backoff_base: float) -> Any:
last_error: Exception | None = None
for attempt in range(retries):
try:
response = session.fetch(
url,
timeout=45_000,
load_dom=True,
network_idle=False,
google_search=False,
)
status = response.status
if status in (403, 404, 429) or status >= 500:
raise RuntimeError(f"HTTP {status}")
return response
except Exception as err: # noqa: BLE001
last_error = err
if attempt == retries - 1:
break
time.sleep(backoff_base * (2**attempt))
raise RuntimeError(f"Request failed for {url}: {last_error}") from last_error

34
src/wscraper/registry.py Normal file
View File

@@ -0,0 +1,34 @@
from __future__ import annotations
from wscraper.sites.happyfappy import adapter as happyfappy_adapter
from wscraper.sites.privatehd import adapter as privatehd_adapter
from wscraper.types import TrackerAdapter, TrackerInfo
TRACKERS: dict[str, TrackerAdapter] = {
happyfappy_adapter.key: happyfappy_adapter,
privatehd_adapter.key: privatehd_adapter,
}
TRACKER_ALIASES = {
"hf": "happyfappy",
"happyfappy": "happyfappy",
"phd": "privatehd",
"privatehd": "privatehd",
}
def normalize_tracker(value: str) -> str:
key = value.strip().lower()
if key not in TRACKER_ALIASES:
supported = ", ".join(sorted(TRACKER_ALIASES))
raise ValueError(f"Unsupported tracker: {value!r}. Supported values: {supported}")
return TRACKER_ALIASES[key]
def get_tracker(value: str) -> TrackerAdapter:
normalized = normalize_tracker(value)
return TRACKERS[normalized]
def list_trackers() -> list[TrackerInfo]:
return [TrackerInfo(key=tracker.key, label=tracker.label) for tracker in TRACKERS.values()]

View File

@@ -0,0 +1 @@
__all__ = ["happyfappy"]

View File

@@ -0,0 +1,652 @@
from __future__ import annotations
import argparse
import json
import random
import re
import tempfile
import time
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from scrapling.fetchers import DynamicSession
from wscraper.common.cookies import parse_cookie_string, parse_cookies_for_playwright
from wscraper.common.net import absolute_url, ensure_tracker_hosts, fetch_dynamic_with_retry
from wscraper.types import BookmarkItem, DownloadResult
STOP_TEXT = "You have not bookmarked any torrents."
BG_URL_RE = re.compile(r"url\((?:'|\")?(.*?)(?:'|\")?\)")
# bookmarks
def extract_background_image(style: str) -> str | None:
if not style:
return None
match = BG_URL_RE.search(style)
if not match:
return None
value = match.group(1).strip()
return value or None
def extract_torrent_cards(response: Any, base_url: str) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
cards = response.css("div.torrent_grid div.torrent_grid__torrent")
for card in cards:
page_url = (card.css('a[href^="/torrents.php?id="]::attr(href)').get("") or "").strip()
if page_url and not page_url.startswith("http"):
page_url = f"{base_url.rstrip('/')}{page_url}"
category = (card.css("span.torrent_grid__torrent__cat::text").get("") or "").strip()
title = (card.css("h3.trim::attr(title)").get("") or card.css("h3.trim::text").get("") or "").strip()
style = (card.css("div.torrent__cover::attr(style)").get("") or "").strip()
background_image = extract_background_image(style)
records.append(
{
"pageURL": page_url,
"isVR": category == "VR",
"title": title,
"backgroundImage": background_image,
}
)
return records
def should_stop(response: Any) -> bool:
body_text = response.body.decode(response.encoding or "utf-8", errors="ignore")
return STOP_TEXT in body_text
def build_bookmarks_url(base_url: str, page: int) -> str:
if page == 1:
return f"{base_url}/bookmarks.php?type=torrents"
return f"{base_url}/bookmarks.php?page={page}&type=torrents#torrent_table"
def run_get_bookmarks(args: argparse.Namespace) -> None:
target_host = urlparse(args.base_url).hostname or "www.happyfappy.net"
ensure_tracker_hosts(args.base_url)
cookie_value = args.cookie or ""
if not cookie_value and args.cookie_file:
cookie_value = Path(args.cookie_file).read_text(encoding="utf-8").strip()
if not cookie_value:
raise ValueError("Cookie is required. Use --cookie or --cookie-file.")
cookies = parse_cookie_string(cookie_value, target_host=target_host)
if not cookies:
raise ValueError("No valid cookies parsed for target host. Check cookie content.")
pw_cookies = parse_cookies_for_playwright(cookie_value, target_host=target_host, base_url=args.base_url.rstrip("/"))
if not pw_cookies:
raise ValueError("No Playwright-compatible cookies generated for target host.")
all_records: list[dict[str, Any]] = []
with DynamicSession(
headless=True,
disable_resources=True,
cookies=pw_cookies,
google_search=False,
retries=1,
retry_delay=1,
) as session:
page = 1
while page <= args.max_pages:
if page > 1:
time.sleep(random.uniform(args.delay_min, args.delay_max))
url = build_bookmarks_url(args.base_url.rstrip("/"), page)
response = fetch_dynamic_with_retry(session, url, retries=args.retries, backoff_base=args.backoff_base)
if should_stop(response):
break
page_records = extract_torrent_cards(response, args.base_url)
all_records.extend(page_records)
print(f"[page={page}] extracted={len(page_records)} total={len(all_records)}")
page += 1
output_path = Path(args.output).resolve()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(all_records, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"Saved {len(all_records)} records to {output_path}")
# torrent
def download_via_browser_with_retry(session: DynamicSession, detail_url: str, retries: int, backoff_base: float) -> tuple[str, bytes]:
last_error: Exception | None = None
for attempt in range(retries):
page = session.context.new_page()
try:
page.goto(detail_url, wait_until="domcontentloaded", timeout=45_000)
with page.expect_download(timeout=45_000) as download_info:
clicked = False
selectors = [
'span.torrent_buttons a[title*="Download"][href*="action=download"]',
'a.button.blueButton[href*="action=download"]',
'a[href*="action=download"][href*="torrent_pass"]',
]
for selector in selectors:
locator = page.locator(selector)
if locator.count() > 0:
locator.first.click()
clicked = True
break
if not clicked:
locator = page.locator(
"xpath=//a[contains(translate(normalize-space(string(.)),'abcdefghijklmnopqrstuvwxyz','ABCDEFGHIJKLMNOPQRSTUVWXYZ'),'DOWNLOAD') and contains(@href,'action=download')]"
)
if locator.count() > 0:
locator.first.click()
clicked = True
if not clicked:
raise RuntimeError("Download button not found in interactive page.")
download = download_info.value
temp_path = download.path()
if not temp_path:
raise RuntimeError("Downloaded file path is empty.")
data = Path(temp_path).read_bytes()
filename = (download.suggested_filename or "downloaded.torrent").strip()
if not filename:
filename = "downloaded.torrent"
return filename, data
except Exception as err: # noqa: BLE001
last_error = err
if attempt == retries - 1:
break
time.sleep(backoff_base * (2**attempt))
finally:
page.close()
raise RuntimeError(f"Request failed for {detail_url}: {last_error}") from last_error
def find_download_link(response: Any) -> str:
selectors = [
'span.torrent_buttons a[title*="Download"][href*="action=download"]::attr(href)',
'a.button.blueButton[href*="action=download"]::attr(href)',
'a[href*="action=download"][href*="torrent_pass"]::attr(href)',
]
for sel in selectors:
href = (response.css(sel).get("") or "").strip()
if href:
return href
href = (
response.xpath(
"//a[contains(translate(normalize-space(string(.)),"
"'abcdefghijklmnopqrstuvwxyz','ABCDEFGHIJKLMNOPQRSTUVWXYZ'),'DOWNLOAD') and contains(@href,'action=download')]/@href"
).get("")
or ""
).strip()
return href
def normalize_filename(filename: str, download_url: str) -> str:
safe_name = Path(filename).name.strip()
if safe_name:
return safe_name if safe_name.lower().endswith(".torrent") else f"{safe_name}.torrent"
from_url = Path(urlparse(download_url).path).name.strip()
if from_url:
return from_url if from_url.lower().endswith(".torrent") else f"{from_url}.torrent"
return "downloaded.torrent"
def looks_like_torrent_bytes(data: bytes) -> bool:
return bool(data) and data.startswith(b"d") and (b"4:info" in data[:4096])
def validate_torrent_response(download_url: str, filename: str, data: bytes) -> None:
good_ext = filename.lower().endswith(".torrent") or urlparse(download_url).path.lower().endswith(".torrent")
if not good_ext:
raise RuntimeError("Downloaded content has no .torrent extension in URL/filename.")
if not looks_like_torrent_bytes(data):
raise RuntimeError("Downloaded file failed torrent bencode check.")
def run_download_torrent_files(args: argparse.Namespace) -> None:
base_url = args.base_url.rstrip("/")
target_host = urlparse(base_url).hostname or "www.happyfappy.net"
ensure_tracker_hosts(base_url)
cookie_value = args.cookie or ""
if not cookie_value and args.cookie_file:
cookie_value = Path(args.cookie_file).read_text(encoding="utf-8").strip()
if not cookie_value:
raise ValueError("Cookie is required. Use --cookie or --cookie-file.")
cookies = parse_cookie_string(cookie_value, target_host=target_host)
if not cookies:
raise ValueError("No valid cookies parsed for target host.")
pw_cookies = parse_cookies_for_playwright(cookie_value, target_host=target_host, base_url=base_url)
if not pw_cookies:
raise ValueError("No Playwright-compatible cookies generated for target host.")
output_dir = Path(args.output_dir).resolve()
output_dir.mkdir(parents=True, exist_ok=True)
with DynamicSession(
headless=True,
disable_resources=True,
cookies=pw_cookies,
google_search=False,
retries=1,
retry_delay=1,
) as session:
detail_response = fetch_dynamic_with_retry(session, args.url, retries=args.retries, backoff_base=args.backoff_base)
href = find_download_link(detail_response)
if not href:
raise RuntimeError("Download link not found on page.")
download_url = absolute_url(base_url, href)
suggested_filename, data = download_via_browser_with_retry(session, args.url, retries=args.retries, backoff_base=args.backoff_base)
filename = normalize_filename(suggested_filename, download_url)
validate_torrent_response(download_url, filename, data)
output_path = output_dir / filename
output_path.write_bytes(data)
print(f"Saved torrent to {output_path}")
if getattr(args, "rm_bookmark", False):
torrent_id = extract_torrent_id(args.url)
removed = remove_bookmark_with_retry(
session=session,
detail_url=args.url,
torrent_id=torrent_id,
retries=args.retries,
backoff_base=args.backoff_base,
)
if not removed:
raise RuntimeError("Torrent downloaded but bookmark removal could not be verified.")
print("Bookmark removed successfully.")
def extract_torrent_id(detail_url: str) -> str | None:
parsed = urlparse(detail_url)
query = parsed.query or ""
match = re.search(r"(?:^|&)id=(\d+)(?:&|$)", query)
if match:
return match.group(1)
path_match = re.search(r"/torrents\.php/(\d+)", parsed.path or "")
if path_match:
return path_match.group(1)
return None
def _click_remove_control(page: Any, torrent_id: str | None) -> dict[str, Any]:
return page.evaluate(
"""
({ torrentId }) => {
const normalize = (v) => (v || "").toString().toLowerCase();
const hasAny = (source, tokens) => tokens.some((t) => source.includes(t));
const removeTokens = ["unbookmark", "remove", "delete", "forget", "unmark"];
const addTokens = ["bookmark", "add", "mark", "save"];
const nodes = Array.from(document.querySelectorAll(
"a,button,[role='button'],[onclick],input[type='button'],input[type='submit']"
));
let best = null;
for (const node of nodes) {
const text = normalize(node.textContent || "");
const title = normalize(node.getAttribute("title"));
const aria = normalize(node.getAttribute("aria-label"));
const id = normalize(node.id);
const cls = normalize(node.className);
const href = normalize(node.getAttribute("href"));
const onclick = normalize(node.getAttribute("onclick"));
const attrs = normalize(node.outerHTML);
const all = [text, title, aria, id, cls, href, onclick, attrs].join(" ");
let score = 0;
const reasons = [];
if (hasAny(onclick, ["unbookmark"])) {
score += 60;
reasons.push("onclick:unbookmark");
}
if (hasAny(all, ["bookmark"])) {
score += 16;
reasons.push("bookmark-signal");
}
if (hasAny(all, removeTokens)) {
score += 26;
reasons.push("remove-signal");
}
if (!hasAny(all, removeTokens) && hasAny(all, addTokens)) {
score -= 20;
reasons.push("add-like-signal");
}
if (torrentId && all.includes(torrentId)) {
score += 30;
reasons.push("torrent-id");
}
if (hasAny(href, ["javascript", "#"])) {
score += 4;
}
if (!best || score > best.score) {
best = { node, score, reasons, snapshot: (node.outerHTML || "").slice(0, 220) };
}
}
if (!best || best.score < 20) {
return { clicked: false, score: best ? best.score : -1, reasons: best ? best.reasons : [], snapshot: best ? best.snapshot : "" };
}
best.node.click();
return { clicked: true, score: best.score, reasons: best.reasons, snapshot: best.snapshot };
}
""",
{"torrentId": torrent_id},
)
def _remove_control_exists(page: Any, torrent_id: str | None) -> bool:
return bool(
page.evaluate(
"""
({ torrentId }) => {
const normalize = (v) => (v || "").toString().toLowerCase();
const removeTokens = ["unbookmark", "remove", "delete", "forget", "unmark"];
const nodes = Array.from(document.querySelectorAll(
"a,button,[role='button'],[onclick],input[type='button'],input[type='submit']"
));
for (const node of nodes) {
const text = normalize(node.textContent || "");
const title = normalize(node.getAttribute("title"));
const aria = normalize(node.getAttribute("aria-label"));
const id = normalize(node.id);
const cls = normalize(node.className);
const href = normalize(node.getAttribute("href"));
const onclick = normalize(node.getAttribute("onclick"));
const all = [text, title, aria, id, cls, href, onclick].join(" ");
const looksLikeRemove = removeTokens.some((t) => all.includes(t)) || onclick.includes("unbookmark");
const matchesId = torrentId ? all.includes(torrentId) : true;
if (looksLikeRemove && matchesId) {
return true;
}
}
return false;
}
""",
{"torrentId": torrent_id},
)
)
def _bookmark_control_state(page: Any, torrent_id: str | None) -> dict[str, Any]:
return page.evaluate(
"""
({ torrentId }) => {
const normalize = (v) => (v || "").toString().toLowerCase();
const candidates = [];
if (torrentId) {
const direct = document.getElementById(`bookmarklink_torrent_${torrentId}`);
if (direct) candidates.push(direct);
}
const nodes = Array.from(document.querySelectorAll("a,button,[onclick],[id*='bookmark']"));
for (const node of nodes) {
if (!candidates.includes(node)) candidates.push(node);
}
const scored = [];
for (const node of candidates) {
const text = normalize(node.textContent || "");
const id = normalize(node.id);
const href = normalize(node.getAttribute("href"));
const onclick = normalize(node.getAttribute("onclick"));
const title = normalize(node.getAttribute("title"));
const all = [text, id, href, onclick, title].join(" ");
let score = 0;
if (all.includes("bookmark")) score += 12;
if (torrentId && all.includes(torrentId)) score += 28;
if (onclick.includes("unbookmark")) score += 45;
if (onclick.includes("bookmark(") && !onclick.includes("unbookmark")) score += 20;
if (id.includes("bookmarklink_torrent")) score += 35;
const action = onclick.includes("unbookmark")
? "remove"
: (onclick.includes("bookmark(") ? "add" : "unknown");
scored.push({
score,
action,
text,
id,
onclick,
snapshot: (node.outerHTML || "").slice(0, 220),
});
}
scored.sort((a, b) => b.score - a.score);
const best = scored[0] || null;
return { best, total: scored.length };
}
""",
{"torrentId": torrent_id},
)
def _click_bookmark_control(page: Any, torrent_id: str | None) -> dict[str, Any]:
return page.evaluate(
"""
({ torrentId }) => {
const normalize = (v) => (v || "").toString().toLowerCase();
let target = null;
if (torrentId) {
target = document.getElementById(`bookmarklink_torrent_${torrentId}`);
}
if (!target) {
const nodes = Array.from(document.querySelectorAll("a,button,[onclick],[id*='bookmark']"));
let best = null;
for (const node of nodes) {
const text = normalize(node.textContent || "");
const id = normalize(node.id);
const href = normalize(node.getAttribute("href"));
const onclick = normalize(node.getAttribute("onclick"));
const title = normalize(node.getAttribute("title"));
const all = [text, id, href, onclick, title].join(" ");
let score = 0;
if (all.includes("bookmark")) score += 12;
if (torrentId && all.includes(torrentId)) score += 28;
if (onclick.includes("unbookmark")) score += 45;
if (id.includes("bookmarklink_torrent")) score += 35;
if (!best || score > best.score) best = { node, score, all, onclick };
}
if (best) target = best.node;
}
if (!target) return { clicked: false, reason: "no-target" };
const onclick = normalize(target.getAttribute("onclick"));
if (onclick.includes("bookmark(") && !onclick.includes("unbookmark")) {
return { clicked: false, reason: "already-removed" };
}
target.click();
return { clicked: true, reason: "clicked", snapshot: (target.outerHTML || "").slice(0, 220) };
}
""",
{"torrentId": torrent_id},
)
def remove_bookmark_with_retry(
session: DynamicSession,
detail_url: str,
torrent_id: str | None,
retries: int,
backoff_base: float,
) -> bool:
last_error: Exception | None = None
for attempt in range(retries):
page = session.context.new_page()
try:
page.goto(detail_url, wait_until="domcontentloaded", timeout=45_000)
state_before = _bookmark_control_state(page, torrent_id)
best_before = (state_before or {}).get("best") or {}
action_before = best_before.get("action")
if action_before == "add":
return True
if action_before != "remove":
raise RuntimeError("Bookmark remove control not detected on detail page.")
click_result = _click_bookmark_control(page, torrent_id)
if not click_result.get("clicked"):
if click_result.get("reason") == "already-removed":
return True
raise RuntimeError(
"Bookmark remove action could not be clicked. "
f"reason={click_result.get('reason')}"
)
page.wait_for_timeout(2200)
page.reload(wait_until="domcontentloaded", timeout=45_000)
state_after = _bookmark_control_state(page, torrent_id)
best_after = (state_after or {}).get("best") or {}
action_after = best_after.get("action")
if action_after == "remove":
raise RuntimeError("Bookmark remove control still present after click; remove likely failed.")
return True
except Exception as err: # noqa: BLE001
last_error = err
if attempt == retries - 1:
break
time.sleep(backoff_base * (2**attempt))
finally:
page.close()
raise RuntimeError(f"Bookmark remove failed for {detail_url}: {last_error}") from last_error
def get_bookmarks(cookie: str, *, base_url: str = "https://www.happyfappy.net") -> list[BookmarkItem]:
with tempfile.TemporaryDirectory(prefix="happyfappy-bookmarks-") as tmpdir:
output_path = Path(tmpdir) / "bookmarks.json"
run_get_bookmarks(
argparse.Namespace(
base_url=base_url,
cookie=cookie,
cookie_file=None,
output=str(output_path),
delay_min=1.8,
delay_max=3.2,
retries=3,
backoff_base=5.0,
max_pages=200,
)
)
return json.loads(output_path.read_text(encoding="utf-8"))
def download_torrent(
cookie: str,
item: BookmarkItem,
*,
base_url: str = "https://www.happyfappy.net",
) -> DownloadResult:
detail_url = (item.get("pageURL") or "").strip()
if not detail_url:
raise ValueError("pageURL is required for HappyFappy download.")
with tempfile.TemporaryDirectory(prefix="happyfappy-download-") as tmpdir:
output_dir = Path(tmpdir) / "torrent"
run_download_torrent_files(
argparse.Namespace(
url=detail_url,
base_url=base_url,
cookie=cookie,
cookie_file=None,
output_dir=str(output_dir),
rm_bookmark=False,
retries=3,
backoff_base=5.0,
)
)
files = sorted(output_dir.glob("*.torrent"))
if not files:
raise RuntimeError("No torrent file produced")
torrent_path = files[0]
return {
"filename": torrent_path.name,
"data": torrent_path.read_bytes(),
}
def remove_bookmark(
cookie: str,
item: BookmarkItem,
*,
base_url: str = "https://www.happyfappy.net",
) -> None:
detail_url = (item.get("pageURL") or "").strip()
if not detail_url:
raise ValueError("pageURL is required for HappyFappy bookmark removal.")
target_host = urlparse(base_url).hostname or "www.happyfappy.net"
ensure_tracker_hosts(base_url)
pw_cookies = parse_cookies_for_playwright(
cookie,
target_host=target_host,
base_url=base_url.rstrip("/"),
)
if not pw_cookies:
raise ValueError("No Playwright-compatible cookies generated for target host.")
torrent_id = extract_torrent_id(detail_url)
with DynamicSession(
headless=True,
disable_resources=True,
cookies=pw_cookies,
google_search=False,
retries=1,
retry_delay=1,
) as session:
remove_bookmark_with_retry(
session=session,
detail_url=detail_url,
torrent_id=torrent_id,
retries=3,
backoff_base=5.0,
)
class HappyFappyAdapter:
key = "happyfappy"
label = "HappyFappy"
def get_bookmarks(self, cookie: str, *, wishlist_url: str | None = None) -> list[BookmarkItem]:
del wishlist_url
return get_bookmarks(cookie)
def download_torrent(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> DownloadResult:
del wishlist_url
return download_torrent(cookie, item)
def remove_bookmark(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> None:
del wishlist_url
remove_bookmark(cookie, item)
adapter = HappyFappyAdapter()

View File

@@ -0,0 +1,359 @@
from __future__ import annotations
import re
import time
from http.cookies import SimpleCookie
from typing import Any
from urllib.parse import unquote
from urllib.parse import urlparse
from curl_cffi import requests
from scrapling.fetchers import DynamicSession
from wscraper.sites.happyfappy import (
absolute_url,
fetch_dynamic_with_retry,
looks_like_torrent_bytes,
normalize_filename,
parse_cookie_string,
parse_cookies_for_playwright,
)
from wscraper.types import BookmarkItem, DownloadResult
DEFAULT_BASE_URL = "https://privatehd.to"
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36"
)
def normalize_wishlist_url(base_url: str, wishlist_url: str | None) -> str:
candidate = (wishlist_url or "").strip()
if not candidate:
raise ValueError("PrivateHD icin wishlistUrl zorunlu.")
if candidate.startswith("http://") or candidate.startswith("https://"):
return candidate
return absolute_url(base_url, candidate)
def parse_int(value: str) -> int | None:
cleaned = value.strip()
if not cleaned:
return None
match = re.search(r"\d+", cleaned.replace(",", ""))
return int(match.group(0)) if match else None
def extract_rows(response: Any, base_url: str) -> list[BookmarkItem]:
records: list[BookmarkItem] = []
rows = response.css("table.table tbody tr")
for row in rows:
detail_href = (row.css("a.torrent-filename::attr(href)").get("") or "").strip()
if not detail_href:
continue
detail_url = absolute_url(base_url, detail_href)
title = " ".join(row.css("a.torrent-filename::text").getall()).strip()
download_href = (
row.css("a.torrent-download-icon::attr(href)").get("")
or row.css("a[href*='/download/torrent/']::attr(href)").get("")
or ""
).strip()
download_url = absolute_url(base_url, download_href) if download_href else None
remove_token = (
row.css("button.btn-delete-wishlist::attr(data-id)").get("")
or row.css("button[data-id]::attr(data-id)").get("")
or ""
).strip() or None
cells = row.css("td")
size = None
seeders = None
leechers = None
if len(cells) >= 7:
size = " ".join(cells[4].css("::text").getall()).strip() or None
seeders = parse_int(" ".join(cells[5].css("::text").getall()))
leechers = parse_int(" ".join(cells[6].css("::text").getall()))
records.append(
{
"pageURL": detail_url,
"title": title,
"downloadURL": download_url,
"removeToken": remove_token,
"size": size,
"seeders": seeders,
"leechers": leechers,
}
)
return records
def enrich_bookmark(response: Any, base_url: str, item: BookmarkItem) -> BookmarkItem:
poster = (
response.css("img[src*='/images/posters/']::attr(src)").get("")
or response.css("img.img-responsive::attr(src)").get("")
or ""
).strip()
title = (
" ".join(response.css("a[href*='/movie/']::text").getall()).strip()
or " ".join(response.css("a[href*='/tv/']::text").getall()).strip()
or " ".join(response.css("h1::text").getall()).strip()
or item.get("title")
or ""
)
download_href = (
response.css("a.btn.btn-xs.btn-primary[href*='/download/torrent/']::attr(href)").get("")
or response.css("a[href*='/download/torrent/']::attr(href)").get("")
or ""
).strip()
return {
**item,
"title": title.strip(),
"backgroundImage": absolute_url(base_url, poster) if poster else item.get("backgroundImage"),
"downloadURL": absolute_url(base_url, download_href) if download_href else item.get("downloadURL"),
}
def build_dynamic_session(
cookie: str,
*,
base_url: str = DEFAULT_BASE_URL,
) -> DynamicSession:
target_host = urlparse(base_url).hostname or "privatehd.to"
pw_cookies = parse_cookies_for_playwright(cookie, target_host=target_host, base_url=base_url.rstrip("/"))
if not pw_cookies:
raise ValueError("No Playwright-compatible cookies generated for target host.")
return DynamicSession(
headless=True,
disable_resources=True,
cookies=pw_cookies,
google_search=False,
retries=1,
retry_delay=1,
)
def get_bookmarks(
cookie: str,
*,
wishlist_url: str,
base_url: str = DEFAULT_BASE_URL,
) -> list[BookmarkItem]:
normalized_wishlist_url = normalize_wishlist_url(base_url, wishlist_url)
with build_dynamic_session(cookie, base_url=base_url) as session:
response = fetch_dynamic_with_retry(
session,
normalized_wishlist_url,
retries=3,
backoff_base=5.0,
)
items = extract_rows(response, base_url)
enriched: list[BookmarkItem] = []
for index, item in enumerate(items):
detail_response = fetch_dynamic_with_retry(
session,
item["pageURL"],
retries=3,
backoff_base=5.0,
)
enriched.append(enrich_bookmark(detail_response, base_url, item))
if index < len(items) - 1:
time.sleep(1.2)
return enriched
def build_http_cookies(cookie: str, target_url: str) -> dict[str, str]:
target_host = urlparse(target_url).hostname or "privatehd.to"
cookies = parse_cookie_string(cookie, target_host=target_host)
if not cookies:
raise ValueError("No valid cookies parsed for target host.")
return cookies
def build_http_session(cookie: str, target_url: str) -> requests.Session:
session = requests.Session()
session.cookies.update(build_http_cookies(cookie, target_url))
session.headers.update({"User-Agent": USER_AGENT})
return session
def fetch_wishlist_token(
cookie: str,
*,
wishlist_url: str,
base_url: str = DEFAULT_BASE_URL,
) -> str:
normalized_wishlist_url = normalize_wishlist_url(base_url, wishlist_url)
with build_dynamic_session(cookie, base_url=base_url) as session:
response = fetch_dynamic_with_retry(
session,
normalized_wishlist_url,
retries=3,
backoff_base=5.0,
)
token = (
response.css("input[name='_token']::attr(value)").get("")
or response.css("meta[name='csrf-token']::attr(content)").get("")
or ""
).strip()
if not token:
cookies = build_http_cookies(cookie, normalized_wishlist_url)
raw_xsrf = cookies.get("XSRF-TOKEN", "").strip()
if raw_xsrf:
return unquote(raw_xsrf)
set_cookie_values = response.headers.get_list("set-cookie") if hasattr(response.headers, "get_list") else []
for raw_header in set_cookie_values:
jar = SimpleCookie()
jar.load(raw_header)
morsel = jar.get("XSRF-TOKEN")
if morsel and morsel.value:
return unquote(morsel.value)
raise RuntimeError("PrivateHD CSRF token bulunamadi.")
return token
def validate_download(download_url: str, filename: str, data: bytes) -> None:
good_ext = filename.lower().endswith(".torrent") or urlparse(download_url).path.lower().endswith(".torrent")
if not good_ext:
raise RuntimeError("Downloaded content has no .torrent extension in URL/filename.")
if not looks_like_torrent_bytes(data):
raise RuntimeError("Downloaded file failed torrent bencode check.")
def download_torrent(
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
base_url: str = DEFAULT_BASE_URL,
) -> DownloadResult:
del wishlist_url
download_url = (item.get("downloadURL") or "").strip()
if not download_url:
raise ValueError("PrivateHD item icin downloadURL zorunlu.")
cookies = build_http_cookies(cookie, download_url)
response = requests.get(
download_url,
cookies=cookies,
headers={
"Referer": item.get("pageURL") or base_url,
"User-Agent": USER_AGENT,
},
timeout=60,
)
if response.status_code >= 400:
raise RuntimeError(f"PrivateHD torrent indirme basarisiz: HTTP {response.status_code}")
filename = normalize_filename("", download_url)
validate_download(download_url, filename, response.content)
return {
"filename": filename,
"data": response.content,
}
def remove_bookmark(
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
base_url: str = DEFAULT_BASE_URL,
) -> None:
remove_token = (item.get("removeToken") or "").strip()
if not remove_token:
raise ValueError("PrivateHD item icin removeToken zorunlu.")
normalized_wishlist_url = normalize_wishlist_url(base_url, wishlist_url)
target_host = urlparse(base_url).hostname or "privatehd.to"
pw_cookies = parse_cookies_for_playwright(cookie, target_host=target_host, base_url=base_url.rstrip("/"))
if not pw_cookies:
raise ValueError("No Playwright-compatible cookies generated for target host.")
with DynamicSession(
headless=True,
disable_resources=True,
cookies=pw_cookies,
google_search=False,
retries=1,
retry_delay=1,
) as session:
page = session.context.new_page()
try:
page.goto(normalized_wishlist_url, wait_until="domcontentloaded", timeout=45_000)
delete_button = page.locator(f"button.btn-delete-wishlist[data-id='{remove_token}']")
if delete_button.count() == 0:
raise RuntimeError("PrivateHD wishlist silme butonu bulunamadi.")
delete_button.first.click()
page.wait_for_timeout(500)
confirm_selectors = [
"button.swal2-confirm",
"button.confirm",
"button.btn-danger",
"button:has-text('Yes')",
]
clicked_confirm = False
for selector in confirm_selectors:
locator = page.locator(selector)
if locator.count() > 0 and locator.first.is_visible():
locator.first.click()
clicked_confirm = True
break
if not clicked_confirm:
confirm_result = page.evaluate(
"""
() => {
const nodes = Array.from(document.querySelectorAll("button,a"));
const target = nodes.find((node) =>
/^(yes|ok|confirm)$/i.test((node.textContent || "").trim())
);
if (!target) return false;
target.click();
return true;
}
"""
)
clicked_confirm = bool(confirm_result)
if not clicked_confirm:
raise RuntimeError("PrivateHD wishlist onay butonu bulunamadi.")
page.wait_for_timeout(1800)
page.reload(wait_until="domcontentloaded", timeout=45_000)
if page.locator(f"button.btn-delete-wishlist[data-id='{remove_token}']").count() > 0:
raise RuntimeError("PrivateHD wishlist silme dogrulanamadi: kayit hala listede gorunuyor.")
finally:
page.close()
class PrivateHDAdapter:
key = "privatehd"
label = "PrivateHD"
def get_bookmarks(self, cookie: str, *, wishlist_url: str | None = None) -> list[BookmarkItem]:
return get_bookmarks(cookie, wishlist_url=wishlist_url or "")
def download_torrent(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> DownloadResult:
return download_torrent(cookie, item, wishlist_url=wishlist_url)
def remove_bookmark(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> None:
return remove_bookmark(cookie, item, wishlist_url=wishlist_url)
adapter = PrivateHDAdapter()

54
src/wscraper/types.py Normal file
View File

@@ -0,0 +1,54 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Protocol, TypedDict
class BookmarkItem(TypedDict, total=False):
pageURL: str
title: str
backgroundImage: str | None
downloadURL: str | None
removeToken: str | None
size: str | None
seeders: int | None
leechers: int | None
class DownloadResult(TypedDict):
filename: str
data: bytes
@dataclass(frozen=True)
class TrackerInfo:
key: str
label: str
class TrackerAdapter(Protocol):
key: str
label: str
def get_bookmarks(
self,
cookie: str,
*,
wishlist_url: str | None = None,
) -> list[BookmarkItem]: ...
def download_torrent(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> DownloadResult: ...
def remove_bookmark(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> None: ...

16
tests/conftest.py Normal file
View File

@@ -0,0 +1,16 @@
from __future__ import annotations
def pytest_terminal_summary(terminalreporter, exitstatus, config):
_ = (exitstatus, config)
passed = len(terminalreporter.stats.get("passed", []))
failed = len(terminalreporter.stats.get("failed", []))
skipped = len(terminalreporter.stats.get("skipped", []))
terminalreporter.write_sep("=", "E2E SUMMARY", cyan=True)
terminalreporter.write_line(f"✅ Passed : {passed}", green=True)
if failed:
terminalreporter.write_line(f"❌ Failed : {failed}", red=True)
else:
terminalreporter.write_line(f"❌ Failed : {failed}", green=True)
terminalreporter.write_line(f"⚠️ Skipped: {skipped}", yellow=True)

75
tests/e2e/_helpers.py Normal file
View File

@@ -0,0 +1,75 @@
from __future__ import annotations
import os
import subprocess
import sys
import time
from pathlib import Path
def e2e_enabled() -> bool:
return os.getenv("WSCRAPER_E2E", "").strip() == "1"
def base_env() -> dict[str, str]:
env = os.environ.copy()
src_path = str(Path.cwd() / "src")
current_pythonpath = env.get("PYTHONPATH", "").strip()
env["PYTHONPATH"] = f"{src_path}{os.pathsep}{current_pythonpath}" if current_pythonpath else src_path
return env
def log(tr, message: str, kind: str = "info") -> None:
icon = ""
style: dict[str, bool] = {}
if kind == "ok":
icon = ""
style = {"green": True}
elif kind == "err":
icon = ""
style = {"red": True}
elif kind == "warn":
icon = "⚠️"
style = {"yellow": True}
elif kind == "run":
icon = "🚀"
style = {"cyan": True}
if tr is not None:
tr.write_line(f"{icon} {message}", **style)
else:
print(f"{icon} {message}")
def run_cli_live(args: list[str], tr, timeout: int = 900) -> tuple[int, str]:
cmd = [sys.executable, "-m", "wscraper"] + args
log(tr, f"Running: {' '.join(cmd)}", kind="run")
started = time.time()
proc = subprocess.Popen(
cmd,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=base_env(),
)
output_lines: list[str] = []
assert proc.stdout is not None
for line in proc.stdout:
output_lines.append(line)
clean = line.rstrip("\n")
if clean:
if tr is not None:
tr.write_line(f" {clean}")
else:
print(f" {clean}")
return_code = proc.wait(timeout=timeout)
duration = time.time() - started
if return_code == 0:
log(tr, f"Command finished successfully in {duration:.2f}s", kind="ok")
else:
log(tr, f"Command failed with exit code {return_code} in {duration:.2f}s", kind="err")
return return_code, "".join(output_lines)

View File

@@ -0,0 +1,95 @@
from __future__ import annotations
import json
import os
from pathlib import Path
import pytest
from tests.e2e._helpers import e2e_enabled
from tests.e2e._helpers import log
from tests.e2e._helpers import run_cli_live
pytestmark = [pytest.mark.e2e]
@pytest.fixture
def tr(request):
return request.config.pluginmanager.getplugin("terminalreporter")
@pytest.mark.skipif(not e2e_enabled(), reason="Set WSCRAPER_E2E=1 to run live tests")
def test_get_bookmarks_live(tmp_path: Path, tr) -> None:
cookie_file = Path(os.getenv("WSCRAPER_COOKIE_FILE", "cookies.txt"))
if not cookie_file.exists():
pytest.skip(f"Cookie file not found: {cookie_file}")
output_file = tmp_path / "bookmarks.json"
log(tr, f"Output file: {output_file}")
return_code, output_text = run_cli_live(
[
"happyfappy",
"--action",
"get-bookmarks",
"-c",
str(cookie_file),
"-o",
str(output_file),
],
tr,
)
assert return_code == 0, f"CLI failed:\n{output_text}"
assert output_file.exists(), "bookmarks.json was not created"
data = json.loads(output_file.read_text(encoding="utf-8"))
assert isinstance(data, list), "bookmarks output must be a JSON list"
assert len(data) >= 1, "expected at least one bookmark record"
log(tr, f"Extracted records: {len(data)}", kind="ok")
first = data[0]
assert isinstance(first, dict), "bookmark entry must be an object"
for required_key in ("pageURL", "isVR", "title", "backgroundImage"):
assert required_key in first, f"missing key: {required_key}"
assert isinstance(first["pageURL"], str) and first["pageURL"].startswith("http")
assert isinstance(first["isVR"], bool)
assert isinstance(first["title"], str) and first["title"].strip() != ""
@pytest.mark.skipif(not e2e_enabled(), reason="Set WSCRAPER_E2E=1 to run live tests")
def test_download_torrent_file_live(tmp_path: Path, tr) -> None:
cookie_file = Path(os.getenv("WSCRAPER_COOKIE_FILE", "cookies.txt"))
if not cookie_file.exists():
pytest.skip(f"Cookie file not found: {cookie_file}")
test_url = os.getenv(
"WSCRAPER_TEST_TORRENT_URL",
"https://www.happyfappy.net/torrents.php?id=110178",
)
output_dir = tmp_path / "torrent"
log(tr, f"Output dir: {output_dir}")
return_code, output_text = run_cli_live(
[
"happyfappy",
"--action",
"download-torrent-files",
"-u",
test_url,
"-c",
str(cookie_file),
"-o",
str(output_dir),
],
tr,
)
assert return_code == 0, f"CLI failed:\n{output_text}"
assert output_dir.exists(), "torrent output directory was not created"
torrent_files = list(output_dir.glob("*.torrent"))
assert len(torrent_files) >= 1, "expected at least one .torrent file"
log(tr, f"Downloaded .torrent files: {len(torrent_files)}", kind="ok")
content = torrent_files[0].read_bytes()
assert content.startswith(b"d"), "torrent file should start with bencode dictionary token 'd'"
assert b"4:info" in content[:4096], "torrent file should include 'info' dictionary marker"

View File

@@ -0,0 +1,158 @@
from __future__ import annotations
import json
import os
from pathlib import Path
import pytest
from tests.e2e._helpers import e2e_enabled
from tests.e2e._helpers import log
from tests.e2e._helpers import run_cli_live
pytestmark = [pytest.mark.e2e]
@pytest.fixture
def tr(request):
return request.config.pluginmanager.getplugin("terminalreporter")
def _privatehd_cookie_file() -> Path:
path = os.getenv("WSCRAPER_PRIVATEHD_COOKIE_FILE") or os.getenv("WSCRAPER_COOKIE_FILE", "cookies.txt")
return Path(path)
def _privatehd_wishlist_url() -> str:
return os.getenv("WSCRAPER_PRIVATEHD_WISHLIST_URL", "").strip()
@pytest.mark.skipif(not e2e_enabled(), reason="Set WSCRAPER_E2E=1 to run live tests")
def test_get_bookmarks_live(tmp_path: Path, tr) -> None:
cookie_file = _privatehd_cookie_file()
if not cookie_file.exists():
pytest.skip(f"Cookie file not found: {cookie_file}")
wishlist_url = _privatehd_wishlist_url()
if not wishlist_url:
pytest.skip("Set WSCRAPER_PRIVATEHD_WISHLIST_URL to run PrivateHD live bookmark test")
output_file = tmp_path / "bookmarks.json"
log(tr, f"Output file: {output_file}")
return_code, output_text = run_cli_live(
[
"privatehd",
"--action",
"get-bookmarks",
"-c",
str(cookie_file),
"--wishlist-url",
wishlist_url,
"-o",
str(output_file),
],
tr,
)
assert return_code == 0, f"CLI failed:\n{output_text}"
assert output_file.exists(), "bookmarks.json was not created"
data = json.loads(output_file.read_text(encoding="utf-8"))
assert isinstance(data, list), "bookmarks output must be a JSON list"
assert len(data) >= 1, "expected at least one bookmark record"
log(tr, f"Extracted records: {len(data)}", kind="ok")
first = data[0]
assert isinstance(first, dict), "bookmark entry must be an object"
for required_key in ("pageURL", "title", "backgroundImage", "downloadURL", "removeToken"):
assert required_key in first, f"missing key: {required_key}"
assert isinstance(first["pageURL"], str) and first["pageURL"].startswith("http")
assert isinstance(first["title"], str) and first["title"].strip() != ""
assert isinstance(first["downloadURL"], str) and first["downloadURL"].startswith("http")
assert isinstance(first["removeToken"], str) and first["removeToken"].strip() != ""
@pytest.mark.skipif(not e2e_enabled(), reason="Set WSCRAPER_E2E=1 to run live tests")
def test_download_torrent_file_live(tmp_path: Path, tr) -> None:
cookie_file = _privatehd_cookie_file()
if not cookie_file.exists():
pytest.skip(f"Cookie file not found: {cookie_file}")
wishlist_url = _privatehd_wishlist_url()
if not wishlist_url:
pytest.skip("Set WSCRAPER_PRIVATEHD_WISHLIST_URL to run PrivateHD live download test")
test_url = os.getenv("WSCRAPER_PRIVATEHD_TEST_TORRENT_URL", "").strip()
download_url = os.getenv("WSCRAPER_PRIVATEHD_TEST_DOWNLOAD_URL", "").strip()
if not test_url or not download_url:
pytest.skip("Set WSCRAPER_PRIVATEHD_TEST_TORRENT_URL and WSCRAPER_PRIVATEHD_TEST_DOWNLOAD_URL")
output_dir = tmp_path / "torrent"
log(tr, f"Output dir: {output_dir}")
return_code, output_text = run_cli_live(
[
"privatehd",
"--action",
"download-torrent-files",
"-u",
test_url,
"--download-url",
download_url,
"-c",
str(cookie_file),
"--wishlist-url",
wishlist_url,
"-o",
str(output_dir),
],
tr,
)
assert return_code == 0, f"CLI failed:\n{output_text}"
assert output_dir.exists(), "torrent output directory was not created"
torrent_files = list(output_dir.glob("*.torrent"))
assert len(torrent_files) >= 1, "expected at least one .torrent file"
log(tr, f"Downloaded .torrent files: {len(torrent_files)}", kind="ok")
content = torrent_files[0].read_bytes()
assert content.startswith(b"d"), "torrent file should start with bencode dictionary token 'd'"
assert b"4:info" in content[:4096], "torrent file should include 'info' dictionary marker"
@pytest.mark.skipif(not e2e_enabled(), reason="Set WSCRAPER_E2E=1 to run live tests")
def test_remove_bookmark_live(tr) -> None:
cookie_file = _privatehd_cookie_file()
if not cookie_file.exists():
pytest.skip(f"Cookie file not found: {cookie_file}")
wishlist_url = _privatehd_wishlist_url()
if not wishlist_url:
pytest.skip("Set WSCRAPER_PRIVATEHD_WISHLIST_URL to run PrivateHD live remove test")
test_url = os.getenv("WSCRAPER_PRIVATEHD_TEST_REMOVE_URL", "").strip()
remove_token = os.getenv("WSCRAPER_PRIVATEHD_TEST_REMOVE_TOKEN", "").strip()
if not test_url or not remove_token:
pytest.skip("Set WSCRAPER_PRIVATEHD_TEST_REMOVE_URL and WSCRAPER_PRIVATEHD_TEST_REMOVE_TOKEN")
return_code, output_text = run_cli_live(
[
"privatehd",
"--action",
"remove-bookmark",
"-u",
test_url,
"--remove-token",
remove_token,
"-c",
str(cookie_file),
"--wishlist-url",
wishlist_url,
],
tr,
timeout=240,
)
assert return_code == 0, f"CLI failed:\n{output_text}"
assert "Bookmark removed successfully." in output_text
log(tr, "PrivateHD bookmark removal completed", kind="ok")

View File

@@ -1,131 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
from download_happyfappy_torrent import run as run_happyfappy_download
from scrape_happyfappy_bookmarks import run as run_happyfappy_bookmarks
SITE_ALIASES = {
"happyfappy": "happyfappy",
"hf": "happyfappy",
}
ACTION_ALIASES = {
"get-bookmarks": "get-bookmarks",
"gb": "get-bookmarks",
"bookmarks": "get-bookmarks",
"download-torrent-files": "download-torrent-files",
"dtf": "download-torrent-files",
"download": "download-torrent-files",
}
def normalize_site(value: str) -> str:
key = value.strip().lower()
if key not in SITE_ALIASES:
supported = ", ".join(sorted(SITE_ALIASES))
raise ValueError(f"Unsupported site: {value!r}. Supported values: {supported}")
return SITE_ALIASES[key]
def normalize_action(value: str) -> str:
key = value.strip().lower()
if key not in ACTION_ALIASES:
supported = ", ".join(sorted(ACTION_ALIASES))
raise ValueError(f"Unsupported action: {value!r}. Supported values: {supported}")
return ACTION_ALIASES[key]
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="wscraper: multi-site scraping entrypoint",
)
parser.add_argument("site", help="Site key, e.g. happyfappy or hf")
parser.add_argument("-a", "--action", required=True, help="Action to run")
parser.add_argument("--base-url", help="Override site base URL")
parser.add_argument("--cookie", help='Raw cookie string, e.g. "a=1; b=2"')
parser.add_argument("-c", "--cookie-file", help="Path to cookie file")
parser.add_argument("-u", "--url", help="Detail page URL (required for download action)")
parser.add_argument(
"-o",
"--output",
help="Output target: file path for get-bookmarks, directory path for download-torrent-files",
)
parser.add_argument("-r", "--retries", type=int, default=3)
parser.add_argument("--backoff-base", type=float, default=5.0)
parser.add_argument("--delay-min", type=float, default=1.8)
parser.add_argument("--delay-max", type=float, default=3.2)
parser.add_argument("--max-pages", type=int, default=200)
return parser
def run_happyfappy(args: argparse.Namespace, action: str) -> None:
base_url = args.base_url or "https://www.happyfappy.net"
if action == "get-bookmarks":
bookmarks_args = argparse.Namespace(
base_url=base_url,
cookie=args.cookie,
cookie_file=args.cookie_file,
output=args.output or "bookmarks.json",
delay_min=args.delay_min,
delay_max=args.delay_max,
retries=args.retries,
backoff_base=args.backoff_base,
max_pages=args.max_pages,
)
run_happyfappy_bookmarks(bookmarks_args)
return
if action == "download-torrent-files":
if not args.url:
raise ValueError("--url is required for action=download-torrent-files.")
download_args = argparse.Namespace(
url=args.url,
base_url=base_url,
cookie=args.cookie,
cookie_file=args.cookie_file,
output_dir=args.output or "torrent",
retries=args.retries,
backoff_base=args.backoff_base,
)
run_happyfappy_download(download_args)
return
raise ValueError(f"Unsupported action for happyfappy: {action}")
def main() -> None:
parser = build_parser()
args = parser.parse_args()
if args.retries < 1:
raise ValueError("--retries must be at least 1.")
if args.backoff_base < 0:
raise ValueError("--backoff-base must be >= 0.")
if args.delay_min < 0 or args.delay_max < 0:
raise ValueError("Delay values must be non-negative.")
if args.delay_min > args.delay_max:
raise ValueError("--delay-min cannot be greater than --delay-max.")
site = normalize_site(args.site)
action = normalize_action(args.action)
if not args.cookie and not args.cookie_file:
raise ValueError("Cookie is required. Use --cookie or --cookie-file/-c.")
if site == "happyfappy":
run_happyfappy(args, action)
return
raise ValueError(f"Unsupported site: {site}")
if __name__ == "__main__":
main()