""" Tool to list available formats from a yt-dlp info.json file. """ import sys import json import argparse import re from urllib.parse import urlparse, parse_qs from datetime import datetime, timezone try: import yt_dlp except ImportError: yt_dlp = None def format_size(b): """Format size in bytes to human-readable string.""" if b is None: return 'N/A' if b < 1024: return f"{b}B" elif b < 1024**2: return f"{b/1024:.2f}KiB" elif b < 1024**3: return f"{b/1024**2:.2f}MiB" else: return f"{b/1024**3:.2f}GiB" def list_formats(info_json, requested_formats_str=None, file=sys.stdout): """Prints a table of available formats from info.json data.""" formats = info_json.get('formats', []) if not formats: print("No formats found in the provided info.json.", file=file) return requested_formats = [] requested_order = {} if requested_formats_str: if yt_dlp: try: ydl = yt_dlp.YoutubeDL({'quiet': True}) formats = info_json.get('formats', []) selector = ydl.build_format_selector(requested_formats_str) ctx = { 'formats': formats, 'has_merged_format': any('none' not in (f.get('acodec'), f.get('vcodec')) for f in formats), 'incomplete_formats': (all(f.get('vcodec') == 'none' for f in formats) or all(f.get('acodec') == 'none' for f in formats)), } selected_formats = list(selector(ctx)) all_selected_ids = [] for f in selected_formats: if 'requested_formats' in f: all_selected_ids.extend(rf['format_id'] for rf in f['requested_formats']) else: all_selected_ids.append(f['format_id']) requested_formats = all_selected_ids requested_order = {fmt: i for i, fmt in enumerate(requested_formats)} except Exception as e: print(f"WARNING: Could not parse format selector '{requested_formats_str}': {e}", file=sys.stderr) # Fallback to simple parsing requested_formats = [item for item in re.split(r'[,/]', requested_formats_str) if item] requested_order = {fmt: i for i, fmt in enumerate(requested_formats)} else: # Fallback to simple parsing if yt-dlp is not installed print("WARNING: yt-dlp not installed. Using simple format selector parsing.", file=sys.stderr) requested_formats = [item for item in re.split(r'[,/]', requested_formats_str) if item] requested_order = {fmt: i for i, fmt in enumerate(requested_formats)} def sort_key(f): fid = f.get('format_id', '') is_requested = fid in requested_order if is_requested: # Sort requested formats by the order they were provided return (False, requested_order[fid]) else: # Sort other formats numerically by ID return (True, int(fid) if fid.isdigit() else 999) sorted_formats = sorted(formats, key=sort_key) # Check if any requested formats were found if requested_formats: found_any = any(f.get('format_id') in requested_order for f in formats) if not found_any: print("WARNING: No format from list found.", file=sys.stderr) # Header header = "{:<6} {:<7} {:<12} {:<5} {:<18} {:<18} {:<12} {:<10} {:<20} {:<17} {:<15} {:<12} {:<12} {:<12} {:<5} {:<12} {:<12} {:<12} {:<12} {:<12}".format( "ID", "EXT", "RESOLUTION", "FPS", "VCODEC", "ACODEC", "FILESIZE", "TBR", "URL (path)", "EXPIRE (UTC)", "IP", "ID_TOKEN", "SESS_TOKEN", "EI_TOKEN", "GIR", "BUI_TOKEN", "POT_TOKEN", "MT_TOKEN", "SIG", "LSIG" ) print(header, file=file) print("-" * len(header), file=file) for f in sorted_formats: format_id = f.get('format_id', 'N/A') ext = f.get('ext', 'N/A') resolution = f.get('resolution') if not resolution: if 'width' in f and f['width'] is not None: resolution = f"{f['width']}x{f['height']}" else: resolution = 'audio only' fps = f.get('fps', '') vcodec = f.get('vcodec', 'none') acodec = f.get('acodec', 'none') filesize = f.get('filesize') or f.get('filesize_approx') tbr = f.get('tbr') display_id = f"*{format_id}" if format_id in requested_order else format_id url = f.get('url', '') partial_url, expire_date, ip, id_token_short, sess_token_short, ei_token_short, gir, bui_token_short, pot_token_short, mt_token_short, sig_short, lsig_short = ('N/A',) * 12 if url: parsed = urlparse(url) query_params = parse_qs(parsed.query) path_and_query = parsed.path if parsed.query: path_and_query += '?' + parsed.query if len(path_and_query) > 18: partial_url = path_and_query[:8] + '...' + path_and_query[-7:] else: partial_url = path_and_query expire_ts = query_params.get('expire', [None])[0] if expire_ts: try: expire_date = datetime.fromtimestamp(int(expire_ts), timezone.utc).strftime('%m-%d %H:%M:%S') except (ValueError, TypeError): expire_date = 'Invalid' ip = query_params.get('ip', ['N/A'])[0] id_token = query_params.get('id', [None])[0] if id_token and len(id_token) > 12: id_token_short = id_token[:6] + '..' + id_token[-4:] elif id_token: id_token_short = id_token sess_token = query_params.get('n', [None])[0] if sess_token and len(sess_token) > 12: sess_token_short = sess_token[:6] + '..' + sess_token[-4:] elif sess_token: sess_token_short = sess_token ei_token = query_params.get('ei', [None])[0] if ei_token and len(ei_token) > 12: ei_token_short = ei_token[:6] + '..' + ei_token[-4:] elif ei_token: ei_token_short = ei_token gir = query_params.get('gir', ['N/A'])[0] bui_token = query_params.get('bui', [None])[0] if bui_token and len(bui_token) > 12: bui_token_short = bui_token[:6] + '..' + bui_token[-4:] elif bui_token: bui_token_short = bui_token pot_token = query_params.get('pot', [None])[0] if pot_token and len(pot_token) > 12: pot_token_short = pot_token[:6] + '..' + pot_token[-4:] elif pot_token: pot_token_short = pot_token mt_token = query_params.get('mt', [None])[0] # mt is often just a timestamp, don't shorten unless it's a long hash if mt_token and len(mt_token) > 12: mt_token_short = mt_token[:6] + '..' + mt_token[-4:] elif mt_token: mt_token_short = mt_token sig = query_params.get('sig', [None])[0] if sig and len(sig) > 12: sig_short = sig[:6] + '..' + sig[-4:] elif sig: sig_short = sig lsig = query_params.get('lsig', [None])[0] if lsig and len(lsig) > 12: lsig_short = lsig[:6] + '..' + lsig[-4:] elif lsig: lsig_short = lsig print("{:<6} {:<7} {:<12} {:<5} {:<18} {:<18} {:<12} {:<10} {:<20} {:<17} {:<15} {:<12} {:<12} {:<12} {:<5} {:<12} {:<12} {:<12} {:<12} {:<12}".format( str(display_id), str(ext), str(resolution), str(fps) if fps else '', str(vcodec)[:18], str(acodec)[:18], format_size(filesize), f"{tbr:.0f}k" if tbr else 'N/A', partial_url, expire_date, ip, id_token_short, sess_token_short, ei_token_short, gir, bui_token_short, pot_token_short, mt_token_short, sig_short, lsig_short ), file=file) def add_list_formats_parser(subparsers): """Add the parser for the 'list-formats' command.""" parser = subparsers.add_parser( 'list-formats', description="List available formats from a yt-dlp info.json file.", formatter_class=argparse.RawTextHelpFormatter, help="List available formats from a yt-dlp info.json file." ) parser.add_argument( '--load-info-json', type=argparse.FileType('r', encoding='utf-8'), default=sys.stdin, help="Path to the info.json file. Reads from stdin if not provided." ) parser.add_argument( '-f', '--formats', help='Comma or slash-separated list of format IDs to highlight and prioritize (e.g., "18,140,299/298").' ) parser.add_argument( '-p', '--pass-through', action='store_true', help='Pass the input JSON through to stdout, printing the format list to stderr.' ) return parser def main_list_formats(args): """Main logic for the 'list-formats' command.""" try: # Read the whole content to allow passing it through info_json_content = args.load_info_json.read() info_data = json.loads(info_json_content) # Determine output stream for the format list output_stream = sys.stderr if args.pass_through else sys.stdout list_formats(info_data, args.formats, file=output_stream) # If pass-through is enabled, print the original JSON to stdout if args.pass_through: # Use end='' because the read content likely includes a trailing newline print(info_json_content, end='') return 0 except json.JSONDecodeError: print("Error: Invalid JSON provided.", file=sys.stderr) return 1 except Exception as e: print(f"An unexpected error occurred: {e}", file=sys.stderr) return 1