media info

This commit is contained in:
Mateusz Gruszczyński
2026-05-21 09:49:50 +02:00
parent ac5113055d
commit bd9be0d11c
5 changed files with 277 additions and 2 deletions

View File

@@ -123,6 +123,168 @@ def iter_remote_file_chunks(profile: dict, source_path: str, size: int | None =
break
_MEDIA_INFO_EXTENSIONS = {
".3g2", ".3gp", ".aac", ".aiff", ".ape", ".asf", ".avi", ".flac",
".flv", ".m4a", ".m4v", ".mka", ".mkv", ".mov", ".mp3", ".mp4",
".mpeg", ".mpg", ".ogg", ".opus", ".ts", ".wav", ".webm", ".wma", ".wmv",
}
_MEDIA_INFO_SAMPLE_BYTES = 32 * 1024 * 1024
_MEDIA_INFO_CHUNK_BYTES = 1024 * 1024
def _media_info_supported(path: str) -> bool:
# Note: Extension filtering avoids trying binary metadata parsers on every torrent payload file.
return LocalPath(str(path or "")).suffix.lower() in _MEDIA_INFO_EXTENSIONS
def _media_info_temp_sample(profile: dict, source_path: str, max_bytes: int) -> tuple[str, int]:
# Note: hachoir needs a seekable file, so this writes a bounded sample to disk instead of loading whole media into RAM.
import tempfile
fd, tmp_path = tempfile.mkstemp(prefix="pytorrent-mediainfo-", suffix=LocalPath(source_path).suffix)
written = 0
try:
with os.fdopen(fd, "wb") as tmp:
if int(profile.get("is_remote") or 0):
for chunk in iter_remote_file_chunks(profile, source_path, size=max_bytes, chunk_size=_MEDIA_INFO_CHUNK_BYTES):
if written >= max_bytes:
break
data = bytes(chunk[: max(0, max_bytes - written)])
tmp.write(data)
written += len(data)
else:
with open(source_path, "rb") as src:
while written < max_bytes:
data = src.read(min(_MEDIA_INFO_CHUNK_BYTES, max_bytes - written))
if not data:
break
tmp.write(data)
written += len(data)
return tmp_path, written
except Exception:
try:
os.unlink(tmp_path)
except Exception:
pass
raise
def _media_info_plaintext(metadata) -> list[str]:
# Note: exportPlaintext is the most stable hachoir API across supported package versions.
try:
lines = metadata.exportPlaintext() or []
except Exception:
return []
return [str(line).strip(" -") for line in lines if str(line).strip(" -")]
def _media_info_parse_lines(lines: list[str]) -> list[dict]:
# Note: The frontend receives both grouped fields and raw text so unknown hachoir fields stay visible.
fields = []
for line in lines:
if not line or ":" not in line:
continue
key, value = line.split(":", 1)
key = key.strip()
value = value.strip()
if key and value:
fields.append({"key": key, "value": value})
return fields
def _media_info_field_lookup(fields: list[dict]) -> dict:
lookup = {}
for field in fields:
key = str(field.get("key") or "").lower()
if key and key not in lookup:
lookup[key] = field.get("value")
return lookup
def _media_info_summary(fields: list[dict]) -> dict:
# Note: Summary keeps the modal readable while raw fields remain available below it.
lookup = _media_info_field_lookup(fields)
def first(*names):
for name in names:
value = lookup.get(name.lower())
if value:
return value
return None
return {
"duration": first("Duration", "Play duration"),
"bit_rate": first("Bit rate", "Overall bit rate"),
"width": first("Image width", "Width"),
"height": first("Image height", "Height"),
"frame_rate": first("Frame rate"),
"sample_rate": first("Sample rate"),
"channels": first("Channel", "Channel(s)", "Channels"),
"compression": first("Compression", "Compressor", "Codec", "Video codec", "Audio codec"),
"producer": first("Producer", "Encoder", "Writing application"),
"creation_date": first("Creation date", "Creation time"),
}
def torrent_file_media_info(profile: dict, torrent_hash: str, index: int, max_bytes: int = _MEDIA_INFO_SAMPLE_BYTES) -> dict:
# Note: This endpoint is MediaInfo-like and intentionally avoids external binaries such as mediainfo, ffprobe or ffmpeg.
selected, remote_path = _torrent_file_remote_path(profile, torrent_hash, index)
name = str(selected.get("path") or remote_path)
size = int(selected.get("size") or 0)
supported = _media_info_supported(name)
result = {
**selected,
"supported": supported,
"sample_bytes": 0,
"sample_limit": int(max_bytes),
"partial": True,
"summary": {},
"fields": [],
"raw": [],
"parser": "hachoir",
}
if not supported:
result["error"] = "This file extension is not supported by the built-in media info parser."
return result
err = remote_file_readability_error(profile, remote_path) if int(profile.get("is_remote") or 0) else None
if err:
raise RuntimeError(err)
tmp_path = None
try:
tmp_path, written = _media_info_temp_sample(profile, remote_path, max(1024 * 1024, int(max_bytes)))
try:
from hachoir.metadata import extractMetadata
from hachoir.parser import createParser
except Exception as exc:
raise RuntimeError("Python package 'hachoir' is required for media info. Install requirements.txt again.") from exc
parser = createParser(tmp_path, real_filename=LocalPath(name).name)
if parser is None:
result.update({"sample_bytes": written, "error": "hachoir could not detect this media container."})
return result
with parser:
metadata = extractMetadata(parser)
if metadata is None:
result.update({"sample_bytes": written, "error": "No media metadata found in the sampled part of the file."})
return result
raw = _media_info_plaintext(metadata)
fields = _media_info_parse_lines(raw)
result.update({
"sample_bytes": written,
"partial": bool(size and written < size),
"summary": _media_info_summary(fields),
"fields": fields,
"raw": raw,
})
return result
finally:
if tmp_path:
try:
os.unlink(tmp_path)
except Exception:
pass
def torrent_download_file_info(profile: dict, torrent_hash: str, index: int) -> dict:
selected, remote_path = _torrent_file_remote_path(profile, torrent_hash, index)
err = remote_file_readability_error(profile, remote_path)