yt-dlp-dags/ytops_client/stress_formats_tool.py

789 lines
38 KiB
Python

#!/usr/bin/env python3
"""
Tool to stress-test video format download URLs from an info.json.
"""
import argparse
import collections
import concurrent.futures
import json
import logging
import os
import random
import re
import shlex
import signal
import subprocess
import sys
import threading
import time
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlparse, parse_qs
# Configure logging
logger = logging.getLogger('stress_formats_tool')
def get_video_id(url: str) -> str:
"""Extracts a YouTube video ID from a URL."""
# For URLs like https://www.youtube.com/watch?v=VIDEO_ID
match = re.search(r"v=([0-9A-Za-z_-]{11})", url)
if match:
return match.group(1)
# For URLs like https://youtu.be/VIDEO_ID
match = re.search(r"youtu\.be\/([0-9A-Za-z_-]{11})", url)
if match:
return match.group(1)
# For plain video IDs
if re.fullmatch(r'[0-9A-Za-z_-]{11}', url):
return url
return "unknown_video_id"
def get_display_name(path_or_url):
"""Returns a clean name for logging, either a filename or a video ID."""
if isinstance(path_or_url, Path):
return path_or_url.name
path_str = str(path_or_url)
video_id = get_video_id(path_str)
if video_id != "unknown_video_id":
return video_id
# Fallback for file paths as strings or weird URLs
return Path(path_str).name
def format_size(b):
"""Format size in bytes to human-readable string."""
if b is None:
return 'N/A'
if b < 1024:
return f"{b}B"
elif b < 1024**2:
return f"{b/1024:.2f}KiB"
elif b < 1024**3:
return f"{b/1024**2:.2f}MiB"
else:
return f"{b/1024**3:.2f}GiB"
class StatsTracker:
"""Tracks and reports statistics for the stress test."""
def __init__(self, stats_file=None):
self.events = []
self.start_time = time.time()
self.lock = threading.Lock()
self.stats_file_path = stats_file
self.stats_file_handle = None
if self.stats_file_path:
try:
self.stats_file_handle = open(self.stats_file_path, 'a', encoding='utf-8')
except IOError as e:
logger.error(f"Could not open stats file {self.stats_file_path}: {e}")
def log_event(self, event_data):
"""Log a download attempt event."""
with self.lock:
event_data['timestamp'] = datetime.now().isoformat()
self.events.append(event_data)
if self.stats_file_handle:
self.stats_file_handle.write(json.dumps(event_data) + '\n')
self.stats_file_handle.flush()
def close(self):
"""Close the stats file."""
if self.stats_file_handle:
self.stats_file_handle.close()
def print_summary(self):
"""Print a summary of the test run."""
with self.lock:
if not self.events:
logger.info("No events were recorded.")
return
duration = time.time() - self.start_time
# Separate events by type
fetch_events = [e for e in self.events if e.get('type') == 'fetch']
download_events = [e for e in self.events if e.get('type') != 'fetch'] # Default to download for old events
logger.info("\n--- Test Summary ---")
logger.info(f"Total duration: {duration:.2f} seconds")
if fetch_events:
total_fetches = len(fetch_events)
successful_fetches = sum(1 for e in fetch_events if e['success'])
failed_fetches = total_fetches - successful_fetches
logger.info("\n--- Fetch Summary ---")
logger.info(f"Total info.json fetch attempts: {total_fetches}")
logger.info(f" - Successful: {successful_fetches}")
logger.info(f" - Failed: {failed_fetches}")
if total_fetches > 0:
success_rate = (successful_fetches / total_fetches) * 100
logger.info(f"Success rate: {success_rate:.2f}%")
if failed_fetches > 0:
error_counts = collections.Counter(e.get('error_type', 'Unknown') for e in fetch_events if not e['success'])
logger.info("Failure breakdown:")
for error_type, count in sorted(error_counts.items()):
logger.info(f" - {error_type}: {count}")
if download_events:
total_attempts = len(download_events)
successes = sum(1 for e in download_events if e['success'])
failures = total_attempts - successes
logger.info("\n--- Download Summary ---")
logger.info(f"Total download attempts: {total_attempts}")
logger.info(f" - Successful: {successes}")
logger.info(f" - Failed: {failures}")
if total_attempts > 0:
success_rate = (successes / total_attempts) * 100
logger.info(f"Success rate: {success_rate:.2f}%")
if duration > 1 and total_attempts > 0:
dpm = (total_attempts / duration) * 60
logger.info(f"Attempt rate: {dpm:.2f} attempts/minute")
# Download volume stats
total_bytes = sum(e.get('downloaded_bytes', 0) for e in download_events if e['success'])
if total_bytes > 0:
logger.info(f"Total data downloaded: {format_size(total_bytes)}")
if duration > 1:
bytes_per_second = total_bytes / duration
gb_per_hour = (bytes_per_second * 3600) / (1024**3)
gb_per_day = gb_per_hour * 24
logger.info(f"Download rate: {gb_per_hour:.3f} GB/hour ({gb_per_day:.3f} GB/day)")
if failures > 0:
error_counts = collections.Counter(e.get('error_type', 'Unknown') for e in download_events if not e['success'])
logger.info("Failure breakdown:")
for error_type, count in sorted(error_counts.items()):
logger.info(f" - {error_type}: {count}")
logger.info("--------------------")
def print_banner(args, info_jsons=None, urls=None):
"""Prints a summary of the test configuration."""
logger.info("--- Stress Test Configuration ---")
if args.urls_file:
if args.fetch_only:
logger.info(f"Mode: Fetch-only. Generating info.json files from URL list.")
else:
logger.info(f"Mode: Full-stack test from URL list.")
logger.info(f"URL file: {args.urls_file} ({len(urls)} URLs)")
logger.info(f"Workers: {args.workers}")
logger.info(f"Info.json command: {args.info_json_gen_cmd}")
if args.info_json_gen_cmd_alt and args.alt_cmd_every_n > 0:
logger.info(f"Alternate command (every {args.alt_cmd_every_n} URLs): {args.info_json_gen_cmd_alt}")
if args.profile_prefix:
if args.profile_pool:
logger.info(f"Profile mode: Pool of {args.profile_pool} (prefix: {args.profile_prefix})")
elif args.profile_per_request:
logger.info(f"Profile mode: New profile per request (prefix: {args.profile_prefix})")
else: # info-json-files
logger.info(f"Mode: Download-only from static info.json files.")
if info_jsons:
logger.info(f"Files: {', '.join(str(p.name) for p in info_jsons.keys())}")
logger.info(f"Workers: {args.workers}")
logger.info(f"Format selection: {args.format}")
logger.info(f"Sleep between cycles: {args.sleep}s")
if args.sleep_formats > 0:
logger.info(f"Sleep between formats: {args.sleep_formats}s")
if args.duration > 0:
logger.info(f"Test duration: {args.duration} minutes")
if args.max_attempts > 0:
logger.info(f"Max cycles: {args.max_attempts}")
logger.info(f"Stop on failure: {args.stop_on_failure}")
if args.stop_on_403:
logger.info(f"Stop on 403 error: True")
if args.stop_on_timeout:
logger.info(f"Stop on timeout: True")
logger.info(f"Stats file: {args.stats_file}")
if args.stats_interval > 0:
logger.info(f"Periodic stats interval: {args.stats_interval}s")
if args.format_download_args:
logger.info(f"Extra download args: {args.format_download_args}")
logger.info("Download volume: Tracking total data downloaded")
logger.info("---------------------------------")
def add_stress_formats_parser(subparsers):
"""Add the parser for the 'stress-formats' command."""
parser = subparsers.add_parser(
'stress-formats',
description="A simple, command-line driven stress-testing tool for basic scenarios.\nAll options are configured via flags. For more complex scenarios and advanced\nfeatures like rate limiting and client rotation, use the 'stress-policy' command.",
formatter_class=argparse.RawTextHelpFormatter,
help='Run simple, flag-driven stress tests.',
epilog="""
Usage examples:
# Test a format from a static info.json every 60 seconds
ytops-client stress-formats --info-json-files my_video.json -f 18 --sleep 60
# Test with multiple info.json files in parallel using 4 workers
ytops-client stress-formats --info-json-files "file1.json,file2.json,file3.json" -f 18 --sleep 60 --workers 4
# Fetch a new info.json for a URL and test a format every 5 minutes
ytops-client stress-formats --urls-file urls.txt --info-json-gen-cmd "bin/ytops-client get-info {url}" -f "18" --sleep 300
# Run the test for exactly 10 cycles, continuing on failure
ytops-client stress-formats --info-json-files my_video.json -f 18 --sleep 10 --max-attempts 10 --no-stop-on-failure
"""
)
source_group = parser.add_mutually_exclusive_group(required=True)
source_group.add_argument('--info-json-files', help='Comma-separated paths to static info.json files to use for testing.')
source_group.add_argument('--urls-file', help='Path to a file with URLs/IDs to test. Can be a text file (one per line) or a JSON array of strings.')
parser.add_argument('-f', '--format', help='The format selection string. Can be a comma-separated list of IDs (e.g., "18,137"), "all", "random:X%%" (e.g., "random:10%%"), or "random_from:ID1,ID2,..." to pick one from a list. Required unless --fetch-only is used.')
parser.add_argument('--sleep', type=int, default=60, help='Seconds to wait between batches of download attempts. Default: 60.')
parser.add_argument('--sleep-formats', type=int, default=0, help='Seconds to wait between format downloads within a single file/cycle. Default: 0.')
parser.add_argument('--max-attempts', type=int, default=0, help='Maximum number of test cycles. 0 means run indefinitely. Default: 0.')
parser.add_argument('--duration', type=int, default=0, help='Total duration to run the test in minutes. 0 means run indefinitely (or until max-attempts is reached). Default: 0.')
parser.add_argument('--stop-on-failure', action='store_true', help='Stop the test immediately after the first download failure.')
parser.add_argument('--no-stop-on-failure', dest='stop_on_failure', action='store_false', help='Continue testing even after a download failure. (Default)')
parser.set_defaults(stop_on_failure=False)
parser.add_argument('--stop-on-403', action='store_true', help='Stop the test immediately after a 403 Forbidden error.')
parser.add_argument('--stop-on-timeout', action='store_true', help='Stop the test immediately after a read timeout error.')
parser.add_argument('--fetch-only', action='store_true', help='When used with --urls-file, only fetch and save info.json files without performing download tests.')
parser.add_argument('--workers', type=int, default=1, help='Number of parallel workers for multi-file mode. Default: 1.')
parser.add_argument('--stats-file', default='stress_test_stats.jsonl', help='File to log statistics for each attempt. Default: stress_test_stats.jsonl')
parser.add_argument('--stats-interval', type=int, default=0, help='Interval in seconds to print stats summary periodically. 0 disables. Default: 0.')
# Arguments for info.json generation
parser.add_argument('--info-json-gen-cmd', help='Command template to generate info.json. Use {url}, {worker_id}, {cycle}, and {profile} as placeholders. Required with --urls-file.')
parser.add_argument('--info-json-gen-cmd-alt', help='Alternate command template for info.json generation.')
parser.add_argument('--alt-cmd-every-n', type=int, default=0, help='Use the alternate command for every N-th URL (e.g., N=3 means URLs 3, 6, 9...). Requires --info-json-gen-cmd-alt.')
# Profile generation options
profile_group = parser.add_argument_group('Profile Generation Options (for --urls-file mode)')
profile_group.add_argument('--profile-prefix', help='Base name for generated profile IDs (e.g., "test_user"). Used with --profile-pool or --profile-per-request.')
profile_group.add_argument('--profile-pool', type=int, metavar='N', help='Use a pool of N profiles. Profile ID will be {prefix}_{worker_id %% N}. Requires --profile-prefix.')
profile_group.add_argument('--profile-per-request', action='store_true', help='Generate a new unique profile ID for each request. Profile ID will be {prefix}_{timestamp}_{worker_id}. Requires --profile-prefix.')
# Arguments to pass to format_download.py
parser.add_argument('--format-download-args', nargs='+', help='Additional arguments to pass to the download tool. E.g., --proxy-rename s/old/new/ --cleanup')
parser.add_argument('--verbose', action='store_true', help='Enable verbose output.')
return parser
def run_command(cmd, input_data=None):
"""Runs a command, captures its output, and returns status."""
logger.debug(f"Running command: {' '.join(cmd)}")
try:
process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE if input_data else None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding='utf-8'
)
stdout, stderr = process.communicate(input=input_data)
return process.returncode, stdout, stderr
except FileNotFoundError:
logger.error(f"Command not found: {cmd[0]}. Make sure it's in your PATH.")
return -1, "", f"Command not found: {cmd[0]}"
except Exception as e:
logger.error(f"An error occurred while running command: {' '.join(cmd)}. Error: {e}")
return -1, "", str(e)
def run_download_worker(info_json_path, info_json_content, format_to_download, args):
"""
Performs a single download attempt. Designed to be run in a worker thread.
"""
# 1. Attempt download
download_cmd = [
sys.executable, '-m', 'ytops_client.cli', 'download',
'-f', format_to_download
]
if args.format_download_args:
# with nargs='+', this is a list.
# If it's one item, it might be a single quoted string of args that needs splitting.
if len(args.format_download_args) == 1:
download_cmd.extend(shlex.split(args.format_download_args[0]))
else:
# multiple items, assume they are already split by shell
download_cmd.extend(args.format_download_args)
display_name = get_display_name(info_json_path)
logger.info(f"[{display_name} @ {format_to_download}] Kicking off download process...")
retcode, stdout, stderr = run_command(download_cmd, input_data=info_json_content)
# 2. Check result
is_403_error = "HTTP Error 403" in stderr
is_timeout_error = "Read timed out" in stderr
result = {
'type': 'download',
'path': str(info_json_path),
'format': format_to_download,
'success': retcode == 0,
'error_type': None,
'details': '',
'downloaded_bytes': 0
}
if retcode == 0:
# Success
downloaded_filepath = ''
# The filename is the last non-empty line of stdout that doesn't look like a progress bar
lines = stdout.splitlines()
for line in reversed(lines):
if line and not line.strip().startswith('['):
downloaded_filepath = line.strip()
break
details_str = "OK"
if downloaded_filepath:
details_str = f"Downloaded: {Path(downloaded_filepath).name}"
# Parse download size from stderr
size_in_bytes = 0
size_match = re.search(r'\[download\]\s+100%\s+of\s+~?([0-9.]+)(B|KiB|MiB|GiB)', stderr)
if size_match:
value = float(size_match.group(1))
unit = size_match.group(2)
multipliers = {"B": 1, "KiB": 1024, "MiB": 1024**2, "GiB": 1024**3}
size_in_bytes = int(value * multipliers.get(unit, 1))
result['downloaded_bytes'] = size_in_bytes
details_str += f" ({size_match.group(1)}{unit})"
result['details'] = details_str
else:
# Failure
# Try to get the most relevant error line
error_lines = [line for line in stderr.strip().split('\n') if 'ERROR:' in line]
if error_lines:
result['details'] = error_lines[-1]
else:
# If no "ERROR:" line, use the last few lines of stderr for context.
last_lines = stderr.strip().split('\n')[-3:] # Get up to last 3 lines
result['details'] = ' | '.join(line.strip() for line in last_lines if line.strip())
if not result['details']:
result['details'] = "Unknown error (stderr was empty)"
if is_403_error:
result['error_type'] = 'HTTP 403'
elif is_timeout_error:
result['error_type'] = 'Timeout'
else:
result['error_type'] = f'Exit Code {retcode}'
return result
def process_info_json_cycle(path, content, args, stats):
"""
Processes one info.json file for one cycle, downloading selected formats sequentially.
Logs events and returns a list of results.
"""
results = []
should_stop_file = False
display_name = get_display_name(path)
# Determine formats to test based on the info.json content
try:
info_data = json.loads(content)
available_formats = info_data.get('formats', [])
if not available_formats:
logger.warning(f"[{display_name}] No formats found in info.json. Skipping.")
return []
available_format_ids = [f['format_id'] for f in available_formats]
formats_to_test = []
format_selection_mode = args.format.lower()
if format_selection_mode == 'all':
formats_to_test = available_format_ids
logger.info(f"[{display_name}] Testing all {len(formats_to_test)} available formats.")
elif format_selection_mode.startswith('random:'):
try:
percent_str = format_selection_mode.split(':')[1].rstrip('%')
percent = float(percent_str)
if not (0 < percent <= 100):
raise ValueError("Percentage must be between 0 and 100.")
count = max(1, int(len(available_format_ids) * (percent / 100.0)))
formats_to_test = random.sample(available_format_ids, k=count)
logger.info(f"[{display_name}] Randomly selected {len(formats_to_test)} formats ({percent}%) from all available to test: {', '.join(formats_to_test)}")
except (ValueError, IndexError) as e:
logger.error(f"[{display_name}] Invalid random format selection '{args.format}': {e}. Skipping.")
return []
elif format_selection_mode.startswith('random_from:'):
try:
choices_str = format_selection_mode.split(':', 1)[1]
if not choices_str:
raise ValueError("No formats provided after 'random_from:'.")
format_choices = [f.strip() for f in choices_str.split(',') if f.strip()]
# Filter the choices to only those available in the current info.json
valid_choices = [f for f in format_choices if f in available_format_ids]
if not valid_choices:
logger.warning(f"[{display_name}] None of the requested formats for random selection ({', '.join(format_choices)}) are available. Skipping.")
return []
formats_to_test = [random.choice(valid_choices)]
logger.info(f"[{display_name}] Randomly selected 1 format from your list to test: {formats_to_test[0]}")
except (ValueError, IndexError) as e:
logger.error(f"[{display_name}] Invalid random_from format selection '{args.format}': {e}. Skipping.")
return []
else:
# Standard comma-separated list
requested_formats = [f.strip() for f in args.format.split(',') if f.strip()]
formats_to_test = []
for req_fmt in requested_formats:
# Check for exact match first
if req_fmt in available_format_ids:
formats_to_test.append(req_fmt)
continue
# If no exact match, check for formats that start with this ID + '-'
# e.g., req_fmt '140' should match '140-0'
prefix_match = f"{req_fmt}-"
first_match = next((af for af in available_format_ids if af.startswith(prefix_match)), None)
if first_match:
logger.info(f"[{display_name}] Requested format '{req_fmt}' not found. Using first available match: '{first_match}'.")
formats_to_test.append(first_match)
else:
# This could be a complex selector like 'bestvideo' or '299/298', so keep it.
if req_fmt not in available_format_ids:
logger.warning(f"[{display_name}] Requested format '{req_fmt}' not found in available formats.")
formats_to_test.append(req_fmt)
except json.JSONDecodeError:
logger.error(f"[{display_name}] Failed to parse info.json. Skipping.")
return []
for i, format_id in enumerate(formats_to_test):
if should_stop_file:
break
# Check if the format URL is expired before attempting to download
format_details = next((f for f in available_formats if f.get('format_id') == format_id), None)
if format_details and 'url' in format_details:
parsed_url = urlparse(format_details['url'])
query_params = parse_qs(parsed_url.query)
expire_ts_str = query_params.get('expire', [None])[0]
if expire_ts_str and expire_ts_str.isdigit():
expire_ts = int(expire_ts_str)
if expire_ts < time.time():
logger.warning(f"[{display_name}] Skipping format '{format_id}' because its URL is expired.")
result = {
'type': 'download', 'path': str(path), 'format': format_id,
'success': True, 'error_type': 'Skipped',
'details': 'Download URL is expired', 'downloaded_bytes': 0
}
stats.log_event(result)
results.append(result)
continue # Move to the next format
result = run_download_worker(path, content, format_id, args)
stats.log_event(result)
results.append(result)
status = "SUCCESS" if result['success'] else f"FAILURE ({result['error_type']})"
logger.info(f"Result for {display_name} (format {format_id}): {status} - {result.get('details', 'OK')}")
if not result['success']:
# This flag stops processing more formats for THIS file in this cycle
# The main loop will decide if all cycles should stop.
if args.stop_on_failure or \
(args.stop_on_403 and result['error_type'] == 'HTTP 403') or \
(args.stop_on_timeout and result['error_type'] == 'Timeout'):
logger.info(f"Stopping further format tests for {display_name} in this cycle due to failure.")
should_stop_file = True
# Sleep between formats if needed
if args.sleep_formats > 0 and i < len(formats_to_test) - 1:
logger.info(f"Sleeping for {args.sleep_formats}s before next format for {display_name}...")
time.sleep(args.sleep_formats)
return results
def main_stress_formats(args):
"""Main logic for the 'stress-formats' command."""
# The --format argument is required unless we are only fetching info.json files.
if not args.fetch_only and not args.format:
logger.error("Error: argument -f/--format is required when not using --fetch-only.")
return 1
if (args.profile_pool or args.profile_per_request) and not args.profile_prefix:
logger.error("--profile-prefix is required when using --profile-pool or --profile-per-request.")
return 1
if args.urls_file and args.fetch_only and not args.info_json_gen_cmd:
logger.error("--info-json-gen-cmd is required when using --urls-file with --fetch-only.")
return 1
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
else:
# Make the default logger more concise for test output
for handler in logging.root.handlers:
handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s', datefmt='%H:%M:%S'))
stats = StatsTracker(args.stats_file)
start_time = time.time()
duration_seconds = args.duration * 60 if args.duration > 0 else 0
# --- Load sources ---
info_jsons = {}
urls = []
if args.info_json_files:
info_json_files = [Path(p.strip()) for p in args.info_json_files.split(',')]
for file_path in info_json_files:
if not file_path.is_file():
logger.error(f"Info.json file not found: {file_path}")
continue
try:
with open(file_path, 'r', encoding='utf-8') as f:
info_jsons[file_path] = f.read()
except (IOError, json.JSONDecodeError) as e:
logger.error(f"Failed to read or parse {file_path}: {e}")
if not info_jsons:
logger.error("No valid info.json files to process. Exiting.")
return 1
logger.info(f"Loaded {len(info_jsons)} info.json file(s).")
print_banner(args, info_jsons=info_jsons)
elif args.urls_file:
if not args.info_json_gen_cmd:
logger.error("--info-json-gen-cmd is required when using --urls-file.")
return 1
try:
with open(args.urls_file, 'r', encoding='utf-8') as f:
content = f.read()
# Try parsing as JSON array first
try:
data = json.loads(content)
if isinstance(data, list) and all(isinstance(item, str) for item in data):
urls = data
logger.info(f"Loaded {len(urls)} URLs/IDs from JSON array in {args.urls_file}.")
else:
# Valid JSON, but not a list of strings. Treat as error to avoid confusion.
logger.error(f"URL file '{args.urls_file}' is valid JSON but not an array of strings.")
return 1
except json.JSONDecodeError:
# Fallback to line-by-line parsing for plain text files
urls = [line.strip() for line in content.splitlines() if line.strip()]
logger.info(f"Loaded {len(urls)} URLs/IDs from text file {args.urls_file}.")
if not urls:
logger.error(f"URL file '{args.urls_file}' is empty or contains no valid URLs/IDs.")
return 1
except IOError as e:
logger.error(f"Failed to read URL file {args.urls_file}: {e}")
return 1
# Clean up URLs/IDs which might have extra quotes, commas, or brackets from copy-pasting
cleaned_urls = []
for url in urls:
# Strip whitespace, then trailing comma, then surrounding junk, then whitespace again
cleaned_url = url.strip().rstrip(',').strip().strip('\'"[]').strip()
if cleaned_url:
cleaned_urls.append(cleaned_url)
if len(cleaned_urls) != len(urls):
logger.info(f"Cleaned URL list, removed {len(urls) - len(cleaned_urls)} empty or invalid entries.")
urls = cleaned_urls
if not urls:
logger.error("URL list is empty after cleaning. Exiting.")
return 1
print_banner(args, urls=urls)
# --- Main test loop ---
cycles = 0
last_stats_print_time = time.time()
try:
# --- Worker function for URL mode ---
def process_url_task(url, url_index, cycle_num):
"""Worker to generate info.json for a URL and then test formats."""
# 1. Generate profile name if configured
profile_name = None
if args.profile_prefix:
if args.profile_pool:
profile_name = f"{args.profile_prefix}_{url_index % args.profile_pool}"
elif args.profile_per_request:
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
profile_name = f"{args.profile_prefix}_{timestamp}_{url_index}"
# 2. Select and format the generation command
gen_cmd_template = args.info_json_gen_cmd
if args.alt_cmd_every_n > 0 and args.info_json_gen_cmd_alt and (url_index + 1) % args.alt_cmd_every_n == 0:
gen_cmd_template = args.info_json_gen_cmd_alt
logger.info(f"Using alternate command for URL #{url_index + 1}: {url}")
try:
# shlex.split handles quoted arguments in the template
video_id = get_video_id(url)
gen_cmd = []
template_args = shlex.split(gen_cmd_template)
# If the video ID could be mistaken for an option, and it appears to be
# a positional argument, insert '--' to prevent misinterpretation.
if video_id.startswith('-'):
try:
# Heuristic: if {url} is the last token, it's likely positional.
if template_args and template_args[-1] == '{url}':
template_args.insert(-1, '--')
except (ValueError, IndexError):
pass # {url} not found or list is empty.
for arg in template_args:
# Replace placeholders
formatted_arg = arg.replace('{url}', video_id) \
.replace('{worker_id}', str(url_index)) \
.replace('{cycle}', str(cycle_num))
if profile_name:
formatted_arg = formatted_arg.replace('{profile}', profile_name)
gen_cmd.append(formatted_arg)
# Pass verbose flag through if set
if args.verbose and 'get_info_json_client.py' in gen_cmd_template and '--verbose' not in gen_cmd_template:
gen_cmd.append('--verbose')
except Exception as e:
logger.error(f"Failed to format --info-json-gen-cmd: {e}")
stats.log_event({'path': url, 'success': False, 'error_type': 'BadGenCmd', 'details': 'Cmd format error'})
return []
# 3. Run command to get info.json
log_msg = f"[{url}] Generating info.json"
if profile_name:
log_msg += f" with profile '{profile_name}'"
log_msg += "..."
logger.info(log_msg)
retcode, stdout, stderr = run_command(gen_cmd)
if retcode != 0:
error_msg = stderr.strip().split('\n')[-1]
logger.error(f"[{url}] Failed to generate info.json: {error_msg}")
event = {'type': 'fetch', 'path': url, 'success': False, 'error_type': 'GetInfoJsonFail', 'details': error_msg}
stats.log_event(event)
return [] # Return empty list, as no formats were tested
# Handle --fetch-only
if args.fetch_only:
logger.info(f"[{url}] Successfully fetched info.json. Skipping download due to --fetch-only.")
event = {'type': 'fetch', 'path': url, 'success': True, 'details': 'OK'}
stats.log_event(event)
return [] # Return empty list, indicating no downloads to check for failure
# 4. Pass to the format processing function
return process_info_json_cycle(url, stdout, args, stats)
while True:
if duration_seconds and (time.time() - start_time) > duration_seconds:
logger.info(f"Reached duration limit of {args.duration} minutes. Stopping.")
break
cycles += 1
if args.max_attempts > 0 and cycles > args.max_attempts:
logger.info(f"Reached max cycles ({args.max_attempts}). Stopping.")
break
logger.info(f"--- Cycle #{cycles} ---")
with concurrent.futures.ThreadPoolExecutor(max_workers=args.workers) as executor:
future_to_identifier = {}
if args.info_json_files:
future_to_identifier = {
executor.submit(process_info_json_cycle, path, content, args, stats): path
for path, content in info_jsons.items()
}
elif args.urls_file:
future_to_identifier = {
executor.submit(process_url_task, url, i, cycles): url
for i, url in enumerate(urls)
}
should_stop = False
# Use a set of futures that we can modify while iterating
futures = set(future_to_identifier.keys())
while futures and not should_stop:
# Wait for the next future to complete
done, futures = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
for future in done:
identifier = future_to_identifier[future]
identifier_name = get_display_name(identifier)
try:
results = future.result()
# Check if any result from this file triggers a global stop
for result in results:
if not result['success']:
if args.stop_on_failure:
logger.info(f"Failure on {identifier_name} (format {result['format']}). Shutting down all workers due to --stop-on-failure.")
should_stop = True
elif args.stop_on_403 and result['error_type'] == 'HTTP 403':
logger.info(f"403 error on {identifier_name} (format {result['format']}). Shutting down all workers due to --stop-on-403.")
should_stop = True
elif args.stop_on_timeout and result['error_type'] == 'Timeout':
logger.info(f"Timeout on {identifier_name} (format {result['format']}). Shutting down all workers due to --stop-on-timeout.")
should_stop = True
except Exception as exc:
logger.error(f'{identifier_name} generated an exception: {exc}')
stats.log_event({'path': str(identifier), 'success': False, 'error_type': 'Exception', 'details': str(exc)})
if should_stop:
break # Stop processing results from 'done' set
# Check for duration limit after each batch of tasks completes
if duration_seconds and (time.time() - start_time) > duration_seconds:
logger.info(f"Reached duration limit of {args.duration} minutes. Cancelling remaining tasks.")
should_stop = True
# If the loop was exited, cancel any remaining tasks
if should_stop and futures:
logger.info(f"Cancelling {len(futures)} outstanding task(s).")
for future in futures:
future.cancel()
if should_stop:
break
if args.stats_interval > 0 and (time.time() - last_stats_print_time) >= args.stats_interval:
stats.print_summary()
last_stats_print_time = time.time()
if args.max_attempts > 0 and cycles >= args.max_attempts:
break
logger.info(f"Cycle complete. Sleeping for {args.sleep} seconds...")
# Interruptible sleep that respects the total test duration
sleep_end_time = time.time() + args.sleep
should_stop_after_sleep = False
while time.time() < sleep_end_time:
if duration_seconds and (time.time() - start_time) >= duration_seconds:
logger.info(f"Reached duration limit of {args.duration} minutes during sleep. Stopping.")
should_stop_after_sleep = True
break
time.sleep(1) # Check every second
if should_stop_after_sleep:
break
except KeyboardInterrupt:
logger.info("\nCtrl+C received, shutting down...")
finally:
stats.print_summary()
stats.close()
return 0 if not any(not e['success'] for e in stats.events) else 1