Compare commits

..

23 Commits

Author SHA1 Message Date
b8e99ebbd2 test: privatehd icin live e2e testlerini ve test dokumantasyonunu ekle 2026-03-13 08:27:50 +03:00
259531949b docs: privatehd ve q-buffer entegrasyonunu belge 2026-03-13 02:30:31 +03:00
fe429b6cef feat: ortak tracker adapter yapisi ve PrivateHD destegini ekle 2026-03-13 02:08:17 +03:00
daf75166db feat: q-buffer watcher akisini destekle 2026-03-12 22:32:12 +03:00
55459373e5 feat: add optional bookmark removal to torrent download action 2026-03-08 02:58:41 +03:00
7d307c31f9 readme update 2026-03-07 02:47:17 +03:00
b8cd26dc11 chore: ignore macOS metadata files 2026-03-07 02:46:13 +03:00
4ce0aad021 test: add live e2e happyfappy CLI tests with detailed output 2026-03-07 02:46:10 +03:00
6d7cb602b3 readme update 2026-03-07 02:27:28 +03:00
50b26e47d8 readme update 2026-03-07 02:26:29 +03:00
fe535be6a6 readme update 2026-03-07 02:26:02 +03:00
41980be6dd readme update 2026-03-07 02:25:33 +03:00
9bc56b9aa3 logo update 2026-03-07 02:24:29 +03:00
e9ea3c3ebd README.md Güncelle 2026-03-06 23:22:12 +00:00
5d7665b82d README.md Güncelle 2026-03-06 23:21:20 +00:00
358dcc8b2c readme update 2026-03-07 02:20:52 +03:00
8367e4b63d logo update 2026-03-07 02:19:04 +03:00
0924959ff3 logo update 2026-03-07 02:17:30 +03:00
f22258de9b docs: keep logo-only header in README 2026-03-07 02:15:19 +03:00
204c0233f0 docs: add centered logo and styled project title in README 2026-03-07 02:06:08 +03:00
8b4b3c2a30 docs: remove README logo and revert to plain header 2026-03-07 01:47:53 +03:00
891082782d docs: add monochrome logo and embed it in README header 2026-03-07 01:45:36 +03:00
2dd0358163 docs: add repository and python badges to README 2026-03-07 01:42:53 +03:00
17 changed files with 1764 additions and 225 deletions

2
.gitignore vendored
View File

@@ -17,3 +17,5 @@ Scrapling/
# Packaging artifacts
*.egg-info/
.DS_Store

349
README.md
View File

@@ -1,19 +1,59 @@
<p align="center">
<img src="logo-v2.png" alt="wscraper logo" width="240" />
</p>
<p align="center">
<a href="https://gitea.wisecolt-panda.net/wisecolt/Bookmark-Tracker">
<img src="https://img.shields.io/badge/Gitea-Repository-609926?logo=gitea&logoColor=white" alt="Gitea">
</a>
<img src="https://img.shields.io/badge/Python-3.10%2B-3776AB?logo=python&logoColor=white" alt="Python">
<img src="https://img.shields.io/badge/Trackers-HappyFappy%20%7C%20PrivateHD-0A7B83" alt="Trackers">
<img src="https://img.shields.io/badge/Runtime-scrapling%20%2B%20Playwright-1f6feb" alt="Runtime">
</p>
# wscraper
HappyFappy için komutlar paketlenmiş `wscraper` CLI üzerinden çalışır. Proje çoklu site desteği için `src/` paket yapısına göre düzenlenmiştir.
`wscraper`, tracker bookmark / wishlist akışlarını ortak bir Python adapter katmanında toplayan çoklu tracker scraper paketidir. Bugünkü kullanım şekli iki parçalıdır:
## 1) Repo Clone
- `bin/wscraper/`: Python paketinin kendisi, tracker adapter'ları ve CLI
- `bin/wscraper-service/server.py`: `q-buffer` backend'in HTTP ile konuştuğu host-side servis
```bash
git clone <REPO_URL>
cd <REPO_FOLDER>
```
`q-buffer` watcher akışı artık `wscraper` CLI'yi doğrudan Docker içinde spawn etmek yerine, host makinede çalışan `wscraper-service` üzerinden kullanır. Bunun ana nedeni `scrapling + Playwright` zincirinin tracker tarafında daha stabil çalışmasının host ortamında olmasıdır.
## 2) Kurulum
## Desteklenen Tracker'lar
- `happyfappy` (`hf`)
- `privatehd` (`phd`)
Desteklenen ortak aksiyonlar:
- `get-bookmarks`
- `download-torrent-files`
- `remove-bookmark`
## Mimari
`wscraper` paket yapısı artık tracker-registry tabanlıdır:
- `src/wscraper/registry.py`
- desteklenen tracker adapter'larını kaydeder
- `src/wscraper/types.py`
- ortak `BookmarkItem`, `DownloadResult`, `TrackerAdapter` tiplerini tanımlar
- `src/wscraper/sites/happyfappy.py`
- HappyFappy adapter'ı
- `src/wscraper/sites/privatehd.py`
- PrivateHD adapter'ı
- `src/wscraper/cli.py`
- tüm tracker'lar için ortak CLI entrypoint
Bu sayede yeni tracker eklemek için mevcut CLI'yi kopyalamak yerine sadece yeni bir adapter yazmak yeterlidir.
## Kurulum
### macOS / Linux
```bash
cd bin/wscraper
python3.12 -m venv .venv
source .venv/bin/activate
python -m pip install -U pip
@@ -21,9 +61,15 @@ python -m pip install -e .
scrapling install
```
### Windows (PowerShell)
Alternatif:
- `python3.12` yoksa `python3.11` veya `python3.10` kullan
- `scrapling install`, Playwright/browser bağımlılıklarını kurar
### Windows PowerShell
```powershell
cd bin/wscraper
py -3.12 -m venv .venv
.venv\Scripts\Activate.ps1
python -m pip install -U pip
@@ -31,54 +77,277 @@ python -m pip install -e .
scrapling install
```
### Windows (CMD)
## CLI Kullanımı
```bat
py -3.12 -m venv .venv
.venv\Scripts\activate.bat
python -m pip install -U pip
python -m pip install -e .
scrapling install
Genel form:
```bash
wscraper <tracker> --action <action> [opsiyonlar]
```
Not: Ortamı aktive ettikten sonra komutlar `wscraper ...` olarak kullanılabilir. İstersen `python -m wscraper ...` da kullanabilirsin.
## 3) HappyFappy Komutları
### Bookmarks Çekme
### Bookmark / Wishlist Çekme
```bash
wscraper happyfappy --action get-bookmarks -c cookies.txt -o bookmarks.json
wscraper privatehd --action get-bookmarks -c cookies.txt -o bookmarks.json
```
İsteğe bağlı `wishlist_url` override:
```bash
wscraper privatehd --action get-bookmarks -c cookies.txt --wishlist-url "https://privatehd.to/bookmarks" -o bookmarks.json
```
### Torrent Dosyası İndirme
```bash
wscraper happyfappy --action download-torrent-files -u "https://www.happyfappy.net/torrents.php?id=110178" -c cookies.txt -o torrent
wscraper happyfappy --action download-torrent-files \
-c cookies.txt \
-u "https://www.happyfappy.net/torrents.php?id=110178" \
--title "Sample" \
--image-url "https://example.com/poster.jpg" \
-o torrent
```
## 4) Kısa Alias Kullanımı
```bash
# site alias: hf
# action alias: gb (get-bookmarks), dtf (download-torrent-files)
wscraper hf -a gb -c cookies.txt -o bookmarks.json
wscraper hf -a dtf -u "https://www.happyfappy.net/torrents.php?id=110178" -c cookies.txt -o torrent
wscraper privatehd --action download-torrent-files \
-c cookies.txt \
-u "https://privatehd.to/torrent/12345" \
--download-url "https://privatehd.to/download.php?id=12345" \
--title "Sample" \
-o torrent
```
## 5) Proje Dizini
### Bookmark Silme
```bash
wscraper happyfappy --action remove-bookmark \
-c cookies.txt \
-u "https://www.happyfappy.net/torrents.php?id=110178" \
--title "Sample"
```
```bash
wscraper privatehd --action remove-bookmark \
-c cookies.txt \
-u "https://privatehd.to/torrent/12345" \
--remove-token "bookmark-delete-token" \
--title "Sample"
```
### Kısa Alias'lar
```bash
wscraper hf -a gb -c cookies.txt -o bookmarks.json
wscraper phd -a gb -c cookies.txt -o bookmarks.json
wscraper hf -a dtf -c cookies.txt -u "https://www.happyfappy.net/torrents.php?id=110178" -o torrent
wscraper phd -a rb -c cookies.txt -u "https://privatehd.to/torrent/12345" --remove-token "token"
```
## q-buffer ile Entegrasyon
`q-buffer` içinde watcher tarafı artık şu şekilde çalışır:
1. UI'dan watcher tanımlanır
2. `q-buffer` backend cookie'yi şifreli saklar
3. Docker içindeki `server`, host'taki `wscraper-service`e HTTP çağrısı yapar
4. `wscraper-service`, `wscraper` adapter'ı ile bookmarkları çeker
5. yeni bookmark için torrent dosyasını indirir
6. `q-buffer` backend `.torrent` içeriğini alır ve qBittorrent'e yollar
7. başarılı import sonrası bookmark tracker tarafında kaldırılır
Bu yüzden `wscraper` tek başına bir CLI olmanın ötesinde artık `q-buffer watcher` entegrasyonunun backend scraping motorudur.
## wscraper-service API
`bin/wscraper-service/server.py` şu endpoint'leri sunar:
- `GET /health`
- `GET /trackers`
- `POST /bookmarks`
- `POST /download`
- `POST /remove-bookmark`
Örnek `POST /bookmarks` payload:
```json
{
"tracker": "happyfappy",
"cookie": "raw-cookie",
"wishlistUrl": "optional-override"
}
```
Örnek `POST /download` payload:
```json
{
"tracker": "privatehd",
"cookie": "raw-cookie",
"wishlistUrl": "optional-override",
"item": {
"pageURL": "https://privatehd.to/torrent/12345",
"title": "Example",
"downloadURL": "https://privatehd.to/download.php?id=12345",
"removeToken": "bookmark-token"
}
}
```
## q-buffer İlk Kurulumunda wscraper Nasıl Hazırlanır?
Bu adım özellikle önemlidir. `q-buffer` reposunda scraping logic `bin/wscraper/` içindedir. Host servis ise `bin/wscraper-service/server.py` dosyasıdır. Yani:
- `bin/wscraper/` yalnızca `server.py` değildir
- `server.py`, ayrı `wscraper-service` klasöründedir
- asıl tracker kodları `bin/wscraper/src/wscraper/...` altındadır
İlk kurulum için önerilen yol:
1. repo root'ta `.env.example` dosyasını `.env` olarak kopyala
2. `.env` içinde watcher servis ayarlarını gözden geçir:
- `WSCRAPER_SERVICE_BASE_URL`
- `WSCRAPER_SERVICE_TOKEN`
- `WSCRAPER_SERVICE_HOST`
- `WSCRAPER_SERVICE_PORT`
- `WSCRAPER_SERVICE_PYTHON_BIN`
3. repo root'ta şu komutu çalıştır:
```bash
./scripts/bootstrap.sh --dev-mode
```
Bu script:
- Docker `web` ve `server` servislerini `up --build` ile kaldırır
- host'ta `.runtime/wscraper-service/.venv` oluşturur
- `scrapling[fetchers]` kurar
- `scrapling install` çalıştırır
- `bin/wscraper-service/server.py` servis sürecini başlatır
Kurulum daha önce tamamsa script aynı işlemleri baştan yapmaz; sadece eksikleri tamamlar.
## Testler
`wscraper` içinde canlı sistemlere karşı çalışan `pytest` tabanlı e2e testleri vardır. Bunlar varsayılan olarak kapalıdır; yalnızca açıkça etkinleştirildiğinde çalışırlar.
Test dosyaları:
- `tests/e2e/test_happyfappy_live.py`
- `tests/e2e/test_privatehd_live.py`
- `tests/e2e/_helpers.py`
### Testleri Etkinleştirme
Tüm live testler için:
```bash
export WSCRAPER_E2E=1
```
Bu değişken yoksa veya `1` değilse, e2e testleri `skip` olur.
### HappyFappy Live Testleri
Mevcut test kapsamı:
- `get-bookmarks`
- `download-torrent-files`
Kullanılan env değişkenleri:
- `WSCRAPER_COOKIE_FILE`
- `WSCRAPER_TEST_TORRENT_URL`
Örnek:
```bash
export WSCRAPER_E2E=1
export WSCRAPER_COOKIE_FILE=/absolute/path/to/happyfappy-cookies.txt
export WSCRAPER_TEST_TORRENT_URL="https://www.happyfappy.net/torrents.php?id=110178"
pytest tests/e2e/test_happyfappy_live.py -m e2e -s
```
### PrivateHD Live Testleri
PrivateHD için eklenen test kapsamı:
- `get-bookmarks`
- `download-torrent-files`
- `remove-bookmark`
Kullanılan env değişkenleri:
- `WSCRAPER_PRIVATEHD_COOKIE_FILE`
- `WSCRAPER_PRIVATEHD_WISHLIST_URL`
- `WSCRAPER_PRIVATEHD_TEST_TORRENT_URL`
- `WSCRAPER_PRIVATEHD_TEST_DOWNLOAD_URL`
- `WSCRAPER_PRIVATEHD_TEST_REMOVE_URL`
- `WSCRAPER_PRIVATEHD_TEST_REMOVE_TOKEN`
Fallback kuralı:
- `WSCRAPER_PRIVATEHD_COOKIE_FILE` yoksa `WSCRAPER_COOKIE_FILE` kullanılır
Örnek:
```bash
export WSCRAPER_E2E=1
export WSCRAPER_PRIVATEHD_COOKIE_FILE=/absolute/path/to/privatehd-cookies.txt
export WSCRAPER_PRIVATEHD_WISHLIST_URL="https://privatehd.to/profile/blackdockers/wishlist"
export WSCRAPER_PRIVATEHD_TEST_TORRENT_URL="https://privatehd.to/torrent/12345-example"
export WSCRAPER_PRIVATEHD_TEST_DOWNLOAD_URL="https://privatehd.to/download/torrent/12345.example.torrent"
pytest tests/e2e/test_privatehd_live.py -m e2e -s
```
### remove-bookmark Testi Hakkında
`PrivateHD remove-bookmark` testi gerçek wishlist kaydını sildiği için özellikle dikkatli kullanılmalıdır.
Bu test:
- yalnızca `WSCRAPER_PRIVATEHD_TEST_REMOVE_URL` ve `WSCRAPER_PRIVATEHD_TEST_REMOVE_TOKEN` verilirse çalışır
- aksi halde güvenli şekilde `skip` olur
Örnek:
```bash
export WSCRAPER_E2E=1
export WSCRAPER_PRIVATEHD_COOKIE_FILE=/absolute/path/to/privatehd-cookies.txt
export WSCRAPER_PRIVATEHD_WISHLIST_URL="https://privatehd.to/profile/blackdockers/wishlist"
export WSCRAPER_PRIVATEHD_TEST_REMOVE_URL="https://privatehd.to/torrent/12345-example"
export WSCRAPER_PRIVATEHD_TEST_REMOVE_TOKEN="467471"
pytest tests/e2e/test_privatehd_live.py -m e2e -s -k remove
```
### Notlar
- Bu testler gerçek tracker hesaplarına ve geçerli cookie'lere ihtiyaç duyar
- `remove-bookmark` testi mutasyon yapar; test datası bilinçli seçilmelidir
- `tests/e2e/_helpers.py`, tüm tracker live testlerinde ortak CLI çalıştırma ve loglama yardımcılarını içerir
## Dizin Yapısı
```text
.
├── pyproject.toml
├── requirements.txt
├── src/
── wscraper/
├── __init__.py
── __main__.py
│ ├── cli.py
└── sites/
│ ├── __init__.py
│ └── happyfappy.py
└── README.md
bin/
├── wscraper/
│ ├── README.md
│ ├── pyproject.toml
── setup.py
└── src/
── wscraper/
├── cli.py
├── registry.py
│ ├── types.py
│ └── sites/
│ ├── happyfappy.py
│ └── privatehd.py
└── wscraper-service/
└── server.py
```
## Notlar
- Cookie hem raw string hem Netscape cookie file formatında verilebilir
- Tracker metadata alanları (`backgroundImage`, `downloadURL`, `removeToken`, `size`, `seeders`, `leechers`) adapter tarafından normalize edilir
- `q-buffer` tarafında image proxy, watcher item cache ve qBittorrent enrichment katmanları bu scraper çıktısını kullanır

BIN
logo-v2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 127 KiB

View File

@@ -1,5 +1,5 @@
[build-system]
requires = ["setuptools", "wheel"]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
@@ -7,11 +7,16 @@ name = "wscraper"
version = "0.1.0"
description = "Multi-site scraper CLI"
readme = "README.md"
requires-python = ">=3.12"
requires-python = ">=3.11"
dependencies = [
"scrapling[fetchers]==0.4.1",
]
[project.optional-dependencies]
test = [
"pytest>=8.0",
]
[project.scripts]
wscraper = "wscraper.cli:main"
@@ -20,3 +25,8 @@ package-dir = {"" = "src"}
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
markers = [
"e2e: live end-to-end tests against external services",
]

3
setup.py Normal file
View File

@@ -0,0 +1,3 @@
from setuptools import setup
setup()

View File

@@ -1,13 +1,10 @@
from __future__ import annotations
import argparse
import json
from pathlib import Path
from wscraper.sites.happyfappy import run_download_torrent_files, run_get_bookmarks
SITE_ALIASES = {
"happyfappy": "happyfappy",
"hf": "happyfappy",
}
from wscraper.registry import get_tracker, list_trackers, normalize_tracker
ACTION_ALIASES = {
"get-bookmarks": "get-bookmarks",
@@ -16,17 +13,12 @@ ACTION_ALIASES = {
"download-torrent-files": "download-torrent-files",
"dtf": "download-torrent-files",
"download": "download-torrent-files",
"remove-bookmark": "remove-bookmark",
"remove": "remove-bookmark",
"rb": "remove-bookmark",
}
def normalize_site(value: str) -> str:
key = value.strip().lower()
if key not in SITE_ALIASES:
supported = ", ".join(sorted(SITE_ALIASES))
raise ValueError(f"Unsupported site: {value!r}. Supported values: {supported}")
return SITE_ALIASES[key]
def normalize_action(value: str) -> str:
key = value.strip().lower()
if key not in ACTION_ALIASES:
@@ -36,91 +28,96 @@ def normalize_action(value: str) -> str:
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="wscraper: multi-site scraping entrypoint")
parser.add_argument("site", help="Site key, e.g. happyfappy or hf")
supported_sites = ", ".join(sorted({tracker.key for tracker in list_trackers()}))
parser = argparse.ArgumentParser(description=f"wscraper: multi-site scraping entrypoint ({supported_sites})")
parser.add_argument("site", help="Site key, e.g. happyfappy, hf, privatehd or phd")
parser.add_argument("-a", "--action", required=True, help="Action to run")
parser.add_argument("--base-url", help="Override site base URL")
parser.add_argument("--cookie", help='Raw cookie string, e.g. "a=1; b=2"')
parser.add_argument("-c", "--cookie-file", help="Path to cookie file")
parser.add_argument("--wishlist-url", help="Tracker-specific wishlist URL override")
parser.add_argument("-u", "--url", help="Detail page URL")
parser.add_argument("--download-url", help="Direct torrent download URL")
parser.add_argument("--remove-token", help="Tracker-specific remove token")
parser.add_argument("--title", help="Item title")
parser.add_argument("--image-url", help="Background image URL")
parser.add_argument("--size", help="Torrent size text")
parser.add_argument("--seeders", type=int, help="Seeders count")
parser.add_argument("--leechers", type=int, help="Leechers count")
parser.add_argument("-u", "--url", help="Detail page URL (required for download action)")
parser.add_argument(
"-o",
"--output",
help="Output target: file path for get-bookmarks, directory path for download-torrent-files",
)
parser.add_argument("-r", "--retries", type=int, default=3)
parser.add_argument("--backoff-base", type=float, default=5.0)
parser.add_argument("--delay-min", type=float, default=1.8)
parser.add_argument("--delay-max", type=float, default=3.2)
parser.add_argument("--max-pages", type=int, default=200)
return parser
def run_happyfappy(args: argparse.Namespace, action: str) -> None:
base_url = args.base_url or "https://www.happyfappy.net"
def read_cookie(args: argparse.Namespace) -> str:
cookie_value = args.cookie or ""
if not cookie_value and args.cookie_file:
cookie_value = Path(args.cookie_file).read_text(encoding="utf-8").strip()
if not cookie_value:
raise ValueError("Cookie is required. Use --cookie or --cookie-file/-c.")
return cookie_value
if action == "get-bookmarks":
run_get_bookmarks(
argparse.Namespace(
base_url=base_url,
cookie=args.cookie,
cookie_file=args.cookie_file,
output=args.output or "bookmarks.json",
delay_min=args.delay_min,
delay_max=args.delay_max,
retries=args.retries,
backoff_base=args.backoff_base,
max_pages=args.max_pages,
)
)
return
if action == "download-torrent-files":
if not args.url:
raise ValueError("--url is required for action=download-torrent-files.")
run_download_torrent_files(
argparse.Namespace(
url=args.url,
base_url=base_url,
cookie=args.cookie,
cookie_file=args.cookie_file,
output_dir=args.output or "torrent",
retries=args.retries,
backoff_base=args.backoff_base,
)
)
return
raise ValueError(f"Unsupported action for happyfappy: {action}")
def build_item(args: argparse.Namespace) -> dict[str, object]:
if not args.url:
raise ValueError("--url is required for item-based actions.")
item: dict[str, object] = {
"pageURL": args.url,
"title": args.title or "",
}
if args.download_url:
item["downloadURL"] = args.download_url
if args.remove_token:
item["removeToken"] = args.remove_token
if args.image_url:
item["backgroundImage"] = args.image_url
if args.size:
item["size"] = args.size
if args.seeders is not None:
item["seeders"] = args.seeders
if args.leechers is not None:
item["leechers"] = args.leechers
return item
def main() -> None:
parser = build_parser()
args = parser.parse_args()
if args.retries < 1:
raise ValueError("--retries must be at least 1.")
if args.backoff_base < 0:
raise ValueError("--backoff-base must be >= 0.")
if args.delay_min < 0 or args.delay_max < 0:
raise ValueError("Delay values must be non-negative.")
if args.delay_min > args.delay_max:
raise ValueError("--delay-min cannot be greater than --delay-max.")
site = normalize_site(args.site)
tracker = get_tracker(normalize_tracker(args.site))
action = normalize_action(args.action)
cookie = read_cookie(args)
if not args.cookie and not args.cookie_file:
raise ValueError("Cookie is required. Use --cookie or --cookie-file/-c.")
if site == "happyfappy":
run_happyfappy(args, action)
if action == "get-bookmarks":
items = tracker.get_bookmarks(cookie, wishlist_url=args.wishlist_url)
output_path = Path(args.output or "bookmarks.json").resolve()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(items, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"Saved {len(items)} records to {output_path}")
return
raise ValueError(f"Unsupported site: {site}")
item = build_item(args)
if action == "download-torrent-files":
result = tracker.download_torrent(cookie, item, wishlist_url=args.wishlist_url)
output_dir = Path(args.output or "torrent").resolve()
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / result["filename"]
output_path.write_bytes(result["data"])
print(f"Saved torrent to {output_path}")
return
if action == "remove-bookmark":
tracker.remove_bookmark(cookie, item, wishlist_url=args.wishlist_url)
print("Bookmark removed successfully.")
return
raise ValueError(f"Unsupported action: {action}")
if __name__ == "__main__":

View File

@@ -0,0 +1 @@
__all__ = []

View File

@@ -0,0 +1,91 @@
from __future__ import annotations
from typing import Any
def domain_matches(target_host: str, cookie_domain: str) -> bool:
normalized_cookie_domain = cookie_domain.replace("#HttpOnly_", "").lstrip(".").lower()
normalized_target_host = target_host.lower()
return (
normalized_target_host == normalized_cookie_domain
or normalized_target_host.endswith("." + normalized_cookie_domain)
)
def parse_cookie_string(cookie_string: str, target_host: str) -> dict[str, str]:
cookies: dict[str, str] = {}
lines = cookie_string.splitlines()
looks_like_netscape = len(lines) > 1 and any(
("\t" in line) or len(line.split()) >= 7 for line in lines if line.strip()
)
if looks_like_netscape:
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t") if "\t" in line else line.split()
if len(parts) < 7:
continue
domain, _flag, _path, _secure, _expires, name, value = parts[:7]
if not domain_matches(target_host, domain):
continue
if name:
cookies[name] = value
return cookies
for chunk in cookie_string.split(";"):
piece = chunk.strip()
if not piece or "=" not in piece:
continue
key, value = piece.split("=", 1)
key = key.strip()
value = value.strip()
if key:
cookies[key] = value
return cookies
def parse_cookies_for_playwright(
cookie_string: str,
target_host: str,
base_url: str,
) -> list[dict[str, Any]]:
lines = cookie_string.splitlines()
cookies: list[dict[str, Any]] = []
looks_like_netscape = len(lines) > 1 and any(
("\t" in line) or len(line.split()) >= 7 for line in lines if line.strip()
)
if looks_like_netscape:
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t") if "\t" in line else line.split()
if len(parts) < 7:
continue
domain, _flag, path, secure, expires, name, value = parts[:7]
if not domain_matches(target_host, domain):
continue
if not name:
continue
cookie_obj: dict[str, Any] = {
"name": name,
"value": value,
"domain": domain.replace("#HttpOnly_", "").lstrip("."),
"path": path or "/",
"secure": (secure.upper() == "TRUE"),
}
if expires.isdigit():
expires_number = int(expires)
if expires_number > 0:
cookie_obj["expires"] = float(expires_number)
cookies.append(cookie_obj)
return cookies
kv = parse_cookie_string(cookie_string, target_host)
for name, value in kv.items():
cookies.append({"name": name, "value": value, "url": base_url})
return cookies

View File

@@ -0,0 +1,78 @@
from __future__ import annotations
import re
import socket
import time
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
def absolute_url(base_url: str, href: str) -> str:
href = href.strip()
if href.startswith("http://") or href.startswith("https://"):
return href
if href.startswith("/"):
return f"{base_url.rstrip('/')}{href}"
return f"{base_url.rstrip('/')}/{href}"
def ensure_hosts_entry(host: str) -> None:
try:
ip = socket.gethostbyname(host)
except OSError:
return
hosts_path = Path("/etc/hosts")
try:
current = hosts_path.read_text(encoding="utf-8")
except OSError:
return
if re.search(rf"(^|\s){re.escape(host)}(\s|$)", current, flags=re.MULTILINE):
return
try:
with hosts_path.open("a", encoding="utf-8") as handle:
handle.write(f"\n{ip} {host}\n")
except OSError:
return
def ensure_tracker_hosts(base_url: str) -> None:
parsed = urlparse(base_url)
host = parsed.hostname
if not host:
return
variants = {host}
if host.startswith("www."):
variants.add(host[4:])
else:
variants.add(f"www.{host}")
for candidate in variants:
ensure_hosts_entry(candidate)
def fetch_dynamic_with_retry(session: Any, url: str, retries: int, backoff_base: float) -> Any:
last_error: Exception | None = None
for attempt in range(retries):
try:
response = session.fetch(
url,
timeout=45_000,
load_dom=True,
network_idle=False,
google_search=False,
)
status = response.status
if status in (403, 404, 429) or status >= 500:
raise RuntimeError(f"HTTP {status}")
return response
except Exception as err: # noqa: BLE001
last_error = err
if attempt == retries - 1:
break
time.sleep(backoff_base * (2**attempt))
raise RuntimeError(f"Request failed for {url}: {last_error}") from last_error

34
src/wscraper/registry.py Normal file
View File

@@ -0,0 +1,34 @@
from __future__ import annotations
from wscraper.sites.happyfappy import adapter as happyfappy_adapter
from wscraper.sites.privatehd import adapter as privatehd_adapter
from wscraper.types import TrackerAdapter, TrackerInfo
TRACKERS: dict[str, TrackerAdapter] = {
happyfappy_adapter.key: happyfappy_adapter,
privatehd_adapter.key: privatehd_adapter,
}
TRACKER_ALIASES = {
"hf": "happyfappy",
"happyfappy": "happyfappy",
"phd": "privatehd",
"privatehd": "privatehd",
}
def normalize_tracker(value: str) -> str:
key = value.strip().lower()
if key not in TRACKER_ALIASES:
supported = ", ".join(sorted(TRACKER_ALIASES))
raise ValueError(f"Unsupported tracker: {value!r}. Supported values: {supported}")
return TRACKER_ALIASES[key]
def get_tracker(value: str) -> TrackerAdapter:
normalized = normalize_tracker(value)
return TRACKERS[normalized]
def list_trackers() -> list[TrackerInfo]:
return [TrackerInfo(key=tracker.key, label=tracker.label) for tracker in TRACKERS.values()]

View File

@@ -4,126 +4,21 @@ import argparse
import json
import random
import re
import tempfile
import time
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from scrapling.fetchers import DynamicSession
from wscraper.common.cookies import parse_cookie_string, parse_cookies_for_playwright
from wscraper.common.net import absolute_url, ensure_tracker_hosts, fetch_dynamic_with_retry
from wscraper.types import BookmarkItem, DownloadResult
STOP_TEXT = "You have not bookmarked any torrents."
BG_URL_RE = re.compile(r"url\((?:'|\")?(.*?)(?:'|\")?\)")
def _domain_matches(target_host: str, cookie_domain: str) -> bool:
cd = cookie_domain.lstrip(".").lower()
th = target_host.lower()
return th == cd or th.endswith("." + cd)
def parse_cookie_string(cookie_string: str, target_host: str) -> dict[str, str]:
cookies: dict[str, str] = {}
lines = cookie_string.splitlines()
looks_like_netscape = len(lines) > 1 and any("\t" in line for line in lines)
if looks_like_netscape:
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) < 7:
continue
domain, _flag, _path, _secure, _expires, name, value = parts[:7]
if not _domain_matches(target_host, domain):
continue
if name:
cookies[name] = value
return cookies
for chunk in cookie_string.split(";"):
piece = chunk.strip()
if not piece or "=" not in piece:
continue
key, value = piece.split("=", 1)
key = key.strip()
value = value.strip()
if key:
cookies[key] = value
return cookies
def parse_cookies_for_playwright(cookie_string: str, target_host: str, base_url: str) -> list[dict[str, Any]]:
lines = cookie_string.splitlines()
cookies: list[dict[str, Any]] = []
looks_like_netscape = len(lines) > 1 and any("\t" in line for line in lines)
if looks_like_netscape:
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) < 7:
continue
domain, _flag, path, secure, expires, name, value = parts[:7]
if not _domain_matches(target_host, domain):
continue
if not name:
continue
cookie_obj: dict[str, Any] = {
"name": name,
"value": value,
"domain": domain.lstrip("."),
"path": path or "/",
"secure": (secure.upper() == "TRUE"),
}
if expires.isdigit():
exp_num = int(expires)
if exp_num > 0:
cookie_obj["expires"] = float(exp_num)
cookies.append(cookie_obj)
return cookies
kv = parse_cookie_string(cookie_string, target_host)
for name, value in kv.items():
cookies.append({"name": name, "value": value, "url": base_url})
return cookies
def absolute_url(base_url: str, href: str) -> str:
href = href.strip()
if href.startswith("http://") or href.startswith("https://"):
return href
if href.startswith("/"):
return f"{base_url.rstrip('/')}{href}"
return f"{base_url.rstrip('/')}/{href}"
def fetch_dynamic_with_retry(session: Any, url: str, retries: int, backoff_base: float) -> Any:
last_error: Exception | None = None
for attempt in range(retries):
try:
response = session.fetch(
url,
timeout=45_000,
load_dom=True,
network_idle=False,
google_search=False,
)
status = response.status
if status in (403, 404, 429) or status >= 500:
raise RuntimeError(f"HTTP {status}")
return response
except Exception as err: # noqa: BLE001
last_error = err
if attempt == retries - 1:
break
time.sleep(backoff_base * (2**attempt))
raise RuntimeError(f"Request failed for {url}: {last_error}") from last_error
# bookmarks
def extract_background_image(style: str) -> str | None:
@@ -172,6 +67,7 @@ def build_bookmarks_url(base_url: str, page: int) -> str:
def run_get_bookmarks(args: argparse.Namespace) -> None:
target_host = urlparse(args.base_url).hostname or "www.happyfappy.net"
ensure_tracker_hosts(args.base_url)
cookie_value = args.cookie or ""
if not cookie_value and args.cookie_file:
@@ -316,6 +212,7 @@ def validate_torrent_response(download_url: str, filename: str, data: bytes) ->
def run_download_torrent_files(args: argparse.Namespace) -> None:
base_url = args.base_url.rstrip("/")
target_host = urlparse(base_url).hostname or "www.happyfappy.net"
ensure_tracker_hosts(base_url)
cookie_value = args.cookie or ""
if not cookie_value and args.cookie_file:
@@ -353,3 +250,403 @@ def run_download_torrent_files(args: argparse.Namespace) -> None:
output_path = output_dir / filename
output_path.write_bytes(data)
print(f"Saved torrent to {output_path}")
if getattr(args, "rm_bookmark", False):
torrent_id = extract_torrent_id(args.url)
removed = remove_bookmark_with_retry(
session=session,
detail_url=args.url,
torrent_id=torrent_id,
retries=args.retries,
backoff_base=args.backoff_base,
)
if not removed:
raise RuntimeError("Torrent downloaded but bookmark removal could not be verified.")
print("Bookmark removed successfully.")
def extract_torrent_id(detail_url: str) -> str | None:
parsed = urlparse(detail_url)
query = parsed.query or ""
match = re.search(r"(?:^|&)id=(\d+)(?:&|$)", query)
if match:
return match.group(1)
path_match = re.search(r"/torrents\.php/(\d+)", parsed.path or "")
if path_match:
return path_match.group(1)
return None
def _click_remove_control(page: Any, torrent_id: str | None) -> dict[str, Any]:
return page.evaluate(
"""
({ torrentId }) => {
const normalize = (v) => (v || "").toString().toLowerCase();
const hasAny = (source, tokens) => tokens.some((t) => source.includes(t));
const removeTokens = ["unbookmark", "remove", "delete", "forget", "unmark"];
const addTokens = ["bookmark", "add", "mark", "save"];
const nodes = Array.from(document.querySelectorAll(
"a,button,[role='button'],[onclick],input[type='button'],input[type='submit']"
));
let best = null;
for (const node of nodes) {
const text = normalize(node.textContent || "");
const title = normalize(node.getAttribute("title"));
const aria = normalize(node.getAttribute("aria-label"));
const id = normalize(node.id);
const cls = normalize(node.className);
const href = normalize(node.getAttribute("href"));
const onclick = normalize(node.getAttribute("onclick"));
const attrs = normalize(node.outerHTML);
const all = [text, title, aria, id, cls, href, onclick, attrs].join(" ");
let score = 0;
const reasons = [];
if (hasAny(onclick, ["unbookmark"])) {
score += 60;
reasons.push("onclick:unbookmark");
}
if (hasAny(all, ["bookmark"])) {
score += 16;
reasons.push("bookmark-signal");
}
if (hasAny(all, removeTokens)) {
score += 26;
reasons.push("remove-signal");
}
if (!hasAny(all, removeTokens) && hasAny(all, addTokens)) {
score -= 20;
reasons.push("add-like-signal");
}
if (torrentId && all.includes(torrentId)) {
score += 30;
reasons.push("torrent-id");
}
if (hasAny(href, ["javascript", "#"])) {
score += 4;
}
if (!best || score > best.score) {
best = { node, score, reasons, snapshot: (node.outerHTML || "").slice(0, 220) };
}
}
if (!best || best.score < 20) {
return { clicked: false, score: best ? best.score : -1, reasons: best ? best.reasons : [], snapshot: best ? best.snapshot : "" };
}
best.node.click();
return { clicked: true, score: best.score, reasons: best.reasons, snapshot: best.snapshot };
}
""",
{"torrentId": torrent_id},
)
def _remove_control_exists(page: Any, torrent_id: str | None) -> bool:
return bool(
page.evaluate(
"""
({ torrentId }) => {
const normalize = (v) => (v || "").toString().toLowerCase();
const removeTokens = ["unbookmark", "remove", "delete", "forget", "unmark"];
const nodes = Array.from(document.querySelectorAll(
"a,button,[role='button'],[onclick],input[type='button'],input[type='submit']"
));
for (const node of nodes) {
const text = normalize(node.textContent || "");
const title = normalize(node.getAttribute("title"));
const aria = normalize(node.getAttribute("aria-label"));
const id = normalize(node.id);
const cls = normalize(node.className);
const href = normalize(node.getAttribute("href"));
const onclick = normalize(node.getAttribute("onclick"));
const all = [text, title, aria, id, cls, href, onclick].join(" ");
const looksLikeRemove = removeTokens.some((t) => all.includes(t)) || onclick.includes("unbookmark");
const matchesId = torrentId ? all.includes(torrentId) : true;
if (looksLikeRemove && matchesId) {
return true;
}
}
return false;
}
""",
{"torrentId": torrent_id},
)
)
def _bookmark_control_state(page: Any, torrent_id: str | None) -> dict[str, Any]:
return page.evaluate(
"""
({ torrentId }) => {
const normalize = (v) => (v || "").toString().toLowerCase();
const candidates = [];
if (torrentId) {
const direct = document.getElementById(`bookmarklink_torrent_${torrentId}`);
if (direct) candidates.push(direct);
}
const nodes = Array.from(document.querySelectorAll("a,button,[onclick],[id*='bookmark']"));
for (const node of nodes) {
if (!candidates.includes(node)) candidates.push(node);
}
const scored = [];
for (const node of candidates) {
const text = normalize(node.textContent || "");
const id = normalize(node.id);
const href = normalize(node.getAttribute("href"));
const onclick = normalize(node.getAttribute("onclick"));
const title = normalize(node.getAttribute("title"));
const all = [text, id, href, onclick, title].join(" ");
let score = 0;
if (all.includes("bookmark")) score += 12;
if (torrentId && all.includes(torrentId)) score += 28;
if (onclick.includes("unbookmark")) score += 45;
if (onclick.includes("bookmark(") && !onclick.includes("unbookmark")) score += 20;
if (id.includes("bookmarklink_torrent")) score += 35;
const action = onclick.includes("unbookmark")
? "remove"
: (onclick.includes("bookmark(") ? "add" : "unknown");
scored.push({
score,
action,
text,
id,
onclick,
snapshot: (node.outerHTML || "").slice(0, 220),
});
}
scored.sort((a, b) => b.score - a.score);
const best = scored[0] || null;
return { best, total: scored.length };
}
""",
{"torrentId": torrent_id},
)
def _click_bookmark_control(page: Any, torrent_id: str | None) -> dict[str, Any]:
return page.evaluate(
"""
({ torrentId }) => {
const normalize = (v) => (v || "").toString().toLowerCase();
let target = null;
if (torrentId) {
target = document.getElementById(`bookmarklink_torrent_${torrentId}`);
}
if (!target) {
const nodes = Array.from(document.querySelectorAll("a,button,[onclick],[id*='bookmark']"));
let best = null;
for (const node of nodes) {
const text = normalize(node.textContent || "");
const id = normalize(node.id);
const href = normalize(node.getAttribute("href"));
const onclick = normalize(node.getAttribute("onclick"));
const title = normalize(node.getAttribute("title"));
const all = [text, id, href, onclick, title].join(" ");
let score = 0;
if (all.includes("bookmark")) score += 12;
if (torrentId && all.includes(torrentId)) score += 28;
if (onclick.includes("unbookmark")) score += 45;
if (id.includes("bookmarklink_torrent")) score += 35;
if (!best || score > best.score) best = { node, score, all, onclick };
}
if (best) target = best.node;
}
if (!target) return { clicked: false, reason: "no-target" };
const onclick = normalize(target.getAttribute("onclick"));
if (onclick.includes("bookmark(") && !onclick.includes("unbookmark")) {
return { clicked: false, reason: "already-removed" };
}
target.click();
return { clicked: true, reason: "clicked", snapshot: (target.outerHTML || "").slice(0, 220) };
}
""",
{"torrentId": torrent_id},
)
def remove_bookmark_with_retry(
session: DynamicSession,
detail_url: str,
torrent_id: str | None,
retries: int,
backoff_base: float,
) -> bool:
last_error: Exception | None = None
for attempt in range(retries):
page = session.context.new_page()
try:
page.goto(detail_url, wait_until="domcontentloaded", timeout=45_000)
state_before = _bookmark_control_state(page, torrent_id)
best_before = (state_before or {}).get("best") or {}
action_before = best_before.get("action")
if action_before == "add":
return True
if action_before != "remove":
raise RuntimeError("Bookmark remove control not detected on detail page.")
click_result = _click_bookmark_control(page, torrent_id)
if not click_result.get("clicked"):
if click_result.get("reason") == "already-removed":
return True
raise RuntimeError(
"Bookmark remove action could not be clicked. "
f"reason={click_result.get('reason')}"
)
page.wait_for_timeout(2200)
page.reload(wait_until="domcontentloaded", timeout=45_000)
state_after = _bookmark_control_state(page, torrent_id)
best_after = (state_after or {}).get("best") or {}
action_after = best_after.get("action")
if action_after == "remove":
raise RuntimeError("Bookmark remove control still present after click; remove likely failed.")
return True
except Exception as err: # noqa: BLE001
last_error = err
if attempt == retries - 1:
break
time.sleep(backoff_base * (2**attempt))
finally:
page.close()
raise RuntimeError(f"Bookmark remove failed for {detail_url}: {last_error}") from last_error
def get_bookmarks(cookie: str, *, base_url: str = "https://www.happyfappy.net") -> list[BookmarkItem]:
with tempfile.TemporaryDirectory(prefix="happyfappy-bookmarks-") as tmpdir:
output_path = Path(tmpdir) / "bookmarks.json"
run_get_bookmarks(
argparse.Namespace(
base_url=base_url,
cookie=cookie,
cookie_file=None,
output=str(output_path),
delay_min=1.8,
delay_max=3.2,
retries=3,
backoff_base=5.0,
max_pages=200,
)
)
return json.loads(output_path.read_text(encoding="utf-8"))
def download_torrent(
cookie: str,
item: BookmarkItem,
*,
base_url: str = "https://www.happyfappy.net",
) -> DownloadResult:
detail_url = (item.get("pageURL") or "").strip()
if not detail_url:
raise ValueError("pageURL is required for HappyFappy download.")
with tempfile.TemporaryDirectory(prefix="happyfappy-download-") as tmpdir:
output_dir = Path(tmpdir) / "torrent"
run_download_torrent_files(
argparse.Namespace(
url=detail_url,
base_url=base_url,
cookie=cookie,
cookie_file=None,
output_dir=str(output_dir),
rm_bookmark=False,
retries=3,
backoff_base=5.0,
)
)
files = sorted(output_dir.glob("*.torrent"))
if not files:
raise RuntimeError("No torrent file produced")
torrent_path = files[0]
return {
"filename": torrent_path.name,
"data": torrent_path.read_bytes(),
}
def remove_bookmark(
cookie: str,
item: BookmarkItem,
*,
base_url: str = "https://www.happyfappy.net",
) -> None:
detail_url = (item.get("pageURL") or "").strip()
if not detail_url:
raise ValueError("pageURL is required for HappyFappy bookmark removal.")
target_host = urlparse(base_url).hostname or "www.happyfappy.net"
ensure_tracker_hosts(base_url)
pw_cookies = parse_cookies_for_playwright(
cookie,
target_host=target_host,
base_url=base_url.rstrip("/"),
)
if not pw_cookies:
raise ValueError("No Playwright-compatible cookies generated for target host.")
torrent_id = extract_torrent_id(detail_url)
with DynamicSession(
headless=True,
disable_resources=True,
cookies=pw_cookies,
google_search=False,
retries=1,
retry_delay=1,
) as session:
remove_bookmark_with_retry(
session=session,
detail_url=detail_url,
torrent_id=torrent_id,
retries=3,
backoff_base=5.0,
)
class HappyFappyAdapter:
key = "happyfappy"
label = "HappyFappy"
def get_bookmarks(self, cookie: str, *, wishlist_url: str | None = None) -> list[BookmarkItem]:
del wishlist_url
return get_bookmarks(cookie)
def download_torrent(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> DownloadResult:
del wishlist_url
return download_torrent(cookie, item)
def remove_bookmark(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> None:
del wishlist_url
remove_bookmark(cookie, item)
adapter = HappyFappyAdapter()

View File

@@ -0,0 +1,359 @@
from __future__ import annotations
import re
import time
from http.cookies import SimpleCookie
from typing import Any
from urllib.parse import unquote
from urllib.parse import urlparse
from curl_cffi import requests
from scrapling.fetchers import DynamicSession
from wscraper.sites.happyfappy import (
absolute_url,
fetch_dynamic_with_retry,
looks_like_torrent_bytes,
normalize_filename,
parse_cookie_string,
parse_cookies_for_playwright,
)
from wscraper.types import BookmarkItem, DownloadResult
DEFAULT_BASE_URL = "https://privatehd.to"
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36"
)
def normalize_wishlist_url(base_url: str, wishlist_url: str | None) -> str:
candidate = (wishlist_url or "").strip()
if not candidate:
raise ValueError("PrivateHD icin wishlistUrl zorunlu.")
if candidate.startswith("http://") or candidate.startswith("https://"):
return candidate
return absolute_url(base_url, candidate)
def parse_int(value: str) -> int | None:
cleaned = value.strip()
if not cleaned:
return None
match = re.search(r"\d+", cleaned.replace(",", ""))
return int(match.group(0)) if match else None
def extract_rows(response: Any, base_url: str) -> list[BookmarkItem]:
records: list[BookmarkItem] = []
rows = response.css("table.table tbody tr")
for row in rows:
detail_href = (row.css("a.torrent-filename::attr(href)").get("") or "").strip()
if not detail_href:
continue
detail_url = absolute_url(base_url, detail_href)
title = " ".join(row.css("a.torrent-filename::text").getall()).strip()
download_href = (
row.css("a.torrent-download-icon::attr(href)").get("")
or row.css("a[href*='/download/torrent/']::attr(href)").get("")
or ""
).strip()
download_url = absolute_url(base_url, download_href) if download_href else None
remove_token = (
row.css("button.btn-delete-wishlist::attr(data-id)").get("")
or row.css("button[data-id]::attr(data-id)").get("")
or ""
).strip() or None
cells = row.css("td")
size = None
seeders = None
leechers = None
if len(cells) >= 7:
size = " ".join(cells[4].css("::text").getall()).strip() or None
seeders = parse_int(" ".join(cells[5].css("::text").getall()))
leechers = parse_int(" ".join(cells[6].css("::text").getall()))
records.append(
{
"pageURL": detail_url,
"title": title,
"downloadURL": download_url,
"removeToken": remove_token,
"size": size,
"seeders": seeders,
"leechers": leechers,
}
)
return records
def enrich_bookmark(response: Any, base_url: str, item: BookmarkItem) -> BookmarkItem:
poster = (
response.css("img[src*='/images/posters/']::attr(src)").get("")
or response.css("img.img-responsive::attr(src)").get("")
or ""
).strip()
title = (
" ".join(response.css("a[href*='/movie/']::text").getall()).strip()
or " ".join(response.css("a[href*='/tv/']::text").getall()).strip()
or " ".join(response.css("h1::text").getall()).strip()
or item.get("title")
or ""
)
download_href = (
response.css("a.btn.btn-xs.btn-primary[href*='/download/torrent/']::attr(href)").get("")
or response.css("a[href*='/download/torrent/']::attr(href)").get("")
or ""
).strip()
return {
**item,
"title": title.strip(),
"backgroundImage": absolute_url(base_url, poster) if poster else item.get("backgroundImage"),
"downloadURL": absolute_url(base_url, download_href) if download_href else item.get("downloadURL"),
}
def build_dynamic_session(
cookie: str,
*,
base_url: str = DEFAULT_BASE_URL,
) -> DynamicSession:
target_host = urlparse(base_url).hostname or "privatehd.to"
pw_cookies = parse_cookies_for_playwright(cookie, target_host=target_host, base_url=base_url.rstrip("/"))
if not pw_cookies:
raise ValueError("No Playwright-compatible cookies generated for target host.")
return DynamicSession(
headless=True,
disable_resources=True,
cookies=pw_cookies,
google_search=False,
retries=1,
retry_delay=1,
)
def get_bookmarks(
cookie: str,
*,
wishlist_url: str,
base_url: str = DEFAULT_BASE_URL,
) -> list[BookmarkItem]:
normalized_wishlist_url = normalize_wishlist_url(base_url, wishlist_url)
with build_dynamic_session(cookie, base_url=base_url) as session:
response = fetch_dynamic_with_retry(
session,
normalized_wishlist_url,
retries=3,
backoff_base=5.0,
)
items = extract_rows(response, base_url)
enriched: list[BookmarkItem] = []
for index, item in enumerate(items):
detail_response = fetch_dynamic_with_retry(
session,
item["pageURL"],
retries=3,
backoff_base=5.0,
)
enriched.append(enrich_bookmark(detail_response, base_url, item))
if index < len(items) - 1:
time.sleep(1.2)
return enriched
def build_http_cookies(cookie: str, target_url: str) -> dict[str, str]:
target_host = urlparse(target_url).hostname or "privatehd.to"
cookies = parse_cookie_string(cookie, target_host=target_host)
if not cookies:
raise ValueError("No valid cookies parsed for target host.")
return cookies
def build_http_session(cookie: str, target_url: str) -> requests.Session:
session = requests.Session()
session.cookies.update(build_http_cookies(cookie, target_url))
session.headers.update({"User-Agent": USER_AGENT})
return session
def fetch_wishlist_token(
cookie: str,
*,
wishlist_url: str,
base_url: str = DEFAULT_BASE_URL,
) -> str:
normalized_wishlist_url = normalize_wishlist_url(base_url, wishlist_url)
with build_dynamic_session(cookie, base_url=base_url) as session:
response = fetch_dynamic_with_retry(
session,
normalized_wishlist_url,
retries=3,
backoff_base=5.0,
)
token = (
response.css("input[name='_token']::attr(value)").get("")
or response.css("meta[name='csrf-token']::attr(content)").get("")
or ""
).strip()
if not token:
cookies = build_http_cookies(cookie, normalized_wishlist_url)
raw_xsrf = cookies.get("XSRF-TOKEN", "").strip()
if raw_xsrf:
return unquote(raw_xsrf)
set_cookie_values = response.headers.get_list("set-cookie") if hasattr(response.headers, "get_list") else []
for raw_header in set_cookie_values:
jar = SimpleCookie()
jar.load(raw_header)
morsel = jar.get("XSRF-TOKEN")
if morsel and morsel.value:
return unquote(morsel.value)
raise RuntimeError("PrivateHD CSRF token bulunamadi.")
return token
def validate_download(download_url: str, filename: str, data: bytes) -> None:
good_ext = filename.lower().endswith(".torrent") or urlparse(download_url).path.lower().endswith(".torrent")
if not good_ext:
raise RuntimeError("Downloaded content has no .torrent extension in URL/filename.")
if not looks_like_torrent_bytes(data):
raise RuntimeError("Downloaded file failed torrent bencode check.")
def download_torrent(
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
base_url: str = DEFAULT_BASE_URL,
) -> DownloadResult:
del wishlist_url
download_url = (item.get("downloadURL") or "").strip()
if not download_url:
raise ValueError("PrivateHD item icin downloadURL zorunlu.")
cookies = build_http_cookies(cookie, download_url)
response = requests.get(
download_url,
cookies=cookies,
headers={
"Referer": item.get("pageURL") or base_url,
"User-Agent": USER_AGENT,
},
timeout=60,
)
if response.status_code >= 400:
raise RuntimeError(f"PrivateHD torrent indirme basarisiz: HTTP {response.status_code}")
filename = normalize_filename("", download_url)
validate_download(download_url, filename, response.content)
return {
"filename": filename,
"data": response.content,
}
def remove_bookmark(
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
base_url: str = DEFAULT_BASE_URL,
) -> None:
remove_token = (item.get("removeToken") or "").strip()
if not remove_token:
raise ValueError("PrivateHD item icin removeToken zorunlu.")
normalized_wishlist_url = normalize_wishlist_url(base_url, wishlist_url)
target_host = urlparse(base_url).hostname or "privatehd.to"
pw_cookies = parse_cookies_for_playwright(cookie, target_host=target_host, base_url=base_url.rstrip("/"))
if not pw_cookies:
raise ValueError("No Playwright-compatible cookies generated for target host.")
with DynamicSession(
headless=True,
disable_resources=True,
cookies=pw_cookies,
google_search=False,
retries=1,
retry_delay=1,
) as session:
page = session.context.new_page()
try:
page.goto(normalized_wishlist_url, wait_until="domcontentloaded", timeout=45_000)
delete_button = page.locator(f"button.btn-delete-wishlist[data-id='{remove_token}']")
if delete_button.count() == 0:
raise RuntimeError("PrivateHD wishlist silme butonu bulunamadi.")
delete_button.first.click()
page.wait_for_timeout(500)
confirm_selectors = [
"button.swal2-confirm",
"button.confirm",
"button.btn-danger",
"button:has-text('Yes')",
]
clicked_confirm = False
for selector in confirm_selectors:
locator = page.locator(selector)
if locator.count() > 0 and locator.first.is_visible():
locator.first.click()
clicked_confirm = True
break
if not clicked_confirm:
confirm_result = page.evaluate(
"""
() => {
const nodes = Array.from(document.querySelectorAll("button,a"));
const target = nodes.find((node) =>
/^(yes|ok|confirm)$/i.test((node.textContent || "").trim())
);
if (!target) return false;
target.click();
return true;
}
"""
)
clicked_confirm = bool(confirm_result)
if not clicked_confirm:
raise RuntimeError("PrivateHD wishlist onay butonu bulunamadi.")
page.wait_for_timeout(1800)
page.reload(wait_until="domcontentloaded", timeout=45_000)
if page.locator(f"button.btn-delete-wishlist[data-id='{remove_token}']").count() > 0:
raise RuntimeError("PrivateHD wishlist silme dogrulanamadi: kayit hala listede gorunuyor.")
finally:
page.close()
class PrivateHDAdapter:
key = "privatehd"
label = "PrivateHD"
def get_bookmarks(self, cookie: str, *, wishlist_url: str | None = None) -> list[BookmarkItem]:
return get_bookmarks(cookie, wishlist_url=wishlist_url or "")
def download_torrent(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> DownloadResult:
return download_torrent(cookie, item, wishlist_url=wishlist_url)
def remove_bookmark(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> None:
return remove_bookmark(cookie, item, wishlist_url=wishlist_url)
adapter = PrivateHDAdapter()

54
src/wscraper/types.py Normal file
View File

@@ -0,0 +1,54 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Protocol, TypedDict
class BookmarkItem(TypedDict, total=False):
pageURL: str
title: str
backgroundImage: str | None
downloadURL: str | None
removeToken: str | None
size: str | None
seeders: int | None
leechers: int | None
class DownloadResult(TypedDict):
filename: str
data: bytes
@dataclass(frozen=True)
class TrackerInfo:
key: str
label: str
class TrackerAdapter(Protocol):
key: str
label: str
def get_bookmarks(
self,
cookie: str,
*,
wishlist_url: str | None = None,
) -> list[BookmarkItem]: ...
def download_torrent(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> DownloadResult: ...
def remove_bookmark(
self,
cookie: str,
item: BookmarkItem,
*,
wishlist_url: str | None = None,
) -> None: ...

16
tests/conftest.py Normal file
View File

@@ -0,0 +1,16 @@
from __future__ import annotations
def pytest_terminal_summary(terminalreporter, exitstatus, config):
_ = (exitstatus, config)
passed = len(terminalreporter.stats.get("passed", []))
failed = len(terminalreporter.stats.get("failed", []))
skipped = len(terminalreporter.stats.get("skipped", []))
terminalreporter.write_sep("=", "E2E SUMMARY", cyan=True)
terminalreporter.write_line(f"✅ Passed : {passed}", green=True)
if failed:
terminalreporter.write_line(f"❌ Failed : {failed}", red=True)
else:
terminalreporter.write_line(f"❌ Failed : {failed}", green=True)
terminalreporter.write_line(f"⚠️ Skipped: {skipped}", yellow=True)

75
tests/e2e/_helpers.py Normal file
View File

@@ -0,0 +1,75 @@
from __future__ import annotations
import os
import subprocess
import sys
import time
from pathlib import Path
def e2e_enabled() -> bool:
return os.getenv("WSCRAPER_E2E", "").strip() == "1"
def base_env() -> dict[str, str]:
env = os.environ.copy()
src_path = str(Path.cwd() / "src")
current_pythonpath = env.get("PYTHONPATH", "").strip()
env["PYTHONPATH"] = f"{src_path}{os.pathsep}{current_pythonpath}" if current_pythonpath else src_path
return env
def log(tr, message: str, kind: str = "info") -> None:
icon = ""
style: dict[str, bool] = {}
if kind == "ok":
icon = ""
style = {"green": True}
elif kind == "err":
icon = ""
style = {"red": True}
elif kind == "warn":
icon = "⚠️"
style = {"yellow": True}
elif kind == "run":
icon = "🚀"
style = {"cyan": True}
if tr is not None:
tr.write_line(f"{icon} {message}", **style)
else:
print(f"{icon} {message}")
def run_cli_live(args: list[str], tr, timeout: int = 900) -> tuple[int, str]:
cmd = [sys.executable, "-m", "wscraper"] + args
log(tr, f"Running: {' '.join(cmd)}", kind="run")
started = time.time()
proc = subprocess.Popen(
cmd,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=base_env(),
)
output_lines: list[str] = []
assert proc.stdout is not None
for line in proc.stdout:
output_lines.append(line)
clean = line.rstrip("\n")
if clean:
if tr is not None:
tr.write_line(f" {clean}")
else:
print(f" {clean}")
return_code = proc.wait(timeout=timeout)
duration = time.time() - started
if return_code == 0:
log(tr, f"Command finished successfully in {duration:.2f}s", kind="ok")
else:
log(tr, f"Command failed with exit code {return_code} in {duration:.2f}s", kind="err")
return return_code, "".join(output_lines)

View File

@@ -0,0 +1,95 @@
from __future__ import annotations
import json
import os
from pathlib import Path
import pytest
from tests.e2e._helpers import e2e_enabled
from tests.e2e._helpers import log
from tests.e2e._helpers import run_cli_live
pytestmark = [pytest.mark.e2e]
@pytest.fixture
def tr(request):
return request.config.pluginmanager.getplugin("terminalreporter")
@pytest.mark.skipif(not e2e_enabled(), reason="Set WSCRAPER_E2E=1 to run live tests")
def test_get_bookmarks_live(tmp_path: Path, tr) -> None:
cookie_file = Path(os.getenv("WSCRAPER_COOKIE_FILE", "cookies.txt"))
if not cookie_file.exists():
pytest.skip(f"Cookie file not found: {cookie_file}")
output_file = tmp_path / "bookmarks.json"
log(tr, f"Output file: {output_file}")
return_code, output_text = run_cli_live(
[
"happyfappy",
"--action",
"get-bookmarks",
"-c",
str(cookie_file),
"-o",
str(output_file),
],
tr,
)
assert return_code == 0, f"CLI failed:\n{output_text}"
assert output_file.exists(), "bookmarks.json was not created"
data = json.loads(output_file.read_text(encoding="utf-8"))
assert isinstance(data, list), "bookmarks output must be a JSON list"
assert len(data) >= 1, "expected at least one bookmark record"
log(tr, f"Extracted records: {len(data)}", kind="ok")
first = data[0]
assert isinstance(first, dict), "bookmark entry must be an object"
for required_key in ("pageURL", "isVR", "title", "backgroundImage"):
assert required_key in first, f"missing key: {required_key}"
assert isinstance(first["pageURL"], str) and first["pageURL"].startswith("http")
assert isinstance(first["isVR"], bool)
assert isinstance(first["title"], str) and first["title"].strip() != ""
@pytest.mark.skipif(not e2e_enabled(), reason="Set WSCRAPER_E2E=1 to run live tests")
def test_download_torrent_file_live(tmp_path: Path, tr) -> None:
cookie_file = Path(os.getenv("WSCRAPER_COOKIE_FILE", "cookies.txt"))
if not cookie_file.exists():
pytest.skip(f"Cookie file not found: {cookie_file}")
test_url = os.getenv(
"WSCRAPER_TEST_TORRENT_URL",
"https://www.happyfappy.net/torrents.php?id=110178",
)
output_dir = tmp_path / "torrent"
log(tr, f"Output dir: {output_dir}")
return_code, output_text = run_cli_live(
[
"happyfappy",
"--action",
"download-torrent-files",
"-u",
test_url,
"-c",
str(cookie_file),
"-o",
str(output_dir),
],
tr,
)
assert return_code == 0, f"CLI failed:\n{output_text}"
assert output_dir.exists(), "torrent output directory was not created"
torrent_files = list(output_dir.glob("*.torrent"))
assert len(torrent_files) >= 1, "expected at least one .torrent file"
log(tr, f"Downloaded .torrent files: {len(torrent_files)}", kind="ok")
content = torrent_files[0].read_bytes()
assert content.startswith(b"d"), "torrent file should start with bencode dictionary token 'd'"
assert b"4:info" in content[:4096], "torrent file should include 'info' dictionary marker"

View File

@@ -0,0 +1,158 @@
from __future__ import annotations
import json
import os
from pathlib import Path
import pytest
from tests.e2e._helpers import e2e_enabled
from tests.e2e._helpers import log
from tests.e2e._helpers import run_cli_live
pytestmark = [pytest.mark.e2e]
@pytest.fixture
def tr(request):
return request.config.pluginmanager.getplugin("terminalreporter")
def _privatehd_cookie_file() -> Path:
path = os.getenv("WSCRAPER_PRIVATEHD_COOKIE_FILE") or os.getenv("WSCRAPER_COOKIE_FILE", "cookies.txt")
return Path(path)
def _privatehd_wishlist_url() -> str:
return os.getenv("WSCRAPER_PRIVATEHD_WISHLIST_URL", "").strip()
@pytest.mark.skipif(not e2e_enabled(), reason="Set WSCRAPER_E2E=1 to run live tests")
def test_get_bookmarks_live(tmp_path: Path, tr) -> None:
cookie_file = _privatehd_cookie_file()
if not cookie_file.exists():
pytest.skip(f"Cookie file not found: {cookie_file}")
wishlist_url = _privatehd_wishlist_url()
if not wishlist_url:
pytest.skip("Set WSCRAPER_PRIVATEHD_WISHLIST_URL to run PrivateHD live bookmark test")
output_file = tmp_path / "bookmarks.json"
log(tr, f"Output file: {output_file}")
return_code, output_text = run_cli_live(
[
"privatehd",
"--action",
"get-bookmarks",
"-c",
str(cookie_file),
"--wishlist-url",
wishlist_url,
"-o",
str(output_file),
],
tr,
)
assert return_code == 0, f"CLI failed:\n{output_text}"
assert output_file.exists(), "bookmarks.json was not created"
data = json.loads(output_file.read_text(encoding="utf-8"))
assert isinstance(data, list), "bookmarks output must be a JSON list"
assert len(data) >= 1, "expected at least one bookmark record"
log(tr, f"Extracted records: {len(data)}", kind="ok")
first = data[0]
assert isinstance(first, dict), "bookmark entry must be an object"
for required_key in ("pageURL", "title", "backgroundImage", "downloadURL", "removeToken"):
assert required_key in first, f"missing key: {required_key}"
assert isinstance(first["pageURL"], str) and first["pageURL"].startswith("http")
assert isinstance(first["title"], str) and first["title"].strip() != ""
assert isinstance(first["downloadURL"], str) and first["downloadURL"].startswith("http")
assert isinstance(first["removeToken"], str) and first["removeToken"].strip() != ""
@pytest.mark.skipif(not e2e_enabled(), reason="Set WSCRAPER_E2E=1 to run live tests")
def test_download_torrent_file_live(tmp_path: Path, tr) -> None:
cookie_file = _privatehd_cookie_file()
if not cookie_file.exists():
pytest.skip(f"Cookie file not found: {cookie_file}")
wishlist_url = _privatehd_wishlist_url()
if not wishlist_url:
pytest.skip("Set WSCRAPER_PRIVATEHD_WISHLIST_URL to run PrivateHD live download test")
test_url = os.getenv("WSCRAPER_PRIVATEHD_TEST_TORRENT_URL", "").strip()
download_url = os.getenv("WSCRAPER_PRIVATEHD_TEST_DOWNLOAD_URL", "").strip()
if not test_url or not download_url:
pytest.skip("Set WSCRAPER_PRIVATEHD_TEST_TORRENT_URL and WSCRAPER_PRIVATEHD_TEST_DOWNLOAD_URL")
output_dir = tmp_path / "torrent"
log(tr, f"Output dir: {output_dir}")
return_code, output_text = run_cli_live(
[
"privatehd",
"--action",
"download-torrent-files",
"-u",
test_url,
"--download-url",
download_url,
"-c",
str(cookie_file),
"--wishlist-url",
wishlist_url,
"-o",
str(output_dir),
],
tr,
)
assert return_code == 0, f"CLI failed:\n{output_text}"
assert output_dir.exists(), "torrent output directory was not created"
torrent_files = list(output_dir.glob("*.torrent"))
assert len(torrent_files) >= 1, "expected at least one .torrent file"
log(tr, f"Downloaded .torrent files: {len(torrent_files)}", kind="ok")
content = torrent_files[0].read_bytes()
assert content.startswith(b"d"), "torrent file should start with bencode dictionary token 'd'"
assert b"4:info" in content[:4096], "torrent file should include 'info' dictionary marker"
@pytest.mark.skipif(not e2e_enabled(), reason="Set WSCRAPER_E2E=1 to run live tests")
def test_remove_bookmark_live(tr) -> None:
cookie_file = _privatehd_cookie_file()
if not cookie_file.exists():
pytest.skip(f"Cookie file not found: {cookie_file}")
wishlist_url = _privatehd_wishlist_url()
if not wishlist_url:
pytest.skip("Set WSCRAPER_PRIVATEHD_WISHLIST_URL to run PrivateHD live remove test")
test_url = os.getenv("WSCRAPER_PRIVATEHD_TEST_REMOVE_URL", "").strip()
remove_token = os.getenv("WSCRAPER_PRIVATEHD_TEST_REMOVE_TOKEN", "").strip()
if not test_url or not remove_token:
pytest.skip("Set WSCRAPER_PRIVATEHD_TEST_REMOVE_URL and WSCRAPER_PRIVATEHD_TEST_REMOVE_TOKEN")
return_code, output_text = run_cli_live(
[
"privatehd",
"--action",
"remove-bookmark",
"-u",
test_url,
"--remove-token",
remove_token,
"-c",
str(cookie_file),
"--wishlist-url",
wishlist_url,
],
tr,
timeout=240,
)
assert return_code == 0, f"CLI failed:\n{output_text}"
assert "Bookmark removed successfully." in output_text
log(tr, "PrivateHD bookmark removal completed", kind="ok")