yt-dlp-dags/ytops_client/list_formats_tool.py

264 lines
10 KiB
Python

"""
Tool to list available formats from a yt-dlp info.json file.
"""
import sys
import json
import argparse
import re
from urllib.parse import urlparse, parse_qs
from datetime import datetime, timezone
try:
import yt_dlp
except ImportError:
yt_dlp = None
def format_size(b):
"""Format size in bytes to human-readable string."""
if b is None:
return 'N/A'
if b < 1024:
return f"{b}B"
elif b < 1024**2:
return f"{b/1024:.2f}KiB"
elif b < 1024**3:
return f"{b/1024**2:.2f}MiB"
else:
return f"{b/1024**3:.2f}GiB"
def list_formats(info_json, requested_formats_str=None, file=sys.stdout):
"""Prints a table of available formats from info.json data."""
formats = info_json.get('formats', [])
if not formats:
print("No formats found in the provided info.json.", file=file)
return
requested_formats = []
requested_order = {}
if requested_formats_str:
if yt_dlp:
try:
ydl = yt_dlp.YoutubeDL({'quiet': True})
formats = info_json.get('formats', [])
selector = ydl.build_format_selector(requested_formats_str)
ctx = {
'formats': formats,
'has_merged_format': any('none' not in (f.get('acodec'), f.get('vcodec')) for f in formats),
'incomplete_formats': (all(f.get('vcodec') == 'none' for f in formats)
or all(f.get('acodec') == 'none' for f in formats)),
}
selected_formats = list(selector(ctx))
all_selected_ids = []
for f in selected_formats:
if 'requested_formats' in f:
all_selected_ids.extend(rf['format_id'] for rf in f['requested_formats'])
else:
all_selected_ids.append(f['format_id'])
requested_formats = all_selected_ids
requested_order = {fmt: i for i, fmt in enumerate(requested_formats)}
except Exception as e:
print(f"WARNING: Could not parse format selector '{requested_formats_str}': {e}", file=sys.stderr)
# Fallback to simple parsing
requested_formats = [item for item in re.split(r'[,/]', requested_formats_str) if item]
requested_order = {fmt: i for i, fmt in enumerate(requested_formats)}
else:
# Fallback to simple parsing if yt-dlp is not installed
print("WARNING: yt-dlp not installed. Using simple format selector parsing.", file=sys.stderr)
requested_formats = [item for item in re.split(r'[,/]', requested_formats_str) if item]
requested_order = {fmt: i for i, fmt in enumerate(requested_formats)}
def sort_key(f):
fid = f.get('format_id', '')
is_requested = fid in requested_order
if is_requested:
# Sort requested formats by the order they were provided
return (False, requested_order[fid])
else:
# Sort other formats numerically by ID
return (True, int(fid) if fid.isdigit() else 999)
sorted_formats = sorted(formats, key=sort_key)
# Check if any requested formats were found
if requested_formats:
found_any = any(f.get('format_id') in requested_order for f in formats)
if not found_any:
print("WARNING: No format from list found.", file=sys.stderr)
# Header
header = "{:<6} {:<7} {:<12} {:<5} {:<18} {:<18} {:<12} {:<10} {:<20} {:<17} {:<15} {:<12} {:<12} {:<12} {:<5} {:<12} {:<12} {:<12} {:<12} {:<12}".format(
"ID", "EXT", "RESOLUTION", "FPS", "VCODEC", "ACODEC", "FILESIZE", "TBR", "URL (path)", "EXPIRE (UTC)", "IP", "ID_TOKEN", "SESS_TOKEN", "EI_TOKEN", "GIR", "BUI_TOKEN", "POT_TOKEN", "MT_TOKEN", "SIG", "LSIG"
)
print(header, file=file)
print("-" * len(header), file=file)
for f in sorted_formats:
format_id = f.get('format_id', 'N/A')
ext = f.get('ext', 'N/A')
resolution = f.get('resolution')
if not resolution:
if 'width' in f and f['width'] is not None:
resolution = f"{f['width']}x{f['height']}"
else:
resolution = 'audio only'
fps = f.get('fps', '')
vcodec = f.get('vcodec', 'none')
acodec = f.get('acodec', 'none')
filesize = f.get('filesize') or f.get('filesize_approx')
tbr = f.get('tbr')
display_id = f"*{format_id}" if format_id in requested_order else format_id
url = f.get('url', '')
partial_url, expire_date, ip, id_token_short, sess_token_short, ei_token_short, gir, bui_token_short, pot_token_short, mt_token_short, sig_short, lsig_short = ('N/A',) * 12
if url:
parsed = urlparse(url)
query_params = parse_qs(parsed.query)
path_and_query = parsed.path
if parsed.query:
path_and_query += '?' + parsed.query
if len(path_and_query) > 18:
partial_url = path_and_query[:8] + '...' + path_and_query[-7:]
else:
partial_url = path_and_query
expire_ts = query_params.get('expire', [None])[0]
if expire_ts:
try:
expire_date = datetime.fromtimestamp(int(expire_ts), timezone.utc).strftime('%m-%d %H:%M:%S')
except (ValueError, TypeError):
expire_date = 'Invalid'
ip = query_params.get('ip', ['N/A'])[0]
id_token = query_params.get('id', [None])[0]
if id_token and len(id_token) > 12:
id_token_short = id_token[:6] + '..' + id_token[-4:]
elif id_token:
id_token_short = id_token
sess_token = query_params.get('n', [None])[0]
if sess_token and len(sess_token) > 12:
sess_token_short = sess_token[:6] + '..' + sess_token[-4:]
elif sess_token:
sess_token_short = sess_token
ei_token = query_params.get('ei', [None])[0]
if ei_token and len(ei_token) > 12:
ei_token_short = ei_token[:6] + '..' + ei_token[-4:]
elif ei_token:
ei_token_short = ei_token
gir = query_params.get('gir', ['N/A'])[0]
bui_token = query_params.get('bui', [None])[0]
if bui_token and len(bui_token) > 12:
bui_token_short = bui_token[:6] + '..' + bui_token[-4:]
elif bui_token:
bui_token_short = bui_token
pot_token = query_params.get('pot', [None])[0]
if pot_token and len(pot_token) > 12:
pot_token_short = pot_token[:6] + '..' + pot_token[-4:]
elif pot_token:
pot_token_short = pot_token
mt_token = query_params.get('mt', [None])[0]
# mt is often just a timestamp, don't shorten unless it's a long hash
if mt_token and len(mt_token) > 12:
mt_token_short = mt_token[:6] + '..' + mt_token[-4:]
elif mt_token:
mt_token_short = mt_token
sig = query_params.get('sig', [None])[0]
if sig and len(sig) > 12:
sig_short = sig[:6] + '..' + sig[-4:]
elif sig:
sig_short = sig
lsig = query_params.get('lsig', [None])[0]
if lsig and len(lsig) > 12:
lsig_short = lsig[:6] + '..' + lsig[-4:]
elif lsig:
lsig_short = lsig
print("{:<6} {:<7} {:<12} {:<5} {:<18} {:<18} {:<12} {:<10} {:<20} {:<17} {:<15} {:<12} {:<12} {:<12} {:<5} {:<12} {:<12} {:<12} {:<12} {:<12}".format(
str(display_id),
str(ext),
str(resolution),
str(fps) if fps else '',
str(vcodec)[:18],
str(acodec)[:18],
format_size(filesize),
f"{tbr:.0f}k" if tbr else 'N/A',
partial_url,
expire_date,
ip,
id_token_short,
sess_token_short,
ei_token_short,
gir,
bui_token_short,
pot_token_short,
mt_token_short,
sig_short,
lsig_short
), file=file)
def add_list_formats_parser(subparsers):
"""Add the parser for the 'list-formats' command."""
parser = subparsers.add_parser(
'list-formats',
description="List available formats from a yt-dlp info.json file.",
formatter_class=argparse.RawTextHelpFormatter,
help="List available formats from a yt-dlp info.json file."
)
parser.add_argument(
'--load-info-json',
type=argparse.FileType('r', encoding='utf-8'),
default=sys.stdin,
help="Path to the info.json file. Reads from stdin if not provided."
)
parser.add_argument(
'-f', '--formats',
help='Comma or slash-separated list of format IDs to highlight and prioritize (e.g., "18,140,299/298").'
)
parser.add_argument(
'-p', '--pass-through',
action='store_true',
help='Pass the input JSON through to stdout, printing the format list to stderr.'
)
return parser
def main_list_formats(args):
"""Main logic for the 'list-formats' command."""
try:
# Read the whole content to allow passing it through
info_json_content = args.load_info_json.read()
info_data = json.loads(info_json_content)
# Determine output stream for the format list
output_stream = sys.stderr if args.pass_through else sys.stdout
list_formats(info_data, args.formats, file=output_stream)
# If pass-through is enabled, print the original JSON to stdout
if args.pass_through:
# Use end='' because the read content likely includes a trailing newline
print(info_json_content, end='')
return 0
except json.JSONDecodeError:
print("Error: Invalid JSON provided.", file=sys.stderr)
return 1
except Exception as e:
print(f"An unexpected error occurred: {e}", file=sys.stderr)
return 1