2025-12-26 10:05:00 +03:00

2657 lines
146 KiB
Python

import collections
import json
import logging
import os
import random
import re
import shlex
import sys
import tempfile
import shutil
import threading
import time
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from . import utils as sp_utils
from .process_runners import run_command, run_docker_container, get_worker_id
from ..profile_manager_tool import ProfileManager
logger = logging.getLogger(__name__)
# --- Auth Profile Manager Cache ---
# This is a cache to hold a ProfileManager instance for the auth simulation.
# It's needed so that download workers can decrement the correct pending download counter.
_auth_manager_cache = {}
_auth_manager_lock = threading.Lock()
def get_auth_manager(current_manager, auth_env: str):
"""
Gets a ProfileManager instance for a specific auth simulation environment.
It uses the auth_env provided from the info.json metadata.
"""
with _auth_manager_lock:
if not auth_env:
return None
if auth_env in _auth_manager_cache:
return _auth_manager_cache[auth_env]
logger.info(f"Creating new ProfileManager for auth simulation env: '{auth_env}'")
try:
# Re-use connection settings from the current manager
redis_conn_kwargs = current_manager.redis.connection_pool.connection_kwargs
auth_key_prefix = f"{auth_env}_profile_mgmt_"
auth_manager = ProfileManager(
redis_host=redis_conn_kwargs.get('host'),
redis_port=redis_conn_kwargs.get('port'),
redis_password=redis_conn_kwargs.get('password'),
key_prefix=auth_key_prefix
)
_auth_manager_cache[auth_env] = auth_manager
return auth_manager
except Exception as e:
logger.error(f"Failed to create ProfileManager for auth env '{auth_env}': {e}")
return None
def _run_download_logic(source, info_json_content, policy, state_manager, args, running_processes, process_lock, profile_name=None, profile_manager_instance=None):
"""Shared download logic for a single info.json."""
proxy_url = None
if info_json_content:
try:
info_data = json.loads(info_json_content)
proxy_url = info_data.get('_proxy_url')
except (json.JSONDecodeError, AttributeError):
logger.warning(f"[{sp_utils.get_display_name(source)}] Could not parse info.json to get proxy for download controls.")
d_policy = policy.get('download_policy', {})
temp_download_dir = None
local_policy = policy
if d_policy.get('output_to_airflow_ready_dir'):
local_policy = deepcopy(policy)
temp_download_dir = tempfile.mkdtemp(prefix='stress-dl-video-')
# This modification is safe because it's on a deep copy
local_policy.setdefault('download_policy', {})['output_dir'] = temp_download_dir
logger.info(f"[{sp_utils.get_display_name(source)}] Using temporary download directory: {temp_download_dir}")
try:
if not state_manager.check_and_update_download_rate_limit(proxy_url, local_policy):
return []
state_manager.wait_for_proxy_cooldown(proxy_url, local_policy)
results = process_info_json_cycle(source, info_json_content, local_policy, state_manager, args, running_processes, process_lock, proxy_url=proxy_url, profile_name=profile_name,
profile_manager_instance=profile_manager_instance)
state_manager.update_proxy_finish_time(proxy_url)
# --- Post-download logic for Airflow dir ---
if d_policy.get('output_to_airflow_ready_dir'):
for result in results:
if result.get('success') and result.get('downloaded_filepath'):
try:
video_id = result.get('video_id')
if not video_id:
# Fallback: extract from info.json content
try:
info_data = json.loads(info_json_content)
video_id = info_data.get('id')
except (json.JSONDecodeError, AttributeError):
video_id = None
if not video_id:
logger.error(f"[{sp_utils.get_display_name(source)}] Could not find video ID in result for moving file.")
continue
now = datetime.now()
rounded_minute = (now.minute // 10) * 10
timestamp_str = now.strftime('%Y%m%dT%H') + f"{rounded_minute:02d}"
base_path = d_policy.get('airflow_ready_dir_base_path', 'downloadfiles/videos/ready')
if not os.path.isabs(base_path):
base_path = os.path.join(sp_utils._PROJECT_ROOT, base_path)
final_dir_base = os.path.join(base_path, timestamp_str)
final_dir_path = os.path.join(final_dir_base, video_id)
os.makedirs(final_dir_path, exist_ok=True)
downloaded_file = result['downloaded_filepath']
if os.path.exists(downloaded_file):
shutil.move(downloaded_file, final_dir_path)
logger.info(f"[{sp_utils.get_display_name(source)}] Moved media file to {final_dir_path}")
# The source is the path to the task/info.json file.
if isinstance(source, Path) and source.exists():
new_info_json_name = f"info_{video_id}.json"
dest_info_json_path = os.path.join(final_dir_path, new_info_json_name)
shutil.copy(source, dest_info_json_path)
logger.info(f"[{sp_utils.get_display_name(source)}] Copied info.json to {dest_info_json_path}")
except Exception as e:
logger.error(f"[{sp_utils.get_display_name(source)}] Failed to move downloaded file to Airflow ready directory: {e}")
return results
finally:
if temp_download_dir and os.path.exists(temp_download_dir):
shutil.rmtree(temp_download_dir)
logger.info(f"[{sp_utils.get_display_name(source)}] Cleaned up temporary directory: {temp_download_dir}")
def process_profile_task(profile_name, file_list, policy, state_manager, cycle_num, args, running_processes, process_lock, profile_manager_instance=None):
"""Worker task for a profile, processing its files sequentially."""
logger.info(f"Worker {get_worker_id()} starting task for profile '{profile_name}' with {len(file_list)} files.")
all_results = []
for i, file_path in enumerate(file_list):
if state_manager.shutdown_event.is_set():
logger.info(f"Shutdown requested, stopping task for profile '{profile_name}'.")
break
try:
with open(file_path, 'r', encoding='utf-8') as f:
info_json_content = f.read()
except (IOError, FileNotFoundError) as e:
logger.error(f"[{sp_utils.get_display_name(file_path)}] Could not read info.json file: {e}")
continue # Skip this file
results_for_file = _run_download_logic(file_path, info_json_content, policy, state_manager, args, running_processes, process_lock, profile_name=profile_name, profile_manager_instance=profile_manager_instance)
all_results.extend(results_for_file)
# Mark file as processed if configured. This works for both 'once' and 'continuous' modes.
settings = policy.get('settings', {})
if settings.get('directory_scan_mode') == 'continuous':
state_manager.mark_file_as_processed(file_path)
if settings.get('mark_processed_files'):
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
new_path = file_path.parent / f"{file_path.name}.{timestamp}.processed"
file_path.rename(new_path)
logger.info(f"Marked '{file_path.name}' as processed by renaming to '{new_path.name}'")
except (IOError, OSError) as e:
logger.error(f"Failed to rename processed file '{file_path.name}': {e}")
# Check for stop conditions after processing each file
should_stop_profile = False
for result in results_for_file:
if not result['success']:
s_conditions = policy.get('stop_conditions', {})
if s_conditions.get('on_failure') or \
(s_conditions.get('on_http_403') and result['error_type'] == 'HTTP 403') or \
(s_conditions.get('on_timeout') and result['error_type'] == 'Timeout'):
logger.info(f"Stopping further processing for profile '{profile_name}' due to failure.")
should_stop_profile = True
break
if should_stop_profile:
break
# Apply sleep between tasks for this profile
if i < len(file_list) - 1:
exec_control = policy.get('execution_control', {})
sleep_cfg = exec_control.get('sleep_between_tasks', {})
sleep_min = sleep_cfg.get('min_seconds', 0)
sleep_max_val = sleep_cfg.get('max_seconds')
if sleep_min > 0 or sleep_max_val is not None:
sleep_max = sleep_min if sleep_max_val is None else sleep_max_val
sleep_duration = 0
if sleep_max < sleep_min:
logger.warning(f"sleep_between_tasks: max_seconds ({sleep_max}s) is less than min_seconds ({sleep_min}s). Using max_seconds as fixed sleep duration.")
sleep_duration = sleep_max
elif sleep_max > sleep_min:
sleep_duration = random.uniform(sleep_min, sleep_max)
else: # equal
sleep_duration = sleep_min
if sleep_duration > 0:
logger.debug(f"Profile '{profile_name}' sleeping for {sleep_duration:.2f}s before next file.")
# Interruptible sleep
sleep_end_time = time.time() + sleep_duration
while time.time() < sleep_end_time:
if state_manager.shutdown_event.is_set():
break
time.sleep(0.2)
return all_results
def run_download_worker(info_json_path, info_json_content, format_to_download, policy, args, running_processes, process_lock, state_manager, profile_name=None):
"""
Performs a single download attempt. Designed to be run in a worker thread.
"""
worker_id = get_worker_id()
display_name = sp_utils.get_display_name(info_json_path)
profile_log_part = f" [Profile: {profile_name}]" if profile_name else ""
log_prefix = f"[Worker {worker_id}]{profile_log_part} [{display_name} @ {format_to_download}]"
download_policy = policy.get('download_policy', {})
settings = policy.get('settings', {})
downloader = download_policy.get('downloader')
# Get script command from settings, with fallback to download_policy for old format.
script_cmd_str = settings.get('download_script')
if not script_cmd_str:
script_cmd_str = download_policy.get('script')
if script_cmd_str:
download_cmd = shlex.split(script_cmd_str)
elif downloader == 'aria2c_rpc':
download_cmd = [sys.executable, '-m', 'ytops_client.cli', 'download', 'aria-rpc']
elif downloader == 'native-cli':
download_cmd = [sys.executable, '-m', 'ytops_client.cli', 'download', 'cli']
else:
# Default to the new native-py downloader if downloader is 'native-py' or not specified.
download_cmd = [sys.executable, '-m', 'ytops_client.cli', 'download', 'py']
download_cmd.extend(['-f', format_to_download])
if downloader == 'aria2c_rpc':
if download_policy.get('aria_host'):
download_cmd.extend(['--aria-host', str(download_policy['aria_host'])])
if download_policy.get('aria_port'):
download_cmd.extend(['--aria-port', str(download_policy['aria_port'])])
if download_policy.get('aria_secret'):
download_cmd.extend(['--aria-secret', str(download_policy['aria_secret'])])
if download_policy.get('output_dir'):
download_cmd.extend(['--output-dir', str(download_policy['output_dir'])])
if download_policy.get('aria_remote_dir'):
download_cmd.extend(['--remote-dir', str(download_policy['aria_remote_dir'])])
if download_policy.get('aria_fragments_dir'):
download_cmd.extend(['--fragments-dir', str(download_policy['aria_fragments_dir'])])
# For stress testing, waiting is the desired default to get a success/fail result.
# Allow disabling it by explicitly setting aria_wait: false in the policy.
if download_policy.get('aria_wait', True):
download_cmd.append('--wait')
if download_policy.get('auto_merge_fragments'):
download_cmd.append('--auto-merge-fragments')
if download_policy.get('remove_fragments_after_merge'):
download_cmd.append('--remove-fragments-after-merge')
if download_policy.get('cleanup'):
download_cmd.append('--cleanup')
if download_policy.get('purge_on_complete'):
download_cmd.append('--purge-on-complete')
downloader_args = download_policy.get('downloader_args')
proxy = download_policy.get('proxy')
if proxy:
# Note: proxy_rename is not supported for aria2c_rpc mode.
proxy_arg = f"--all-proxy {shlex.quote(str(proxy))}"
if downloader_args:
downloader_args = f"{downloader_args} {proxy_arg}"
else:
downloader_args = proxy_arg
if downloader_args:
# For aria2c_rpc, the downloader_args value is passed directly to the script's --downloader-args option.
download_cmd.extend(['--downloader-args', downloader_args])
elif downloader == 'native-cli':
# This is the logic for the legacy download_tool.py (yt-dlp CLI wrapper).
pause_seconds = download_policy.get('pause_before_download_seconds')
if pause_seconds and isinstance(pause_seconds, (int, float)) and pause_seconds > 0:
download_cmd.extend(['--pause', str(pause_seconds)])
if download_policy.get('continue_downloads'):
download_cmd.append('--download-continue')
# Add proxy if specified directly in the policy
proxy = download_policy.get('proxy')
if proxy:
download_cmd.extend(['--proxy', str(proxy)])
proxy_rename = download_policy.get('proxy_rename')
if proxy_rename:
download_cmd.extend(['--proxy-rename', str(proxy_rename)])
extra_args = download_policy.get('extra_args')
if extra_args:
download_cmd.extend(shlex.split(extra_args))
# Note: 'downloader' here refers to yt-dlp's internal downloader, not our script.
# The policy key 'external_downloader' is more clear, but we support 'downloader' for backward compatibility.
ext_downloader = download_policy.get('external_downloader') or download_policy.get('downloader')
if ext_downloader and ext_downloader not in ['native-cli', 'native-py', 'aria2c_rpc']:
download_cmd.extend(['--downloader', str(ext_downloader)])
downloader_args = download_policy.get('downloader_args')
if downloader_args:
download_cmd.extend(['--downloader-args', str(downloader_args)])
if download_policy.get('merge_output_format'):
download_cmd.extend(['--merge-output-format', str(download_policy['merge_output_format'])])
if download_policy.get('merge_output_format'):
download_cmd.extend(['--merge-output-format', str(download_policy['merge_output_format'])])
if download_policy.get('cleanup'):
download_cmd.append('--cleanup')
else:
# This is the default logic for the new native-py downloader.
if download_policy.get('output_to_buffer'):
download_cmd.append('--output-buffer')
else:
# --output-dir is only relevant if not outputting to buffer.
if download_policy.get('output_dir'):
download_cmd.extend(['--output-dir', str(download_policy['output_dir'])])
if download_policy.get('config'):
download_cmd.extend(['--config', str(download_policy['config'])])
if download_policy.get('temp_path'):
download_cmd.extend(['--temp-path', str(download_policy['temp_path'])])
if download_policy.get('continue_downloads'):
download_cmd.append('--download-continue')
pause_seconds = download_policy.get('pause_before_download_seconds')
if pause_seconds and isinstance(pause_seconds, (int, float)) and pause_seconds > 0:
download_cmd.extend(['--pause', str(pause_seconds)])
proxy = download_policy.get('proxy')
if proxy:
download_cmd.extend(['--proxy', str(proxy)])
proxy_rename = download_policy.get('proxy_rename')
if proxy_rename:
download_cmd.extend(['--proxy-rename', str(proxy_rename)])
# The 'extra_args' from the policy are for the download script itself, not for yt-dlp.
# We need to split them and add them to the command.
extra_args = download_policy.get('extra_args')
if extra_args:
download_cmd.extend(shlex.split(extra_args))
# Pass through downloader settings for yt-dlp to use
# e.g. to tell yt-dlp to use aria2c as its backend
ext_downloader = download_policy.get('external_downloader')
if ext_downloader:
download_cmd.extend(['--downloader', str(ext_downloader)])
downloader_args = download_policy.get('downloader_args')
if downloader_args:
download_cmd.extend(['--downloader-args', str(downloader_args)])
if args.dummy:
# Create a copy to add the info.json path for logging, without modifying the original
log_cmd = list(download_cmd)
if isinstance(info_json_path, Path) and info_json_path.exists():
log_cmd.extend(['--load-info-json', str(info_json_path)])
else:
log_cmd.extend(['--load-info-json', '<from_memory>'])
logger.info(f"{log_prefix} Dummy mode: simulating download...")
logger.info(f"{log_prefix} Dummy mode: Would run command: {' '.join(shlex.quote(s) for s in log_cmd)}")
dummy_settings = policy.get('settings', {}).get('dummy_simulation_settings', {})
min_seconds = dummy_settings.get('download_min_seconds', 1.0)
max_seconds = dummy_settings.get('download_max_seconds', 3.0)
failure_rate = dummy_settings.get('download_failure_rate', 0.0)
skipped_rate = dummy_settings.get('download_skipped_failure_rate', 0.0)
sleep_duration = random.uniform(min_seconds, max_seconds)
logger.info(f"{log_prefix} Dummy mode: simulating download for {sleep_duration:.2f}s (from policy range {min_seconds}-{max_seconds}s).")
time.sleep(sleep_duration) # Simulate work
rand_val = random.random()
should_fail_skipped = rand_val < skipped_rate
should_fail_fatal = not should_fail_skipped and rand_val < (skipped_rate + failure_rate)
if should_fail_skipped:
logger.warning(f"{log_prefix} Dummy mode: Injecting simulated skipped download failure.")
return {
'type': 'download',
'path': str(info_json_path),
'format': format_to_download,
'success': False,
'error_type': 'DummySkippedFailure',
'details': 'FAIL (Dummy mode, skipped)',
'downloaded_bytes': 0,
'profile': profile_name,
'downloaded_filepath': None,
'is_tolerated_error': True
}
if should_fail_fatal:
logger.warning(f"{log_prefix} Dummy mode: Injecting simulated fatal download failure.")
return {
'type': 'download',
'path': str(info_json_path),
'format': format_to_download,
'success': False,
'error_type': 'DummyFailure',
'details': 'FAIL (Dummy mode, fatal)',
'downloaded_bytes': 0,
'profile': profile_name,
'downloaded_filepath': None
}
downloaded_filepath = f'/dev/null/{display_name}.mp4'
if download_policy.get('output_to_airflow_ready_dir'):
output_dir = download_policy.get('output_dir')
if output_dir and os.path.isdir(output_dir):
try:
dummy_path_obj = Path(output_dir) / f"{display_name}.mp4"
dummy_path_obj.touch()
downloaded_filepath = str(dummy_path_obj)
logger.info(f"{log_prefix} Dummy mode: created dummy file for Airflow move: {downloaded_filepath}")
except OSError as e:
logger.error(f"{log_prefix} Dummy mode: failed to create dummy file in '{output_dir}': {e}")
return {
'type': 'download',
'path': str(info_json_path),
'format': format_to_download,
'success': True,
'error_type': None,
'details': 'OK (Dummy mode)',
'downloaded_bytes': random.randint(100000, 5000000),
'profile': profile_name,
'downloaded_filepath': downloaded_filepath
}
logger.info(f"{log_prefix} Kicking off download process...")
temp_info_file_path = None
try:
if isinstance(info_json_path, Path) and info_json_path.exists():
# The info.json is already in a file, pass its path directly.
download_cmd.extend(['--load-info-json', str(info_json_path)])
else:
# The info.json content is in memory, so write it to a temporary file.
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json', encoding='utf-8') as temp_f:
temp_f.write(info_json_content)
temp_info_file_path = temp_f.name
download_cmd.extend(['--load-info-json', temp_info_file_path])
cmd_str_for_log = ' '.join(shlex.quote(s) for s in download_cmd)
logger.info(f"{log_prefix} Running download command: {cmd_str_for_log}")
output_to_buffer = download_policy.get('output_to_buffer', False)
retcode, stdout, stderr = run_command(
download_cmd,
running_processes,
process_lock,
binary_stdout=output_to_buffer,
stream_output=getattr(args, 'print_downloader_log', False),
stream_prefix=f"{log_prefix} | "
)
finally:
if temp_info_file_path and os.path.exists(temp_info_file_path):
os.unlink(temp_info_file_path)
is_403_error = "HTTP Error 403" in stderr
is_timeout_error = "Read timed out" in stderr
output_to_buffer = download_policy.get('output_to_buffer', False)
# Parse stdout to find the downloaded file path.
# The download scripts print the final path, sometimes with a prefix.
downloaded_filepath = None
if stdout and not output_to_buffer:
lines = stdout.strip().split('\n')
if lines:
last_line = lines[-1].strip()
# Handle aria-rpc output format: "Download... successful: <path>"
aria_match = re.search(r'successful: (.+)', last_line)
if aria_match:
path_from_aria = aria_match.group(1).strip()
if os.path.exists(path_from_aria):
downloaded_filepath = path_from_aria
else:
logger.warning(f"[{display_name}] Path from aria-rpc output does not exist: '{path_from_aria}'")
# Handle native-py/cli output format (just the path)
elif os.path.exists(last_line):
downloaded_filepath = last_line
result = {
'type': 'download',
'path': str(info_json_path),
'format': format_to_download,
'success': retcode == 0,
'error_type': None,
'details': '',
'downloaded_bytes': 0,
'profile': profile_name,
'downloaded_filepath': downloaded_filepath
}
if retcode == 0:
details_str = "OK"
size_in_bytes = 0
if output_to_buffer:
# The most accurate size is the length of the stdout buffer.
size_in_bytes = len(stdout) # stdout is bytes
details_str += f" (Buffered {sp_utils.format_size(size_in_bytes)})"
else:
size_match = re.search(r'\[download\]\s+100%\s+of\s+~?([0-9.]+)(B|KiB|MiB|GiB)', stderr)
if size_match:
value = float(size_match.group(1))
unit = size_match.group(2)
multipliers = {"B": 1, "KiB": 1024, "MiB": 1024**2, "GiB": 1024**3}
size_in_bytes = int(value * multipliers.get(unit, 1))
details_str += f" ({size_match.group(1)}{unit})"
result['downloaded_bytes'] = size_in_bytes
result['details'] = details_str
else:
# Check for shutdown first. This is the most likely cause for an abrupt non-zero exit.
if state_manager.shutdown_event.is_set():
result['error_type'] = 'Cancelled'
result['details'] = 'Task cancelled during shutdown.'
else:
# Check both stdout and stderr for error messages, as logging might be directed to stdout.
full_output = f"{stdout}\n{stderr}"
# Look for common error indicators from both yt-dlp and our scripts.
error_lines = [
line for line in full_output.strip().split('\n')
if 'ERROR:' in line or ' - ERROR - ' in line or 'DownloadError:' in line or 'Traceback' in line
]
if error_lines:
# Try to get the most specific part of the error.
details = error_lines[-1].strip()
# Our log format is "timestamp - logger - LEVEL - message"
if ' - ERROR - ' in details:
details = details.split(' - ERROR - ', 1)[-1]
result['details'] = details
else:
# Fallback to last non-empty line of stderr if no explicit "ERROR" line found
stderr_lines = [line for line in stderr.strip().split('\n') if line.strip()]
result['details'] = stderr_lines[-1].strip() if stderr_lines else "Unknown error"
if is_403_error:
result['error_type'] = 'HTTP 403'
elif is_timeout_error:
result['error_type'] = 'Timeout'
else:
result['error_type'] = f'Exit Code {retcode}'
return result
def process_info_json_cycle(path, content, policy, state_manager, args, running_processes, process_lock, proxy_url=None, profile_name=None, profile_manager_instance=None):
"""
Processes one info.json file for one cycle, downloading selected formats.
"""
results = []
display_name = sp_utils.get_display_name(path)
d_policy = policy.get('download_policy', {})
s_conditions = policy.get('stop_conditions', {})
try:
info_data = json.loads(content)
# If the task file specifies a format, use it instead of the policy's format list.
# This is for granular tasks generated by the task-generator tool.
if '_ytops_download_format' in info_data:
format_selection = info_data['_ytops_download_format']
logger.info(f"[{display_name}] Using format '{format_selection}' from task file.")
else:
format_selection = d_policy.get('formats', '')
available_formats = [f['format_id'] for f in info_data.get('formats', [])]
if not available_formats:
logger.warning(f"[{display_name}] No formats found in info.json. Skipping.")
return []
formats_to_test = []
if format_selection == 'all':
formats_to_test = available_formats
elif format_selection.startswith('random:'):
percent = float(format_selection.split(':')[1].rstrip('%'))
count = max(1, int(len(available_formats) * (percent / 100.0)))
formats_to_test = random.sample(available_formats, k=count)
elif format_selection.startswith('random_from:'):
choices = [f.strip() for f in format_selection.split(':', 1)[1].split(',')]
valid_choices = [f for f in choices if f in available_formats]
if valid_choices:
formats_to_test = [random.choice(valid_choices)]
else:
# If the format selection contains complex selector characters (other than comma),
# treat the entire string as a single format selector for yt-dlp to interpret.
# Otherwise, split by comma to test each specified format ID individually.
if any(c in format_selection for c in '/+[]()'):
requested_formats = [format_selection]
else:
requested_formats = [f.strip() for f in format_selection.split(',') if f.strip()]
formats_to_test = []
selector_keywords = ('best', 'worst', 'bestvideo', 'bestaudio')
for req_fmt in requested_formats:
# Treat as a selector and pass through if it contains special characters
# or starts with a known selector keyword.
if any(c in req_fmt for c in '/+[]()') or req_fmt.startswith(selector_keywords):
formats_to_test.append(req_fmt)
continue
# Otherwise, treat as a specific format ID that must exist.
# Check for exact match first.
if req_fmt in available_formats:
formats_to_test.append(req_fmt)
continue
# If no exact match, check for formats that start with this ID + '-' and then digits
# e.g., req_fmt '140' should match '140-0' but not '140-something'.
prefix_match_re = re.compile(rf'^{re.escape(req_fmt)}-\d+$')
first_match = next((af for af in available_formats if prefix_match_re.match(af)), None)
if first_match:
logger.info(f"[{display_name}] Requested format '{req_fmt}' not found. Using first available match: '{first_match}'.")
formats_to_test.append(first_match)
else:
logger.warning(f"[{display_name}] Requested format '{req_fmt}' not found in available formats and is not a recognized selector. Skipping this format.")
except json.JSONDecodeError:
logger.error(f"[{display_name}] Failed to parse info.json. Skipping.")
return []
for i, format_id in enumerate(formats_to_test):
if state_manager.shutdown_event.is_set():
logger.info(f"Shutdown requested, stopping further format tests for {display_name}.")
break
# Check if the format URL is expired before attempting to download
format_details = next((f for f in info_data.get('formats', []) if f.get('format_id') == format_id), None)
# If format_id is a complex selector, it won't be found directly. As a heuristic,
# check the expiration of the first format URL, as they typically share the same expiration.
if not format_details and any(c in format_id for c in '/+[]()'):
available_formats_list = info_data.get('formats', [])
if available_formats_list:
format_details = available_formats_list[0]
# The check is enabled by default and can be disabled via policy.
if d_policy.get('check_url_expiration', True) and format_details and 'url' in format_details:
url_to_check = format_details['url']
time_shift_minutes = d_policy.get('expire_time_shift_minutes', 0)
status, time_left_seconds = sp_utils.check_url_expiry(url_to_check, time_shift_minutes)
logger.debug(f"[{display_name}] URL expiration check for format '{format_id}': status={status}, time_left={time_left_seconds:.0f}s")
if status == 'expired':
details = "Download URL is expired"
if time_shift_minutes > 0 and time_left_seconds > 0:
logger.warning(f"[{display_name}] Skipping format '{format_id}' because its URL will expire in {time_left_seconds/60:.1f}m (within {time_shift_minutes}m time-shift).")
details = f"URL will expire within {time_shift_minutes}m time-shift"
else:
logger.warning(f"[{display_name}] Skipping format '{format_id}' because its URL is expired.")
result = {
'type': 'download', 'path': str(path), 'format': format_id,
'success': False, 'error_type': 'Skipped (Expired URL)',
'details': details, 'downloaded_bytes': 0, 'is_tolerated_error': True
}
if proxy_url:
result['proxy_url'] = proxy_url
if profile_manager_instance and profile_name:
profile_manager_instance.record_activity(profile_name, 'tolerated_error')
state_manager.log_event(result)
results.append(result)
continue # Move to the next format
elif status == 'no_expiry_info':
logger.debug(f"[{display_name}] No valid 'expire' parameter found in format URL for '{format_id}'. Skipping expiration check.")
result = run_download_worker(path, content, format_id, policy, args, running_processes, process_lock, state_manager, profile_name=profile_name)
if 'id' in info_data:
result['video_id'] = info_data['id']
if proxy_url:
result['proxy_url'] = proxy_url
# Record download attempt/error if a profile is being used.
if profile_manager_instance and profile_name:
if result.get('success'):
profile_manager_instance.record_activity(profile_name, 'download')
elif result.get('error_type') == 'Cancelled':
pass # Do not record cancellations
elif result.get('is_tolerated_error'):
profile_manager_instance.record_activity(profile_name, 'tolerated_error')
else:
profile_manager_instance.record_activity(profile_name, 'download_error')
state_manager.log_event(result)
results.append(result)
worker_id = get_worker_id()
status = "SUCCESS" if result['success'] else f"FAILURE ({result['error_type']})"
profile_log_part = f" [Profile: {profile_name}]" if profile_name else ""
logger.info(f"[Worker {worker_id}]{profile_log_part} Result for {display_name} (format {format_id}): {status} - {result.get('details', 'OK')}")
if not result['success']:
if s_conditions.get('on_failure') or \
(s_conditions.get('on_http_403') and result['error_type'] == 'HTTP 403') or \
(s_conditions.get('on_timeout') and result['error_type'] == 'Timeout'):
logger.info(f"Stopping further format tests for {display_name} in this cycle due to failure.")
break
sleep_cfg = d_policy.get('sleep_between_formats', {})
sleep_min = sleep_cfg.get('min_seconds', 0)
if sleep_min > 0 and i < len(formats_to_test) - 1:
sleep_max = sleep_cfg.get('max_seconds') or sleep_min
if sleep_max > sleep_min:
sleep_duration = random.uniform(sleep_min, sleep_max)
else:
sleep_duration = sleep_min
logger.debug(f"Sleeping for {sleep_duration:.2f}s between formats for {display_name}.")
# Interruptible sleep
sleep_end_time = time.time() + sleep_duration
while time.time() < sleep_end_time:
if state_manager.shutdown_event.is_set():
break
time.sleep(0.2)
return results
def run_throughput_worker(worker_id, policy, state_manager, args, profile_manager_instance, running_processes, process_lock):
"""A persistent worker for the 'throughput' orchestration mode."""
owner_id = f"throughput-worker-{worker_id}"
settings = policy.get('settings', {})
exec_control = policy.get('execution_control', {})
d_policy = policy.get('download_policy', {})
profile_prefix = d_policy.get('profile_prefix')
if not profile_prefix:
logger.error(f"[Worker {worker_id}] Throughput mode requires 'download_policy.profile_prefix'. Worker exiting.")
return []
no_task_streak = 0
while not state_manager.shutdown_event.is_set():
locked_profile = None
claimed_task_path = None
try:
# 0. If no tasks were found previously, pause briefly.
if no_task_streak > 0:
polling_interval = exec_control.get('worker_polling_interval_seconds', 1)
logger.info(f"[Worker {worker_id}] No tasks found in previous attempt(s). Pausing for {polling_interval}s. (Streak: {no_task_streak})")
time.sleep(polling_interval)
if state_manager.shutdown_event.is_set(): continue
# 1. Find a task and lock its associated profile
locked_profile, claimed_task_path = find_task_and_lock_profile(
profile_manager_instance, owner_id, profile_prefix, policy, worker_id
)
if not locked_profile:
# No task/profile combo was available.
no_task_streak += 1
polling_interval = exec_control.get('worker_polling_interval_seconds', 1)
logger.info(f"[Worker {worker_id}] No available tasks found for any active profiles. Pausing for {polling_interval}s.")
time.sleep(polling_interval)
continue
profile_name = locked_profile['name']
# We have a task and a lock.
if claimed_task_path:
no_task_streak = 0 # Reset streak
# 3. Process the task
try:
with open(claimed_task_path, 'r', encoding='utf-8') as f:
info_json_content = f.read()
except (IOError, FileNotFoundError) as e:
logger.error(f"[{sp_utils.get_display_name(claimed_task_path)}] Could not read claimed task file: {e}")
# Unlock profile and continue, file might be corrupted
profile_manager_instance.unlock_profile(profile_name, owner=owner_id)
locked_profile = None
# Clean up the bad file
try: claimed_task_path.unlink()
except OSError: pass
continue
# The locked profile's proxy MUST be used for the download.
local_policy = deepcopy(policy)
local_policy.setdefault('download_policy', {})['proxy'] = locked_profile['proxy']
_run_download_logic(
source=claimed_task_path,
info_json_content=info_json_content,
policy=local_policy,
state_manager=state_manager,
args=args,
running_processes=running_processes,
process_lock=process_lock,
profile_name=profile_name,
profile_manager_instance=profile_manager_instance
)
# 4. Clean up the processed task file
try:
os.remove(claimed_task_path)
logger.debug(f"[{sp_utils.get_display_name(claimed_task_path)}] Removed processed task file.")
except OSError as e:
logger.error(f"Failed to remove processed task file '{claimed_task_path}': {e}")
else:
# This case should not be reached with the new task-first locking logic.
# If it is, it means find_task_and_lock_profile returned a profile but no task.
logger.warning(f"[Worker {worker_id}] Inconsistent state: locked profile '{profile_name}' but no task was claimed. Unlocking and continuing.")
except Exception as e:
logger.error(f"[Worker {worker_id}] An unexpected error occurred in the worker loop: {e}", exc_info=True)
time.sleep(5) # Pause before retrying to avoid spamming errors
finally:
if locked_profile:
# 5. Unlock the profile. Only apply cooldown if a task was processed.
cooldown = None
if claimed_task_path:
# Enforcer is the only point where we configure to apply different policies,
# since we might restart enforcer, but won't restart stress-policy working on auth and downloads simultaneously.
# This is like applying a policy across multiple workers/machines without needing to restart each of them.
# DESIGN: The cooldown duration is not configured in the worker's policy.
# Instead, it is read from a central Redis key. This key is set by the
# policy-enforcer, making the enforcer the single source of truth for
# this policy. This allows changing the cooldown behavior without
# restarting the workers.
cooldown_config = profile_manager_instance.get_config('unlock_cooldown_seconds')
if cooldown_config:
try:
val = json.loads(cooldown_config)
if isinstance(val, list) and len(val) == 2 and val[0] < val[1]:
cooldown = random.randint(val[0], val[1])
elif isinstance(val, int):
cooldown = val
except (json.JSONDecodeError, TypeError):
if cooldown_config.isdigit():
cooldown = int(cooldown_config)
if cooldown:
logger.info(f"[Worker {worker_id}] Putting profile '{locked_profile['name']}' into COOLDOWN for {cooldown}s.")
profile_manager_instance.unlock_profile(
locked_profile['name'],
owner=owner_id,
rest_for_seconds=cooldown
)
locked_profile = None
# 6. Throughput is now controlled by the enforcer via the profile's
# 'unlock_cooldown_seconds' policy, which puts the profile into a
# RESTING state. The worker does not need to sleep here and can
# immediately try to lock a new profile to maximize throughput.
logger.info(f"[Worker {worker_id}] Worker loop finished.")
return [] # This function doesn't return results directly
def _post_process_and_move_info_json(file_path, profile_name, proxy_url, policy, worker_id, profile_manager_instance=None):
"""Helper to post-process a single info.json file and move it to the final directory."""
direct_policy = policy.get('direct_docker_cli_policy', {})
settings = policy.get('settings', {})
save_dir = settings.get('save_info_json_dir')
if not save_dir:
return False
video_id = "unknown"
try:
# Use a short delay and retry mechanism to handle cases where the file is not yet fully written.
for attempt in range(3):
try:
with open(file_path, 'r+', encoding='utf-8') as f:
info_data = json.load(f)
video_id = info_data.get('id', 'unknown')
env_name = profile_manager_instance.key_prefix.replace('_profile_mgmt_', '') if profile_manager_instance else 'unknown'
info_data['_ytops_metadata'] = {
'profile_name': profile_name,
'proxy_url': proxy_url,
'generation_timestamp_utc': datetime.now(timezone.utc).isoformat(),
'auth_env': env_name
}
f.seek(0)
json.dump(info_data, f, indent=2)
f.truncate()
break # Success
except (json.JSONDecodeError, IOError) as e:
if attempt < 2:
time.sleep(0.2)
else:
raise e
final_path = Path(save_dir) / file_path.name
rename_template = direct_policy.get('rename_file_template')
if rename_template:
sanitized_proxy = re.sub(r'[:/]', '_', proxy_url)
new_name = rename_template.format(
video_id=video_id, profile_name=profile_name, proxy=sanitized_proxy
)
final_path = Path(save_dir) / new_name
# Use rename for atomic move
os.rename(str(file_path), str(final_path))
logger.info(f"[Worker {worker_id}] Post-processed and moved info.json to '{final_path}'")
return True
except (IOError, json.JSONDecodeError, OSError) as e:
logger.error(f"[Worker {worker_id}] Error post-processing '{file_path.name}' (video: {video_id}): {e}")
return False
def find_task_and_lock_profile(profile_manager, owner_id, profile_prefix, policy, worker_id):
"""
Scans for an available task and locks the specific ACTIVE profile that generated it.
This preserves a 1-to-1 relationship between a profile and its tasks.
Returns a tuple of (locked_profile_dict, claimed_task_path_obj) or (None, None).
"""
settings = policy.get('settings', {})
info_json_dir = settings.get('info_json_dir')
if not info_json_dir:
return None, None
# 1. Get ACTIVE profiles from Redis.
active_profiles = profile_manager.list_profiles(state_filter='ACTIVE')
active_profile_names = {p['name'] for p in active_profiles if p['name'].startswith(profile_prefix)}
if not active_profile_names:
return None, None
# 2. Get all available task files.
try:
task_files = list(Path(info_json_dir).glob('*.json'))
except FileNotFoundError:
logger.warning(f"Info JSON directory not found during scan: {info_json_dir}")
return None, None
if not task_files:
return None, None
profile_regex_str = settings.get('profile_extraction_regex')
if not profile_regex_str:
logger.error(f"[Worker {worker_id}] The task-locking strategy requires 'settings.profile_extraction_regex' to be defined in the policy.")
return None, None
try:
profile_regex = re.compile(profile_regex_str)
except re.error as e:
logger.error(f"Invalid profile_extraction_regex in policy: '{profile_regex_str}'. Error: {e}")
return None, None
# 3. Shuffle tasks to distribute load if multiple workers are looking.
random.shuffle(task_files)
# 4. Iterate through tasks and try to lock their corresponding ACTIVE profile.
for task_path in task_files:
match = profile_regex.search(task_path.name)
if not (match and match.groups()):
continue
profile_name = match.group(1)
if profile_name in active_profile_names:
# Found a task for an active profile. Try to lock it.
locked_profile = profile_manager.lock_profile(owner=owner_id, specific_profile_name=profile_name)
if locked_profile:
# Success! Claim the file.
locked_path = task_path.with_suffix(f"{task_path.suffix}.LOCKED.{worker_id}")
try:
task_path.rename(locked_path)
logger.info(f"[Worker {worker_id}] Locked profile '{profile_name}' and claimed its task '{task_path.name}'.")
return locked_profile, locked_path
except FileNotFoundError:
logger.warning(f"[Worker {worker_id}] Task '{task_path.name}' was claimed by another worker. Unlocking '{profile_name}'.")
profile_manager.unlock_profile(profile_name, owner=owner_id)
continue # Try next task
except OSError as e:
logger.error(f"[Worker {worker_id}] Error claiming task file '{task_path.name}': {e}")
profile_manager.unlock_profile(profile_name, owner=owner_id)
continue
# No suitable task/profile combo found.
logger.debug("Found task files, but none correspond to any currently ACTIVE profiles.")
return None, None
def run_direct_batch_worker(worker_id, policy, state_manager, args, profile_manager_instance, urls_list, running_processes, process_lock):
"""A worker for the 'direct_batch_cli' orchestration mode."""
owner_id = f"direct-batch-worker-{worker_id}"
settings = policy.get('settings', {})
exec_control = policy.get('execution_control', {})
gen_policy = policy.get('info_json_generation_policy', {})
direct_policy = policy.get('direct_batch_cli_policy', {})
profile_prefix = gen_policy.get('profile_prefix')
if not profile_prefix:
logger.error(f"[Worker {worker_id}] Direct batch mode requires 'info_json_generation_policy.profile_prefix'. Worker exiting.")
return []
batch_size = direct_policy.get('batch_size')
if not batch_size:
logger.error(f"[Worker {worker_id}] Direct batch mode requires 'direct_batch_cli_policy.batch_size'. Worker exiting.")
return []
save_dir = settings.get('save_info_json_dir')
if not save_dir:
logger.error(f"[Worker {worker_id}] Direct batch mode requires 'settings.save_info_json_dir'. Worker exiting.")
return []
os.makedirs(save_dir, exist_ok=True)
last_used_profile_name = None
while not state_manager.shutdown_event.is_set():
locked_profile = None
temp_batch_file = None
# --- Variables for robust finalization ---
files_created = 0
url_batch_len = 0
batch_started = False
# ---
try:
# 1. Lock a profile
locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix)
# --- New logic to avoid immediate reuse ---
avoid_reuse = direct_policy.get('avoid_immediate_profile_reuse', False)
if avoid_reuse and locked_profile and last_used_profile_name and locked_profile['name'] == last_used_profile_name:
logger.info(f"[Worker {worker_id}] Re-locked same profile '{locked_profile['name']}'. Unlocking and pausing to allow for rotation.")
profile_manager_instance.unlock_profile(locked_profile['name'], owner=owner_id)
wait_seconds = direct_policy.get('avoid_reuse_max_wait_seconds', 5)
time.sleep(wait_seconds)
# After waiting, try to lock again.
logger.info(f"[Worker {worker_id}] Attempting to lock a new profile after waiting.")
locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix)
if locked_profile and locked_profile['name'] == last_used_profile_name:
logger.warning(f"[Worker {worker_id}] Still locking the same profile '{locked_profile['name']}' after waiting. Proceeding to use it to avoid getting stuck.")
elif locked_profile:
logger.info(f"[Worker {worker_id}] Switched to a different profile after waiting: '{locked_profile['name']}'.")
# --- End new logic ---
if not locked_profile:
polling_interval = exec_control.get('worker_polling_interval_seconds', 1)
# --- Add diagnostic logging ---
all_profiles_in_pool = profile_manager_instance.list_profiles()
profiles_in_prefix = [p for p in all_profiles_in_pool if p['name'].startswith(profile_prefix)]
if profiles_in_prefix:
state_counts = collections.Counter(p['state'] for p in profiles_in_prefix)
states_summary = ', '.join(f"{count} {state}" for state, count in sorted(state_counts.items()))
logger.info(f"[Worker {worker_id}] No auth profiles available to lock. Pool status ({profile_prefix}*): {states_summary}. Pausing for {polling_interval}s.")
else:
logger.info(f"[Worker {worker_id}] No auth profiles available to lock. No profiles found with prefix '{profile_prefix}'. Pausing for {polling_interval}s.")
# --- End diagnostic logging ---
time.sleep(polling_interval)
continue
profile_name = locked_profile['name']
proxy_url = locked_profile['proxy']
# 2. Get a batch of URLs from the shared list
url_batch, start_idx = state_manager.get_next_url_batch(batch_size, urls_list)
if not url_batch:
logger.info(f"[Worker {worker_id}] No more URLs to process. Worker exiting.")
break # Exit the while loop
url_batch_len = len(url_batch)
batch_started = True
# Preemptively increment the counter to avoid race conditions with download workers.
profile_manager_instance.increment_pending_downloads(profile_name, url_batch_len)
logger.info(f"[Worker {worker_id}] [{profile_name}] Preemptively incremented pending downloads by {url_batch_len} for the upcoming batch.")
end_idx = start_idx + len(url_batch)
logger.info(f"[Worker {worker_id}] [{profile_name}] Processing batch of {len(url_batch)} URLs (lines {start_idx + 1}-{end_idx} from source).")
video_ids_in_batch = {sp_utils.get_video_id(u) for u in url_batch}
# 3. Write URLs to a temporary batch file
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', encoding='utf-8') as f:
temp_batch_file = f.name
f.write('\n'.join(url_batch))
# 4. Construct and run the command
ytdlp_cmd_str = direct_policy.get('ytdlp_command')
if not ytdlp_cmd_str:
logger.error(f"[Worker {worker_id}] Direct batch mode requires 'direct_batch_cli_policy.ytdlp_command'.")
break
cmd = shlex.split(ytdlp_cmd_str)
cmd.extend(['--batch-file', temp_batch_file])
cmd.extend(['--proxy', proxy_url])
# The output template should not include the .info.json extension, as
# yt-dlp adds it automatically when --write-info-json is used.
output_template_str = direct_policy.get('ytdlp_output_template', '%(id)s')
ytdlp_args = direct_policy.get('ytdlp_args')
custom_env = direct_policy.get('env_vars', {}).copy()
# --- PYTHONPATH for custom yt-dlp module ---
ytdlp_module_path = direct_policy.get('ytdlp_module_path')
if ytdlp_module_path:
existing_pythonpath = custom_env.get('PYTHONPATH', os.environ.get('PYTHONPATH', ''))
# Prepend the custom path to PYTHONPATH to give it precedence
custom_env['PYTHONPATH'] = f"{ytdlp_module_path}{os.pathsep}{existing_pythonpath}".strip(os.pathsep)
logger.debug(f"[Worker {worker_id}] Using custom PYTHONPATH: {custom_env['PYTHONPATH']}")
custom_env['YTDLP_PROFILE_NAME'] = profile_name
custom_env['YTDLP_PROXY_URL'] = proxy_url
env_name = profile_manager_instance.key_prefix.replace('_profile_mgmt_', '')
custom_env['YTDLP_SIM_MODE'] = env_name
# Create a per-profile cache directory and set XDG_CACHE_HOME
cache_dir_base = direct_policy.get('cache_dir_base', '.cache')
profile_cache_dir = os.path.join(cache_dir_base, profile_name)
try:
os.makedirs(profile_cache_dir, exist_ok=True)
custom_env['XDG_CACHE_HOME'] = profile_cache_dir
except OSError as e:
logger.error(f"[Worker {worker_id}] Failed to create cache directory '{profile_cache_dir}': {e}")
# --- Manage User-Agent ---
# Use a consistent User-Agent per profile, storing it in the profile's cache directory.
user_agent = None
user_agent_file = os.path.join(profile_cache_dir, 'user_agent.txt')
try:
if os.path.exists(user_agent_file):
with open(user_agent_file, 'r', encoding='utf-8') as f:
user_agent = f.read().strip()
if not user_agent: # File doesn't exist or is empty
user_agent = sp_utils.generate_user_agent_from_policy(policy)
with open(user_agent_file, 'w', encoding='utf-8') as f:
f.write(user_agent)
logger.info(f"[{profile_name}] Generated and saved new User-Agent: '{user_agent}'")
else:
logger.info(f"[{profile_name}] Using existing User-Agent from cache: '{user_agent}'")
except IOError as e:
logger.error(f"[Worker {worker_id}] Error accessing User-Agent file '{user_agent_file}': {e}. Using generated UA for this run.")
user_agent = sp_utils.generate_user_agent_from_policy(policy) # fallback
# Add proxy rename from policy if specified, for custom yt-dlp forks
proxy_rename = direct_policy.get('ytdlp_proxy_rename')
if proxy_rename:
custom_env['YTDLP_PROXY_RENAME'] = proxy_rename
if user_agent:
cmd.extend(['--user-agent', user_agent])
if ytdlp_args:
cmd.extend(shlex.split(ytdlp_args))
if args.verbose and '--verbose' not in cmd:
cmd.append('--verbose')
if args.dummy_batch:
# In dummy batch mode, we simulate the entire batch process directly.
log_cmd = list(cmd)
log_cmd.extend(['-o', os.path.join('temp_dir', output_template_str)])
logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY BATCH MODE: Simulating batch of {len(url_batch)} URLs.")
logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY BATCH MODE: Would run real command: {' '.join(shlex.quote(s) for s in log_cmd)}")
logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY BATCH MODE: With environment: {custom_env}")
dummy_settings = policy.get('settings', {}).get('dummy_simulation_settings', {})
auth_failure_rate = dummy_settings.get('auth_failure_rate', 0.0)
auth_skipped_rate = dummy_settings.get('auth_skipped_failure_rate', 0.0)
min_seconds = dummy_settings.get('auth_min_seconds', 0.1)
max_seconds = dummy_settings.get('auth_max_seconds', 0.5)
for url in url_batch:
time.sleep(random.uniform(min_seconds, max_seconds))
video_id = sp_utils.get_video_id(url) or f"dummy_{random.randint(1000, 9999)}"
rand_val = random.random()
if rand_val < auth_skipped_rate:
logger.warning(f"[Worker {worker_id}] [{profile_name}] DUMMY: Simulating tolerated failure for {video_id}.")
profile_manager_instance.record_activity(profile_name, 'tolerated_error')
elif rand_val < (auth_skipped_rate + auth_failure_rate):
logger.warning(f"[Worker {worker_id}] [{profile_name}] DUMMY: Simulating fatal failure for {video_id}.")
profile_manager_instance.record_activity(profile_name, 'failure')
else:
# Success - create dummy info.json
profile_manager_instance.record_activity(profile_name, 'success')
files_created += 1
info_data = {'id': video_id, 'title': f'Dummy Video {video_id}', '_dummy': True}
env_name = profile_manager_instance.key_prefix.replace('_profile_mgmt_', '')
info_data['_ytops_metadata'] = {
'profile_name': profile_name, 'proxy_url': proxy_url,
'generation_timestamp_utc': datetime.now(timezone.utc).isoformat(),
'auth_env': env_name
}
final_path = Path(save_dir) / f"{video_id}.info.json"
rename_template = direct_policy.get('rename_file_template')
if rename_template:
sanitized_proxy = re.sub(r'[:/]', '_', proxy_url)
new_name = rename_template.format(video_id=video_id, profile_name=profile_name, proxy=sanitized_proxy)
final_path = Path(save_dir) / new_name
try:
with open(final_path, 'w', encoding='utf-8') as f:
json.dump(info_data, f, indent=2)
logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY: Created dummy info.json: '{final_path}'")
except IOError as e:
logger.error(f"[Worker {worker_id}] [{profile_name}] DUMMY: Failed to write dummy info.json: {e}")
success = (files_created > 0)
state_manager.record_batch_result(success, len(url_batch), profile_name=profile_name)
event = { 'type': 'fetch_batch', 'profile': profile_name, 'proxy_url': proxy_url, 'success': success, 'details': f"Dummy batch completed. Files created: {files_created}/{len(url_batch)}.", 'video_count': len(url_batch) }
state_manager.log_event(event)
else:
with tempfile.TemporaryDirectory(prefix=f"ytdlp-batch-{worker_id}-") as temp_output_dir:
output_template = os.path.join(temp_output_dir, output_template_str)
cmd.extend(['-o', output_template])
logger.info(f"[Worker {worker_id}] [{profile_name}] Processing batch of {len(url_batch)} URLs...")
logger.info(f"[Worker {worker_id}] [{profile_name}] Running command: {' '.join(shlex.quote(s) for s in cmd)}")
logger.info(f"[Worker {worker_id}] [{profile_name}] With environment: {custom_env}")
retcode, stdout, stderr = run_command(
cmd, running_processes, process_lock, env=custom_env, stream_output=args.verbose,
stream_prefix=f"[Worker {worker_id} | yt-dlp] "
)
is_bot_error = "Sign in to confirm you're not a bot" in stderr
if is_bot_error:
logger.warning(f"[Worker {worker_id}] [{profile_name}] Bot detection occurred during batch. Marking as failure.")
processed_files = list(Path(temp_output_dir).glob('*.json'))
for temp_path in processed_files:
files_created += 1
video_id = "unknown"
try:
with open(temp_path, 'r+', encoding='utf-8') as f:
info_data = json.load(f)
video_id = info_data.get('id', 'unknown')
env_name = profile_manager_instance.key_prefix.replace('_profile_mgmt_', '')
info_data['_ytops_metadata'] = {
'profile_name': profile_name,
'proxy_url': proxy_url,
'generation_timestamp_utc': datetime.now(timezone.utc).isoformat(),
'auth_env': env_name
}
f.seek(0)
json.dump(info_data, f, indent=2)
f.truncate()
final_path = Path(save_dir) / temp_path.name
rename_template = direct_policy.get('rename_file_template')
if rename_template:
sanitized_proxy = re.sub(r'[:/]', '_', proxy_url)
new_name = rename_template.format(
video_id=video_id, profile_name=profile_name, proxy=sanitized_proxy
)
final_path = Path(save_dir) / new_name
shutil.move(str(temp_path), str(final_path))
logger.info(f"[Worker {worker_id}] Post-processed and moved info.json to '{final_path}'")
except (IOError, json.JSONDecodeError, OSError) as e:
logger.error(f"[Worker {worker_id}] Error post-processing '{temp_path.name}' (video: {video_id}): {e}")
# The orchestrator records per-URL success/failure for the profile.
# A batch is considered an overall success for logging if it had no fatal errors
# and produced at least one file.
success = (files_created > 0 and not is_bot_error)
if not success:
reason = "bot detection occurred" if is_bot_error else f"0 files created out of {len(url_batch)}"
logger.warning(f"[Worker {worker_id}] [{profile_name}] Marking batch as FAILED. Reason: {reason}.")
# Record batch stats for overall orchestrator health
state_manager.record_batch_result(success, len(url_batch), profile_name=profile_name)
# In this mode, the custom yt-dlp script is responsible for recording
# per-URL activity ('success', 'failure', 'tolerated_error') directly into Redis.
# The orchestrator does not record activity here to avoid double-counting.
logger.info(f"[Worker {worker_id}] [{profile_name}] Batch finished. Per-URL activity was recorded by the yt-dlp script.")
event_details = f"Batch completed. Exit: {retcode}. Files created: {files_created}/{len(url_batch)}."
if not success and stderr:
if is_bot_error:
event_details += " Stderr: Bot detection occurred."
else:
event_details += f" Stderr: {stderr.strip().splitlines()[-1]}"
event = { 'type': 'fetch_batch', 'profile': profile_name, 'proxy_url': proxy_url, 'success': success, 'details': event_details, 'video_count': len(url_batch) }
state_manager.log_event(event)
except Exception as e:
logger.error(f"[Worker {worker_id}] Unexpected error in worker loop: {e}", exc_info=True)
if locked_profile:
profile_manager_instance.record_activity(locked_profile['name'], 'failure')
finally:
if locked_profile and batch_started:
# --- Reconcile pending downloads counter ---
# This is in the finally block to guarantee it runs even if post-processing fails.
adjustment = files_created - url_batch_len
if adjustment != 0:
logger.warning(f"[Worker {worker_id}] [{profile_name}] Reconciling pending downloads. Batch created {files_created}/{url_batch_len} files. Adjusting by {adjustment}.")
profile_manager_instance.increment_pending_downloads(locked_profile['name'], adjustment)
if locked_profile:
last_used_profile_name = locked_profile['name']
cooldown = None
# DESIGN: The cooldown duration is not configured in the worker's policy.
# Instead, it is read from a central Redis key. This key is set by the
# policy-enforcer, making the enforcer the single source of truth for
# this policy. This allows changing the cooldown behavior without
# restarting the workers.
cooldown_config = profile_manager_instance.get_config('unlock_cooldown_seconds')
if cooldown_config:
try:
val = json.loads(cooldown_config)
if isinstance(val, list) and len(val) == 2 and val[0] < val[1]:
cooldown = random.randint(val[0], val[1])
elif isinstance(val, int):
cooldown = val
except (json.JSONDecodeError, TypeError):
if cooldown_config.isdigit():
cooldown = int(cooldown_config)
if cooldown:
logger.info(f"[Worker {worker_id}] Putting profile '{locked_profile['name']}' into COOLDOWN for {cooldown}s.")
profile_manager_instance.unlock_profile(
locked_profile['name'],
owner=owner_id,
rest_for_seconds=cooldown
)
if temp_batch_file and os.path.exists(temp_batch_file):
os.unlink(temp_batch_file)
logger.info(f"[Worker {worker_id}] Worker loop finished.")
return []
def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_manager_instance, urls_list, running_processes, process_lock):
"""A worker for the 'direct_docker_cli' orchestration mode (fetch_only)."""
owner_id = f"direct-docker-worker-{worker_id}"
settings = policy.get('settings', {})
exec_control = policy.get('execution_control', {})
gen_policy = policy.get('info_json_generation_policy', {})
direct_policy = policy.get('direct_docker_cli_policy', {})
profile_prefix = gen_policy.get('profile_prefix')
if not profile_prefix:
logger.error(f"[Worker {worker_id}] Direct docker mode requires 'info_json_generation_policy.profile_prefix'. Worker exiting.")
return []
batch_size = direct_policy.get('batch_size')
if not batch_size:
logger.error(f"[Worker {worker_id}] Direct docker mode requires 'direct_docker_cli_policy.batch_size'. Worker exiting.")
return []
save_dir = settings.get('save_info_json_dir')
if not save_dir:
logger.error(f"[Worker {worker_id}] Direct docker mode requires 'settings.save_info_json_dir'. Worker exiting.")
return []
os.makedirs(save_dir, exist_ok=True)
# --- Docker specific config ---
image_name = direct_policy.get('docker_image_name')
host_mount_path = os.path.abspath(direct_policy.get('docker_host_mount_path'))
container_mount_path = direct_policy.get('docker_container_mount_path')
host_cache_path = direct_policy.get('docker_host_cache_path')
if host_cache_path: host_cache_path = os.path.abspath(host_cache_path)
container_cache_path = direct_policy.get('docker_container_cache_path')
network_name = direct_policy.get('docker_network_name')
if not all([image_name, host_mount_path, container_mount_path]):
logger.error(f"[Worker {worker_id}] Direct docker mode requires 'docker_image_name', 'docker_host_mount_path', and 'docker_container_mount_path'. Worker exiting.")
return []
try:
os.makedirs(host_mount_path, exist_ok=True)
except OSError as e:
logger.error(f"[Worker {worker_id}] Could not create docker_host_mount_path '{host_mount_path}': {e}. Worker exiting.")
return []
last_used_profile_name = None
while not state_manager.shutdown_event.is_set():
locked_profile = None
temp_task_dir_host = None
# --- Variables for robust finalization ---
live_success_count = 0
url_batch_len = 0
batch_started = False
# ---
try:
# 1. Lock a profile
locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix)
# --- New logic to avoid immediate reuse ---
avoid_reuse = direct_policy.get('avoid_immediate_profile_reuse', False)
if avoid_reuse and locked_profile and last_used_profile_name and locked_profile['name'] == last_used_profile_name:
logger.info(f"[Worker {worker_id}] Re-locked same profile '{locked_profile['name']}'. Unlocking and pausing to allow for rotation.")
profile_manager_instance.unlock_profile(locked_profile['name'], owner=owner_id)
wait_seconds = direct_policy.get('avoid_reuse_max_wait_seconds', 5)
time.sleep(wait_seconds)
# After waiting, try to lock again.
logger.info(f"[Worker {worker_id}] Attempting to lock a new profile after waiting.")
locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix)
if locked_profile and locked_profile['name'] == last_used_profile_name:
logger.warning(f"[Worker {worker_id}] Still locking the same profile '{locked_profile['name']}' after waiting. Proceeding to use it to avoid getting stuck.")
elif locked_profile:
logger.info(f"[Worker {worker_id}] Switched to a different profile after waiting: '{locked_profile['name']}'.")
# --- End new logic ---
if not locked_profile:
polling_interval = exec_control.get('worker_polling_interval_seconds', 1)
# --- Add diagnostic logging ---
all_profiles_in_pool = profile_manager_instance.list_profiles()
profiles_in_prefix = [p for p in all_profiles_in_pool if p['name'].startswith(profile_prefix)]
if profiles_in_prefix:
state_counts = collections.Counter(p['state'] for p in profiles_in_prefix)
states_summary = ', '.join(f"{count} {state}" for state, count in sorted(state_counts.items()))
logger.info(f"[Worker {worker_id}] No auth profiles available to lock. Pool status ({profile_prefix}*): {states_summary}. Pausing for {polling_interval}s.")
else:
logger.info(f"[Worker {worker_id}] No auth profiles available to lock. No profiles found with prefix '{profile_prefix}'. Pausing for {polling_interval}s.")
# --- End diagnostic logging ---
time.sleep(polling_interval)
continue
profile_name = locked_profile['name']
proxy_url = locked_profile['proxy']
# --- Manage User-Agent, Visitor ID, and Cache Directory ---
user_agent = None
visitor_id = None
environment = {}
profile_cache_dir_host = None
if host_cache_path:
profile_cache_dir_host = os.path.join(host_cache_path, profile_name)
try:
os.makedirs(profile_cache_dir_host, exist_ok=True)
if container_cache_path:
environment['XDG_CACHE_HOME'] = container_cache_path
# --- User-Agent ---
user_agent_file = os.path.join(profile_cache_dir_host, 'user_agent.txt')
if os.path.exists(user_agent_file):
with open(user_agent_file, 'r', encoding='utf-8') as f:
user_agent = f.read().strip()
if not user_agent:
user_agent = sp_utils.generate_user_agent_from_policy(policy)
with open(user_agent_file, 'w', encoding='utf-8') as f:
f.write(user_agent)
logger.info(f"[{profile_name}] Generated and saved new User-Agent: '{user_agent}'")
else:
logger.info(f"[{profile_name}] Using existing User-Agent from cache: '{user_agent}'")
# --- Visitor ID ---
if direct_policy.get('track_visitor_id'):
visitor_id_file = os.path.join(profile_cache_dir_host, 'visitor_id.txt')
if os.path.exists(visitor_id_file):
with open(visitor_id_file, 'r', encoding='utf-8') as f:
visitor_id = f.read().strip()
if visitor_id:
logger.info(f"[{profile_name}] Using existing Visitor ID from cache: '{visitor_id}'")
except IOError as e:
logger.error(f"[Worker {worker_id}] Error accessing cache file in '{profile_cache_dir_host}': {e}. Using generated UA for this run.")
user_agent = sp_utils.generate_user_agent_from_policy(policy) # Fallback for UA
else:
# Fallback if no cache is configured for auth simulation
user_agent = sp_utils.generate_user_agent_from_policy(policy)
# 2. Get a batch of URLs
url_batch, start_idx = state_manager.get_next_url_batch(batch_size, urls_list)
if not url_batch:
logger.info(f"[Worker {worker_id}] No more URLs to process. Worker exiting.")
break
url_batch_len = len(url_batch)
batch_started = True
# Preemptively increment the counter to avoid race conditions with download workers.
profile_manager_instance.increment_pending_downloads(profile_name, url_batch_len)
logger.info(f"[Worker {worker_id}] [{profile_name}] Preemptively incremented pending downloads by {url_batch_len} for the upcoming batch.")
end_idx = start_idx + len(url_batch)
logger.info(f"[Worker {worker_id}] [{profile_name}] Processing batch of {len(url_batch)} URLs (lines {start_idx + 1}-{end_idx} from source).")
# 3. Prepare files on the host
temp_task_dir_host = tempfile.mkdtemp(prefix=f"docker-task-{worker_id}-", dir=host_mount_path)
task_dir_name = os.path.basename(temp_task_dir_host)
task_dir_container = os.path.join(container_mount_path, task_dir_name)
# Set XDG_CONFIG_HOME for yt-dlp to find the config automatically
environment['XDG_CONFIG_HOME'] = task_dir_container
# Write batch file
temp_batch_file_host = os.path.join(temp_task_dir_host, 'batch.txt')
with open(temp_batch_file_host, 'w', encoding='utf-8') as f:
f.write('\n'.join(url_batch))
# Write yt-dlp config file
base_config_content = ""
base_config_file = direct_policy.get('ytdlp_config_file')
if base_config_file:
# Try path as-is first, then relative to project root.
config_path_to_read = Path(base_config_file)
if not config_path_to_read.exists():
config_path_to_read = Path(sp_utils._PROJECT_ROOT) / base_config_file
if config_path_to_read.exists():
try:
with open(config_path_to_read, 'r', encoding='utf-8') as f:
base_config_content = f.read()
logger.info(f"[Worker {worker_id}] [{profile_name}] Loaded base config from '{config_path_to_read}'")
except IOError as e:
logger.error(f"[Worker {worker_id}] Could not read ytdlp_config_file '{config_path_to_read}': {e}")
else:
logger.error(f"[Worker {worker_id}] Could not find ytdlp_config_file: '{base_config_file}'")
config_overrides = direct_policy.get('ytdlp_config_overrides', {}).copy()
if direct_policy.get('use_cookies') and host_cache_path and container_cache_path:
try:
cookie_file_host = os.path.join(profile_cache_dir_host, 'cookies.txt')
# Ensure the file exists and has the Netscape header if it's empty.
if not os.path.exists(cookie_file_host) or os.path.getsize(cookie_file_host) == 0:
with open(cookie_file_host, 'w', encoding='utf-8') as f:
f.write("# Netscape HTTP Cookie File\n")
logger.info(f"[{profile_name}] Created/initialized cookie file with header: {cookie_file_host}")
cookie_file_container = os.path.join(container_cache_path, 'cookies.txt')
config_overrides['cookies'] = cookie_file_container
logger.info(f"[{profile_name}] Using persistent cookie jar: {cookie_file_host}")
except (IOError, OSError) as e:
logger.error(f"[Worker {worker_id}] Could not create cookie file in '{profile_cache_dir_host}': {e}")
# Inject per-task values into overrides
config_overrides['proxy'] = proxy_url
config_overrides['batch-file'] = os.path.join(task_dir_container, 'batch.txt')
# The output template should not include the .info.json extension, as
# yt-dlp adds it automatically when --write-info-json is used.
config_overrides['output'] = os.path.join(task_dir_container, '%(id)s')
if user_agent:
config_overrides['user-agent'] = user_agent
overrides_content = sp_utils._config_dict_to_flags_file_content(config_overrides)
raw_args_from_policy = direct_policy.get('ytdlp_raw_args', [])
# --- Inject visitor_id into raw args if available ---
if visitor_id:
# Start with a copy of the raw args from policy
new_raw_args = list(raw_args_from_policy)
# --- Handle youtube extractor args ---
youtube_arg_index = -1
original_youtube_value = None
for i, arg in enumerate(new_raw_args):
if arg.startswith('--extractor-args') and 'youtube:' in arg:
youtube_arg_index = i
try:
parts = shlex.split(arg)
if len(parts) == 2 and parts[1].startswith('youtube:'):
original_youtube_value = parts[1]
except ValueError:
logger.warning(f"Could not parse extractor-arg, will not modify: {arg}")
break # Found it, stop searching
if youtube_arg_index != -1 and original_youtube_value:
# Modify existing youtube arg
new_value = f'{original_youtube_value.rstrip()};visitor_data={visitor_id}'
if 'skip=' in new_value:
new_value = re.sub(r'skip=([^;\'"]*)', r'skip=\1,webpage,configs', new_value)
else:
new_value += ';skip=webpage,configs'
new_raw_args[youtube_arg_index] = f'--extractor-args "{new_value}"'
else:
# Add new youtube arg
logger.warning(f"[{profile_name}] No existing '--extractor-args youtube:...' found. Adding a new one for visitor_id.")
new_raw_args.append(f'--extractor-args "youtube:visitor_data={visitor_id};skip=webpage,configs"')
# --- Handle youtubetab extractor args ---
youtubetab_arg_index = -1
for i, arg in enumerate(new_raw_args):
if arg.startswith('--extractor-args') and 'youtubetab:' in arg:
youtubetab_arg_index = i
break
# The request is to set/replace this argument
new_youtubetab_arg = '--extractor-args "youtubetab:skip=webpage"'
if youtubetab_arg_index != -1:
# Replace existing
new_raw_args[youtubetab_arg_index] = new_youtubetab_arg
else:
# Add new
new_raw_args.append(new_youtubetab_arg)
raw_args_from_policy = new_raw_args
# --- End visitor_id injection ---
raw_args_content = '\n'.join(raw_args_from_policy)
config_content = f"{base_config_content.strip()}\n\n# --- Overrides from policy ---\n{overrides_content}"
if raw_args_content:
config_content += f"\n\n# --- Raw args from policy ---\n{raw_args_content}"
logger.info(f"[Worker {worker_id}] [{profile_name}] Generated yt-dlp config file content:\n---config---\n{config_content}\n------------")
# Create the directory structure yt-dlp expects inside the temp task dir
ytdlp_config_dir_host = os.path.join(temp_task_dir_host, 'yt-dlp')
os.makedirs(ytdlp_config_dir_host, exist_ok=True)
temp_config_file_host = os.path.join(ytdlp_config_dir_host, 'config')
with open(temp_config_file_host, 'w', encoding='utf-8') as f:
f.write(config_content)
# 4. Construct and run the 'docker run' command
volumes = {
host_mount_path: {
'bind': container_mount_path,
'mode': 'rw'
}
}
if host_cache_path and container_cache_path:
profile_cache_dir_host = os.path.join(host_cache_path, profile_name)
os.makedirs(profile_cache_dir_host, exist_ok=True)
volumes[profile_cache_dir_host] = {
'bind': container_cache_path,
'mode': 'rw'
}
# The command tells yt-dlp where to find the config file we created.
# We still set XDG_CONFIG_HOME for any other config it might look for.
command = ['yt-dlp', '--config-locations', os.path.join(task_dir_container, 'yt-dlp/config')]
logger.info(f"[Worker {worker_id}] [{profile_name}] Running docker command: {' '.join(shlex.quote(s) for s in command)}")
# For logging purposes, construct the full equivalent command line with host paths
log_config_overrides_for_host = config_overrides.copy()
log_config_overrides_for_host['batch-file'] = temp_batch_file_host
log_config_overrides_for_host['output'] = os.path.join(temp_task_dir_host, '%(id)s')
if 'cookies' in log_config_overrides_for_host and host_cache_path:
log_config_overrides_for_host['cookies'] = os.path.join(profile_cache_dir_host, 'cookies.txt')
log_command_override = ['yt-dlp']
if base_config_content:
log_command_override.extend(sp_utils._parse_config_file_to_cli_args(base_config_content))
log_command_override.extend(sp_utils._config_dict_to_cli_flags(log_config_overrides_for_host))
for raw_arg in raw_args_from_policy:
log_command_override.extend(shlex.split(raw_arg))
# --- Live log parsing and activity recording ---
live_failure_count = 0
live_tolerated_count = 0
activity_lock = threading.Lock()
# Get error patterns from policy for live parsing
tolerated_error_patterns = direct_policy.get('tolerated_error_patterns', [])
fatal_error_patterns = direct_policy.get('fatal_error_patterns', [])
def log_parser_callback(line):
nonlocal live_success_count, live_failure_count, live_tolerated_count
# --- Visitor ID Extraction ---
if direct_policy.get('track_visitor_id') and profile_cache_dir_host:
# e.g., [debug] [youtube] [pot:cache] TRACE: Retrieved cache spec PoTokenCacheSpec(key_bindings={'t': 'webpo', 'cb': '...', 'cbt': 'visitor_id', ...
match = re.search(r"'cb': '([^']*)', 'cbt': 'visitor_id'", line)
if match:
new_visitor_id = match.group(1)
logger.info(f"[Worker {worker_id}] [{profile_name}] Detected new Visitor ID: {new_visitor_id}")
try:
visitor_id_file = os.path.join(profile_cache_dir_host, 'visitor_id.txt')
with open(visitor_id_file, 'w', encoding='utf-8') as f:
f.write(new_visitor_id)
logger.info(f"[{profile_name}] Saved new Visitor ID to cache.")
except IOError as e:
logger.error(f"[{profile_name}] Failed to save new Visitor ID to cache: {e}")
# Success is the highest priority check
if '[info] Writing video metadata as JSON to:' in line:
with activity_lock:
live_success_count += 1
logger.info(f"[Worker {worker_id}] [{profile_name}] Live success #{live_success_count} detected from log.")
profile_manager_instance.record_activity(profile_name, 'success')
# --- Immediate post-processing ---
try:
path_match = re.search(r"Writing video metadata as JSON to: '?([^']+)'?$", line)
if not path_match:
path_match = re.search(r"Writing video metadata as JSON to: (.*)$", line)
if path_match:
container_file_path = path_match.group(1).strip()
if container_file_path.startswith(container_mount_path):
relative_path = os.path.relpath(container_file_path, container_mount_path)
host_file_path = os.path.join(host_mount_path, relative_path)
# The file might not exist immediately.
for _ in range(5): # Retry for up to 0.5s
if os.path.exists(host_file_path):
break
time.sleep(0.1)
if os.path.exists(host_file_path):
_post_process_and_move_info_json(
Path(host_file_path), profile_name, proxy_url, policy, worker_id,
profile_manager_instance=profile_manager_instance
)
else:
logger.warning(f"File from log not found on host for immediate processing: {host_file_path}")
except Exception as e:
logger.error(f"Error during immediate post-processing from log line: {e}")
# --- End immediate post-processing ---
return False
# Check for fatal patterns (e.g., bot detection) which might not start with ERROR:
for pattern in fatal_error_patterns:
if re.search(pattern, line, re.IGNORECASE):
with activity_lock:
live_failure_count += 1
logger.error(f"[Worker {worker_id}] [{profile_name}] Live FATAL error #{live_failure_count} detected from log: {line}")
profile_manager_instance.record_activity(profile_name, 'failure')
if direct_policy.get('ban_on_fatal_error_in_batch'):
logger.warning(f"Banning profile '{profile_name}' immediately due to fatal error to stop container.")
profile_manager_instance.update_profile_state(profile_name, 'BANNED', 'Fatal error during batch')
return True # Signal to stop container
return False # Do not stop if ban_on_fatal_error_in_batch is false
# Only process lines that contain ERROR: for tolerated/generic failures
if 'ERROR:' not in line:
return False
# Check if it's a tolerated error
for pattern in tolerated_error_patterns:
if re.search(pattern, line, re.IGNORECASE):
with activity_lock:
live_tolerated_count += 1
logger.warning(f"[Worker {worker_id}] [{profile_name}] Live TOLERATED error #{live_tolerated_count} detected from log: {line}")
profile_manager_instance.record_activity(profile_name, 'tolerated_error')
return False
# If it's an ERROR: line and not tolerated, it's a failure
with activity_lock:
live_failure_count += 1
logger.warning(f"[Worker {worker_id}] [{profile_name}] Live failure #{live_failure_count} detected from log: {line}")
profile_manager_instance.record_activity(profile_name, 'failure')
return False
if args.dummy_batch:
logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY BATCH MODE: Simulating Docker batch of {len(url_batch)} URLs.")
logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY BATCH MODE: Would run docker command: {' '.join(shlex.quote(s) for s in command)}")
logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY BATCH MODE: With environment: {environment}")
dummy_settings = policy.get('settings', {}).get('dummy_simulation_settings', {})
auth_failure_rate = dummy_settings.get('auth_failure_rate', 0.0)
auth_skipped_rate = dummy_settings.get('auth_skipped_failure_rate', 0.0)
min_seconds = dummy_settings.get('auth_min_seconds', 0.1)
max_seconds = dummy_settings.get('auth_max_seconds', 0.5)
for url in url_batch:
time.sleep(random.uniform(min_seconds, max_seconds))
video_id = sp_utils.get_video_id(url) or f"dummy_{random.randint(1000, 9999)}"
rand_val = random.random()
if rand_val < auth_skipped_rate:
logger.warning(f"[Worker {worker_id}] [{profile_name}] DUMMY: Simulating tolerated failure for {video_id}.")
profile_manager_instance.record_activity(profile_name, 'tolerated_error')
live_tolerated_count += 1
elif rand_val < (auth_skipped_rate + auth_failure_rate):
logger.warning(f"[Worker {worker_id}] [{profile_name}] DUMMY: Simulating fatal failure for {video_id}.")
profile_manager_instance.record_activity(profile_name, 'failure')
live_failure_count += 1
else:
# Success - create dummy info.json
profile_manager_instance.record_activity(profile_name, 'success')
live_success_count += 1
info_data = {'id': video_id, 'title': f'Dummy Video {video_id}', '_dummy': True}
# Create a dummy file in the temp task dir for post-processing to find
dummy_file_path = Path(temp_task_dir_host) / f"{video_id}.info.json"
try:
with open(dummy_file_path, 'w', encoding='utf-8') as f:
json.dump(info_data, f)
except IOError as e:
logger.error(f"[Worker {worker_id}] [{profile_name}] DUMMY: Failed to write dummy info.json for post-processing: {e}")
retcode = 0
stdout, stderr, stop_reason = "", "", None
else:
retcode, stdout, stderr, stop_reason = run_docker_container(
image_name=image_name,
command=command,
volumes=volumes,
stream_prefix=f"[Worker {worker_id} | docker-ytdlp] ",
network_name=network_name,
log_callback=log_parser_callback,
profile_manager=profile_manager_instance,
profile_name=profile_name,
environment=environment,
log_command_override=log_command_override
)
# 5. Post-process results
logger.info(f"[Worker {worker_id}] [{profile_name}] Docker container finished. Post-processing results...")
full_output = f"{stdout}\n{stderr}"
is_bot_error = "Sign in to confirm you're not a bot" in full_output
if is_bot_error:
logger.warning(f"[Worker {worker_id}] [{profile_name}] Bot detection occurred during batch. Marking as failure.")
# Fallback post-processing for any files missed by the live parser.
# The live parser moves files, so this loop should only find leftovers.
processed_files = list(Path(temp_task_dir_host).glob('*.json'))
if processed_files:
logger.info(f"[Worker {worker_id}] Found {len(processed_files)} leftover file(s) to process after live parsing.")
for temp_path in processed_files:
_post_process_and_move_info_json(temp_path, profile_name, proxy_url, policy, worker_id, profile_manager_instance=profile_manager_instance)
# A batch is considered an overall success for logging if it had no fatal errors.
# The per-URL activity has already been recorded live.
# We use live_success_count for a more accurate success metric.
success = (live_success_count > 0 and not is_bot_error)
if not success:
reason = "bot detection occurred" if is_bot_error else f"0 successful files created out of {len(url_batch)}"
logger.warning(f"[Worker {worker_id}] [{profile_name}] Marking batch as FAILED. Reason: {reason}.")
# Record batch stats for overall orchestrator health
state_manager.record_batch_result(success, len(url_batch), profile_name=profile_name)
# If live parsing didn't catch all activity (e.g., yt-dlp exits before printing logs),
# we reconcile the counts here based on files created.
with activity_lock:
processed_count = live_success_count + live_failure_count + live_tolerated_count
# Failures are harder to reconcile from file counts alone.
# We assume live parsing caught them. The total number of failures is
# (batch_size - files_created), but we don't know if they were already recorded.
# The current live parsing is the most reliable source for failures.
unaccounted_failures = len(url_batch) - processed_count
if unaccounted_failures > 0:
logger.info(f"[Worker {worker_id}] [{profile_name}] Reconciling activity: {unaccounted_failures} unaccounted failure(s).")
for _ in range(unaccounted_failures):
profile_manager_instance.record_activity(profile_name, 'failure')
if stop_reason:
logger.warning(f"[Worker {worker_id}] [{profile_name}] Batch aborted due to: {stop_reason}. Adjusting URL index.")
# The batch was from start_idx to end_idx.
# We processed `processed_count` URLs.
# The next batch should start from `start_idx + processed_count`.
# `get_next_url_batch` updated `last_url_index` to `end_idx`. We need to rewind it.
with activity_lock:
processed_count = live_success_count + live_failure_count + live_tolerated_count
next_start_index = start_idx + processed_count
state_manager.update_last_url_index(next_start_index, force=True)
logger.info(f"[Worker {worker_id}] Rewound URL index to {next_start_index} for next worker.")
event_details = f"Docker batch completed. Exit: {retcode}. Files created: {live_success_count}/{len(url_batch)}. (Live successes: {live_success_count}, Live failures: {live_failure_count}, Live tolerated: {live_tolerated_count})"
if not success and stderr:
event_details += f" Stderr: {stderr.strip().splitlines()[-1] if stderr.strip() else 'N/A'}"
if stop_reason:
event_details += f" Aborted: {stop_reason}."
event = { 'type': 'fetch_batch', 'profile': profile_name, 'proxy_url': proxy_url, 'success': success, 'details': event_details, 'video_count': len(url_batch) }
state_manager.log_event(event)
logger.info(f"[Worker {worker_id}] [{profile_name}] Batch processing complete. Worker will now unlock profile and attempt next batch.")
except Exception as e:
logger.error(f"[Worker {worker_id}] Unexpected error in worker loop: {e}", exc_info=True)
if locked_profile:
profile_manager_instance.record_activity(locked_profile['name'], 'failure')
finally:
if locked_profile and batch_started:
# --- Reconcile pending downloads counter ---
# This is in the finally block to guarantee it runs even if post-processing fails.
adjustment = live_success_count - url_batch_len
if adjustment != 0:
logger.warning(f"[Worker {worker_id}] [{profile_name}] Reconciling pending downloads. Batch created {live_success_count}/{url_batch_len} files. Adjusting by {adjustment}.")
profile_manager_instance.increment_pending_downloads(locked_profile['name'], adjustment)
if locked_profile:
last_used_profile_name = locked_profile['name']
profile_manager_instance.unlock_profile(locked_profile['name'], owner=owner_id)
if temp_task_dir_host and os.path.exists(temp_task_dir_host):
# If shutdown is requested, a batch might have been interrupted after files were
# created but before they were post-processed. We preserve the temp directory
# to allow for manual recovery of the info.json files.
if state_manager.shutdown_event.is_set() and any(Path(temp_task_dir_host).iterdir()):
logger.warning(f"Shutdown requested. Preserving temporary task directory for manual recovery: {temp_task_dir_host}")
else:
shutil.rmtree(temp_task_dir_host)
logger.info(f"[Worker {worker_id}] Worker loop finished.")
return []
def run_direct_docker_download_worker(worker_id, policy, state_manager, args, profile_manager_instance, running_processes, process_lock):
"""A worker for the 'direct_docker_cli' orchestration mode with `mode: download_only`."""
owner_id = f"direct-docker-dl-worker-{worker_id}"
settings = policy.get('settings', {})
exec_control = policy.get('execution_control', {})
d_policy = policy.get('download_policy', {})
direct_policy = policy.get('direct_docker_cli_policy', {})
profile_prefix = d_policy.get('profile_prefix')
if not profile_prefix:
logger.error(f"[Worker {worker_id}] Direct docker download mode requires 'download_policy.profile_prefix'. Worker exiting.")
return []
# --- Docker specific config ---
image_name = direct_policy.get('docker_image_name')
host_mount_path = direct_policy.get('docker_host_mount_path')
container_mount_path = direct_policy.get('docker_container_mount_path')
host_download_path = direct_policy.get('docker_host_download_path')
container_download_path = direct_policy.get('docker_container_download_path')
network_name = direct_policy.get('docker_network_name')
if not all([image_name, host_mount_path, container_mount_path, host_download_path, container_download_path]):
logger.error(f"[Worker {worker_id}] Direct docker download mode requires all docker_* keys in 'direct_docker_cli_policy'. Worker exiting.")
return []
try:
os.makedirs(host_mount_path, exist_ok=True)
os.makedirs(host_download_path, exist_ok=True)
except OSError as e:
logger.error(f"[Worker {worker_id}] Could not create required host directories: {e}. Worker exiting.")
return []
no_task_streak = 0
last_used_profile_name = None
while not state_manager.shutdown_event.is_set():
locked_profile = None
claimed_task_path_host = None
temp_config_dir_host = None
was_banned_by_parser = False
try:
if no_task_streak > 0:
polling_interval = exec_control.get('worker_polling_interval_seconds', 1)
# --- Add diagnostic logging ---
all_profiles_in_pool = profile_manager_instance.list_profiles()
profiles_in_prefix = [p for p in all_profiles_in_pool if p['name'].startswith(profile_prefix)]
if profiles_in_prefix:
state_counts = collections.Counter(p['state'] for p in profiles_in_prefix)
states_summary = ', '.join(f"{count} {state}" for state, count in sorted(state_counts.items()))
logger.info(f"[Worker {worker_id}] No tasks found or profiles available. Pool status ({profile_prefix}*): {states_summary}. Pausing for {polling_interval}s. (Streak: {no_task_streak})")
else:
logger.info(f"[Worker {worker_id}] No tasks found or profiles available. No profiles found with prefix '{profile_prefix}'. Pausing for {polling_interval}s. (Streak: {no_task_streak})")
# --- End diagnostic logging ---
time.sleep(polling_interval)
if state_manager.shutdown_event.is_set(): continue
# 1. Find a task and lock its associated profile
locked_profile, claimed_task_path_host = find_task_and_lock_profile(
profile_manager_instance, owner_id, profile_prefix, policy, worker_id
)
if not locked_profile:
no_task_streak += 1
# The main loop will pause if the streak continues.
continue
profile_name = locked_profile['name']
# We have a task and a lock.
# User-Agent is not used for download simulation.
user_agent = None
if claimed_task_path_host:
no_task_streak = 0
auth_profile_name, auth_env = None, None
info_data = None
# --- Read info.json content and metadata first ---
try:
with open(claimed_task_path_host, 'r', encoding='utf-8') as f:
info_data = json.load(f)
# This is critical for decrementing the counter in the finally block
metadata = info_data.get('_ytops_metadata', {})
auth_profile_name = metadata.get('profile_name')
auth_env = metadata.get('auth_env')
except (IOError, json.JSONDecodeError) as e:
logger.error(f"CRITICAL: Could not read or parse task file '{claimed_task_path_host.name}': {e}. This task will be skipped, but the pending downloads counter CANNOT be decremented.")
continue # Skip to finally block to unlock profile
if args.dummy or args.dummy_batch:
logger.info(f"========== [Worker {worker_id}] BEGIN DUMMY DOCKER DOWNLOAD ==========")
logger.info(f"[Worker {worker_id}] Profile: {profile_name} | Task: {claimed_task_path_host.name}")
dummy_settings = policy.get('settings', {}).get('dummy_simulation_settings', {})
min_seconds = dummy_settings.get('download_min_seconds', 1.0)
max_seconds = dummy_settings.get('download_max_seconds', 3.0)
failure_rate = dummy_settings.get('download_failure_rate', 0.0)
skipped_rate = dummy_settings.get('download_skipped_failure_rate', 0.0)
time.sleep(random.uniform(min_seconds, max_seconds))
rand_val = random.random()
should_fail_skipped = rand_val < skipped_rate
should_fail_fatal = not should_fail_skipped and rand_val < (skipped_rate + failure_rate)
if should_fail_skipped:
logger.warning(f"[Worker {worker_id}] DUMMY: Simulating skipped download failure.")
profile_manager_instance.record_activity(profile_name, 'tolerated_error')
elif should_fail_fatal:
logger.warning(f"[Worker {worker_id}] DUMMY: Simulating fatal download failure.")
profile_manager_instance.record_activity(profile_name, 'download_error')
else:
logger.info(f"[Worker {worker_id}] DUMMY: Simulating download success.")
profile_manager_instance.record_activity(profile_name, 'download')
logger.info(f"========== [Worker {worker_id}] END DUMMY DOCKER DOWNLOAD ==========")
# In dummy mode, we just rename the file to processed and continue to the finally block.
try:
base_path_str = str(claimed_task_path_host).rsplit('.LOCKED.', 1)[0]
processed_path = Path(f"{base_path_str}.processed")
claimed_task_path_host.rename(processed_path)
logger.debug(f"DUMMY MODE: Renamed processed task file to '{processed_path.name}'.")
except (OSError, IndexError) as e:
logger.error(f"DUMMY MODE: Failed to rename processed task file '{claimed_task_path_host}': {e}")
continue # Skip to finally block
# --- Check for URL expiration before running Docker ---
if d_policy.get('check_url_expiration', True):
# Heuristic: check the first available format URL
first_format = next((f for f in info_data.get('formats', []) if 'url' in f), None)
if first_format:
url_to_check = first_format['url']
time_shift_minutes = d_policy.get('expire_time_shift_minutes', 0)
status, time_left_seconds = sp_utils.check_url_expiry(url_to_check, time_shift_minutes)
logger.debug(f"[Worker {worker_id}] [{profile_name}] URL expiration check for task '{claimed_task_path_host.name}': status={status}, time_left={time_left_seconds:.0f}s")
if status == 'expired':
details = "Download URL is expired"
if time_shift_minutes > 0 and time_left_seconds > 0:
logger.warning(f"[Worker {worker_id}] [{profile_name}] Skipping task '{claimed_task_path_host.name}' because its URL will expire in {time_left_seconds/60:.1f}m (within {time_shift_minutes}m time-shift).")
details = f"URL will expire within {time_shift_minutes}m time-shift"
else:
logger.warning(f"[Worker {worker_id}] [{profile_name}] Skipping task '{claimed_task_path_host.name}' because its URL is expired.")
profile_manager_instance.record_activity(profile_name, 'tolerated_error')
event = {
'type': 'direct_docker_download', 'profile': profile_name,
'proxy_url': locked_profile['proxy'], 'success': False,
'error_type': 'Skipped (Expired URL)', 'details': details,
'is_tolerated_error': True
}
state_manager.log_event(event)
try:
base_path_str = str(claimed_task_path_host).rsplit('.LOCKED.', 1)[0]
processed_path = Path(f"{base_path_str}.processed")
claimed_task_path_host.rename(processed_path)
logger.debug(f"Renamed expired task file to '{processed_path.name}'.")
except (OSError, IndexError) as e:
logger.error(f"Failed to rename expired task file '{claimed_task_path_host}': {e}")
continue # Skip to the finally block
# The path to the task file inside the container needs to be relative to the host mount root.
# We must make the task path absolute first to correctly calculate the relative path from the absolute mount path.
relative_task_path = os.path.relpath(os.path.abspath(claimed_task_path_host), host_mount_path)
task_path_container = os.path.join(container_mount_path, relative_task_path)
# 3. Prepare config file on host in a temporary directory
temp_config_dir_host = tempfile.mkdtemp(prefix=f"docker-dl-config-{worker_id}-", dir=host_mount_path)
config_dir_name = os.path.basename(temp_config_dir_host)
config_dir_container = os.path.join(container_mount_path, config_dir_name)
environment = {'XDG_CONFIG_HOME': config_dir_container}
base_config_content = ""
base_config_file = direct_policy.get('ytdlp_config_file')
if base_config_file:
config_path_to_read = Path(base_config_file)
if not config_path_to_read.exists():
config_path_to_read = Path(sp_utils._PROJECT_ROOT) / base_config_file
if config_path_to_read.exists():
try:
with open(config_path_to_read, 'r', encoding='utf-8') as base_f:
base_config_content = base_f.read()
except IOError as e:
logger.error(f"[Worker {worker_id}] Could not read ytdlp_config_file '{config_path_to_read}': {e}")
config_overrides = direct_policy.get('ytdlp_config_overrides', {}).copy()
config_overrides['proxy'] = locked_profile['proxy']
config_overrides['load-info-json'] = task_path_container
config_overrides['output'] = os.path.join(container_download_path, '%(id)s.f%(format_id)s.%(ext)s')
# Prevent yt-dlp from using a cache directory.
config_overrides['no-cache-dir'] = True
overrides_content = sp_utils._config_dict_to_flags_file_content(config_overrides)
raw_args_from_policy = direct_policy.get('ytdlp_raw_args', [])
raw_args_content = '\n'.join(raw_args_from_policy)
config_content = f"{base_config_content.strip()}\n\n# --- Overrides from policy ---\n{overrides_content}"
if raw_args_content:
config_content += f"\n\n# --- Raw args from policy ---\n{raw_args_content}"
logger.info(f"[Worker {worker_id}] [{profile_name}] Generated yt-dlp config:\n---config---\n{config_content}\n------------")
ytdlp_config_dir_host = os.path.join(temp_config_dir_host, 'yt-dlp')
os.makedirs(ytdlp_config_dir_host, exist_ok=True)
temp_config_file_host = os.path.join(ytdlp_config_dir_host, 'config')
with open(temp_config_file_host, 'w', encoding='utf-8') as f:
f.write(config_content)
# 4. Construct and run docker run command
volumes = {
os.path.abspath(host_mount_path): {'bind': container_mount_path, 'mode': 'ro'},
os.path.abspath(host_download_path): {'bind': container_download_path, 'mode': 'rw'}
}
# The command tells yt-dlp where to find the config file we created.
# We still set XDG_CONFIG_HOME for any other config it might look for.
command = ['yt-dlp', '--config-locations', os.path.join(config_dir_container, 'yt-dlp/config')]
logger.info(f"[Worker {worker_id}] [{profile_name}] Running docker command: {' '.join(shlex.quote(s) for s in command)}")
# For logging purposes, construct the full equivalent command line with host paths
log_config_overrides_for_host = config_overrides.copy()
log_config_overrides_for_host['load-info-json'] = str(claimed_task_path_host)
log_config_overrides_for_host['output'] = os.path.join(host_download_path, '%(id)s.f%(format_id)s.%(ext)s')
log_command_override = ['yt-dlp']
if base_config_content:
log_command_override.extend(sp_utils._parse_config_file_to_cli_args(base_config_content))
log_command_override.extend(sp_utils._config_dict_to_cli_flags(log_config_overrides_for_host))
raw_args_from_policy = direct_policy.get('ytdlp_raw_args', [])
for raw_arg in raw_args_from_policy:
log_command_override.extend(shlex.split(raw_arg))
# --- Live log parsing and activity recording ---
live_success_count = 0
live_failure_count = 0
live_tolerated_count = 0
activity_lock = threading.Lock()
tolerated_error_patterns = direct_policy.get('tolerated_error_patterns', [])
fatal_error_patterns = direct_policy.get('fatal_error_patterns', [])
def log_parser_callback(line):
nonlocal live_success_count, live_failure_count, live_tolerated_count, was_banned_by_parser
# Success is a high-priority check. Only record one success per task.
if '[download] 100% of' in line or 'has already been downloaded' in line:
with activity_lock:
# Only count one success per task
if live_success_count == 0:
live_success_count += 1
logger.info(f"[Worker {worker_id}] [{profile_name}] Live download success detected from log.")
profile_manager_instance.record_activity(profile_name, 'download')
return False
# Check for fatal patterns
for pattern in fatal_error_patterns:
if re.search(pattern, line, re.IGNORECASE):
with activity_lock:
live_failure_count += 1
logger.error(f"[Worker {worker_id}] [{profile_name}] Live FATAL download error #{live_failure_count} detected from log: {line}")
profile_manager_instance.record_activity(profile_name, 'download_error')
if direct_policy.get('ban_on_fatal_error_in_batch'):
logger.warning(f"Banning profile '{profile_name}' immediately due to fatal download error to stop container.")
profile_manager_instance.update_profile_state(profile_name, 'BANNED', 'Fatal error during download')
was_banned_by_parser = True
return True # Signal to stop container
return False # Do not stop if ban_on_fatal_error_in_batch is false
# Only process lines that contain ERROR: for tolerated/generic failures
if 'ERROR:' not in line:
return False
# Check if it's a tolerated error
for pattern in tolerated_error_patterns:
if re.search(pattern, line, re.IGNORECASE):
with activity_lock:
live_tolerated_count += 1
logger.warning(f"[Worker {worker_id}] [{profile_name}] Live TOLERATED download error #{live_tolerated_count} detected from log: {line}")
profile_manager_instance.record_activity(profile_name, 'tolerated_error')
return False
# If it's an ERROR: line and not tolerated, it's a failure
with activity_lock:
live_failure_count += 1
logger.warning(f"[Worker {worker_id}] [{profile_name}] Live download failure #{live_failure_count} detected from log: {line}")
profile_manager_instance.record_activity(profile_name, 'download_error')
return False
retcode, stdout, stderr, stop_reason = run_docker_container(
image_name=image_name,
command=command,
volumes=volumes,
stream_prefix=f"[Worker {worker_id} | docker-ytdlp] ",
network_name=network_name,
log_callback=log_parser_callback,
profile_manager=profile_manager_instance,
profile_name=profile_name,
environment=environment,
log_command_override=log_command_override
)
# 5. Post-process and record activity
full_output = f"{stdout}\n{stderr}"
is_bot_error = "Sign in to confirm you're not a bot" in full_output
if is_bot_error:
logger.warning(f"[Worker {worker_id}] [{profile_name}] Bot detection occurred during download. Marking as failure.")
# --- Final Outcome Determination ---
# Activity is now recorded live by the log parser. This block just determines
# the overall success/failure for logging and event reporting.
success = False
final_outcome = "unknown"
with activity_lock:
if live_success_count > 0:
success = True
final_outcome = "download"
elif live_failure_count > 0 or is_bot_error:
final_outcome = "download_error"
elif live_tolerated_count > 0:
final_outcome = "tolerated_error"
elif retcode == 0:
# Fallback if no logs were matched but exit was clean.
success = True
final_outcome = "download"
logger.warning(f"[Worker {worker_id}] [{profile_name}] No specific success/error log line matched, but exit code is 0. Assuming success, but this may indicate a parsing issue.")
# We record a success here as a fallback, in case the log parser missed it.
profile_manager_instance.record_activity(profile_name, 'download')
else:
final_outcome = "download_error"
logger.warning(f"[Worker {worker_id}] [{profile_name}] No specific error log line matched, but exit code was {retcode}. Recording a generic download_error.")
profile_manager_instance.record_activity(profile_name, 'download_error')
# --- Airflow Directory Logic ---
if success and d_policy.get('output_to_airflow_ready_dir'):
# Find the downloaded file path from yt-dlp's output
downloaded_filename = None
# Order of checks is important: Merger -> VideoConvertor -> Destination
merge_match = re.search(r'\[Merger\] Merging formats into "([^"]+)"', stdout)
if merge_match:
downloaded_filename = os.path.basename(merge_match.group(1))
else:
convertor_match = re.search(r'\[VideoConvertor\].*?; Destination: (.*)', stdout)
if convertor_match:
downloaded_filename = os.path.basename(convertor_match.group(1).strip())
else:
dest_match = re.search(r'\[download\] Destination: (.*)', stdout)
if dest_match:
downloaded_filename = os.path.basename(dest_match.group(1).strip())
if downloaded_filename:
try:
# Get video_id from the info.json
with open(claimed_task_path_host, 'r', encoding='utf-8') as f:
info_data = json.load(f)
video_id = info_data.get('id')
if not video_id:
logger.error(f"[{profile_name}] Could not find video ID in '{claimed_task_path_host.name}' for moving file.")
else:
now = datetime.now()
rounded_minute = (now.minute // 10) * 10
timestamp_str = now.strftime('%Y%m%dT%H') + f"{rounded_minute:02d}"
base_path = d_policy.get('airflow_ready_dir_base_path', 'downloadfiles/videos/ready')
if not os.path.isabs(base_path):
base_path = os.path.join(sp_utils._PROJECT_ROOT, base_path)
final_dir_base = os.path.join(base_path, timestamp_str)
final_dir_path = os.path.join(final_dir_base, video_id)
os.makedirs(final_dir_path, exist_ok=True)
downloaded_file_host_path = os.path.join(host_download_path, downloaded_filename)
if os.path.exists(downloaded_file_host_path):
shutil.move(downloaded_file_host_path, final_dir_path)
logger.info(f"[{profile_name}] Moved media file to {final_dir_path}")
new_info_json_name = f"info_{video_id}.json"
dest_info_json_path = os.path.join(final_dir_path, new_info_json_name)
shutil.copy(claimed_task_path_host, dest_info_json_path)
logger.info(f"[{profile_name}] Copied info.json to {dest_info_json_path}")
except Exception as e:
logger.error(f"[{profile_name}] Failed to move downloaded file to Airflow ready directory: {e}")
else:
logger.warning(f"[{profile_name}] Download succeeded, but could not parse final filename from output to move to Airflow dir.")
event_details = f"Docker download finished. Exit: {retcode}. Final Outcome: {final_outcome}. (Live successes: {live_success_count}, Live failures: {live_failure_count}, Live tolerated: {live_tolerated_count})"
if not success and stderr:
event_details += f" Stderr: {stderr.strip().splitlines()[-1] if stderr.strip() else 'N/A'}"
if stop_reason:
event_details += f" Aborted: {stop_reason}."
event = { 'type': 'direct_docker_download', 'profile': profile_name, 'proxy_url': locked_profile['proxy'], 'success': success, 'details': event_details }
state_manager.log_event(event)
logger.info(f"[Worker {worker_id}] [{profile_name}] Task processing complete. Worker will now unlock profile and attempt next task.")
# 6. Clean up task file by renaming to .processed
try:
# The claimed_task_path_host has a .LOCKED suffix, remove it before adding .processed
base_path_str = str(claimed_task_path_host).rsplit('.LOCKED.', 1)[0]
processed_path = Path(f"{base_path_str}.processed")
claimed_task_path_host.rename(processed_path)
logger.debug(f"[{sp_utils.get_display_name(claimed_task_path_host)}] Renamed processed task file to '{processed_path.name}'.")
except (OSError, IndexError) as e:
logger.error(f"Failed to rename processed task file '{claimed_task_path_host}': {e}")
# After this point, claimed_task_path_host is no longer valid.
# The metadata has already been read into auth_profile_name and auth_env.
else:
# This case should not be reached with the new task-first locking logic.
logger.warning(f"[Worker {worker_id}] Inconsistent state: locked profile '{profile_name}' but no task was claimed. Unlocking and continuing.")
except Exception as e:
logger.error(f"[Worker {worker_id}] An unexpected error occurred in the worker loop: {e}", exc_info=True)
if locked_profile:
profile_manager_instance.record_activity(locked_profile['name'], 'failure') # Generic failure
time.sleep(5)
finally:
if locked_profile:
if claimed_task_path_host:
# The auth_profile_name and auth_env variables were populated in the `try` block
# before the task file was renamed or deleted.
if auth_profile_name and auth_env:
auth_manager = _get_auth_manager(profile_manager_instance, auth_env)
if auth_manager:
auth_manager.decrement_pending_downloads(auth_profile_name)
else:
logger.error(f"Could not get auth profile manager for env '{auth_env}'. Pending downloads counter will not be decremented.")
else:
logger.warning(f"Could not find auth profile name and/or auth_env in info.json metadata. Pending downloads counter will not be decremented. (Profile: {auth_profile_name}, Env: {auth_env})")
if was_banned_by_parser:
logger.info(f"[Worker {worker_id}] Profile '{locked_profile['name']}' was already banned by the log parser. Skipping unlock/cooldown.")
else:
last_used_profile_name = locked_profile['name']
cooldown = None
# Only apply cooldown if a task was actually claimed and processed.
if claimed_task_path_host:
# Enforcer is the only point where we configure to apply different policies,
# since we might restart enforcer, but won't restart stress-policy working on auth and downloads simultaneously.
# This is like applying a policy across multiple workers/machines without needing to restart each of them.
# DESIGN: The cooldown duration is not configured in the worker's policy.
# Instead, it is read from a central Redis key. This key is set by the
# policy-enforcer, making the enforcer the single source of truth for
# this policy. This allows changing the cooldown behavior without
# restarting the workers.
cooldown_source_value = profile_manager_instance.get_config('unlock_cooldown_seconds')
source_description = "Redis config"
if cooldown_source_value is None:
cooldown_source_value = d_policy.get('default_unlock_cooldown_seconds')
source_description = "local policy"
if cooldown_source_value is not None:
try:
# If from Redis, it's a string that needs parsing.
# If from local policy, it's already an int or list.
val = cooldown_source_value
if isinstance(val, str):
val = json.loads(val)
if isinstance(val, list) and len(val) == 2 and val[0] < val[1]:
cooldown = random.randint(val[0], val[1])
elif isinstance(val, int):
cooldown = val
if cooldown is not None:
logger.debug(f"Determined cooldown from {source_description}: {cooldown_source_value}")
except (json.JSONDecodeError, TypeError):
if isinstance(cooldown_source_value, str) and cooldown_source_value.isdigit():
cooldown = int(cooldown_source_value)
logger.debug(f"Determined cooldown from {source_description}: {cooldown_source_value}")
if cooldown:
logger.info(f"[Worker {worker_id}] Putting profile '{locked_profile['name']}' into COOLDOWN for {cooldown}s.")
profile_manager_instance.unlock_profile(
locked_profile['name'],
owner=owner_id,
rest_for_seconds=cooldown
)
if claimed_task_path_host and os.path.exists(claimed_task_path_host):
try: os.remove(claimed_task_path_host)
except OSError: pass
if temp_config_dir_host and os.path.exists(temp_config_dir_host):
try:
shutil.rmtree(temp_config_dir_host)
except OSError: pass
logger.info(f"[Worker {worker_id}] Worker loop finished.")
return []
def run_direct_download_worker(worker_id, policy, state_manager, args, profile_manager_instance, running_processes, process_lock):
"""A persistent worker for the 'direct_download_cli' orchestration mode."""
owner_id = f"direct-dl-worker-{worker_id}"
settings = policy.get('settings', {})
exec_control = policy.get('execution_control', {})
d_policy = policy.get('download_policy', {})
direct_policy = policy.get('direct_download_cli_policy', {})
profile_prefix = d_policy.get('profile_prefix')
if not profile_prefix:
logger.error(f"[Worker {worker_id}] Direct download mode requires 'download_policy.profile_prefix'. Worker exiting.")
return []
output_dir = direct_policy.get('output_dir')
if not output_dir:
logger.error(f"[Worker {worker_id}] Direct download mode requires 'direct_download_cli_policy.output_dir'. Worker exiting.")
return []
os.makedirs(output_dir, exist_ok=True)
no_task_streak = 0
while not state_manager.shutdown_event.is_set():
locked_profile = None
claimed_task_path = None
try:
# 0. If no tasks were found, pause briefly.
if no_task_streak > 0:
polling_interval = exec_control.get('worker_polling_interval_seconds', 1)
# --- Add diagnostic logging ---
all_profiles_in_pool = profile_manager_instance.list_profiles()
profiles_in_prefix = [p for p in all_profiles_in_pool if p['name'].startswith(profile_prefix)]
if profiles_in_prefix:
state_counts = collections.Counter(p['state'] for p in profiles_in_prefix)
states_summary = ', '.join(f"{count} {state}" for state, count in sorted(state_counts.items()))
logger.info(f"[Worker {worker_id}] No tasks found for available profiles. Pool status ({profile_prefix}*): {states_summary}. Pausing for {polling_interval}s. (Streak: {no_task_streak})")
else:
logger.info(f"[Worker {worker_id}] No tasks found for available profiles. No profiles found with prefix '{profile_prefix}'. Pausing for {polling_interval}s. (Streak: {no_task_streak})")
# --- End diagnostic logging ---
time.sleep(polling_interval)
if state_manager.shutdown_event.is_set(): continue
# 1. Find a task and lock its associated profile
locked_profile, claimed_task_path = find_task_and_lock_profile(
profile_manager_instance, owner_id, profile_prefix, policy, worker_id
)
if not locked_profile:
no_task_streak += 1
# The main loop will pause if the streak continues.
continue
profile_name = locked_profile['name']
# We have a task and a lock.
if claimed_task_path:
no_task_streak = 0 # Reset streak
auth_profile_name, auth_env = None, None
# --- Read metadata before processing/deleting file ---
try:
with open(claimed_task_path, 'r', encoding='utf-8') as f:
info_data = json.load(f)
metadata = info_data.get('_ytops_metadata', {})
auth_profile_name = metadata.get('profile_name')
auth_env = metadata.get('auth_env')
except (IOError, json.JSONDecodeError) as e:
logger.error(f"Could not read info.json to get auth profile for decrementing counter: {e}")
# 3. Construct and run the command
ytdlp_cmd_str = direct_policy.get('ytdlp_command')
if not ytdlp_cmd_str:
logger.error(f"[Worker {worker_id}] Direct download mode requires 'direct_download_cli_policy.ytdlp_command'.")
break
proxy_url = locked_profile['proxy']
proxy_rename = direct_policy.get('proxy_rename')
if proxy_rename:
rename_rule = proxy_rename.strip("'\"")
if rename_rule.startswith('s/') and rename_rule.count('/') >= 2:
try:
parts = rename_rule.split('/')
proxy_url = re.sub(parts[1], parts[2], proxy_url)
except (re.error, IndexError):
logger.error(f"[Worker {worker_id}] Invalid proxy_rename rule: {proxy_rename}")
output_template = os.path.join(output_dir, '%(title)s - %(id)s.%(ext)s')
cmd = shlex.split(ytdlp_cmd_str)
cmd.extend(['--load-info-json', str(claimed_task_path)])
cmd.extend(['--proxy', proxy_url])
cmd.extend(['-o', output_template])
ytdlp_args = direct_policy.get('ytdlp_args')
if ytdlp_args:
cmd.extend(shlex.split(ytdlp_args))
if args.verbose and '--verbose' not in cmd:
cmd.append('--verbose')
custom_env = direct_policy.get('env_vars', {}).copy()
# --- PYTHONPATH for custom yt-dlp module ---
ytdlp_module_path = direct_policy.get('ytdlp_module_path')
if ytdlp_module_path:
existing_pythonpath = custom_env.get('PYTHONPATH', os.environ.get('PYTHONPATH', ''))
custom_env['PYTHONPATH'] = f"{ytdlp_module_path}{os.pathsep}{existing_pythonpath}".strip(os.pathsep)
logger.debug(f"[Worker {worker_id}] Using custom PYTHONPATH: {custom_env['PYTHONPATH']}")
# Pass profile info to the custom yt-dlp process
custom_env['YTDLP_PROFILE_NAME'] = profile_name
custom_env['YTDLP_PROXY_URL'] = locked_profile['proxy'] # Original proxy
env_name = profile_manager_instance.key_prefix.replace('_profile_mgmt_', '')
custom_env['YTDLP_SIM_MODE'] = env_name
# Create a per-profile cache directory and set XDG_CACHE_HOME
cache_dir_base = direct_policy.get('cache_dir_base', '.cache')
profile_cache_dir = os.path.join(cache_dir_base, profile_name)
try:
os.makedirs(profile_cache_dir, exist_ok=True)
custom_env['XDG_CACHE_HOME'] = profile_cache_dir
except OSError as e:
logger.error(f"[Worker {worker_id}] Failed to create cache directory '{profile_cache_dir}': {e}")
logger.info(f"[Worker {worker_id}] [{profile_name}] Processing task '{claimed_task_path.name}'...")
if args.dummy or args.dummy_batch:
logger.info(f"========== [Worker {worker_id}] BEGIN DUMMY DIRECT DOWNLOAD ==========")
logger.info(f"[Worker {worker_id}] Profile: {profile_name} | Task: {claimed_task_path.name}")
logger.info(f"[Worker {worker_id}] Would run command: {' '.join(shlex.quote(s) for s in cmd)}")
logger.info(f"[Worker {worker_id}] With environment: {custom_env}")
dummy_settings = policy.get('settings', {}).get('dummy_simulation_settings', {})
min_seconds = dummy_settings.get('download_min_seconds', 0.5)
max_seconds = dummy_settings.get('download_max_seconds', 1.5)
failure_rate = dummy_settings.get('download_failure_rate', 0.0)
skipped_rate = dummy_settings.get('download_skipped_failure_rate', 0.0)
time.sleep(random.uniform(min_seconds, max_seconds))
rand_val = random.random()
should_fail_skipped = rand_val < skipped_rate
should_fail_fatal = not should_fail_skipped and rand_val < (skipped_rate + failure_rate)
if should_fail_skipped:
logger.warning(f"[Worker {worker_id}] DUMMY: Simulating skipped download failure.")
retcode = 0
stderr = "Dummy skipped failure"
elif should_fail_fatal:
logger.warning(f"[Worker {worker_id}] DUMMY: Simulating fatal download failure.")
retcode = 1
stderr = "Dummy fatal failure"
else:
logger.info(f"[Worker {worker_id}] DUMMY: Simulating download success.")
retcode = 0
stderr = ""
logger.info(f"========== [Worker {worker_id}] END DUMMY DIRECT DOWNLOAD ==========")
else:
logger.info(f"[Worker {worker_id}] [{profile_name}] Running command: {' '.join(shlex.quote(s) for s in cmd)}")
logger.info(f"[Worker {worker_id}] [{profile_name}] With environment: {custom_env}")
retcode, stdout, stderr = run_command(
cmd, running_processes, process_lock, env=custom_env, stream_output=args.verbose,
stream_prefix=f"[Worker {worker_id} | yt-dlp] "
)
# 4. Record activity
success = (retcode == 0)
activity_type = 'download' if success else 'download_error'
logger.info(f"[Worker {worker_id}] Recording '{activity_type}' for profile '{profile_name}'.")
profile_manager_instance.record_activity(profile_name, activity_type)
event_details = f"Download finished. Exit code: {retcode}."
if not success and stderr:
event_details += f" Stderr: {stderr.strip().splitlines()[-1]}"
event = {'type': 'direct_download', 'profile': profile_name, 'proxy_url': proxy_url, 'success': success, 'details': event_details}
state_manager.log_event(event)
# 5. Clean up the processed task file
try:
os.remove(claimed_task_path)
logger.debug(f"[{sp_utils.get_display_name(claimed_task_path)}] Removed processed task file.")
except OSError as e:
logger.error(f"Failed to remove processed task file '{claimed_task_path}': {e}")
else:
no_task_streak += 1
logger.info(f"[Worker {worker_id}] No tasks found for profile '{profile_name}'.")
except Exception as e:
logger.error(f"[Worker {worker_id}] An unexpected error occurred in the worker loop: {e}", exc_info=True)
if locked_profile:
profile_manager_instance.record_activity(locked_profile['name'], 'failure') # Generic failure
time.sleep(5)
finally:
if locked_profile:
if claimed_task_path:
# The auth_profile_name and auth_env variables were populated in the `try` block
# before the task file was deleted.
if auth_profile_name and auth_env:
auth_manager = _get_auth_manager(profile_manager_instance, auth_env)
if auth_manager:
auth_manager.decrement_pending_downloads(auth_profile_name)
else:
logger.error(f"Could not get auth profile manager for env '{auth_env}'. Pending downloads counter will not be decremented.")
else:
logger.warning(f"Could not find auth profile name and/or auth_env in info.json metadata. Pending downloads counter will not be decremented. (Profile: {auth_profile_name}, Env: {auth_env})")
cooldown = None
if claimed_task_path:
# Enforcer is the only point where we configure to apply different policies,
# since we might restart enforcer, but won't restart stress-policy working on auth and downloads simultaneously.
# This is like applying a policy across multiple workers/machines without needing to restart each of them.
# DESIGN: The cooldown duration is not configured in the worker's policy.
# Instead, it is read from a central Redis key. This key is set by the
# policy-enforcer, making the enforcer the single source of truth for
# this policy. This allows changing the cooldown behavior without
# restarting the workers.
cooldown_config = profile_manager_instance.get_config('unlock_cooldown_seconds')
if cooldown_config:
try:
val = json.loads(cooldown_config)
if isinstance(val, list) and len(val) == 2 and val[0] < val[1]:
cooldown = random.randint(val[0], val[1])
elif isinstance(val, int):
cooldown = val
except (json.JSONDecodeError, TypeError):
if cooldown_config.isdigit():
cooldown = int(cooldown_config)
if cooldown:
logger.info(f"[Worker {worker_id}] Putting profile '{locked_profile['name']}' into COOLDOWN for {cooldown}s.")
profile_manager_instance.unlock_profile(
locked_profile['name'],
owner=owner_id,
rest_for_seconds=cooldown
)
locked_profile = None
logger.info(f"[Worker {worker_id}] Worker loop finished.")
return []