security in path list
This commit is contained in:
@@ -6,10 +6,45 @@ from .config import default_download_path
|
||||
from ...utils import human_size
|
||||
|
||||
|
||||
|
||||
def _remote_path_contains(base: str, candidate: str) -> bool:
|
||||
clean_base = _remote_clean_path(str(base or "").rstrip("/") or "/")
|
||||
clean_candidate = _remote_clean_path(str(candidate or "").rstrip("/") or "/")
|
||||
return clean_candidate == clean_base or clean_candidate.startswith(clean_base.rstrip("/") + "/")
|
||||
|
||||
|
||||
def _rtorrent_home_path(profile: dict) -> str:
|
||||
# Note: This reads the remote rTorrent process home, not the pyTorrent server home.
|
||||
try:
|
||||
c = client_for(profile)
|
||||
return _remote_clean_path(str(_rt_execute(c, "execute.capture", "sh", "-c", 'printf "%s" "${HOME:-}"') or "").strip())
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
def _path_browse_roots(profile: dict) -> list[str]:
|
||||
roots: list[str] = []
|
||||
for item in (default_download_path(profile), _rtorrent_home_path(profile)):
|
||||
clean = _remote_clean_path(item or "")
|
||||
if clean and clean.startswith("/") and clean != "/" and clean not in roots:
|
||||
roots.append(clean)
|
||||
return roots
|
||||
|
||||
|
||||
def _safe_browse_base(profile: dict, requested_path: str | None) -> tuple[str, list[str], bool]:
|
||||
roots = _path_browse_roots(profile)
|
||||
if not roots:
|
||||
raise RuntimeError("Cannot determine a safe rTorrent browse root")
|
||||
fallback = roots[0]
|
||||
requested = _remote_clean_path(requested_path or fallback)
|
||||
allowed = bool(requested and any(_remote_path_contains(root, requested) for root in roots))
|
||||
return (requested if allowed else fallback), roots, not allowed
|
||||
|
||||
|
||||
def browse_path(profile: dict, path: str | None = None) -> dict:
|
||||
"""List directories through rTorrent execute.capture to avoid pyTorrent FS permissions."""
|
||||
"""List allowed rTorrent directories through execute.capture without exposing the full filesystem."""
|
||||
c = client_for(profile)
|
||||
base = _remote_clean_path(path or default_download_path(profile))
|
||||
base, roots, used_fallback = _safe_browse_base(profile, path)
|
||||
script = (
|
||||
'base=$1; '
|
||||
'[ -d "$base" ] || exit 2; '
|
||||
@@ -17,6 +52,7 @@ def browse_path(profile: dict, path: str | None = None) -> dict:
|
||||
'dir_count=0; file_count=0; '
|
||||
'for p in "$base"/* "$base"/.[!.]* "$base"/..?*; do '
|
||||
'[ -e "$p" ] || continue; '
|
||||
'[ -L "$p" ] && continue; '
|
||||
'if [ -d "$p" ]; then '
|
||||
'dir_count=$((dir_count+1)); name=${p##*/}; empty=1; '
|
||||
'if find "$p" -mindepth 1 -maxdepth 1 -print -quit 2>/dev/null | grep -q .; then empty=0; fi; '
|
||||
@@ -61,11 +97,14 @@ def browse_path(profile: dict, path: str | None = None) -> dict:
|
||||
disk_total = disk_used = disk_free = disk_percent = 0
|
||||
dirs.sort(key=lambda x: x["name"].lower())
|
||||
parent = posixpath.dirname(base.rstrip("/")) or "/"
|
||||
if parent == base:
|
||||
if parent == base or not any(_remote_path_contains(root, parent) for root in roots):
|
||||
parent = base
|
||||
return {
|
||||
"path": base,
|
||||
"parent": parent,
|
||||
"root": roots[0] if roots else base,
|
||||
"allowed_roots": roots,
|
||||
"fallback": used_fallback,
|
||||
"dirs": dirs[:300],
|
||||
"source": "rtorrent",
|
||||
"dir_count": dir_count,
|
||||
@@ -80,7 +119,6 @@ def browse_path(profile: dict, path: str | None = None) -> dict:
|
||||
}
|
||||
|
||||
|
||||
|
||||
def _safe_directory_name(name: str) -> str:
|
||||
value = str(name or "").strip()
|
||||
if not value or value in {".", ".."} or "/" in value or "\x00" in value:
|
||||
|
||||
Reference in New Issue
Block a user