408 lines
19 KiB
Python
408 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tool to download a specified format using yt-dlp as a Python library.
|
|
"""
|
|
|
|
import argparse
|
|
import contextlib
|
|
import io
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import shlex
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
|
|
try:
|
|
import yt_dlp
|
|
except ImportError:
|
|
print("yt-dlp is not installed. Please install it with: pip install yt-dlp", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
logger = logging.getLogger('download_native_py_tool')
|
|
|
|
# A custom logger for yt-dlp to capture output and key events
|
|
class YTDLPLogger:
|
|
def __init__(self):
|
|
self.final_filename = None
|
|
self.is_403 = False
|
|
self.is_timeout = False
|
|
|
|
def debug(self, msg):
|
|
# yt-dlp logs the destination file path at the debug level.
|
|
if msg.startswith('[download] Destination:'):
|
|
self.final_filename = msg.split(':', 1)[1].strip()
|
|
elif msg.startswith('[download]') and 'has already been downloaded' in msg:
|
|
match = re.search(r'\[download\]\s+(.*)\s+has already been downloaded', msg)
|
|
if match:
|
|
self.final_filename = match.group(1).strip()
|
|
logger.debug(msg)
|
|
|
|
def info(self, msg):
|
|
logger.info(msg)
|
|
|
|
def warning(self, msg):
|
|
logger.warning(msg)
|
|
|
|
def error(self, msg):
|
|
if "HTTP Error 403" in msg:
|
|
self.is_403 = True
|
|
if "Read timed out" in msg:
|
|
self.is_timeout = True
|
|
logger.error(msg)
|
|
|
|
def ytdlp_progress_hook(d, ytdlp_logger):
|
|
"""Progress hook to capture the final filename."""
|
|
if d['status'] == 'finished':
|
|
ytdlp_logger.final_filename = d.get('filename')
|
|
logger.info(f"Download finished. Final file: {ytdlp_logger.final_filename}")
|
|
|
|
def add_download_native_py_parser(subparsers):
|
|
"""Add the parser for the 'download py' command."""
|
|
parser = subparsers.add_parser(
|
|
'py',
|
|
description='Download using yt-dlp as a Python library (recommended). This method calls yt-dlp functions directly.',
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
help='Download using a direct Python call to yt-dlp (recommended).'
|
|
)
|
|
parser.add_argument('--load-info-json', type=argparse.FileType('r', encoding='utf-8'), help="Path to the info.json file. If not provided, reads from stdin.")
|
|
parser.add_argument('-f', '--format', required=True, help='The format selection string to download (e.g., "18", "299/137", "bestvideo+bestaudio").')
|
|
parser.add_argument('--output-dir', default='.', help='Directory to save the downloaded file. Defaults to current directory.')
|
|
parser.add_argument('--save-info-json-dir', help='If specified, save the info.json received from stdin to this directory with an auto-generated name.')
|
|
parser.add_argument('--proxy', help='Proxy to use for the download, e.g., "socks5://127.0.0.1:1080".')
|
|
parser.add_argument('--proxy-rename', help='Apply sed-style regex substitution to the proxy URL. Format: s/pattern/replacement/')
|
|
parser.add_argument('--temp-path', help='Directory for temporary files (e.g., fragments). Use a RAM disk for best performance.')
|
|
parser.add_argument('--pause', type=int, default=0, help='Seconds to wait before starting the download.')
|
|
parser.add_argument('--download-continue', action='store_true', help='Enable download continuation (--no-overwrites and --continue flags for yt-dlp).')
|
|
parser.add_argument('--verbose', action='store_true', help='Enable verbose output for this script and yt-dlp.')
|
|
parser.add_argument('--cli-config', help='Path to a yt-dlp configuration file to load.')
|
|
parser.add_argument('--downloader', help='Name of the external downloader backend for yt-dlp to use (e.g., "aria2c", "native").')
|
|
parser.add_argument('--downloader-args', help='Arguments to pass to the external downloader backend (e.g., "aria2c:-x 8").')
|
|
parser.add_argument('--extra-ytdlp-args', help='A string of extra command-line arguments to pass to yt-dlp.')
|
|
parser.add_argument('--output-buffer', action='store_true', help='Download to an in-memory buffer and print raw bytes to stdout. Final filename is printed to stderr.')
|
|
parser.add_argument('--cleanup', action='store_true', help='After download, rename the file to include a timestamp and truncate it to 0 bytes.')
|
|
parser.add_argument('--merge-output-format', help='Container format to merge to (e.g., "mp4", "mkv"). Overrides config file.')
|
|
parser.add_argument('--retries', type=int, help='Number of retries for the entire download (default: 10).')
|
|
parser.add_argument('--fragment-retries', type=int, help='Number of retries for each fragment (default: 10).')
|
|
parser.add_argument('--socket-timeout', type=int, help='Timeout for socket operations in seconds (default: 20).')
|
|
parser.add_argument('--add-header', action='append', help='Add a custom HTTP header for the download. Format: "Key: Value". Can be used multiple times.')
|
|
# Arguments to pass through to yt-dlp
|
|
parser.add_argument('--download-sections', help='yt-dlp --download-sections argument (e.g., "*0-10240").')
|
|
parser.add_argument('--test', action='store_true', help='yt-dlp --test argument (download small part).')
|
|
return parser
|
|
|
|
def main_download_native_py(args):
|
|
"""Main logic for the 'download-native-py' command."""
|
|
# All logging should go to stderr to keep stdout clean for the final filename, or for binary data with --output-buffer.
|
|
log_stream = sys.stderr
|
|
log_level = logging.DEBUG if args.verbose else logging.INFO
|
|
# Reconfigure root logger
|
|
for handler in logging.root.handlers[:]:
|
|
logging.root.removeHandler(handler)
|
|
logging.basicConfig(level=log_level, stream=log_stream, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
|
if args.pause > 0:
|
|
logger.info(f"Pausing for {args.pause} seconds...")
|
|
time.sleep(args.pause)
|
|
|
|
info_json_content = ""
|
|
input_source_name = ""
|
|
if args.load_info_json:
|
|
info_json_content = args.load_info_json.read()
|
|
input_source_name = args.load_info_json.name
|
|
else:
|
|
info_json_content = sys.stdin.read()
|
|
input_source_name = "stdin"
|
|
|
|
if not info_json_content.strip():
|
|
logger.error(f"Failed to read info.json from {input_source_name}. Input is empty.")
|
|
return 1
|
|
|
|
try:
|
|
info_data = json.loads(info_json_content)
|
|
logger.info(f"Successfully loaded info.json from {input_source_name}.")
|
|
except json.JSONDecodeError:
|
|
logger.error(f"Failed to parse info.json from {input_source_name}. Is the input valid JSON?")
|
|
return 1
|
|
|
|
if args.save_info_json_dir:
|
|
try:
|
|
video_id = info_data.get('id', 'unknown_video_id')
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
filename = f"{timestamp}-{video_id}-info.json"
|
|
output_path = os.path.join(args.save_info_json_dir, filename)
|
|
os.makedirs(args.save_info_json_dir, exist_ok=True)
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(info_data, f, indent=2)
|
|
logger.info(f"Saved info.json to {output_path}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to save info.json: {e}")
|
|
|
|
# Handle proxy and proxy rename
|
|
proxy_url = args.proxy
|
|
if not proxy_url:
|
|
proxy_url = info_data.get('_proxy_url')
|
|
if proxy_url:
|
|
logger.info(f"Using proxy from info.json: {proxy_url}")
|
|
|
|
if proxy_url and args.proxy_rename:
|
|
rename_rule = args.proxy_rename.strip("'\"")
|
|
if rename_rule.startswith('s/') and rename_rule.count('/') >= 2:
|
|
try:
|
|
parts = rename_rule.split('/')
|
|
pattern, replacement = parts[1], parts[2]
|
|
original_proxy = proxy_url
|
|
proxy_url = re.sub(pattern, replacement, proxy_url)
|
|
logger.info(f"Renamed proxy URL from '{original_proxy}' to '{proxy_url}' using rule '{rename_rule}'")
|
|
except re.error as e:
|
|
logger.error(f"Invalid regex in --proxy-rename: {e}")
|
|
return 1
|
|
else:
|
|
logger.error("Invalid --proxy-rename format. Expected: s/pattern/replacement/")
|
|
return 1
|
|
|
|
# Build the yt-dlp options dictionary
|
|
# Start by parsing options from config file and extra args to establish a baseline.
|
|
base_opts_args = []
|
|
if args.cli_config and os.path.exists(args.cli_config):
|
|
try:
|
|
with open(args.cli_config, 'r', encoding='utf-8') as f:
|
|
config_content = f.read()
|
|
base_opts_args.extend(shlex.split(config_content))
|
|
logger.info(f"Loaded {len(base_opts_args)} arguments from config file: {args.cli_config}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to read or parse config file {args.cli_config}: {e}")
|
|
return 1
|
|
elif args.cli_config:
|
|
logger.warning(f"Config file '{args.cli_config}' not found. Ignoring.")
|
|
|
|
if args.extra_ytdlp_args:
|
|
extra_args_list = shlex.split(args.extra_ytdlp_args)
|
|
logger.info(f"Adding {len(extra_args_list)} extra arguments from --extra-ytdlp-args.")
|
|
base_opts_args.extend(extra_args_list)
|
|
|
|
ydl_opts = {
|
|
'noresizebuffer': True,
|
|
'buffersize': '4M',
|
|
}
|
|
if base_opts_args:
|
|
try:
|
|
logger.info(f"Parsing {len(base_opts_args)} arguments from config/extra_args...")
|
|
i = 0
|
|
while i < len(base_opts_args):
|
|
arg = base_opts_args[i]
|
|
if not arg.startswith('--'):
|
|
logger.warning(f"Skipping non-option argument in extra args: {arg}")
|
|
i += 1
|
|
continue
|
|
|
|
key = arg.lstrip('-').replace('-', '_')
|
|
|
|
# Handle flags (no value)
|
|
is_flag = i + 1 >= len(base_opts_args) or base_opts_args[i + 1].startswith('--')
|
|
|
|
if key == 'resize_buffer':
|
|
ydl_opts['noresizebuffer'] = False
|
|
logger.debug(f"Parsed flag: noresizebuffer = False")
|
|
i += 1
|
|
continue
|
|
elif key == 'no_resize_buffer':
|
|
ydl_opts['noresizebuffer'] = True
|
|
logger.debug(f"Parsed flag: noresizebuffer = True")
|
|
i += 1
|
|
continue
|
|
|
|
if is_flag:
|
|
if key.startswith('no_'):
|
|
# Handle --no-foo flags
|
|
ydl_opts[key[3:]] = False
|
|
else:
|
|
ydl_opts[key] = True
|
|
logger.debug(f"Parsed flag: {key} = {ydl_opts.get(key[3:] if key.startswith('no_') else key)}")
|
|
i += 1
|
|
# Handle options with values
|
|
else:
|
|
value = base_opts_args[i + 1]
|
|
# Try to convert values to numbers, which yt-dlp expects.
|
|
# This includes parsing byte suffixes like 'K', 'M', 'G'.
|
|
if isinstance(value, str):
|
|
original_value = value
|
|
value_upper = value.upper()
|
|
multipliers = {'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4}
|
|
|
|
if value_upper and value_upper[-1] in multipliers:
|
|
try:
|
|
num = float(value[:-1])
|
|
value = int(num * multipliers[value_upper[-1]])
|
|
except (ValueError, TypeError):
|
|
value = original_value # fallback
|
|
else:
|
|
try:
|
|
value = int(value)
|
|
except (ValueError, TypeError):
|
|
try:
|
|
value = float(value)
|
|
except (ValueError, TypeError):
|
|
value = original_value # fallback
|
|
|
|
# Special handling for keys that differ from CLI arg, e.g. --limit-rate -> ratelimit
|
|
if key == 'limit_rate':
|
|
key = 'ratelimit'
|
|
elif key == 'buffer_size':
|
|
key = 'buffersize'
|
|
|
|
ydl_opts[key] = value
|
|
logger.debug(f"Parsed option: {key} = {value}")
|
|
i += 2
|
|
logger.info("Successfully parsed extra yt-dlp options.")
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse options from config/extra_args: {e}", exc_info=True)
|
|
return 1
|
|
|
|
# Now, layer the script's explicit arguments on top, as they have higher precedence.
|
|
os.makedirs(args.output_dir, exist_ok=True)
|
|
output_template = os.path.join(args.output_dir, '%(title)s [%(id)s].f%(format_id)s.%(ext)s')
|
|
|
|
ytdlp_logger = YTDLPLogger()
|
|
|
|
# Use update to merge, so explicit args overwrite config/extra args.
|
|
ydl_opts.update({
|
|
'format': args.format,
|
|
'outtmpl': '-' if args.output_buffer else output_template,
|
|
'logger': ytdlp_logger,
|
|
'progress_hooks': [lambda d: ytdlp_progress_hook(d, ytdlp_logger)],
|
|
'verbose': args.verbose,
|
|
})
|
|
|
|
if args.temp_path:
|
|
ydl_opts['paths'] = {'temp': args.temp_path}
|
|
logger.info(f"Using temporary path: {args.temp_path}")
|
|
|
|
if args.add_header:
|
|
if 'http_headers' not in ydl_opts:
|
|
ydl_opts['http_headers'] = {}
|
|
elif not isinstance(ydl_opts['http_headers'], dict):
|
|
logger.warning(f"Overwriting non-dictionary http_headers from config with headers from command line.")
|
|
ydl_opts['http_headers'] = {}
|
|
|
|
for header in args.add_header:
|
|
if ':' not in header:
|
|
logger.error(f"Invalid header format in --add-header: '{header}'. Expected 'Key: Value'.")
|
|
return 1
|
|
key, value = header.split(':', 1)
|
|
ydl_opts['http_headers'][key.strip()] = value.strip()
|
|
logger.info(f"Adding/overwriting header: {key.strip()}: {value.strip()}")
|
|
|
|
if args.download_continue:
|
|
ydl_opts['continuedl'] = True
|
|
ydl_opts['nooverwrites'] = True
|
|
|
|
if proxy_url:
|
|
ydl_opts['proxy'] = proxy_url
|
|
|
|
if args.downloader:
|
|
ydl_opts['downloader'] = {args.downloader: None}
|
|
if args.downloader_args:
|
|
# yt-dlp expects a dict for downloader_args
|
|
# e.g., {'aria2c': ['-x', '8']}
|
|
try:
|
|
downloader_name, args_str = args.downloader_args.split(':', 1)
|
|
ydl_opts.setdefault('downloader_args', {})[downloader_name] = shlex.split(args_str)
|
|
except ValueError:
|
|
logger.error(f"Invalid --downloader-args format. Expected 'downloader:args'. Got: '{args.downloader_args}'")
|
|
return 1
|
|
|
|
if args.merge_output_format:
|
|
ydl_opts['merge_output_format'] = args.merge_output_format
|
|
|
|
if args.download_sections:
|
|
ydl_opts['download_sections'] = args.download_sections
|
|
|
|
if args.test:
|
|
ydl_opts['test'] = True
|
|
|
|
if args.retries is not None:
|
|
ydl_opts['retries'] = args.retries
|
|
if args.fragment_retries is not None:
|
|
ydl_opts['fragment_retries'] = args.fragment_retries
|
|
if args.socket_timeout is not None:
|
|
ydl_opts['socket_timeout'] = args.socket_timeout
|
|
|
|
try:
|
|
logger.info(f"Starting download for format '{args.format}' using yt-dlp library...")
|
|
|
|
download_buffer = None
|
|
if args.output_buffer:
|
|
# When downloading to buffer, we redirect stdout to capture the binary data.
|
|
download_buffer = io.BytesIO()
|
|
ctx_mgr = contextlib.redirect_stdout(download_buffer)
|
|
else:
|
|
# Otherwise, use a null context manager.
|
|
ctx_mgr = contextlib.nullcontext()
|
|
|
|
with ctx_mgr, yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
# The download() method is for URLs. For a pre-fetched info dict,
|
|
# we must use process_ie_result to bypass the info extraction step.
|
|
# It raises DownloadError on failure, which is caught by the outer try...except block.
|
|
ydl.process_ie_result(info_data)
|
|
# If process_ie_result completes without an exception, the download was successful.
|
|
retcode = 0
|
|
|
|
# The success path is now always taken if no exception was raised.
|
|
if retcode == 0:
|
|
if ytdlp_logger.is_403:
|
|
logger.error("Download failed: yt-dlp reported HTTP Error 403: Forbidden. The URL has likely expired.")
|
|
return 1
|
|
if ytdlp_logger.is_timeout:
|
|
logger.error("Download failed: yt-dlp reported a timeout.")
|
|
return 1
|
|
|
|
logger.info("yt-dlp download completed successfully.")
|
|
|
|
if args.output_buffer:
|
|
# Write the captured binary data to the actual stdout.
|
|
sys.stdout.buffer.write(download_buffer.getvalue())
|
|
sys.stdout.buffer.flush()
|
|
# Print the filename to stderr for the orchestrator.
|
|
if ytdlp_logger.final_filename:
|
|
print(ytdlp_logger.final_filename, file=sys.stderr)
|
|
else:
|
|
# Print the filename to stdout as usual.
|
|
if ytdlp_logger.final_filename:
|
|
print(ytdlp_logger.final_filename, file=sys.stdout)
|
|
|
|
if args.cleanup:
|
|
downloaded_filepath = ytdlp_logger.final_filename
|
|
if downloaded_filepath and os.path.exists(downloaded_filepath):
|
|
try:
|
|
logger.info(f"Cleanup: Renaming and truncating '{downloaded_filepath}'")
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
directory, original_filename = os.path.split(downloaded_filepath)
|
|
filename_base, filename_ext = os.path.splitext(original_filename)
|
|
new_filename = f"{filename_base}_{timestamp}{filename_ext}.empty"
|
|
new_filepath = os.path.join(directory, new_filename)
|
|
os.rename(downloaded_filepath, new_filepath)
|
|
logger.info(f"Renamed to '{new_filepath}'")
|
|
with open(new_filepath, 'w') as f:
|
|
pass
|
|
logger.info(f"Truncated '{new_filepath}' to 0 bytes.")
|
|
except Exception as e:
|
|
logger.error(f"Cleanup failed: {e}")
|
|
return 1 # Treat cleanup failure as a script failure
|
|
elif not args.output_buffer:
|
|
logger.warning("Cleanup requested, but no downloaded file was found. Skipping cleanup.")
|
|
return 0
|
|
else:
|
|
logger.error(f"yt-dlp download failed with internal exit code {retcode}.")
|
|
return 1
|
|
|
|
except yt_dlp.utils.DownloadError as e:
|
|
# This catches download-specific errors from yt-dlp
|
|
logger.error(f"yt-dlp DownloadError: {e}")
|
|
return 1
|
|
except Exception as e:
|
|
logger.exception(f"An unexpected error occurred during yt-dlp execution: {e}")
|
|
return 1
|