Add iso2god.py
This commit is contained in:
918
iso2god.py
Normal file
918
iso2god.py
Normal file
@@ -0,0 +1,918 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import fnmatch
|
||||||
|
import glob
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import shlex
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import unicodedata
|
||||||
|
import urllib.request
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Iterable, Optional
|
||||||
|
|
||||||
|
GIB = 1024 ** 3
|
||||||
|
MIB = 1024 ** 2
|
||||||
|
DEFAULT_TITLE_MAP_URL = (
|
||||||
|
"https://gist.githubusercontent.com/AdrianCassar/"
|
||||||
|
"c0d05a14608168259232b3ed8c77f28c/raw/"
|
||||||
|
"482e1a7a303cceaf747a4acd765dd950b056019a/Title%2520IDs.json"
|
||||||
|
)
|
||||||
|
TITLE_ID_RE = re.compile(r"^[0-9a-fA-F]{8}$")
|
||||||
|
|
||||||
|
|
||||||
|
class HelpFormatter(argparse.ArgumentDefaultsHelpFormatter, argparse.RawDescriptionHelpFormatter):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def log(msg: str) -> None:
|
||||||
|
print(msg, flush=True)
|
||||||
|
|
||||||
|
|
||||||
|
def warn(msg: str) -> None:
|
||||||
|
print(msg, file=sys.stderr, flush=True)
|
||||||
|
|
||||||
|
|
||||||
|
def sanitize_title_dir_name(title: str, fallback: str) -> str:
|
||||||
|
"""
|
||||||
|
Zamienia tytuł gry na bezpieczną nazwę katalogu.
|
||||||
|
Zostają tylko znaki ASCII: A-Z, a-z, 0-9, _ oraz -.
|
||||||
|
Spacje i inne niedozwolone znaki są zamieniane na pojedyncze _.
|
||||||
|
"""
|
||||||
|
normalized = unicodedata.normalize("NFKD", title)
|
||||||
|
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
|
||||||
|
safe = re.sub(r"[^A-Za-z0-9_-]+", "_", ascii_text)
|
||||||
|
safe = re.sub(r"_+", "_", safe).strip("_-")
|
||||||
|
return safe or fallback.upper()
|
||||||
|
|
||||||
|
|
||||||
|
def parse_title_map_obj(obj: object) -> dict[str, str]:
|
||||||
|
if not isinstance(obj, list):
|
||||||
|
raise ValueError("title-map musi być listą obiektów z polami titleid i title")
|
||||||
|
|
||||||
|
out: dict[str, str] = {}
|
||||||
|
for item in obj:
|
||||||
|
if not isinstance(item, dict):
|
||||||
|
continue
|
||||||
|
title_id = str(item.get("titleid", "")).strip().upper()
|
||||||
|
title = str(item.get("title", "")).strip()
|
||||||
|
if TITLE_ID_RE.fullmatch(title_id) and title:
|
||||||
|
out[title_id] = title
|
||||||
|
|
||||||
|
if not out:
|
||||||
|
raise ValueError("title-map nie zawiera poprawnych wpisów titleid/title")
|
||||||
|
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def load_title_map_file(path: Path) -> dict[str, str]:
|
||||||
|
with path.expanduser().open("r", encoding="utf-8") as fh:
|
||||||
|
return parse_title_map_obj(json.load(fh))
|
||||||
|
|
||||||
|
|
||||||
|
def load_title_map_url(url: str, timeout: float) -> dict[str, str]:
|
||||||
|
req = urllib.request.Request(url, headers={"User-Agent": "iso2god_batch/1.0"})
|
||||||
|
with urllib.request.urlopen(req, timeout=timeout) as response:
|
||||||
|
charset = response.headers.get_content_charset() or "utf-8"
|
||||||
|
text = response.read().decode(charset, errors="replace")
|
||||||
|
return parse_title_map_obj(json.loads(text))
|
||||||
|
|
||||||
|
|
||||||
|
def load_title_map(args: argparse.Namespace) -> dict[str, str]:
|
||||||
|
title_map: dict[str, str] = {}
|
||||||
|
|
||||||
|
if args.title_map_url:
|
||||||
|
loaded = load_title_map_url(args.title_map_url, args.title_map_timeout)
|
||||||
|
title_map.update(loaded)
|
||||||
|
log(f"[TITLE] pobrano title-map z URL: {len(loaded)} wpisów")
|
||||||
|
|
||||||
|
if args.title_map_file:
|
||||||
|
path = Path(args.title_map_file).expanduser()
|
||||||
|
loaded = load_title_map_file(path)
|
||||||
|
title_map.update(loaded)
|
||||||
|
log(f"[TITLE] wczytano title-map z pliku: {len(loaded)} wpisów")
|
||||||
|
|
||||||
|
return title_map
|
||||||
|
|
||||||
|
|
||||||
|
def unique_title_target(dest_dir: Path, base_name: str, title_id: str) -> Path:
|
||||||
|
target = dest_dir / base_name
|
||||||
|
if not target.exists():
|
||||||
|
return target
|
||||||
|
|
||||||
|
title_id_l = title_id.upper()
|
||||||
|
target = dest_dir / f"{base_name}_{title_id_l}"
|
||||||
|
if not target.exists():
|
||||||
|
return target
|
||||||
|
|
||||||
|
idx = 2
|
||||||
|
while True:
|
||||||
|
target = dest_dir / f"{base_name}_{title_id_l}_{idx}"
|
||||||
|
if not target.exists():
|
||||||
|
return target
|
||||||
|
idx += 1
|
||||||
|
|
||||||
|
|
||||||
|
def rename_converted_title_dirs(dest_dir: Path, title_map: dict[str, str]) -> int:
|
||||||
|
"""
|
||||||
|
iso2god zwykle tworzy katalog TITLEID, np. 4156004D.
|
||||||
|
Jeśli znamy TITLEID z mapy, zmieniamy nazwę katalogu na oczyszczony tytuł gry.
|
||||||
|
"""
|
||||||
|
if not title_map or not dest_dir.exists():
|
||||||
|
return 0
|
||||||
|
|
||||||
|
renamed = 0
|
||||||
|
for child in sorted(dest_dir.iterdir(), key=natural_key):
|
||||||
|
if not child.is_dir() or not TITLE_ID_RE.fullmatch(child.name):
|
||||||
|
continue
|
||||||
|
|
||||||
|
title_id = child.name.upper()
|
||||||
|
title = title_map.get(title_id)
|
||||||
|
if not title:
|
||||||
|
continue
|
||||||
|
|
||||||
|
base_name = sanitize_title_dir_name(title, fallback=title_id.lower())
|
||||||
|
if child.name == base_name:
|
||||||
|
continue
|
||||||
|
|
||||||
|
target = unique_title_target(dest_dir, base_name, title_id)
|
||||||
|
child.rename(target)
|
||||||
|
log(f"[TITLE] {child.name} -> {target.name} ({title})")
|
||||||
|
renamed += 1
|
||||||
|
|
||||||
|
return renamed
|
||||||
|
|
||||||
|
|
||||||
|
def natural_key(path: Path):
|
||||||
|
return [int(x) if x.isdigit() else x.lower() for x in re.split(r"(\d+)", str(path))]
|
||||||
|
|
||||||
|
|
||||||
|
def format_bytes(value: int) -> str:
|
||||||
|
value_f = float(value)
|
||||||
|
for unit in ("B", "KiB", "MiB", "GiB", "TiB"):
|
||||||
|
if value_f < 1024.0 or unit == "TiB":
|
||||||
|
return f"{value_f:.2f} {unit}"
|
||||||
|
value_f /= 1024.0
|
||||||
|
return f"{value} B"
|
||||||
|
|
||||||
|
|
||||||
|
def bytes_from_gb(value: float) -> int:
|
||||||
|
return int(max(value, 0.0) * GIB)
|
||||||
|
|
||||||
|
|
||||||
|
def free_bytes(path: Path) -> int:
|
||||||
|
probe = path if path.exists() else path.parent
|
||||||
|
while not probe.exists() and probe != probe.parent:
|
||||||
|
probe = probe.parent
|
||||||
|
return shutil.disk_usage(probe).free
|
||||||
|
|
||||||
|
|
||||||
|
def is_iso(path: Path) -> bool:
|
||||||
|
return path.is_file() and path.name.lower().endswith(".iso")
|
||||||
|
|
||||||
|
|
||||||
|
def archive_kind(path: Path) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Zwraca '7z' albo 'rar' tylko dla partu, od którego należy zacząć.
|
||||||
|
Kolejne party są pomijane, żeby nie przerabiać tej samej gry wiele razy.
|
||||||
|
"""
|
||||||
|
if not path.is_file():
|
||||||
|
return None
|
||||||
|
|
||||||
|
name = path.name.lower()
|
||||||
|
|
||||||
|
# 7z wieloczęściowy: gra.7z.001, gra.7z.002...
|
||||||
|
m = re.search(r"\.7z\.0*(\d+)$", name)
|
||||||
|
if m:
|
||||||
|
return "7z" if int(m.group(1)) == 1 else None
|
||||||
|
|
||||||
|
if name.endswith(".7z"):
|
||||||
|
return "7z"
|
||||||
|
|
||||||
|
# RAR wieloczęściowy: gra.part1.rar, gra.part01.rar...
|
||||||
|
m = re.search(r"\.part0*(\d+)\.rar$", name)
|
||||||
|
if m:
|
||||||
|
return "rar" if int(m.group(1)) == 1 else None
|
||||||
|
|
||||||
|
# Stary RAR: gra.rar + gra.r00/gra.r01...
|
||||||
|
if re.search(r"\.r\d+$", name):
|
||||||
|
return None
|
||||||
|
|
||||||
|
if name.endswith(".rar"):
|
||||||
|
return "rar"
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def source_has_glob(value: str) -> bool:
|
||||||
|
return any(ch in value for ch in "*?[")
|
||||||
|
|
||||||
|
|
||||||
|
def unique_paths(paths: Iterable[Path]) -> list[Path]:
|
||||||
|
seen: set[str] = set()
|
||||||
|
out: list[Path] = []
|
||||||
|
for path in paths:
|
||||||
|
try:
|
||||||
|
key = str(path.expanduser().resolve())
|
||||||
|
except OSError:
|
||||||
|
key = str(path.expanduser().absolute())
|
||||||
|
if key in seen:
|
||||||
|
continue
|
||||||
|
seen.add(key)
|
||||||
|
out.append(Path(key))
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def expand_source_specs(source_specs: list[str]) -> list[Path]:
|
||||||
|
"""
|
||||||
|
Obsługuje:
|
||||||
|
- katalog: /downloads
|
||||||
|
- plik ISO/archiwum: /downloads/gra.iso
|
||||||
|
- glob: /downloads/*X360*
|
||||||
|
- wiele pozycji po -s, np. gdy shell rozwinie *X360*
|
||||||
|
"""
|
||||||
|
expanded: list[Path] = []
|
||||||
|
|
||||||
|
for raw in source_specs:
|
||||||
|
spec = os.path.expanduser(raw)
|
||||||
|
if source_has_glob(spec):
|
||||||
|
found = [Path(p) for p in glob.glob(spec, recursive=True)]
|
||||||
|
if not found:
|
||||||
|
warn(f"[WARN] wzorzec źródła niczego nie znalazł: {raw}")
|
||||||
|
expanded.extend(found)
|
||||||
|
else:
|
||||||
|
expanded.append(Path(spec))
|
||||||
|
|
||||||
|
return unique_paths(expanded)
|
||||||
|
|
||||||
|
|
||||||
|
def candidate_for_file(path: Path) -> Optional[tuple[str, Path]]:
|
||||||
|
if is_iso(path):
|
||||||
|
return ("iso", path)
|
||||||
|
|
||||||
|
kind = archive_kind(path)
|
||||||
|
if kind:
|
||||||
|
return (kind, path)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def iter_candidates(source: Path, recursive: bool) -> Iterable[tuple[str, Path]]:
|
||||||
|
if source.is_file():
|
||||||
|
candidate = candidate_for_file(source)
|
||||||
|
if candidate:
|
||||||
|
yield candidate
|
||||||
|
return
|
||||||
|
|
||||||
|
if not source.is_dir():
|
||||||
|
warn(f"[WARN] pomijam, nie istnieje albo nie jest plikiem/katalogiem: {source}")
|
||||||
|
return
|
||||||
|
|
||||||
|
files = source.rglob("*") if recursive else source.glob("*")
|
||||||
|
for path in sorted(files, key=natural_key):
|
||||||
|
candidate = candidate_for_file(path)
|
||||||
|
if candidate:
|
||||||
|
yield candidate
|
||||||
|
|
||||||
|
|
||||||
|
def path_matches_pattern(path: Path, pattern: str) -> bool:
|
||||||
|
"""
|
||||||
|
Filtr działa po nazwie pliku i pełnej ścieżce.
|
||||||
|
Bez globów traktuje pattern jako zwykłe słowo kluczowe, np. X360.
|
||||||
|
Z globami działa np. *X360* albo *.iso.
|
||||||
|
"""
|
||||||
|
pattern_l = pattern.lower()
|
||||||
|
texts = (path.name.lower(), str(path).lower())
|
||||||
|
|
||||||
|
for text in texts:
|
||||||
|
if pattern_l in text:
|
||||||
|
return True
|
||||||
|
if fnmatch.fnmatch(text, pattern_l):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def apply_path_filters(
|
||||||
|
candidates: Iterable[tuple[str, Path]],
|
||||||
|
include_patterns: list[str],
|
||||||
|
exclude_patterns: list[str],
|
||||||
|
) -> list[tuple[str, Path]]:
|
||||||
|
seen: set[str] = set()
|
||||||
|
out: list[tuple[str, Path]] = []
|
||||||
|
|
||||||
|
for kind, path in candidates:
|
||||||
|
key = str(path.resolve())
|
||||||
|
if key in seen:
|
||||||
|
continue
|
||||||
|
seen.add(key)
|
||||||
|
|
||||||
|
if include_patterns and not any(path_matches_pattern(path, p) for p in include_patterns):
|
||||||
|
continue
|
||||||
|
|
||||||
|
if exclude_patterns and any(path_matches_pattern(path, p) for p in exclude_patterns):
|
||||||
|
continue
|
||||||
|
|
||||||
|
out.append((kind, path))
|
||||||
|
|
||||||
|
return sorted(out, key=lambda item: natural_key(item[1]))
|
||||||
|
|
||||||
|
|
||||||
|
def which_first(names: list[str]) -> Optional[str]:
|
||||||
|
for name in names:
|
||||||
|
found = shutil.which(name)
|
||||||
|
if found:
|
||||||
|
return found
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def command_exists(token: str) -> bool:
|
||||||
|
expanded = Path(token).expanduser()
|
||||||
|
if os.path.sep in token or (os.path.altsep and os.path.altsep in token):
|
||||||
|
return expanded.exists()
|
||||||
|
return shutil.which(token) is not None
|
||||||
|
|
||||||
|
|
||||||
|
def quote_cmd(cmd: list[str]) -> str:
|
||||||
|
return " ".join(shlex.quote(str(x)) for x in cmd)
|
||||||
|
|
||||||
|
|
||||||
|
def run_checked(cmd: list[str], cwd: Optional[Path] = None) -> None:
|
||||||
|
log("+ " + quote_cmd(cmd))
|
||||||
|
proc = subprocess.run(
|
||||||
|
[str(x) for x in cmd],
|
||||||
|
cwd=str(cwd) if cwd else None,
|
||||||
|
stdin=subprocess.DEVNULL,
|
||||||
|
)
|
||||||
|
if proc.returncode != 0:
|
||||||
|
raise RuntimeError(f"komenda zakończona kodem {proc.returncode}: {quote_cmd(cmd)}")
|
||||||
|
|
||||||
|
|
||||||
|
def rm_tree_contents(path: Path) -> None:
|
||||||
|
if path.exists():
|
||||||
|
shutil.rmtree(path)
|
||||||
|
path.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
|
||||||
|
def extractor_commands(kind: str, archive: Path, out_dir: Path, password: Optional[str]) -> list[list[str]]:
|
||||||
|
cmds: list[list[str]] = []
|
||||||
|
|
||||||
|
sevenz = which_first(["7z", "7zz", "7za"])
|
||||||
|
if sevenz:
|
||||||
|
cmd = [sevenz, "x", "-y", f"-o{out_dir}"]
|
||||||
|
if password:
|
||||||
|
cmd.append(f"-p{password}")
|
||||||
|
cmd.append(str(archive))
|
||||||
|
cmds.append(cmd)
|
||||||
|
|
||||||
|
unar = shutil.which("unar")
|
||||||
|
if unar:
|
||||||
|
cmd = [unar, "-force-overwrite", "-o", str(out_dir)]
|
||||||
|
if password:
|
||||||
|
cmd += ["-password", password]
|
||||||
|
cmd.append(str(archive))
|
||||||
|
cmds.append(cmd)
|
||||||
|
|
||||||
|
if kind == "rar":
|
||||||
|
unrar = shutil.which("unrar")
|
||||||
|
if unrar:
|
||||||
|
# Nie dodajemy -y: przy błędzie zapisu mogłoby to oznaczać ciągłe Retry.
|
||||||
|
# stdin=DEVNULL w extract_archive sprawia, że prompt Retry/Abort nie blokuje skryptu.
|
||||||
|
cmd = [unrar, "x", "-o+"]
|
||||||
|
if password:
|
||||||
|
cmd.append(f"-p{password}")
|
||||||
|
cmd += [str(archive), str(out_dir) + os.sep]
|
||||||
|
cmds.append(cmd)
|
||||||
|
|
||||||
|
return cmds
|
||||||
|
|
||||||
|
|
||||||
|
def parse_7z_slt_iso_size(output: str) -> Optional[int]:
|
||||||
|
total = 0
|
||||||
|
current_path: Optional[str] = None
|
||||||
|
current_size: Optional[int] = None
|
||||||
|
|
||||||
|
def commit() -> None:
|
||||||
|
nonlocal total, current_path, current_size
|
||||||
|
if current_path and current_path.lower().endswith(".iso") and current_size is not None:
|
||||||
|
total += current_size
|
||||||
|
current_path = None
|
||||||
|
current_size = None
|
||||||
|
|
||||||
|
for raw_line in output.splitlines():
|
||||||
|
line = raw_line.strip()
|
||||||
|
if not line:
|
||||||
|
commit()
|
||||||
|
continue
|
||||||
|
if line.startswith("Path = "):
|
||||||
|
commit()
|
||||||
|
current_path = line[7:].strip()
|
||||||
|
current_size = None
|
||||||
|
continue
|
||||||
|
if line.startswith("Size = "):
|
||||||
|
value = line[7:].strip()
|
||||||
|
if value.isdigit():
|
||||||
|
current_size = int(value)
|
||||||
|
|
||||||
|
commit()
|
||||||
|
return total if total > 0 else None
|
||||||
|
|
||||||
|
|
||||||
|
def estimate_archive_iso_bytes_with_7z(archive: Path, password: Optional[str]) -> Optional[int]:
|
||||||
|
sevenz = which_first(["7z", "7zz", "7za"])
|
||||||
|
if not sevenz:
|
||||||
|
return None
|
||||||
|
|
||||||
|
cmd = [sevenz, "l", "-slt", "-y"]
|
||||||
|
if password:
|
||||||
|
cmd.append(f"-p{password}")
|
||||||
|
cmd.append(str(archive))
|
||||||
|
|
||||||
|
try:
|
||||||
|
proc = subprocess.run(
|
||||||
|
[str(x) for x in cmd],
|
||||||
|
cwd=str(archive.parent),
|
||||||
|
stdin=subprocess.DEVNULL,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
text=True,
|
||||||
|
errors="replace",
|
||||||
|
timeout=120,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if proc.returncode != 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return parse_7z_slt_iso_size(proc.stdout)
|
||||||
|
|
||||||
|
|
||||||
|
def estimate_archive_extract_bytes(
|
||||||
|
archive: Path,
|
||||||
|
password: Optional[str],
|
||||||
|
fallback_gb: float,
|
||||||
|
) -> tuple[int, str]:
|
||||||
|
estimated = estimate_archive_iso_bytes_with_7z(archive, password)
|
||||||
|
if estimated is not None:
|
||||||
|
return estimated, "listing 7z"
|
||||||
|
return bytes_from_gb(fallback_gb), f"fallback {fallback_gb:g} GiB"
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_enough_work_space(
|
||||||
|
work_parent: Path,
|
||||||
|
archive: Path,
|
||||||
|
password: Optional[str],
|
||||||
|
min_free_gb: float,
|
||||||
|
fallback_gb: float,
|
||||||
|
no_space_check: bool,
|
||||||
|
) -> None:
|
||||||
|
if no_space_check:
|
||||||
|
return
|
||||||
|
|
||||||
|
work_parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
needed, source = estimate_archive_extract_bytes(archive, password, fallback_gb)
|
||||||
|
reserve = bytes_from_gb(min_free_gb)
|
||||||
|
required = needed + reserve
|
||||||
|
free = free_bytes(work_parent)
|
||||||
|
|
||||||
|
log(
|
||||||
|
f"[SPACE] work={work_parent} wolne={format_bytes(free)} "
|
||||||
|
f"potrzeba~={format_bytes(needed)} zapas={format_bytes(reserve)} źródło={source}"
|
||||||
|
)
|
||||||
|
|
||||||
|
if free < required:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"za mało miejsca w katalogu roboczym: {work_parent}; "
|
||||||
|
f"wolne {format_bytes(free)}, wymagane ok. {format_bytes(required)}. "
|
||||||
|
f"Użyj --work-dir na ext4/xfs/btrfs z większą ilością miejsca."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def extract_archive(kind: str, archive: Path, out_dir: Path, password: Optional[str]) -> None:
|
||||||
|
cmds = extractor_commands(kind, archive, out_dir, password)
|
||||||
|
if not cmds:
|
||||||
|
raise RuntimeError("brak programu do rozpakowania: zainstaluj 7z/7zz/7za, unar albo unrar")
|
||||||
|
|
||||||
|
last_code = None
|
||||||
|
for idx, cmd in enumerate(cmds, start=1):
|
||||||
|
rm_tree_contents(out_dir)
|
||||||
|
log(f"[EXTRACT] {archive.name} -> {out_dir} (próba {idx}/{len(cmds)})")
|
||||||
|
log("+ " + quote_cmd(cmd))
|
||||||
|
proc = subprocess.run(
|
||||||
|
[str(x) for x in cmd],
|
||||||
|
cwd=str(archive.parent),
|
||||||
|
stdin=subprocess.DEVNULL,
|
||||||
|
)
|
||||||
|
if proc.returncode == 0:
|
||||||
|
return
|
||||||
|
last_code = proc.returncode
|
||||||
|
warn(f"[WARN] rozpakowanie nieudane, kod {proc.returncode}; próbuję następny backend")
|
||||||
|
|
||||||
|
free = free_bytes(out_dir.parent)
|
||||||
|
raise RuntimeError(
|
||||||
|
f"nie udało się rozpakować archiwum, ostatni kod: {last_code}; "
|
||||||
|
f"wolne w katalogu roboczym: {format_bytes(free)}. "
|
||||||
|
f"Przy 'Write error' najczęściej brakuje miejsca albo filesystem nie obsługuje plików >4 GiB. "
|
||||||
|
f"Ustaw np. --work-dir /home/user/iso_tmp"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def find_isos(root: Path) -> list[Path]:
|
||||||
|
return sorted(
|
||||||
|
(p for p in root.rglob("*") if p.is_file() and p.name.lower().endswith(".iso")),
|
||||||
|
key=natural_key,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def build_iso2god_cmd(
|
||||||
|
iso2god_cmd: list[str],
|
||||||
|
iso_path: Path,
|
||||||
|
dest_dir: Path,
|
||||||
|
threads: Optional[int],
|
||||||
|
trim: str,
|
||||||
|
iso2god_dry_run: bool,
|
||||||
|
extra_args: list[str],
|
||||||
|
) -> list[str]:
|
||||||
|
cmd = list(iso2god_cmd)
|
||||||
|
|
||||||
|
if iso2god_dry_run:
|
||||||
|
cmd.append("--dry-run")
|
||||||
|
|
||||||
|
# iso2god-rs domyślnie przycina z końca; dodajemy jawnie dla czytelności.
|
||||||
|
if trim == "from-end":
|
||||||
|
cmd.append("--trim")
|
||||||
|
elif trim == "none":
|
||||||
|
cmd.append("--trim=none")
|
||||||
|
|
||||||
|
if threads and threads > 0:
|
||||||
|
cmd += ["-j", str(threads)]
|
||||||
|
|
||||||
|
cmd += extra_args
|
||||||
|
cmd += [str(iso_path), str(dest_dir)]
|
||||||
|
return cmd
|
||||||
|
|
||||||
|
|
||||||
|
def convert_iso(
|
||||||
|
iso_path: Path,
|
||||||
|
dest_dir: Path,
|
||||||
|
iso2god_cmd: list[str],
|
||||||
|
threads: Optional[int],
|
||||||
|
trim: str,
|
||||||
|
iso2god_dry_run: bool,
|
||||||
|
extra_args: list[str],
|
||||||
|
title_map: dict[str, str],
|
||||||
|
) -> None:
|
||||||
|
log(f"[ISO2GOD] {iso_path}")
|
||||||
|
cmd = build_iso2god_cmd(
|
||||||
|
iso2god_cmd=iso2god_cmd,
|
||||||
|
iso_path=iso_path,
|
||||||
|
dest_dir=dest_dir,
|
||||||
|
threads=threads,
|
||||||
|
trim=trim,
|
||||||
|
iso2god_dry_run=iso2god_dry_run,
|
||||||
|
extra_args=extra_args,
|
||||||
|
)
|
||||||
|
run_checked(cmd)
|
||||||
|
rename_converted_title_dirs(dest_dir, title_map)
|
||||||
|
|
||||||
|
|
||||||
|
def process_archive(
|
||||||
|
kind: str,
|
||||||
|
archive: Path,
|
||||||
|
dest_dir: Path,
|
||||||
|
work_parent: Path,
|
||||||
|
keep_temp: bool,
|
||||||
|
password: Optional[str],
|
||||||
|
iso2god_cmd: list[str],
|
||||||
|
threads: Optional[int],
|
||||||
|
trim: str,
|
||||||
|
iso2god_dry_run: bool,
|
||||||
|
extra_args: list[str],
|
||||||
|
min_free_gb: float,
|
||||||
|
assume_iso_size_gb: float,
|
||||||
|
no_space_check: bool,
|
||||||
|
title_map: dict[str, str],
|
||||||
|
) -> int:
|
||||||
|
ensure_enough_work_space(
|
||||||
|
work_parent=work_parent,
|
||||||
|
archive=archive,
|
||||||
|
password=password,
|
||||||
|
min_free_gb=min_free_gb,
|
||||||
|
fallback_gb=assume_iso_size_gb,
|
||||||
|
no_space_check=no_space_check,
|
||||||
|
)
|
||||||
|
|
||||||
|
tmp_dir = Path(tempfile.mkdtemp(prefix="iso2god_batch_", dir=str(work_parent)))
|
||||||
|
extract_dir = tmp_dir / "extract"
|
||||||
|
|
||||||
|
try:
|
||||||
|
extract_archive(kind, archive, extract_dir, password)
|
||||||
|
isos = find_isos(extract_dir)
|
||||||
|
if not isos:
|
||||||
|
raise RuntimeError("w archiwum nie znaleziono pliku .iso")
|
||||||
|
|
||||||
|
for iso in isos:
|
||||||
|
convert_iso(
|
||||||
|
iso_path=iso,
|
||||||
|
dest_dir=dest_dir,
|
||||||
|
iso2god_cmd=iso2god_cmd,
|
||||||
|
threads=threads,
|
||||||
|
trim=trim,
|
||||||
|
iso2god_dry_run=iso2god_dry_run,
|
||||||
|
extra_args=extra_args,
|
||||||
|
title_map=title_map,
|
||||||
|
)
|
||||||
|
|
||||||
|
return len(isos)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
if keep_temp:
|
||||||
|
log(f"[TEMP] zostawiam: {tmp_dir}")
|
||||||
|
else:
|
||||||
|
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_work_parent(args: argparse.Namespace, dest: Path) -> Path:
|
||||||
|
if args.work_dir:
|
||||||
|
return Path(args.work_dir).expanduser().resolve()
|
||||||
|
|
||||||
|
env_work_dir = os.environ.get("ISO2GOD_WORK_DIR")
|
||||||
|
if env_work_dir:
|
||||||
|
return Path(env_work_dir).expanduser().resolve()
|
||||||
|
|
||||||
|
# Nie używamy /tmp domyślnie. ISO z X360 zwykle potrzebuje 8-9 GiB.
|
||||||
|
# /tmp bywa tmpfs-em w RAM-ie i wtedy unrar pada przy 70-90%.
|
||||||
|
return (dest.parent / ".iso2god_batch_work").resolve()
|
||||||
|
|
||||||
|
|
||||||
|
def parse_args() -> argparse.Namespace:
|
||||||
|
p = argparse.ArgumentParser(
|
||||||
|
description="Batch ISO/7z/RAR(part) -> ISO2GOD dla Xbox 360.",
|
||||||
|
formatter_class=HelpFormatter,
|
||||||
|
epilog="""Przykłady:
|
||||||
|
Prosto:
|
||||||
|
%(prog)s -s /katalog/zrodlowy -d /home/dysk/Games
|
||||||
|
|
||||||
|
Glob w źródle:
|
||||||
|
%(prog)s -s "/downloads/*X360*" -d /home/dysk/Games
|
||||||
|
%(prog)s -s *X360* -d /home/dysk/Games
|
||||||
|
|
||||||
|
Include / filtrowanie po słowach w pełnej ścieżce:
|
||||||
|
%(prog)s -s /downloads -d /home/dysk/Games --include X360
|
||||||
|
%(prog)s -s /downloads -d /home/dysk/Games --include XBOX360 --include Xbox_360
|
||||||
|
%(prog)s -s /downloads -d /home/dysk/Games --include Batman
|
||||||
|
|
||||||
|
Exclude / pomijanie katalogów i plików:
|
||||||
|
%(prog)s -s /downloads -d /home/dysk/Games --exclude DLC --exclude XBLA --exclude PS3
|
||||||
|
%(prog)s -s /downloads -d /home/dysk/Games --include X360 --exclude "*DLC*" --exclude "*Update*"
|
||||||
|
%(prog)s -s /downloads -d /home/dysk/Games --include "*COMPLEX*" --exclude Sample
|
||||||
|
|
||||||
|
Fix na błąd /tmp, Write error, sshfs albo za mało miejsca:
|
||||||
|
%(prog)s -s /home/mg/sshfs -d /home/dysk/Games --work-dir /home/mg/iso_tmp
|
||||||
|
%(prog)s -s /home/mg/sshfs -d /home/dysk/Games --work-dir /home/dysk/.iso2god_tmp --min-free-gb 5
|
||||||
|
|
||||||
|
Samo wykrywanie bez konwersji:
|
||||||
|
%(prog)s -s /downloads -d /home/dysk/Games --include X360 --list-only
|
||||||
|
|
||||||
|
Kilka źródeł naraz:
|
||||||
|
%(prog)s -s /iso /rar "/mnt/seedbox/*X360*" -d /home/dysk/Games --exclude DLC
|
||||||
|
|
||||||
|
Nazwy katalogów z titleid -> title:
|
||||||
|
%(prog)s -s /downloads -d /home/dysk/Games --include X360 --title-map-url
|
||||||
|
%(prog)s -s /downloads -d /home/dysk/Games --title-map-file /home/mg/TitleIDs.json
|
||||||
|
|
||||||
|
Uwagi:
|
||||||
|
* Katalog roboczy służy do pełnego, tymczasowego rozpakowania ISO.
|
||||||
|
* Domyślnie work-dir jest obok katalogu docelowego: DEST_PARENT/.iso2god_batch_work, nie /tmp.
|
||||||
|
* Możesz też ustawić zmienną: ISO2GOD_WORK_DIR=/home/mg/iso_tmp.
|
||||||
|
* Dla RAR partów skrypt startuje tylko .rar / .part1.rar; .r00/.r01/.part2.rar są pomijane.
|
||||||
|
* Jeśli dysk docelowy jest FAT32 albo nie obsługuje plików >4 GiB, daj --work-dir na ext4/xfs/btrfs.
|
||||||
|
* --title-map-url/--title-map-file zmienia katalog TITLEID na tytuł gry oczyszczony do A-Z, a-z, 0-9, _ oraz -.
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"-s",
|
||||||
|
"--source",
|
||||||
|
required=True,
|
||||||
|
nargs="+",
|
||||||
|
help="jeden lub więcej katalogów/plików/wzorców glob z ISO/7z/RAR",
|
||||||
|
)
|
||||||
|
p.add_argument("-d", "--dest", required=True, help="katalog docelowy dla plików GOD")
|
||||||
|
p.add_argument(
|
||||||
|
"--include",
|
||||||
|
action="append",
|
||||||
|
default=[],
|
||||||
|
help="bierz tylko ścieżki pasujące do słowa/wzorca, np. X360 albo '*X360*'; można podać wiele razy",
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--exclude",
|
||||||
|
action="append",
|
||||||
|
default=[],
|
||||||
|
help="pomiń ścieżki pasujące do słowa/wzorca, np. DLC, PS3 albo '*Sample*'; można podać wiele razy",
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--iso2god-cmd",
|
||||||
|
default="iso2god",
|
||||||
|
help="komenda ISO2GOD, np. 'iso2god' albo '/opt/iso2god/iso2god'",
|
||||||
|
)
|
||||||
|
p.add_argument("-j", "--threads", type=int, default=0, help="liczba wątków dla iso2god; 0 = domyślnie")
|
||||||
|
p.add_argument("--trim", choices=["from-end", "none"], default="from-end", help="tryb przycinania ISO")
|
||||||
|
p.add_argument(
|
||||||
|
"--work-dir",
|
||||||
|
"--tmp-dir",
|
||||||
|
"--temp-dir",
|
||||||
|
dest="work_dir",
|
||||||
|
default=None,
|
||||||
|
help="katalog roboczy na tymczasowo rozpakowane ISO; domyślnie obok --dest albo z ISO2GOD_WORK_DIR",
|
||||||
|
)
|
||||||
|
p.add_argument("--archive-password", default=None, help="hasło do archiwów, gdy potrzebne")
|
||||||
|
p.add_argument("--keep-temp", action="store_true", help="nie kasuj katalogów tymczasowych")
|
||||||
|
p.add_argument("--no-recursive", action="store_true", help="skanuj tylko główny katalog źródłowy")
|
||||||
|
p.add_argument("--list-only", action="store_true", help="tylko pokaż wykryte pliki, bez pracy")
|
||||||
|
p.add_argument("--iso2god-dry-run", action="store_true", help="przekaż --dry-run do iso2god")
|
||||||
|
p.add_argument("--stop-on-error", action="store_true", help="przerwij po pierwszym błędzie")
|
||||||
|
p.add_argument(
|
||||||
|
"--min-free-gb",
|
||||||
|
type=float,
|
||||||
|
default=2.0,
|
||||||
|
help="dodatkowy zapas wolnego miejsca wymagany w work-dir przed rozpakowaniem",
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--assume-iso-size-gb",
|
||||||
|
type=float,
|
||||||
|
default=9.0,
|
||||||
|
help="awaryjny rozmiar ISO, gdy nie uda się odczytać listy archiwum",
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--no-space-check",
|
||||||
|
action="store_true",
|
||||||
|
help="wyłącz sprawdzanie wolnego miejsca przed rozpakowaniem archiwum",
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--title-map-url",
|
||||||
|
nargs="?",
|
||||||
|
const=DEFAULT_TITLE_MAP_URL,
|
||||||
|
default=None,
|
||||||
|
help="URL JSON-a titleid -> title; bez wartości używa wbudowanego URL-a",
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--title-map-file",
|
||||||
|
"--title-map",
|
||||||
|
dest="title_map_file",
|
||||||
|
default=None,
|
||||||
|
help="lokalny JSON titleid -> title; nadpisuje wpisy z --title-map-url",
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--title-map-timeout",
|
||||||
|
type=float,
|
||||||
|
default=20.0,
|
||||||
|
help="timeout pobierania --title-map-url w sekundach",
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--extra-iso2god-arg",
|
||||||
|
action="append",
|
||||||
|
default=[],
|
||||||
|
help="dodatkowy argument dla iso2god; można podać wiele razy",
|
||||||
|
)
|
||||||
|
return p.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
args = parse_args()
|
||||||
|
|
||||||
|
sources = expand_source_specs(args.source)
|
||||||
|
dest = Path(args.dest).expanduser().resolve()
|
||||||
|
recursive = not args.no_recursive
|
||||||
|
iso2god_cmd = shlex.split(args.iso2god_cmd)
|
||||||
|
|
||||||
|
if not iso2god_cmd:
|
||||||
|
warn("[ERR] pusta komenda --iso2god-cmd")
|
||||||
|
return 2
|
||||||
|
|
||||||
|
if not sources:
|
||||||
|
warn("[ERR] żadne źródło z -s/--source nie pasuje do pliku ani katalogu")
|
||||||
|
return 2
|
||||||
|
|
||||||
|
raw_candidates: list[tuple[str, Path]] = []
|
||||||
|
for source in sources:
|
||||||
|
raw_candidates.extend(iter_candidates(source, recursive=recursive))
|
||||||
|
|
||||||
|
candidates = apply_path_filters(
|
||||||
|
raw_candidates,
|
||||||
|
include_patterns=args.include,
|
||||||
|
exclude_patterns=args.exclude,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not candidates:
|
||||||
|
log("[INFO] brak pasujących plików .iso, .7z albo pierwszych partów .rar")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
log("[INFO] źródła:")
|
||||||
|
for source in sources:
|
||||||
|
log(f" {source}")
|
||||||
|
log(f"[INFO] cel: {dest}")
|
||||||
|
if args.include:
|
||||||
|
log(f"[INFO] include: {', '.join(args.include)}")
|
||||||
|
if args.exclude:
|
||||||
|
log(f"[INFO] exclude: {', '.join(args.exclude)}")
|
||||||
|
log(f"[INFO] znaleziono: {len(candidates)} pozycji")
|
||||||
|
|
||||||
|
if args.list_only:
|
||||||
|
for kind, path in candidates:
|
||||||
|
print(f"{kind:>3} {path}")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
dest.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
if not command_exists(iso2god_cmd[0]):
|
||||||
|
warn(f"[ERR] nie znaleziono ISO2GOD: {iso2god_cmd[0]}")
|
||||||
|
warn(" Podaj np. --iso2god-cmd /pełna/ścieżka/do/iso2god")
|
||||||
|
return 2
|
||||||
|
|
||||||
|
try:
|
||||||
|
title_map = load_title_map(args)
|
||||||
|
except Exception as exc:
|
||||||
|
warn(f"[ERR] nie udało się wczytać title-map: {exc}")
|
||||||
|
return 2
|
||||||
|
|
||||||
|
work_parent = resolve_work_parent(args, dest)
|
||||||
|
work_parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
log(f"[INFO] work-dir: {work_parent}")
|
||||||
|
|
||||||
|
archive_kinds = sorted({kind for kind, _ in candidates if kind in {"7z", "rar"}})
|
||||||
|
for kind in archive_kinds:
|
||||||
|
dummy = Path(f"dummy.{kind}")
|
||||||
|
if not extractor_commands(kind, dummy, work_parent / "dummy", args.archive_password):
|
||||||
|
warn(f"[ERR] brak programu do archiwów {kind}: zainstaluj 7z/7zz/7za" + (", unar albo unrar" if kind == "rar" else " albo unar"))
|
||||||
|
return 2
|
||||||
|
|
||||||
|
converted = 0
|
||||||
|
failed: list[tuple[Path, str]] = []
|
||||||
|
|
||||||
|
for idx, (kind, path) in enumerate(candidates, start=1):
|
||||||
|
log("")
|
||||||
|
log(f"[{idx}/{len(candidates)}] {kind.upper()} {path}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
if kind == "iso":
|
||||||
|
convert_iso(
|
||||||
|
iso_path=path,
|
||||||
|
dest_dir=dest,
|
||||||
|
iso2god_cmd=iso2god_cmd,
|
||||||
|
threads=args.threads,
|
||||||
|
trim=args.trim,
|
||||||
|
iso2god_dry_run=args.iso2god_dry_run,
|
||||||
|
extra_args=args.extra_iso2god_arg,
|
||||||
|
title_map=title_map,
|
||||||
|
)
|
||||||
|
converted += 1
|
||||||
|
else:
|
||||||
|
converted += process_archive(
|
||||||
|
kind=kind,
|
||||||
|
archive=path,
|
||||||
|
dest_dir=dest,
|
||||||
|
work_parent=work_parent,
|
||||||
|
keep_temp=args.keep_temp,
|
||||||
|
password=args.archive_password,
|
||||||
|
iso2god_cmd=iso2god_cmd,
|
||||||
|
threads=args.threads,
|
||||||
|
trim=args.trim,
|
||||||
|
iso2god_dry_run=args.iso2god_dry_run,
|
||||||
|
extra_args=args.extra_iso2god_arg,
|
||||||
|
min_free_gb=args.min_free_gb,
|
||||||
|
assume_iso_size_gb=args.assume_iso_size_gb,
|
||||||
|
no_space_check=args.no_space_check,
|
||||||
|
title_map=title_map,
|
||||||
|
)
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
warn("\n[ERR] przerwano przez użytkownika")
|
||||||
|
return 130
|
||||||
|
except Exception as exc:
|
||||||
|
failed.append((path, str(exc)))
|
||||||
|
warn(f"[ERR] {path}: {exc}")
|
||||||
|
if args.stop_on_error:
|
||||||
|
break
|
||||||
|
|
||||||
|
log("")
|
||||||
|
log(f"[DONE] skonwertowanych ISO: {converted}")
|
||||||
|
if failed:
|
||||||
|
warn(f"[FAIL] błędy: {len(failed)}")
|
||||||
|
for path, msg in failed:
|
||||||
|
warn(f" - {path}: {msg}")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
raise SystemExit(main())
|
||||||
Reference in New Issue
Block a user