3044 lines
171 KiB
Python

import collections
import json
import logging
import os
import random
import re
import shlex
import sys
import tempfile
import shutil
import subprocess
import threading
import time
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from . import utils as sp_utils
from .process_runners import run_command, run_docker_container, get_worker_id
from ..profile_manager_tool import ProfileManager
logger = logging.getLogger(__name__)
# --- Auth Profile Manager Cache ---
# This is a cache to hold a ProfileManager instance for the auth simulation.
# It's needed so that download workers can decrement the correct pending download counter.
_auth_manager_cache = {}
_auth_manager_lock = threading.Lock()
def get_auth_manager(current_manager, auth_env: str):
"""
Gets a ProfileManager instance for a specific auth simulation environment.
It uses the auth_env provided from the info.json metadata.
"""
with _auth_manager_lock:
if not auth_env:
return None
if auth_env in _auth_manager_cache:
return _auth_manager_cache[auth_env]
logger.info(f"Creating new ProfileManager for auth simulation env: '{auth_env}'")
try:
# Re-use connection settings from the current manager
redis_conn_kwargs = current_manager.redis.connection_pool.connection_kwargs
auth_key_prefix = f"{auth_env}_profile_mgmt_"
auth_manager = ProfileManager(
redis_host=redis_conn_kwargs.get('host'),
redis_port=redis_conn_kwargs.get('port'),
redis_password=redis_conn_kwargs.get('password'),
key_prefix=auth_key_prefix
)
_auth_manager_cache[auth_env] = auth_manager
return auth_manager
except Exception as e:
logger.error(f"Failed to create ProfileManager for auth env '{auth_env}': {e}")
return None
def _run_ffprobe(media_path, output_path):
"""Runs ffprobe on the media file and saves the JSON output."""
try:
ffprobe_cmd = [
'ffprobe',
'-v', 'quiet',
'-print_format', 'json',
'-show_format',
'-show_streams',
str(media_path)
]
result = subprocess.run(ffprobe_cmd, check=True, capture_output=True, text=True, encoding='utf-8')
with open(output_path, 'w', encoding='utf-8') as f:
f.write(result.stdout)
logger.info(f"Successfully generated ffprobe JSON for '{media_path.name}' at '{output_path}'")
return True
except FileNotFoundError:
logger.error("ffprobe command not found. Please ensure ffprobe is installed and in your PATH.")
return False
except subprocess.CalledProcessError as e:
logger.error(f"ffprobe failed for '{media_path.name}': {e.stderr}")
return False
except Exception as e:
logger.error(f"An error occurred while running ffprobe for '{media_path.name}': {e}")
return False
def _cleanup_media_file(media_path):
"""Replaces the media file with an empty file ending in .empty."""
try:
empty_path = Path(str(media_path) + '.empty')
empty_path.touch()
os.remove(media_path)
logger.info(f"Cleaned up media file '{media_path.name}', replaced with '{empty_path.name}'")
except Exception as e:
logger.error(f"Failed to cleanup media file '{media_path.name}': {e}")
def _post_process_and_move_info_json(file_path, profile_name, proxy_url, policy, worker_id, state_manager, profile_manager_instance=None):
"""Helper to post-process a single info.json file and move it to the final directory."""
direct_policy = policy.get('direct_docker_cli_policy', {})
settings = policy.get('settings', {})
save_dir = settings.get('save_info_json_dir')
if not save_dir:
return False
video_id = "unknown"
try:
# Use a short delay and retry mechanism to handle cases where the file is not yet fully written.
for attempt in range(3):
try:
with open(file_path, 'r+', encoding='utf-8') as f:
info_data = json.load(f)
video_id = info_data.get('id', 'unknown')
env_name = profile_manager_instance.key_prefix.replace('_profile_mgmt_', '') if profile_manager_instance else 'unknown'
info_data['_ytops_metadata'] = {
'profile_name': profile_name,
'proxy_url': proxy_url,
'generation_timestamp_utc': datetime.now(timezone.utc).isoformat(),
'auth_env': env_name
}
f.seek(0)
json.dump(info_data, f, indent=2)
f.truncate()
break # Success
except (json.JSONDecodeError, IOError) as e:
if attempt < 2:
time.sleep(0.2)
else:
raise e
final_path = Path(save_dir) / file_path.name
rename_template = direct_policy.get('rename_file_template')
if rename_template:
sanitized_proxy = re.sub(r'[:/]', '_', proxy_url)
new_name = rename_template.format(
video_id=video_id, profile_name=profile_name, proxy=sanitized_proxy
)
final_path = Path(save_dir) / new_name
# Use shutil.move, which can handle cross-device moves (e.g., to an S3 mount)
# by falling back to a copy-and-delete operation.
shutil.move(str(file_path), str(final_path))
logger.info(f"[Worker {worker_id}] Post-processed and moved info.json to '{final_path}'")
# --- Create download task if in queue mode ---
queue_policy = policy.get('queue_policy', {})
formats_to_download = queue_policy.get('formats_to_download')
if formats_to_download and state_manager and state_manager.queue_provider:
try:
url = info_data.get('original_url') or info_data.get('webpage_url')
download_task = {
'info_json_path': str(final_path),
'video_id': video_id,
'url': url,
'auth_profile_name': profile_name,
'proxy_url': proxy_url,
'auth_env': env_name,
}
added_count = state_manager.add_download_tasks_batch([download_task])
if added_count > 0:
logger.info(f"[Worker {worker_id}] [{profile_name}] Added {added_count} download task(s) to queue for {video_id}")
else:
logger.error(f"[Worker {worker_id}] [{profile_name}] Failed to add download task to queue for {video_id}")
return False
except Exception as e:
logger.error(f"[Worker {worker_id}] [{profile_name}] Failed to create download task for {video_id}: {e}", exc_info=True)
return False
return True
except (IOError, json.JSONDecodeError, OSError) as e:
logger.error(f"[Worker {worker_id}] Error post-processing '{file_path.name}' (video: {video_id}): {e}")
return False
def find_task_and_lock_profile(profile_manager, owner_id, profile_prefix, policy, worker_id):
"""
Scans for an available task and locks the specific ACTIVE profile that generated it.
This preserves a 1-to-1 relationship between a profile and its tasks.
Returns a tuple of (locked_profile_dict, claimed_task_path_obj) or (None, None).
"""
settings = policy.get('settings', {})
info_json_dir = settings.get('info_json_dir')
if not info_json_dir:
return None, None
logger.info(f"[Worker {worker_id}] Scanning for tasks in '{info_json_dir}'...")
# 1. Get all available task files and group by profile
try:
task_files = list(Path(info_json_dir).rglob('*.json'))
except FileNotFoundError:
logger.warning(f"Info JSON directory not found during scan: {info_json_dir}")
return None, None
if not task_files:
return None, None
profile_regex_str = settings.get('profile_extraction_regex')
if not profile_regex_str:
logger.error(f"[Worker {worker_id}] The task-locking strategy requires 'settings.profile_extraction_regex' to be defined in the policy.")
return None, None
try:
profile_regex = re.compile(profile_regex_str)
except re.error as e:
logger.error(f"Invalid profile_extraction_regex in policy: '{profile_regex_str}'. Error: {e}")
return None, None
tasks_by_profile = collections.defaultdict(list)
for task_path in task_files:
match = profile_regex.search(task_path.name)
if match and match.groups():
profile_name = match.group(1)
tasks_by_profile[profile_name].append(task_path)
if not tasks_by_profile:
logger.debug(f"[Worker {worker_id}] Found task files, but could not extract any profile names from them.")
return None, None
# 2. Get ACTIVE profiles from Redis.
active_profiles = profile_manager.list_profiles(state_filter='ACTIVE')
active_profile_names = {p['name'] for p in active_profiles if p['name'].startswith(profile_prefix or '')}
# 3. Find profiles that are both ACTIVE and have tasks.
candidate_profiles = list(active_profile_names.intersection(tasks_by_profile.keys()))
if not candidate_profiles:
logger.debug(f"[Worker {worker_id}] Found tasks for profiles: {list(tasks_by_profile.keys())}, but none are currently ACTIVE.")
return None, None
# 4. Shuffle candidates and try to lock one.
random.shuffle(candidate_profiles)
for profile_name in candidate_profiles:
# Try to lock the profile.
# The owner_id already contains the worker_id, so the log message from profile_manager will be informative enough.
locked_profile = profile_manager.lock_profile(owner=owner_id, specific_profile_name=profile_name)
if locked_profile:
# Success! Claim one of its task files.
# Shuffle the tasks for this profile to avoid workers grabbing the same one.
profile_tasks = tasks_by_profile[profile_name]
random.shuffle(profile_tasks)
for task_path in profile_tasks:
locked_path = task_path.with_suffix(f"{task_path.suffix}.LOCKED.{worker_id}")
try:
task_path.rename(locked_path)
logger.info(f"[Worker {worker_id}] Locked profile '{profile_name}' and claimed its task '{task_path.name}'.")
return locked_profile, locked_path
except FileNotFoundError:
# This specific task was claimed by another worker between our scan and now.
# This is rare but possible if two workers lock the same profile and try to claim tasks.
# Let's just try the next task for this profile.
logger.warning(f"[Worker {worker_id}] Task '{task_path.name}' was claimed by another worker. Trying another task for '{profile_name}'.")
continue
except OSError as e:
logger.error(f"[Worker {worker_id}] Error claiming task file '{task_path.name}': {e}")
# Something is wrong with this file, try the next one.
continue
# If we are here, we locked a profile but failed to claim any of its tasks.
# This is a weird state. We should unlock and move on.
logger.warning(f"[Worker {worker_id}] Locked profile '{profile_name}' but failed to claim any of its tasks. Unlocking.")
profile_manager.unlock_profile(profile_name, owner=owner_id)
# No suitable task/profile combo found.
logger.debug(f"[Worker {worker_id}] Could not lock any of the {len(candidate_profiles)} candidate profiles (all may have been locked by other workers).")
return None, None
def _run_download_logic(source, info_json_content, policy, state_manager, args, running_processes, process_lock, profile_name=None, profile_manager_instance=None):
"""Shared download logic for a single info.json."""
proxy_url = None
if info_json_content:
try:
info_data = json.loads(info_json_content)
proxy_url = info_data.get('_proxy_url')
except (json.JSONDecodeError, AttributeError):
logger.warning(f"[{sp_utils.get_display_name(source)}] Could not parse info.json to get proxy for download controls.")
d_policy = policy.get('download_policy', {})
temp_download_dir = None
local_policy = policy
if d_policy.get('output_to_airflow_ready_dir'):
local_policy = deepcopy(policy)
temp_download_dir = tempfile.mkdtemp(prefix='stress-dl-video-')
# This modification is safe because it's on a deep copy
local_policy.setdefault('download_policy', {})['output_dir'] = temp_download_dir
logger.info(f"[{sp_utils.get_display_name(source)}] Using temporary download directory: {temp_download_dir}")
try:
if not state_manager.check_and_update_download_rate_limit(proxy_url, local_policy):
return []
state_manager.wait_for_proxy_cooldown(proxy_url, local_policy)
results = process_info_json_cycle(source, info_json_content, local_policy, state_manager, args, running_processes, process_lock, proxy_url=proxy_url, profile_name=profile_name,
profile_manager_instance=profile_manager_instance)
state_manager.update_proxy_finish_time(proxy_url)
# --- Post-download logic for ffprobe, Airflow dir, and cleanup ---
for result in results:
if result.get('success') and result.get('downloaded_filepath'):
downloaded_filepath_str = result.get('downloaded_filepath')
if not downloaded_filepath_str or not os.path.exists(downloaded_filepath_str):
continue
media_path = Path(downloaded_filepath_str)
# 1. Run ffprobe on the file at its current location
if d_policy.get('run_ffprobe'):
ffprobe_output_path = None
# The source is the path to the task/info.json file.
if isinstance(source, Path):
# Name it like the info.json but with .ffprobe.json
ffprobe_output_path = source.with_name(f"{source.stem}.ffprobe.json")
else:
# Fallback if source is not a path (e.g. from memory)
video_id = result.get('video_id', 'unknown_video')
# Save it next to the media file
ffprobe_output_path = media_path.with_name(f"{video_id}.ffprobe.json")
_run_ffprobe(media_path, ffprobe_output_path)
# 2. Move to Airflow directory if configured. This updates media_path.
if d_policy.get('output_to_airflow_ready_dir'):
try:
video_id = result.get('video_id')
if not video_id:
try:
info_data = json.loads(info_json_content)
video_id = info_data.get('id')
except (json.JSONDecodeError, AttributeError):
video_id = None
if not video_id:
logger.error(f"[{sp_utils.get_display_name(source)}] Could not find video ID in result for moving file.")
else:
now = datetime.now()
rounded_minute = (now.minute // 10) * 10
timestamp_str = now.strftime('%Y%m%dT%H') + f"{rounded_minute:02d}"
base_path = d_policy.get('airflow_ready_dir_base_path', 'downloadfiles/videos/ready')
if not os.path.isabs(base_path):
base_path = os.path.join(sp_utils._PROJECT_ROOT, base_path)
final_dir_base = os.path.join(base_path, timestamp_str)
final_dir_path = os.path.join(final_dir_base, video_id)
os.makedirs(final_dir_path, exist_ok=True)
if os.path.exists(media_path):
final_media_path = Path(final_dir_path) / media_path.name
shutil.move(str(media_path), str(final_media_path))
logger.info(f"[{sp_utils.get_display_name(source)}] Moved media file to {final_media_path}")
media_path = final_media_path
if isinstance(source, Path) and source.exists():
new_info_json_name = f"info_{video_id}.json"
dest_info_json_path = os.path.join(final_dir_path, new_info_json_name)
shutil.copy(source, dest_info_json_path)
logger.info(f"[{sp_utils.get_display_name(source)}] Copied info.json to {dest_info_json_path}")
except Exception as e:
logger.error(f"[{sp_utils.get_display_name(source)}] Failed to move downloaded file to Airflow ready directory: {e}")
# 3. Cleanup the file at its final location
if d_policy.get('cleanup'):
if os.path.exists(media_path):
_cleanup_media_file(media_path)
return results
finally:
if temp_download_dir and os.path.exists(temp_download_dir):
shutil.rmtree(temp_download_dir)
logger.info(f"[{sp_utils.get_display_name(source)}] Cleaned up temporary directory: {temp_download_dir}")
def process_profile_task(profile_name, file_list, policy, state_manager, cycle_num, args, running_processes, process_lock, profile_manager_instance=None):
"""Worker task for a profile, processing its files sequentially."""
logger.info(f"Worker {get_worker_id()} starting task for profile '{profile_name}' with {len(file_list)} files.")
all_results = []
for i, file_path in enumerate(file_list):
if state_manager.shutdown_event.is_set():
logger.info(f"Shutdown requested, stopping task for profile '{profile_name}'.")
break
try:
with open(file_path, 'r', encoding='utf-8') as f:
info_json_content = f.read()
except (IOError, FileNotFoundError) as e:
logger.error(f"[{sp_utils.get_display_name(file_path)}] Could not read info.json file: {e}")
continue # Skip this file
results_for_file = _run_download_logic(file_path, info_json_content, policy, state_manager, args, running_processes, process_lock, profile_name=profile_name, profile_manager_instance=profile_manager_instance)
all_results.extend(results_for_file)
# Mark file as processed if configured. This works for both 'once' and 'continuous' modes.
settings = policy.get('settings', {})
if settings.get('directory_scan_mode') == 'continuous':
state_manager.mark_file_as_processed(file_path)
if settings.get('mark_processed_files'):
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
new_path = file_path.parent / f"{file_path.name}.{timestamp}.processed"
file_path.rename(new_path)
logger.info(f"Marked '{file_path.name}' as processed by renaming to '{new_path.name}'")
except (IOError, OSError) as e:
logger.error(f"Failed to rename processed file '{file_path.name}': {e}")
# Check for stop conditions after processing each file
should_stop_profile = False
for result in results_for_file:
if not result['success']:
s_conditions = policy.get('stop_conditions', {})
if s_conditions.get('on_failure') or \
(s_conditions.get('on_http_403') and result['error_type'] == 'HTTP 403') or \
(s_conditions.get('on_timeout') and result['error_type'] == 'Timeout'):
logger.info(f"Stopping further processing for profile '{profile_name}' due to failure.")
should_stop_profile = True
break
if should_stop_profile:
break
# Apply sleep between tasks for this profile
if i < len(file_list) - 1:
exec_control = policy.get('execution_control', {})
sleep_cfg = exec_control.get('sleep_between_tasks', {})
sleep_min = sleep_cfg.get('min_seconds', 0)
sleep_max_val = sleep_cfg.get('max_seconds')
if sleep_min > 0 or sleep_max_val is not None:
sleep_max = sleep_min if sleep_max_val is None else sleep_max_val
sleep_duration = 0
if sleep_max < sleep_min:
logger.warning(f"sleep_between_tasks: max_seconds ({sleep_max}s) is less than min_seconds ({sleep_min}s). Using max_seconds as fixed sleep duration.")
sleep_duration = sleep_max
elif sleep_max > sleep_min:
sleep_duration = random.uniform(sleep_min, sleep_max)
else: # equal
sleep_duration = sleep_min
if sleep_duration > 0:
logger.debug(f"Profile '{profile_name}' sleeping for {sleep_duration:.2f}s before next file.")
# Interruptible sleep
sleep_end_time = time.time() + sleep_duration
while time.time() < sleep_end_time:
if state_manager.shutdown_event.is_set():
break
time.sleep(0.2)
return all_results
def run_download_worker(info_json_path, info_json_content, format_to_download, policy, args, running_processes, process_lock, state_manager, profile_name=None):
"""
Performs a single download attempt. Designed to be run in a worker thread.
"""
worker_id = get_worker_id()
display_name = sp_utils.get_display_name(info_json_path)
profile_log_part = f" [Profile: {profile_name}]" if profile_name else ""
log_prefix = f"[Worker {worker_id}]{profile_log_part} [{display_name} @ {format_to_download}]"
download_policy = policy.get('download_policy', {})
settings = policy.get('settings', {})
downloader = download_policy.get('downloader')
# Get script command from settings, with fallback to download_policy for old format.
script_cmd_str = settings.get('download_script')
if not script_cmd_str:
script_cmd_str = download_policy.get('script')
if script_cmd_str:
download_cmd = shlex.split(script_cmd_str)
elif downloader == 'aria2c_rpc':
download_cmd = [sys.executable, '-m', 'ytops_client.cli', 'download', 'aria-rpc']
elif downloader == 'native-cli':
download_cmd = [sys.executable, '-m', 'ytops_client.cli', 'download', 'cli']
else:
# Default to the new native-py downloader if downloader is 'native-py' or not specified.
download_cmd = [sys.executable, '-m', 'ytops_client.cli', 'download', 'py']
download_cmd.extend(['-f', format_to_download])
if downloader == 'aria2c_rpc':
if download_policy.get('aria_host'):
download_cmd.extend(['--aria-host', str(download_policy['aria_host'])])
if download_policy.get('aria_port'):
download_cmd.extend(['--aria-port', str(download_policy['aria_port'])])
if download_policy.get('aria_secret'):
download_cmd.extend(['--aria-secret', str(download_policy['aria_secret'])])
if download_policy.get('output_dir'):
download_cmd.extend(['--output-dir', str(download_policy['output_dir'])])
if download_policy.get('aria_remote_dir'):
download_cmd.extend(['--remote-dir', str(download_policy['aria_remote_dir'])])
if download_policy.get('aria_fragments_dir'):
download_cmd.extend(['--fragments-dir', str(download_policy['aria_fragments_dir'])])
# For stress testing, waiting is the desired default to get a success/fail result.
# Allow disabling it by explicitly setting aria_wait: false in the policy.
if download_policy.get('aria_wait', True):
download_cmd.append('--wait')
if download_policy.get('auto_merge_fragments'):
download_cmd.append('--auto-merge-fragments')
if download_policy.get('remove_fragments_after_merge'):
download_cmd.append('--remove-fragments-after-merge')
if download_policy.get('purge_on_complete'):
download_cmd.append('--purge-on-complete')
downloader_args = download_policy.get('downloader_args')
proxy = download_policy.get('proxy')
if proxy:
# Note: proxy_rename is not supported for aria2c_rpc mode.
proxy_arg = f"--all-proxy {shlex.quote(str(proxy))}"
if downloader_args:
downloader_args = f"{downloader_args} {proxy_arg}"
else:
downloader_args = proxy_arg
if downloader_args:
# For aria2c_rpc, the downloader_args value is passed directly to the script's --downloader-args option.
download_cmd.extend(['--downloader-args', downloader_args])
elif downloader == 'native-cli':
# This is the logic for the legacy download_tool.py (yt-dlp CLI wrapper).
pause_seconds = download_policy.get('pause_before_download_seconds')
if pause_seconds and isinstance(pause_seconds, (int, float)) and pause_seconds > 0:
download_cmd.extend(['--pause', str(pause_seconds)])
if download_policy.get('continue_downloads'):
download_cmd.append('--download-continue')
# Add proxy if specified directly in the policy
proxy = download_policy.get('proxy')
if proxy:
download_cmd.extend(['--proxy', str(proxy)])
proxy_rename = download_policy.get('proxy_rename')
if proxy_rename:
download_cmd.extend(['--proxy-rename', str(proxy_rename)])
extra_args = download_policy.get('extra_args')
if extra_args:
download_cmd.extend(shlex.split(extra_args))
# Note: 'downloader' here refers to yt-dlp's internal downloader, not our script.
# The policy key 'external_downloader' is more clear, but we support 'downloader' for backward compatibility.
ext_downloader = download_policy.get('external_downloader') or download_policy.get('downloader')
if ext_downloader and ext_downloader not in ['native-cli', 'native-py', 'aria2c_rpc']:
download_cmd.extend(['--downloader', str(ext_downloader)])
downloader_args = download_policy.get('downloader_args')
if downloader_args:
download_cmd.extend(['--downloader-args', str(downloader_args)])
if download_policy.get('merge_output_format'):
download_cmd.extend(['--merge-output-format', str(download_policy['merge_output_format'])])
if download_policy.get('merge_output_format'):
download_cmd.extend(['--merge-output-format', str(download_policy['merge_output_format'])])
else:
# This is the default logic for the new native-py downloader.
if download_policy.get('output_to_buffer'):
download_cmd.append('--output-buffer')
else:
# --output-dir is only relevant if not outputting to buffer.
if download_policy.get('output_dir'):
download_cmd.extend(['--output-dir', str(download_policy['output_dir'])])
if download_policy.get('config'):
download_cmd.extend(['--config', str(download_policy['config'])])
if download_policy.get('temp_path'):
download_cmd.extend(['--temp-path', str(download_policy['temp_path'])])
if download_policy.get('continue_downloads'):
download_cmd.append('--download-continue')
pause_seconds = download_policy.get('pause_before_download_seconds')
if pause_seconds and isinstance(pause_seconds, (int, float)) and pause_seconds > 0:
download_cmd.extend(['--pause', str(pause_seconds)])
proxy = download_policy.get('proxy')
if proxy:
download_cmd.extend(['--proxy', str(proxy)])
proxy_rename = download_policy.get('proxy_rename')
if proxy_rename:
download_cmd.extend(['--proxy-rename', str(proxy_rename)])
# The 'extra_args' from the policy are for the download script itself, not for yt-dlp.
# We need to split them and add them to the command.
extra_args = download_policy.get('extra_args')
if extra_args:
download_cmd.extend(shlex.split(extra_args))
# Pass through downloader settings for yt-dlp to use
# e.g. to tell yt-dlp to use aria2c as its backend
ext_downloader = download_policy.get('external_downloader')
if ext_downloader:
download_cmd.extend(['--downloader', str(ext_downloader)])
downloader_args = download_policy.get('downloader_args')
if downloader_args:
download_cmd.extend(['--downloader-args', str(downloader_args)])
if args.dummy:
# Create a copy to add the info.json path for logging, without modifying the original
log_cmd = list(download_cmd)
if isinstance(info_json_path, Path) and info_json_path.exists():
log_cmd.extend(['--load-info-json', str(info_json_path)])
else:
log_cmd.extend(['--load-info-json', '<from_memory>'])
logger.info(f"{log_prefix} Dummy mode: simulating download...")
logger.info(f"{log_prefix} Dummy mode: Would run command: {' '.join(shlex.quote(s) for s in log_cmd)}")
dummy_settings = policy.get('settings', {}).get('dummy_simulation_settings', {})
min_seconds = dummy_settings.get('download_min_seconds', 1.0)
max_seconds = dummy_settings.get('download_max_seconds', 3.0)
failure_rate = dummy_settings.get('download_failure_rate', 0.0)
skipped_rate = dummy_settings.get('download_skipped_failure_rate', 0.0)
sleep_duration = random.uniform(min_seconds, max_seconds)
logger.info(f"{log_prefix} Dummy mode: simulating download for {sleep_duration:.2f}s (from policy range {min_seconds}-{max_seconds}s).")
time.sleep(sleep_duration) # Simulate work
rand_val = random.random()
should_fail_skipped = rand_val < skipped_rate
should_fail_fatal = not should_fail_skipped and rand_val < (skipped_rate + failure_rate)
if should_fail_skipped:
logger.warning(f"{log_prefix} Dummy mode: Injecting simulated skipped download failure.")
return {
'type': 'download',
'path': str(info_json_path),
'format': format_to_download,
'success': False,
'error_type': 'DummySkippedFailure',
'details': 'FAIL (Dummy mode, skipped)',
'downloaded_bytes': 0,
'profile': profile_name,
'downloaded_filepath': None,
'is_tolerated_error': True
}
if should_fail_fatal:
logger.warning(f"{log_prefix} Dummy mode: Injecting simulated fatal download failure.")
return {
'type': 'download',
'path': str(info_json_path),
'format': format_to_download,
'success': False,
'error_type': 'DummyFailure',
'details': 'FAIL (Dummy mode, fatal)',
'downloaded_bytes': 0,
'profile': profile_name,
'downloaded_filepath': None
}
downloaded_filepath = f'/dev/null/{display_name}.mp4'
if download_policy.get('output_to_airflow_ready_dir'):
output_dir = download_policy.get('output_dir')
if output_dir and os.path.isdir(output_dir):
try:
dummy_path_obj = Path(output_dir) / f"{display_name}.mp4"
dummy_path_obj.touch()
downloaded_filepath = str(dummy_path_obj)
logger.info(f"{log_prefix} Dummy mode: created dummy file for Airflow move: {downloaded_filepath}")
except OSError as e:
logger.error(f"{log_prefix} Dummy mode: failed to create dummy file in '{output_dir}': {e}")
return {
'type': 'download',
'path': str(info_json_path),
'format': format_to_download,
'success': True,
'error_type': None,
'details': 'OK (Dummy mode)',
'downloaded_bytes': random.randint(100000, 5000000),
'profile': profile_name,
'downloaded_filepath': downloaded_filepath
}
logger.info(f"{log_prefix} Kicking off download process...")
temp_info_file_path = None
try:
if isinstance(info_json_path, Path) and info_json_path.exists():
# The info.json is already in a file, pass its path directly.
download_cmd.extend(['--load-info-json', str(info_json_path)])
else:
# The info.json content is in memory, so write it to a temporary file.
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json', encoding='utf-8') as temp_f:
temp_f.write(info_json_content)
temp_info_file_path = temp_f.name
download_cmd.extend(['--load-info-json', temp_info_file_path])
cmd_str_for_log = ' '.join(shlex.quote(s) for s in download_cmd)
logger.info(f"{log_prefix} Running download command: {cmd_str_for_log}")
output_to_buffer = download_policy.get('output_to_buffer', False)
retcode, stdout, stderr = run_command(
download_cmd,
running_processes,
process_lock,
binary_stdout=output_to_buffer,
stream_output=getattr(args, 'print_downloader_log', False),
stream_prefix=f"{log_prefix} | "
)
finally:
if temp_info_file_path and os.path.exists(temp_info_file_path):
os.unlink(temp_info_file_path)
is_403_error = "HTTP Error 403" in stderr
is_timeout_error = "Read timed out" in stderr
output_to_buffer = download_policy.get('output_to_buffer', False)
# Parse stdout to find the downloaded file path.
# The download scripts print the final path, sometimes with a prefix.
downloaded_filepath = None
if stdout and not output_to_buffer:
lines = stdout.strip().split('\n')
if lines:
last_line = lines[-1].strip()
# Handle aria-rpc output format: "Download... successful: <path>"
aria_match = re.search(r'successful: (.+)', last_line)
if aria_match:
path_from_aria = aria_match.group(1).strip()
if os.path.exists(path_from_aria):
downloaded_filepath = path_from_aria
else:
logger.warning(f"[{display_name}] Path from aria-rpc output does not exist: '{path_from_aria}'")
# Handle native-py/cli output format (just the path)
elif os.path.exists(last_line):
downloaded_filepath = last_line
result = {
'type': 'download',
'path': str(info_json_path),
'format': format_to_download,
'success': retcode == 0,
'error_type': None,
'details': '',
'downloaded_bytes': 0,
'profile': profile_name,
'downloaded_filepath': downloaded_filepath
}
if retcode == 0:
details_str = "OK"
size_in_bytes = 0
if output_to_buffer:
# The most accurate size is the length of the stdout buffer.
size_in_bytes = len(stdout) # stdout is bytes
details_str += f" (Buffered {sp_utils.format_size(size_in_bytes)})"
else:
size_match = re.search(r'\[download\]\s+100%\s+of\s+~?([0-9.]+)(B|KiB|MiB|GiB)', stderr)
if size_match:
value = float(size_match.group(1))
unit = size_match.group(2)
multipliers = {"B": 1, "KiB": 1024, "MiB": 1024**2, "GiB": 1024**3}
size_in_bytes = int(value * multipliers.get(unit, 1))
details_str += f" ({size_match.group(1)}{unit})"
result['downloaded_bytes'] = size_in_bytes
result['details'] = details_str
else:
# Check for shutdown first. This is the most likely cause for an abrupt non-zero exit.
if state_manager.shutdown_event.is_set():
result['error_type'] = 'Cancelled'
result['details'] = 'Task cancelled during shutdown.'
else:
# Check both stdout and stderr for error messages, as logging might be directed to stdout.
full_output = f"{stdout}\n{stderr}"
# Look for common error indicators from both yt-dlp and our scripts.
error_lines = [
line for line in full_output.strip().split('\n')
if 'ERROR:' in line or ' - ERROR - ' in line or 'DownloadError:' in line or 'Traceback' in line
]
if error_lines:
# Try to get the most specific part of the error.
details = error_lines[-1].strip()
# Our log format is "timestamp - logger - LEVEL - message"
if ' - ERROR - ' in details:
details = details.split(' - ERROR - ', 1)[-1]
result['details'] = details
else:
# Fallback to last non-empty line of stderr if no explicit "ERROR" line found
stderr_lines = [line for line in stderr.strip().split('\n') if line.strip()]
result['details'] = stderr_lines[-1].strip() if stderr_lines else "Unknown error"
if is_403_error:
result['error_type'] = 'HTTP 403'
elif is_timeout_error:
result['error_type'] = 'Timeout'
else:
result['error_type'] = f'Exit Code {retcode}'
return result
def process_info_json_cycle(path, content, policy, state_manager, args, running_processes, process_lock, proxy_url=None, profile_name=None, profile_manager_instance=None):
"""
Processes one info.json file for one cycle, downloading selected formats.
"""
results = []
display_name = sp_utils.get_display_name(path)
d_policy = policy.get('download_policy', {})
s_conditions = policy.get('stop_conditions', {})
try:
info_data = json.loads(content)
# If the task file specifies a format, use it instead of the policy's format list.
# This is for granular tasks generated by the task-generator tool.
if '_ytops_download_format' in info_data:
format_selection = info_data['_ytops_download_format']
logger.info(f"[{display_name}] Using format '{format_selection}' from task file.")
else:
format_selection = d_policy.get('formats', '')
available_formats = [f['format_id'] for f in info_data.get('formats', [])]
if not available_formats:
logger.warning(f"[{display_name}] No formats found in info.json. Skipping.")
return []
formats_to_test = []
if format_selection == 'all':
formats_to_test = available_formats
elif format_selection.startswith('random:'):
percent = float(format_selection.split(':')[1].rstrip('%'))
count = max(1, int(len(available_formats) * (percent / 100.0)))
formats_to_test = random.sample(available_formats, k=count)
elif format_selection.startswith('random_from:'):
choices = [f.strip() for f in format_selection.split(':', 1)[1].split(',')]
valid_choices = [f for f in choices if f in available_formats]
if valid_choices:
formats_to_test = [random.choice(valid_choices)]
else:
# If the format selection contains complex selector characters (other than comma),
# treat the entire string as a single format selector for yt-dlp to interpret.
# Otherwise, split by comma to test each specified format ID individually.
if any(c in format_selection for c in '/+[]()'):
requested_formats = [format_selection]
else:
requested_formats = [f.strip() for f in format_selection.split(',') if f.strip()]
formats_to_test = []
selector_keywords = ('best', 'worst', 'bestvideo', 'bestaudio')
for req_fmt in requested_formats:
# Treat as a selector and pass through if it contains special characters
# or starts with a known selector keyword.
if any(c in req_fmt for c in '/+[]()') or req_fmt.startswith(selector_keywords):
formats_to_test.append(req_fmt)
continue
# Otherwise, treat as a specific format ID that must exist.
# Check for exact match first.
if req_fmt in available_formats:
formats_to_test.append(req_fmt)
continue
# If no exact match, check for formats that start with this ID + '-' and then digits
# e.g., req_fmt '140' should match '140-0' but not '140-something'.
prefix_match_re = re.compile(rf'^{re.escape(req_fmt)}-\d+$')
first_match = next((af for af in available_formats if prefix_match_re.match(af)), None)
if first_match:
logger.info(f"[{display_name}] Requested format '{req_fmt}' not found. Using first available match: '{first_match}'.")
formats_to_test.append(first_match)
else:
logger.warning(f"[{display_name}] Requested format '{req_fmt}' not found in available formats and is not a recognized selector. Skipping this format.")
except json.JSONDecodeError:
logger.error(f"[{display_name}] Failed to parse info.json. Skipping.")
return []
for i, format_id in enumerate(formats_to_test):
if state_manager.shutdown_event.is_set():
logger.info(f"Shutdown requested, stopping further format tests for {display_name}.")
break
# Check if the format URL is expired before attempting to download
format_details = next((f for f in info_data.get('formats', []) if f.get('format_id') == format_id), None)
# If format_id is a complex selector, it won't be found directly. As a heuristic,
# check the expiration of the first format URL, as they typically share the same expiration.
if not format_details and any(c in format_id for c in '/+[]()'):
available_formats_list = info_data.get('formats', [])
if available_formats_list:
format_details = available_formats_list[0]
# The check is enabled by default and can be disabled via policy.
if d_policy.get('check_url_expiration', True) and format_details and 'url' in format_details:
url_to_check = format_details['url']
time_shift_minutes = d_policy.get('expire_time_shift_minutes', 0)
status, time_left_seconds = sp_utils.check_url_expiry(url_to_check, time_shift_minutes)
logger.debug(f"[{display_name}] URL expiration check for format '{format_id}': status={status}, time_left={time_left_seconds:.0f}s")
if status == 'expired':
details = "Download URL is expired"
if time_shift_minutes > 0 and time_left_seconds > 0:
logger.warning(f"[{display_name}] Skipping format '{format_id}' because its URL will expire in {time_left_seconds/60:.1f}m (within {time_shift_minutes}m time-shift).")
details = f"URL will expire within {time_shift_minutes}m time-shift"
else:
logger.warning(f"[{display_name}] Skipping format '{format_id}' because its URL is expired.")
result = {
'type': 'download', 'path': str(path), 'format': format_id,
'success': False, 'error_type': 'Skipped (Expired URL)',
'details': details, 'downloaded_bytes': 0, 'is_tolerated_error': True
}
if proxy_url:
result['proxy_url'] = proxy_url
if profile_manager_instance and profile_name:
profile_manager_instance.record_activity(profile_name, 'tolerated_error')
state_manager.log_event(result)
results.append(result)
continue # Move to the next format
elif status == 'no_expiry_info':
logger.debug(f"[{display_name}] No valid 'expire' parameter found in format URL for '{format_id}'. Skipping expiration check.")
result = run_download_worker(path, content, format_id, policy, args, running_processes, process_lock, state_manager, profile_name=profile_name)
if 'id' in info_data:
result['video_id'] = info_data['id']
if proxy_url:
result['proxy_url'] = proxy_url
# Record download attempt/error if a profile is being used.
if profile_manager_instance and profile_name:
if result.get('success'):
profile_manager_instance.record_activity(profile_name, 'download')
elif result.get('error_type') == 'Cancelled':
pass # Do not record cancellations
elif result.get('is_tolerated_error'):
profile_manager_instance.record_activity(profile_name, 'tolerated_error')
else:
profile_manager_instance.record_activity(profile_name, 'download_error')
state_manager.log_event(result)
results.append(result)
worker_id = get_worker_id()
status = "SUCCESS" if result['success'] else f"FAILURE ({result['error_type']})"
profile_log_part = f" [Profile: {profile_name}]" if profile_name else ""
logger.info(f"[Worker {worker_id}]{profile_log_part} Result for {display_name} (format {format_id}): {status} - {result.get('details', 'OK')}")
if not result['success']:
if s_conditions.get('on_failure') or \
(s_conditions.get('on_http_403') and result['error_type'] == 'HTTP 403') or \
(s_conditions.get('on_timeout') and result['error_type'] == 'Timeout'):
logger.info(f"Stopping further format tests for {display_name} in this cycle due to failure.")
break
sleep_cfg = d_policy.get('sleep_between_formats', {})
sleep_min = sleep_cfg.get('min_seconds', 0)
if sleep_min > 0 and i < len(formats_to_test) - 1:
sleep_max = sleep_cfg.get('max_seconds') or sleep_min
if sleep_max > sleep_min:
sleep_duration = random.uniform(sleep_min, sleep_max)
else:
sleep_duration = sleep_min
logger.debug(f"Sleeping for {sleep_duration:.2f}s between formats for {display_name}.")
# Interruptible sleep
sleep_end_time = time.time() + sleep_duration
while time.time() < sleep_end_time:
if state_manager.shutdown_event.is_set():
break
time.sleep(0.2)
return results
def run_throughput_worker(worker_id, policy, state_manager, args, profile_manager_instance, running_processes, process_lock, profile_prefix=None):
"""A persistent worker for the 'throughput' orchestration mode."""
owner_id = f"throughput-worker-{worker_id}"
settings = policy.get('settings', {})
exec_control = policy.get('execution_control', {})
# Prioritize the passed-in profile_prefix for worker pool compatibility.
if not profile_prefix:
d_policy = policy.get('download_policy', {})
profile_prefix = d_policy.get('profile_prefix')
if not profile_prefix:
logger.error(f"[Worker {worker_id}] Throughput mode requires a 'profile_prefix' from the worker pool or 'download_policy'. Worker exiting.")
return []
no_task_streak = 0
while not state_manager.shutdown_event.is_set():
locked_profile = None
claimed_task_path = None
try:
# 0. If no tasks were found previously, pause briefly.
if no_task_streak > 0:
polling_interval = exec_control.get('worker_polling_interval_seconds', 1)
logger.info(f"[Worker {worker_id}] No tasks found in previous attempt(s). Pausing for {polling_interval}s. (Streak: {no_task_streak})")
time.sleep(polling_interval)
if state_manager.shutdown_event.is_set(): continue
# 1. Find a task and lock its associated profile
locked_profile, claimed_task_path = find_task_and_lock_profile(
profile_manager_instance, owner_id, profile_prefix, policy, worker_id
)
if not locked_profile:
# No task/profile combo was available.
no_task_streak += 1
polling_interval = exec_control.get('worker_polling_interval_seconds', 1)
logger.info(f"[Worker {worker_id}] No available tasks found for any active profiles. Pausing for {polling_interval}s.")
time.sleep(polling_interval)
continue
profile_name = locked_profile['name']
# We have a task and a lock.
if claimed_task_path:
no_task_streak = 0 # Reset streak
# 3. Process the task
try:
with open(claimed_task_path, 'r', encoding='utf-8') as f:
info_json_content = f.read()
except (IOError, FileNotFoundError) as e:
logger.error(f"[{sp_utils.get_display_name(claimed_task_path)}] Could not read claimed task file: {e}")
# Unlock profile and continue, file might be corrupted
profile_manager_instance.unlock_profile(profile_name, owner=owner_id)
locked_profile = None
# Clean up the bad file
try: claimed_task_path.unlink()
except OSError: pass
continue
# The locked profile's proxy MUST be used for the download.
local_policy = deepcopy(policy)
local_policy.setdefault('download_policy', {})['proxy'] = locked_profile['proxy']
_run_download_logic(
source=claimed_task_path,
info_json_content=info_json_content,
policy=local_policy,
state_manager=state_manager,
args=args,
running_processes=running_processes,
process_lock=process_lock,
profile_name=profile_name,
profile_manager_instance=profile_manager_instance
)
# 4. Clean up the processed task file
try:
os.remove(claimed_task_path)
logger.debug(f"[{sp_utils.get_display_name(claimed_task_path)}] Removed processed task file.")
except OSError as e:
logger.error(f"Failed to remove processed task file '{claimed_task_path}': {e}")
else:
# This case should not be reached with the new task-first locking logic.
# If it is, it means find_task_and_lock_profile returned a profile but no task.
logger.warning(f"[Worker {worker_id}] Inconsistent state: locked profile '{profile_name}' but no task was claimed. Unlocking and continuing.")
except Exception as e:
logger.error(f"[Worker {worker_id}] An unexpected error occurred in the worker loop: {e}", exc_info=True)
time.sleep(5) # Pause before retrying to avoid spamming errors
finally:
if locked_profile:
# 5. Unlock the profile. Only apply cooldown if a task was processed.
cooldown = None
if claimed_task_path:
# Enforcer is the only point where we configure to apply different policies,
# since we might restart enforcer, but won't restart stress-policy working on auth and downloads simultaneously.
# This is like applying a policy across multiple workers/machines without needing to restart each of them.
# DESIGN: The cooldown duration is not configured in the worker's policy.
# Instead, it is read from a central Redis key. This key is set by the
# policy-enforcer, making the enforcer the single source of truth for
# this policy. This allows changing the cooldown behavior without
# restarting the workers.
cooldown_config = profile_manager_instance.get_config('unlock_cooldown_seconds')
if cooldown_config:
try:
val = json.loads(cooldown_config)
if isinstance(val, list) and len(val) == 2 and val[0] < val[1]:
cooldown = random.randint(val[0], val[1])
elif isinstance(val, int):
cooldown = val
except (json.JSONDecodeError, TypeError):
if cooldown_config.isdigit():
cooldown = int(cooldown_config)
if cooldown:
logger.info(f"[Worker {worker_id}] Putting profile '{locked_profile['name']}' into COOLDOWN for {cooldown}s.")
profile_manager_instance.unlock_profile(
locked_profile['name'],
owner=owner_id,
rest_for_seconds=cooldown
)
locked_profile = None
# 6. Throughput is now controlled by the enforcer via the profile's
# 'unlock_cooldown_seconds' policy, which puts the profile into a
# RESTING state. The worker does not need to sleep here and can
# immediately try to lock a new profile to maximize throughput.
logger.info(f"[Worker {worker_id}] Worker loop finished.")
return [] # This function doesn't return results directly
def run_direct_batch_worker(worker_id, policy, state_manager, args, profile_manager_instance, urls_list, running_processes, process_lock, profile_prefix=None):
"""A worker for the 'direct_batch_cli' orchestration mode."""
owner_id = f"direct-batch-worker-{worker_id}"
settings = policy.get('settings', {})
exec_control = policy.get('execution_control', {})
gen_policy = policy.get('info_json_generation_policy', {})
direct_policy = policy.get('direct_batch_cli_policy', {})
queue_policy = policy.get('queue_policy')
# Prioritize the passed-in profile_prefix for worker pool compatibility.
if not profile_prefix:
profile_prefix = gen_policy.get('profile_prefix')
if not profile_prefix:
logger.error(f"[Worker {worker_id}] Direct batch mode requires a 'profile_prefix' from the worker pool or 'info_json_generation_policy'. Worker exiting.")
return []
batch_size = direct_policy.get('batch_size')
if not batch_size:
logger.error(f"[Worker {worker_id}] Direct batch mode requires 'direct_batch_cli_policy.batch_size'. Worker exiting.")
return []
save_dir = settings.get('save_info_json_dir')
if not save_dir:
logger.error(f"[Worker {worker_id}] Direct batch mode requires 'settings.save_info_json_dir'. Worker exiting.")
return []
os.makedirs(save_dir, exist_ok=True)
last_used_profile_name = None
while not state_manager.shutdown_event.is_set():
locked_profile = None
temp_batch_file = None
# --- Variables for robust finalization ---
files_created = 0
url_batch_len = 0
batch_started = False
downloads_per_url = 0 # Default to 0, meaning no increment unless configured
# ---
try:
# 1. Lock a profile
locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix)
# --- New logic to avoid immediate reuse ---
avoid_reuse = direct_policy.get('avoid_immediate_profile_reuse', False)
if avoid_reuse and locked_profile and last_used_profile_name and locked_profile['name'] == last_used_profile_name:
logger.info(f"[Worker {worker_id}] Re-locked same profile '{locked_profile['name']}'. Unlocking and pausing to allow for rotation.")
profile_manager_instance.unlock_profile(locked_profile['name'], owner=owner_id)
wait_seconds = direct_policy.get('avoid_reuse_max_wait_seconds', 5)
time.sleep(wait_seconds)
# After waiting, try to lock again.
logger.info(f"[Worker {worker_id}] Attempting to lock a new profile after waiting.")
locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix)
if locked_profile and locked_profile['name'] == last_used_profile_name:
logger.warning(f"[Worker {worker_id}] Still locking the same profile '{locked_profile['name']}' after waiting. Proceeding to use it to avoid getting stuck.")
elif locked_profile:
logger.info(f"[Worker {worker_id}] Switched to a different profile after waiting: '{locked_profile['name']}'.")
# --- End new logic ---
if not locked_profile:
polling_interval = exec_control.get('worker_polling_interval_seconds', 1)
# --- Add diagnostic logging ---
all_profiles_in_pool = profile_manager_instance.list_profiles()
profiles_in_prefix = [p for p in all_profiles_in_pool if p['name'].startswith(profile_prefix)]
if profiles_in_prefix:
state_counts = collections.Counter(p['state'] for p in profiles_in_prefix)
states_summary = ', '.join(f"{count} {state}" for state, count in sorted(state_counts.items()))
logger.info(f"[Worker {worker_id}] No auth profiles available to lock. Pool status ({profile_prefix}*): {states_summary}. Pausing for {polling_interval}s.")
else:
logger.info(f"[Worker {worker_id}] No auth profiles available to lock. No profiles found with prefix '{profile_prefix}'. Pausing for {polling_interval}s.")
# --- End diagnostic logging ---
time.sleep(polling_interval)
continue
profile_name = locked_profile['name']
proxy_url = locked_profile['proxy']
# 2. Get a batch of URLs from the shared list
url_batch, start_idx = state_manager.get_next_url_batch(batch_size, urls_list)
if not url_batch:
logger.info(f"[Worker {worker_id}] No more URLs to process. Worker exiting.")
break # Exit the while loop
url_batch_len = len(url_batch)
batch_started = True
# --- Calculate how many download tasks will be generated ---
downloads_per_url = 0 # Default to 0, meaning no increment unless configured
downloads_per_url_config = gen_policy.get('downloads_per_url')
if downloads_per_url_config:
if isinstance(downloads_per_url_config, int):
downloads_per_url = downloads_per_url_config
elif downloads_per_url_config == 'from_download_policy':
download_policy = policy.get('download_policy', {})
formats_str = download_policy.get('formats', '')
if formats_str:
# Use smarter parsing to handle complex yt-dlp format selectors
if any(c in formats_str for c in '/+[]()'):
num_formats = 1
else:
num_formats = len([f for f in formats_str.split(',') if f.strip()])
if num_formats > 0:
downloads_per_url = num_formats
if downloads_per_url > 0:
downloads_to_increment = url_batch_len * downloads_per_url
profile_manager_instance.increment_pending_downloads(profile_name, downloads_to_increment)
logger.info(f"[Worker {worker_id}] [{profile_name}] Preemptively incremented pending downloads by {downloads_to_increment} for the upcoming batch ({url_batch_len} URLs * {downloads_per_url} formats).")
else:
logger.warning(f"[Worker {worker_id}] [{profile_name}] 'downloads_per_url' is not configured or resolves to 0. Pending downloads counter will not be incremented for this batch.")
end_idx = start_idx + len(url_batch)
logger.info(f"[Worker {worker_id}] [{profile_name}] Processing batch of {len(url_batch)} URLs (lines {start_idx + 1}-{end_idx} from source).")
video_ids_in_batch = {sp_utils.get_video_id(u) for u in url_batch}
# 3. Write URLs to a temporary batch file
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', encoding='utf-8') as f:
temp_batch_file = f.name
f.write('\n'.join(url_batch))
# 4. Construct and run the command
ytdlp_cmd_str = direct_policy.get('ytdlp_command')
if not ytdlp_cmd_str:
logger.error(f"[Worker {worker_id}] Direct batch mode requires 'direct_batch_cli_policy.ytdlp_command'.")
break
cmd = shlex.split(ytdlp_cmd_str)
cmd.extend(['--batch-file', temp_batch_file])
cmd.extend(['--proxy', proxy_url])
# The output template should not include the .info.json extension, as
# yt-dlp adds it automatically when --write-info-json is used.
output_template_str = direct_policy.get('ytdlp_output_template', '%(id)s')
ytdlp_args = direct_policy.get('ytdlp_args')
custom_env = direct_policy.get('env_vars', {}).copy()
# --- PYTHONPATH for custom yt-dlp module ---
ytdlp_module_path = direct_policy.get('ytdlp_module_path')
if ytdlp_module_path:
existing_pythonpath = custom_env.get('PYTHONPATH', os.environ.get('PYTHONPATH', ''))
# Prepend the custom path to PYTHONPATH to give it precedence
custom_env['PYTHONPATH'] = f"{ytdlp_module_path}{os.pathsep}{existing_pythonpath}".strip(os.pathsep)
logger.debug(f"[Worker {worker_id}] Using custom PYTHONPATH: {custom_env['PYTHONPATH']}")
custom_env['YTDLP_PROFILE_NAME'] = profile_name
custom_env['YTDLP_PROXY_URL'] = proxy_url
env_name = profile_manager_instance.key_prefix.replace('_profile_mgmt_', '')
custom_env['YTDLP_SIM_MODE'] = env_name
# Create a per-profile cache directory and set XDG_CACHE_HOME
cache_dir_base = direct_policy.get('cache_dir_base', '.cache')
profile_cache_dir = os.path.join(cache_dir_base, profile_name)
try:
os.makedirs(profile_cache_dir, exist_ok=True)
custom_env['XDG_CACHE_HOME'] = profile_cache_dir
except OSError as e:
logger.error(f"[Worker {worker_id}] Failed to create cache directory '{profile_cache_dir}': {e}")
# --- Manage User-Agent ---
# Use a consistent User-Agent per profile, storing it in the profile's cache directory.
user_agent = None
user_agent_file = os.path.join(profile_cache_dir, 'user_agent.txt')
try:
if os.path.exists(user_agent_file):
with open(user_agent_file, 'r', encoding='utf-8') as f:
user_agent = f.read().strip()
if not user_agent: # File doesn't exist or is empty
user_agent = sp_utils.generate_user_agent_from_policy(policy)
with open(user_agent_file, 'w', encoding='utf-8') as f:
f.write(user_agent)
logger.info(f"[{profile_name}] Generated and saved new User-Agent: '{user_agent}'")
else:
logger.info(f"[{profile_name}] Using existing User-Agent from cache: '{user_agent}'")
except IOError as e:
logger.error(f"[Worker {worker_id}] Error accessing User-Agent file '{user_agent_file}': {e}. Using generated UA for this run.")
user_agent = sp_utils.generate_user_agent_from_policy(policy) # fallback
# Add proxy rename from policy if specified, for custom yt-dlp forks
proxy_rename = direct_policy.get('ytdlp_proxy_rename')
if proxy_rename:
custom_env['YTDLP_PROXY_RENAME'] = proxy_rename
if user_agent:
cmd.extend(['--user-agent', user_agent])
if ytdlp_args:
cmd.extend(shlex.split(ytdlp_args))
if args.verbose and '--verbose' not in cmd:
cmd.append('--verbose')
if args.dummy_batch:
# In dummy batch mode, we simulate the entire batch process directly.
log_cmd = list(cmd)
log_cmd.extend(['-o', os.path.join('temp_dir', output_template_str)])
logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY BATCH MODE: Simulating batch of {len(url_batch)} URLs.")
logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY BATCH MODE: Would run real command: {' '.join(shlex.quote(s) for s in log_cmd)}")
logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY BATCH MODE: With environment: {custom_env}")
dummy_settings = policy.get('settings', {}).get('dummy_simulation_settings', {})
auth_failure_rate = dummy_settings.get('auth_failure_rate', 0.0)
auth_skipped_rate = dummy_settings.get('auth_skipped_failure_rate', 0.0)
min_seconds = dummy_settings.get('auth_min_seconds', 0.1)
max_seconds = dummy_settings.get('auth_max_seconds', 0.5)
for url in url_batch:
time.sleep(random.uniform(min_seconds, max_seconds))
video_id = sp_utils.get_video_id(url) or f"dummy_{random.randint(1000, 9999)}"
rand_val = random.random()
if rand_val < auth_skipped_rate:
logger.warning(f"[Worker {worker_id}] [{profile_name}] DUMMY: Simulating tolerated failure for {video_id}.")
profile_manager_instance.record_activity(profile_name, 'tolerated_error')
elif rand_val < (auth_skipped_rate + auth_failure_rate):
logger.warning(f"[Worker {worker_id}] [{profile_name}] DUMMY: Simulating fatal failure for {video_id}.")
profile_manager_instance.record_activity(profile_name, 'failure')
else:
# Success - create dummy info.json
files_created += 1
profile_manager_instance.record_activity(profile_name, 'success')
info_data = {'id': video_id, 'title': f'Dummy Video {video_id}', '_dummy': True}
env_name = profile_manager_instance.key_prefix.replace('_profile_mgmt_', '')
info_data['_ytops_metadata'] = {
'profile_name': profile_name, 'proxy_url': proxy_url,
'generation_timestamp_utc': datetime.now(timezone.utc).isoformat(),
'auth_env': env_name
}
final_path = Path(save_dir) / f"{video_id}.info.json"
rename_template = direct_policy.get('rename_file_template')
if rename_template:
sanitized_proxy = re.sub(r'[:/]', '_', proxy_url)
new_name = rename_template.format(video_id=video_id, profile_name=profile_name, proxy=sanitized_proxy)
final_path = Path(save_dir) / new_name
try:
with open(final_path, 'w', encoding='utf-8') as f:
json.dump(info_data, f, indent=2)
logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY: Created dummy info.json: '{final_path}'")
except IOError as e:
logger.error(f"[Worker {worker_id}] [{profile_name}] DUMMY: Failed to write dummy info.json: {e}")
success = (files_created > 0)
state_manager.record_batch_result(success, len(url_batch), profile_name=profile_name)
event = { 'type': 'fetch_batch', 'profile': profile_name, 'proxy_url': proxy_url, 'success': success, 'details': f"Dummy batch completed. Files created: {files_created}/{len(url_batch)}.", 'video_count': len(url_batch) }
state_manager.log_event(event)
else:
with tempfile.TemporaryDirectory(prefix=f"ytdlp-batch-{worker_id}-") as temp_output_dir:
output_template = os.path.join(temp_output_dir, output_template_str)
cmd.extend(['-o', output_template])
logger.info(f"[Worker {worker_id}] [{profile_name}] Processing batch of {len(url_batch)} URLs...")
logger.info(f"[Worker {worker_id}] [{profile_name}] Running command: {' '.join(shlex.quote(s) for s in cmd)}")
logger.info(f"[Worker {worker_id}] [{profile_name}] With environment: {custom_env}")
retcode, stdout, stderr = run_command(
cmd, running_processes, process_lock, env=custom_env, stream_output=args.verbose,
stream_prefix=f"[Worker {worker_id} | yt-dlp] "
)
is_bot_error = "Sign in to confirm you're not a bot" in stderr
if is_bot_error:
logger.warning(f"[Worker {worker_id}] [{profile_name}] Bot detection occurred during batch. Marking as failure.")
processed_files = list(Path(temp_output_dir).glob('*.json'))
for temp_path in processed_files:
files_created += 1
video_id = "unknown"
try:
with open(temp_path, 'r+', encoding='utf-8') as f:
info_data = json.load(f)
video_id = info_data.get('id', 'unknown')
env_name = profile_manager_instance.key_prefix.replace('_profile_mgmt_', '')
info_data['_ytops_metadata'] = {
'profile_name': profile_name,
'proxy_url': proxy_url,
'generation_timestamp_utc': datetime.now(timezone.utc).isoformat(),
'auth_env': env_name
}
f.seek(0)
json.dump(info_data, f, indent=2)
f.truncate()
final_path = Path(save_dir) / temp_path.name
rename_template = direct_policy.get('rename_file_template')
if rename_template:
sanitized_proxy = re.sub(r'[:/]', '_', proxy_url)
new_name = rename_template.format(
video_id=video_id, profile_name=profile_name, proxy=sanitized_proxy
)
final_path = Path(save_dir) / new_name
shutil.move(str(temp_path), str(final_path))
logger.info(f"[Worker {worker_id}] Post-processed and moved info.json to '{final_path}'")
except (IOError, json.JSONDecodeError, OSError) as e:
logger.error(f"[Worker {worker_id}] Error post-processing '{temp_path.name}' (video: {video_id}): {e}")
# The orchestrator records per-URL success/failure for the profile.
# A batch is considered an overall success for logging if it had no fatal errors
# and produced at least one file.
success = (files_created > 0 and not is_bot_error)
if not success:
reason = "bot detection occurred" if is_bot_error else f"0 files created out of {len(url_batch)}"
logger.warning(f"[Worker {worker_id}] [{profile_name}] Marking batch as FAILED. Reason: {reason}.")
# Record batch stats for overall orchestrator health
state_manager.record_batch_result(success, len(url_batch), profile_name=profile_name)
# In this mode, the custom yt-dlp script is responsible for recording
# per-URL activity ('success', 'failure', 'tolerated_error') directly into Redis.
# The orchestrator does not record activity here to avoid double-counting.
logger.info(f"[Worker {worker_id}] [{profile_name}] Batch finished. Per-URL activity was recorded by the yt-dlp script.")
event_details = f"Batch completed. Exit: {retcode}. Files created: {files_created}/{len(url_batch)}."
if not success and stderr:
if is_bot_error:
event_details += " Stderr: Bot detection occurred."
else:
event_details += f" Stderr: {stderr.strip().splitlines()[-1]}"
event = { 'type': 'fetch_batch', 'profile': profile_name, 'proxy_url': proxy_url, 'success': success, 'details': event_details, 'video_count': len(url_batch) }
state_manager.log_event(event)
except Exception as e:
logger.error(f"[Worker {worker_id}] Unexpected error in worker loop: {e}", exc_info=True)
if locked_profile:
profile_manager_instance.record_activity(locked_profile['name'], 'failure')
finally:
if locked_profile and batch_started:
# --- Reconcile pending downloads counter ---
if downloads_per_url > 0:
initial_increment = url_batch_len * downloads_per_url
actual_downloads = files_created * downloads_per_url
adjustment = actual_downloads - initial_increment
if adjustment != 0:
logger.warning(f"[Worker {worker_id}] [{profile_name}] Reconciling pending downloads. Batch created {files_created}/{url_batch_len} successful info.json(s). Adjusting counter by {adjustment}.")
profile_manager_instance.increment_pending_downloads(locked_profile['name'], adjustment)
if locked_profile:
last_used_profile_name = locked_profile['name']
cooldown = None
# DESIGN: The cooldown duration is not configured in the worker's policy.
# Instead, it is read from a central Redis key. This key is set by the
# policy-enforcer, making the enforcer the single source of truth for
# this policy. This allows changing the cooldown behavior without
# restarting the workers.
cooldown_config = profile_manager_instance.get_config('unlock_cooldown_seconds')
if cooldown_config:
try:
val = json.loads(cooldown_config)
if isinstance(val, list) and len(val) == 2 and val[0] < val[1]:
cooldown = random.randint(val[0], val[1])
elif isinstance(val, int):
cooldown = val
except (json.JSONDecodeError, TypeError):
if cooldown_config.isdigit():
cooldown = int(cooldown_config)
if cooldown:
logger.info(f"[Worker {worker_id}] Putting profile '{locked_profile['name']}' into COOLDOWN for {cooldown}s.")
profile_manager_instance.unlock_profile(
locked_profile['name'],
owner=owner_id,
rest_for_seconds=cooldown
)
if temp_batch_file and os.path.exists(temp_batch_file):
os.unlink(temp_batch_file)
logger.info(f"[Worker {worker_id}] Worker loop finished.")
return []
def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_manager_instance, urls_list, running_processes, process_lock, profile_prefix=None):
"""A worker for the 'direct_docker_cli' orchestration mode (fetch_only)."""
owner_id = f"direct-docker-worker-{worker_id}"
settings = policy.get('settings', {})
exec_control = policy.get('execution_control', {})
gen_policy = policy.get('info_json_generation_policy', {})
direct_policy = policy.get('direct_docker_cli_policy', {})
queue_policy = policy.get('queue_policy')
# Prioritize the passed-in profile_prefix for worker pool compatibility.
# If it's not passed (e.g. legacy 'workers' mode), fall back to policy.
if not profile_prefix:
profile_prefix = gen_policy.get('profile_prefix')
# Unlike other modes, this worker can function without a prefix (it will try to lock any active profile).
# The check `if not profile_prefix` is removed to allow this flexibility.
batch_size = direct_policy.get('batch_size')
if not batch_size and queue_policy:
batch_size = queue_policy.get('batch_size')
if not batch_size:
logger.error(f"[Worker {worker_id}] Direct docker mode requires 'batch_size' in 'direct_docker_cli_policy' or 'queue_policy'. Worker exiting.")
return []
save_dir = settings.get('save_info_json_dir')
if not save_dir:
logger.error(f"[Worker {worker_id}] Direct docker mode requires 'settings.save_info_json_dir'. Worker exiting.")
return []
os.makedirs(save_dir, exist_ok=True)
# --- Docker specific config ---
image_name = direct_policy.get('docker_image_name')
host_mount_path = os.path.abspath(direct_policy.get('docker_host_mount_path'))
container_mount_path = direct_policy.get('docker_container_mount_path')
host_cache_path = direct_policy.get('docker_host_cache_path')
if host_cache_path: host_cache_path = os.path.abspath(host_cache_path)
container_cache_path = direct_policy.get('docker_container_cache_path')
network_name = direct_policy.get('docker_network_name')
if not all([image_name, host_mount_path, container_mount_path]):
logger.error(f"[Worker {worker_id}] Direct docker mode requires 'docker_image_name', 'docker_host_mount_path', and 'docker_container_mount_path'. Worker exiting.")
return []
try:
os.makedirs(host_mount_path, exist_ok=True)
except OSError as e:
logger.error(f"[Worker {worker_id}] Could not create docker_host_mount_path '{host_mount_path}': {e}. Worker exiting.")
return []
last_used_profile_name = None
while not state_manager.shutdown_event.is_set():
locked_profile = None
temp_task_dir_host = None
task_batch = []
# --- Variables for robust finalization ---
live_success_count = 0
url_batch_len = 0
batch_started = False
downloads_per_url = 0 # Default to 0, meaning no increment unless configured
# ---
try:
# 1. Lock a profile
locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix)
# --- New logic to avoid immediate reuse ---
avoid_reuse = direct_policy.get('avoid_immediate_profile_reuse', False)
if avoid_reuse and locked_profile and last_used_profile_name and locked_profile['name'] == last_used_profile_name:
logger.info(f"[Worker {worker_id}] Re-locked same profile '{locked_profile['name']}'. Unlocking and pausing to allow for rotation.")
profile_manager_instance.unlock_profile(locked_profile['name'], owner=owner_id)
wait_seconds = direct_policy.get('avoid_reuse_max_wait_seconds', 5)
time.sleep(wait_seconds)
# After waiting, try to lock again.
logger.info(f"[Worker {worker_id}] Attempting to lock a new profile after waiting.")
locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix)
if locked_profile and locked_profile['name'] == last_used_profile_name:
logger.warning(f"[Worker {worker_id}] Still locking the same profile '{locked_profile['name']}' after waiting. Proceeding to use it to avoid getting stuck.")
elif locked_profile:
logger.info(f"[Worker {worker_id}] Switched to a different profile after waiting: '{locked_profile['name']}'.")
# --- End new logic ---
if not locked_profile:
polling_interval = exec_control.get('worker_polling_interval_seconds', 1)
# --- Add diagnostic logging ---
all_profiles_in_pool = profile_manager_instance.list_profiles()
profiles_in_prefix = [p for p in all_profiles_in_pool if p['name'].startswith(profile_prefix or '')]
if profiles_in_prefix:
state_counts = collections.Counter(p['state'] for p in profiles_in_prefix)
states_summary = ', '.join(f"{count} {state}" for state, count in sorted(state_counts.items()))
logger.info(f"[Worker {worker_id}] No auth profiles available to lock. Pool status ({profile_prefix or '*'}*): {states_summary}. Pausing for {polling_interval}s.")
else:
logger.info(f"[Worker {worker_id}] No auth profiles available to lock. No profiles found with prefix '{profile_prefix or '*'}'. Pausing for {polling_interval}s.")
# --- End diagnostic logging ---
time.sleep(polling_interval)
continue
profile_name = locked_profile['name']
proxy_url = locked_profile['proxy']
# --- Manage User-Agent, Visitor ID, and Cache Directory ---
user_agent = None
visitor_id = None
environment = {}
profile_cache_dir_host = None
if host_cache_path:
profile_cache_dir_host = os.path.join(host_cache_path, profile_name)
try:
os.makedirs(profile_cache_dir_host, exist_ok=True)
if container_cache_path:
environment['XDG_CACHE_HOME'] = container_cache_path
# --- User-Agent ---
user_agent_file = os.path.join(profile_cache_dir_host, 'user_agent.txt')
if os.path.exists(user_agent_file):
with open(user_agent_file, 'r', encoding='utf-8') as f:
user_agent = f.read().strip()
if not user_agent:
user_agent = sp_utils.generate_user_agent_from_policy(policy)
with open(user_agent_file, 'w', encoding='utf-8') as f:
f.write(user_agent)
logger.info(f"[{profile_name}] Generated and saved new User-Agent: '{user_agent}'")
else:
logger.info(f"[{profile_name}] Using existing User-Agent from cache: '{user_agent}'")
# --- Visitor ID ---
if direct_policy.get('track_visitor_id'):
visitor_id_file = os.path.join(profile_cache_dir_host, 'visitor_id.txt')
if os.path.exists(visitor_id_file):
with open(visitor_id_file, 'r', encoding='utf-8') as f:
visitor_id = f.read().strip()
if visitor_id:
logger.info(f"[{profile_name}] Using existing Visitor ID from cache: '{visitor_id}'")
except IOError as e:
logger.error(f"[Worker {worker_id}] Error accessing cache file in '{profile_cache_dir_host}': {e}. Using generated UA for this run.")
user_agent = sp_utils.generate_user_agent_from_policy(policy) # Fallback for UA
else:
# Fallback if no cache is configured for auth simulation
user_agent = sp_utils.generate_user_agent_from_policy(policy)
# 2. Get a batch of URLs
url_batch = []
start_idx = 0
if not queue_policy:
url_batch, start_idx = state_manager.get_next_url_batch(batch_size, urls_list)
else:
task_batch = state_manager.get_auth_tasks_batch(batch_size)
if task_batch:
# Mark tasks as in-progress
for task in task_batch:
task_id = task.get('id') or task.get('task_id')
if task_id:
state_manager.mark_auth_in_progress(task_id, owner_id)
url_batch = [task.get('url') for task in task_batch if task.get('url')]
if not url_batch:
if not queue_policy:
logger.info(f"[Worker {worker_id}] No more URLs to process. Worker exiting.")
break # Exit the while loop
else:
# Queue mode: no tasks, unlock and poll
polling_interval = exec_control.get('worker_polling_interval_seconds', 1)
logger.debug(f"[Worker {worker_id}] No tasks in queue for profile '{profile_name}'. Unlocking and sleeping for {polling_interval}s.")
profile_manager_instance.unlock_profile(profile_name, owner=owner_id)
locked_profile = None
time.sleep(polling_interval)
continue
url_batch_len = len(url_batch)
batch_started = True
# --- Calculate how many download tasks will be generated ---
downloads_per_url = 0 # Default to 0, meaning no increment unless configured
downloads_per_url_config = gen_policy.get('downloads_per_url')
if downloads_per_url_config:
if isinstance(downloads_per_url_config, int):
downloads_per_url = downloads_per_url_config
elif downloads_per_url_config == 'from_download_policy':
download_policy = policy.get('download_policy', {})
formats_str = download_policy.get('formats', '')
if formats_str:
# Use smarter parsing to handle complex yt-dlp format selectors
if any(c in formats_str for c in '/+[]()'):
num_formats = 1
else:
num_formats = len([f for f in formats_str.split(',') if f.strip()])
if num_formats > 0:
downloads_per_url = num_formats
if downloads_per_url > 0:
downloads_to_increment = url_batch_len * downloads_per_url
profile_manager_instance.increment_pending_downloads(profile_name, downloads_to_increment)
logger.info(f"[Worker {worker_id}] [{profile_name}] Preemptively incremented pending downloads by {downloads_to_increment} for the upcoming batch ({url_batch_len} URLs * {downloads_per_url} formats).")
else:
logger.warning(f"[Worker {worker_id}] [{profile_name}] 'downloads_per_url' is not configured or resolves to 0. Pending downloads counter will not be incremented for this batch.")
end_idx = start_idx + len(url_batch)
logger.info(f"[Worker {worker_id}] [{profile_name}] Processing batch of {len(url_batch)} URLs (lines {start_idx + 1}-{end_idx} from source).")
# 3. Prepare files on the host
temp_task_dir_host = tempfile.mkdtemp(prefix=f"docker-task-{worker_id}-", dir=host_mount_path)
task_dir_name = os.path.basename(temp_task_dir_host)
task_dir_container = os.path.join(container_mount_path, task_dir_name)
# The config file path is passed explicitly to yt-dlp via --config-locations,
# so setting XDG_CONFIG_HOME in the environment is redundant.
# Write batch file
temp_batch_file_host = os.path.join(temp_task_dir_host, 'batch.txt')
with open(temp_batch_file_host, 'w', encoding='utf-8') as f:
f.write('\n'.join(url_batch))
# Write yt-dlp config file
base_config_content = ""
base_config_file = direct_policy.get('ytdlp_config_file')
if base_config_file:
# Try path as-is first, then relative to project root.
config_path_to_read = Path(base_config_file)
if not config_path_to_read.exists():
config_path_to_read = Path(sp_utils._PROJECT_ROOT) / base_config_file
if config_path_to_read.exists():
try:
with open(config_path_to_read, 'r', encoding='utf-8') as f:
base_config_content = f.read()
logger.info(f"[Worker {worker_id}] [{profile_name}] Loaded base config from '{config_path_to_read}'")
except IOError as e:
logger.error(f"[Worker {worker_id}] Could not read ytdlp_config_file '{config_path_to_read}': {e}")
else:
logger.error(f"[Worker {worker_id}] Could not find ytdlp_config_file: '{base_config_file}'")
config_overrides = direct_policy.get('ytdlp_config_overrides', {}).copy()
if direct_policy.get('use_cookies') and host_cache_path and container_cache_path:
try:
cookie_file_host = os.path.join(profile_cache_dir_host, 'cookies.txt')
# Ensure the file exists and has the Netscape header if it's empty.
if not os.path.exists(cookie_file_host) or os.path.getsize(cookie_file_host) == 0:
with open(cookie_file_host, 'w', encoding='utf-8') as f:
f.write("# Netscape HTTP Cookie File\n")
logger.info(f"[{profile_name}] Created/initialized cookie file with header: {cookie_file_host}")
cookie_file_container = os.path.join(container_cache_path, 'cookies.txt')
config_overrides['cookies'] = cookie_file_container
logger.info(f"[{profile_name}] Using persistent cookie jar: {cookie_file_host}")
except (IOError, OSError) as e:
logger.error(f"[Worker {worker_id}] Could not create cookie file in '{profile_cache_dir_host}': {e}")
# Inject per-task values into overrides
config_overrides['proxy'] = proxy_url
config_overrides['batch-file'] = os.path.join(task_dir_container, 'batch.txt')
# The output template should not include the .info.json extension, as
# yt-dlp adds it automatically when --write-info-json is used.
config_overrides['output'] = os.path.join(task_dir_container, '%(id)s')
if user_agent:
config_overrides['user-agent'] = user_agent
overrides_content = sp_utils._config_dict_to_flags_file_content(config_overrides)
raw_args_from_policy = direct_policy.get('ytdlp_raw_args', [])
# --- Inject visitor_id into raw args if available ---
if visitor_id:
# Start with a copy of the raw args from policy
new_raw_args = list(raw_args_from_policy)
# --- Handle youtube extractor args ---
youtube_arg_index = -1
original_youtube_value = None
for i, arg in enumerate(new_raw_args):
if arg.startswith('--extractor-args') and 'youtube:' in arg:
youtube_arg_index = i
try:
parts = shlex.split(arg)
if len(parts) == 2 and parts[1].startswith('youtube:'):
original_youtube_value = parts[1]
except ValueError:
logger.warning(f"Could not parse extractor-arg, will not modify: {arg}")
break # Found it, stop searching
if youtube_arg_index != -1 and original_youtube_value:
# Modify existing youtube arg
new_value = f'{original_youtube_value.rstrip()};visitor_data={visitor_id}'
if 'skip=' in new_value:
new_value = re.sub(r'skip=([^;\'"]*)', r'skip=\1,webpage,configs', new_value)
else:
new_value += ';skip=webpage,configs'
new_raw_args[youtube_arg_index] = f'--extractor-args "{new_value}"'
else:
# Add new youtube arg
logger.warning(f"[{profile_name}] No existing '--extractor-args youtube:...' found. Adding a new one for visitor_id.")
new_raw_args.append(f'--extractor-args "youtube:visitor_data={visitor_id};skip=webpage,configs"')
# --- Handle youtubetab extractor args ---
youtubetab_arg_index = -1
for i, arg in enumerate(new_raw_args):
if arg.startswith('--extractor-args') and 'youtubetab:' in arg:
youtubetab_arg_index = i
break
# The request is to set/replace this argument
new_youtubetab_arg = '--extractor-args "youtubetab:skip=webpage"'
if youtubetab_arg_index != -1:
# Replace existing
new_raw_args[youtubetab_arg_index] = new_youtubetab_arg
else:
# Add new
new_raw_args.append(new_youtubetab_arg)
raw_args_from_policy = new_raw_args
# --- End visitor_id injection ---
raw_args_content = '\n'.join(raw_args_from_policy)
config_content = f"{base_config_content.strip()}\n\n# --- Overrides from policy ---\n{overrides_content}"
if raw_args_content:
config_content += f"\n\n# --- Raw args from policy ---\n{raw_args_content}"
logger.info(f"[Worker {worker_id}] [{profile_name}] Generated yt-dlp config file content:\n---config---\n{config_content}\n------------")
# Create the directory structure yt-dlp expects inside the temp task dir
ytdlp_config_dir_host = os.path.join(temp_task_dir_host, 'yt-dlp')
os.makedirs(ytdlp_config_dir_host, exist_ok=True)
temp_config_file_host = os.path.join(ytdlp_config_dir_host, 'config')
with open(temp_config_file_host, 'w', encoding='utf-8') as f:
f.write(config_content)
# 4. Construct and run the 'docker run' command
volumes = {
host_mount_path: {
'bind': container_mount_path,
'mode': 'rw'
}
}
if host_cache_path and container_cache_path:
profile_cache_dir_host = os.path.join(host_cache_path, profile_name)
os.makedirs(profile_cache_dir_host, exist_ok=True)
volumes[profile_cache_dir_host] = {
'bind': container_cache_path,
'mode': 'rw'
}
# The command tells yt-dlp exactly where to find the config file we created.
command = ['yt-dlp', '--config-locations', os.path.join(task_dir_container, 'yt-dlp/config')]
logger.info(f"[Worker {worker_id}] [{profile_name}] Running docker command: {' '.join(shlex.quote(s) for s in command)}")
# For logging purposes, construct the full equivalent command line with host paths
log_config_overrides_for_host = config_overrides.copy()
log_config_overrides_for_host['batch-file'] = temp_batch_file_host
log_config_overrides_for_host['output'] = os.path.join(temp_task_dir_host, '%(id)s')
if 'cookies' in log_config_overrides_for_host and host_cache_path:
log_config_overrides_for_host['cookies'] = os.path.join(profile_cache_dir_host, 'cookies.txt')
log_command_override = ['yt-dlp']
if base_config_content:
log_command_override.extend(sp_utils._parse_config_file_to_cli_args(base_config_content))
log_command_override.extend(sp_utils._config_dict_to_cli_flags(log_config_overrides_for_host))
for raw_arg in raw_args_from_policy:
log_command_override.extend(shlex.split(raw_arg))
# --- Live log parsing and activity recording ---
live_failure_count = 0
live_tolerated_count = 0
activity_lock = threading.Lock()
# Get error patterns from policy for live parsing
tolerated_error_patterns = direct_policy.get('tolerated_error_patterns', [])
fatal_error_patterns = direct_policy.get('fatal_error_patterns', [])
def log_parser_callback(line):
nonlocal live_success_count, live_failure_count, live_tolerated_count
# --- Visitor ID Extraction ---
if direct_policy.get('track_visitor_id') and profile_cache_dir_host:
# e.g., [debug] [youtube] [pot:cache] TRACE: Retrieved cache spec PoTokenCacheSpec(key_bindings={'t': 'webpo', 'cb': '...', 'cbt': 'visitor_id', ...
match = re.search(r"'cb': '([^']*)', 'cbt': 'visitor_id'", line)
if match:
new_visitor_id = match.group(1)
logger.info(f"[Worker {worker_id}] [{profile_name}] Detected new Visitor ID: {new_visitor_id}")
try:
visitor_id_file = os.path.join(profile_cache_dir_host, 'visitor_id.txt')
with open(visitor_id_file, 'w', encoding='utf-8') as f:
f.write(new_visitor_id)
logger.info(f"[{profile_name}] Saved new Visitor ID to cache.")
except IOError as e:
logger.error(f"[{profile_name}] Failed to save new Visitor ID to cache: {e}")
# Success is the highest priority check
if '[info] Writing video metadata as JSON to:' in line:
post_processed_successfully = False
# --- Immediate post-processing ---
try:
path_match = re.search(r"Writing video metadata as JSON to: '?([^']+)'?$", line)
if not path_match:
path_match = re.search(r"Writing video metadata as JSON to: (.*)$", line)
if path_match:
container_file_path = path_match.group(1).strip()
if container_file_path.startswith(container_mount_path):
relative_path = os.path.relpath(container_file_path, container_mount_path)
host_file_path = os.path.join(host_mount_path, relative_path)
# The file might not exist immediately.
for _ in range(5): # Retry for up to 0.5s
if os.path.exists(host_file_path):
break
time.sleep(0.1)
if os.path.exists(host_file_path):
post_processed_successfully = _post_process_and_move_info_json(
Path(host_file_path), profile_name, proxy_url, policy, worker_id,
state_manager, profile_manager_instance=profile_manager_instance
)
else:
logger.warning(f"File from log not found on host for immediate processing: {host_file_path}")
except Exception as e:
logger.error(f"Error during immediate post-processing from log line: {e}")
with activity_lock:
if post_processed_successfully:
live_success_count += 1
logger.info(f"[Worker {worker_id}] [{profile_name}] Live success #{live_success_count} detected and post-processed.")
profile_manager_instance.record_activity(profile_name, 'success')
else:
live_failure_count += 1
logger.error(f"[Worker {worker_id}] [{profile_name}] Post-processing failed for a successful fetch. Recording as failure.")
profile_manager_instance.record_activity(profile_name, 'failure')
# --- End immediate post-processing ---
return False
# Check for fatal patterns (e.g., bot detection) which might not start with ERROR:
for pattern in fatal_error_patterns:
if re.search(pattern, line, re.IGNORECASE):
with activity_lock:
live_failure_count += 1
logger.error(f"[Worker {worker_id}] [{profile_name}] Live FATAL error #{live_failure_count} detected from log: {line}")
profile_manager_instance.record_activity(profile_name, 'failure')
if direct_policy.get('ban_on_fatal_error_in_batch'):
logger.warning(f"Banning profile '{profile_name}' immediately due to fatal error to stop container.")
profile_manager_instance.update_profile_state(profile_name, 'BANNED', 'Fatal error during batch')
return True # Signal to stop container
return False # Do not stop if ban_on_fatal_error_in_batch is false
# Only process lines that contain ERROR: for tolerated/generic failures
if 'ERROR:' not in line:
return False
# Check if it's a tolerated error
for pattern in tolerated_error_patterns:
if re.search(pattern, line, re.IGNORECASE):
with activity_lock:
live_tolerated_count += 1
logger.warning(f"[Worker {worker_id}] [{profile_name}] Live TOLERATED error #{live_tolerated_count} detected from log: {line}")
profile_manager_instance.record_activity(profile_name, 'tolerated_error')
return False
# If it's an ERROR: line and not tolerated, it's a failure
with activity_lock:
live_failure_count += 1
logger.warning(f"[Worker {worker_id}] [{profile_name}] Live failure #{live_failure_count} detected from log: {line}")
profile_manager_instance.record_activity(profile_name, 'failure')
return False
if args.dummy_batch:
logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY BATCH MODE: Simulating Docker batch of {len(url_batch)} URLs.")
logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY BATCH MODE: Would run docker command: {' '.join(shlex.quote(s) for s in command)}")
logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY BATCH MODE: With environment: {environment}")
dummy_settings = policy.get('settings', {}).get('dummy_simulation_settings', {})
auth_failure_rate = dummy_settings.get('auth_failure_rate', 0.0)
auth_skipped_rate = dummy_settings.get('auth_skipped_failure_rate', 0.0)
min_seconds = dummy_settings.get('auth_min_seconds', 0.1)
max_seconds = dummy_settings.get('auth_max_seconds', 0.5)
for url in url_batch:
time.sleep(random.uniform(min_seconds, max_seconds))
video_id = sp_utils.get_video_id(url) or f"dummy_{random.randint(1000, 9999)}"
rand_val = random.random()
if rand_val < auth_skipped_rate:
logger.warning(f"[Worker {worker_id}] [{profile_name}] DUMMY: Simulating tolerated failure for {video_id}.")
live_tolerated_count += 1
profile_manager_instance.record_activity(profile_name, 'tolerated_error')
elif rand_val < (auth_skipped_rate + auth_failure_rate):
logger.warning(f"[Worker {worker_id}] [{profile_name}] DUMMY: Simulating fatal failure for {video_id}.")
live_failure_count += 1
profile_manager_instance.record_activity(profile_name, 'failure')
else:
# Success - create dummy info.json
live_success_count += 1
profile_manager_instance.record_activity(profile_name, 'success')
info_data = {'id': video_id, 'title': f'Dummy Video {video_id}', '_dummy': True}
env_name = profile_manager_instance.key_prefix.replace('_profile_mgmt_', '')
info_data['_ytops_metadata'] = {
'profile_name': profile_name, 'proxy_url': proxy_url,
'generation_timestamp_utc': datetime.now(timezone.utc).isoformat(),
'auth_env': env_name
}
final_path = Path(save_dir) / f"{video_id}.info.json"
rename_template = direct_policy.get('rename_file_template')
if rename_template:
sanitized_proxy = re.sub(r'[:/]', '_', proxy_url)
new_name = rename_template.format(video_id=video_id, profile_name=profile_name, proxy=sanitized_proxy)
final_path = Path(save_dir) / new_name
try:
with open(final_path, 'w', encoding='utf-8') as f:
json.dump(info_data, f, indent=2)
logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY: Created dummy info.json: '{final_path}'")
except IOError as e:
logger.error(f"[Worker {worker_id}] [{profile_name}] DUMMY: Failed to write dummy info.json: {e}")
retcode = 0
stdout, stderr, stop_reason = "", "", None
else:
retcode, stdout, stderr, stop_reason = run_docker_container(
image_name=image_name,
command=command,
volumes=volumes,
stream_prefix=f"[Worker {worker_id} | docker-ytdlp] ",
network_name=network_name,
log_callback=log_parser_callback,
profile_manager=profile_manager_instance,
profile_name=profile_name,
environment=environment,
log_command_override=log_command_override
)
# 5. Post-process results
logger.info(f"[Worker {worker_id}] [{profile_name}] Docker container finished. Post-processing results...")
full_output = f"{stdout}\n{stderr}"
is_bot_error = "Sign in to confirm you're not a bot" in full_output
if is_bot_error:
logger.warning(f"[Worker {worker_id}] [{profile_name}] Bot detection occurred during batch. Marking as failure.")
# Fallback post-processing for any files missed by the live parser.
# The live parser moves files, so this loop should only find leftovers.
# This is no longer needed for dummy mode as it writes directly to the final destination.
if not args.dummy_batch:
processed_files = list(Path(temp_task_dir_host).glob('*.json'))
if processed_files:
logger.info(f"[Worker {worker_id}] Found {len(processed_files)} leftover file(s) to process after live parsing.")
for temp_path in processed_files:
_post_process_and_move_info_json(temp_path, profile_name, proxy_url, policy, worker_id, state_manager, profile_manager_instance=profile_manager_instance)
# A batch is considered an overall success for logging if it had no fatal errors.
# The per-URL activity has already been recorded live.
# We use live_success_count for a more accurate success metric.
success = (live_success_count > 0 and not is_bot_error)
if not success:
reason = "bot detection occurred" if is_bot_error else f"0 successful files created out of {len(url_batch)}"
logger.warning(f"[Worker {worker_id}] [{profile_name}] Marking batch as FAILED. Reason: {reason}.")
# Record batch stats for overall orchestrator health
state_manager.record_batch_result(success, len(url_batch), profile_name=profile_name)
# If live parsing didn't catch all activity (e.g., yt-dlp exits before printing logs),
# we reconcile the counts here based on files created.
with activity_lock:
processed_count = live_success_count + live_failure_count + live_tolerated_count
# Failures are harder to reconcile from file counts alone.
# We assume live parsing caught them. The total number of failures is
# (batch_size - files_created), but we don't know if they were already recorded.
# The current live parsing is the most reliable source for failures.
unaccounted_failures = len(url_batch) - processed_count
if unaccounted_failures > 0:
logger.info(f"[Worker {worker_id}] [{profile_name}] Reconciling activity: {unaccounted_failures} unaccounted failure(s).")
for _ in range(unaccounted_failures):
profile_manager_instance.record_activity(profile_name, 'failure')
if stop_reason:
logger.warning(f"[Worker {worker_id}] [{profile_name}] Batch aborted due to: {stop_reason}. Adjusting URL index.")
# The batch was from start_idx to end_idx.
# We processed `processed_count` URLs.
# The next batch should start from `start_idx + processed_count`.
# `get_next_url_batch` updated `last_url_index` to `end_idx`. We need to rewind it.
with activity_lock:
processed_count = live_success_count + live_failure_count + live_tolerated_count
next_start_index = start_idx + processed_count
state_manager.update_last_url_index(next_start_index, force=True)
logger.info(f"[Worker {worker_id}] Rewound URL index to {next_start_index} for next worker.")
event_details = f"Docker batch completed. Exit: {retcode}. Files created: {live_success_count}/{len(url_batch)}. (Live successes: {live_success_count}, Live failures: {live_failure_count}, Live tolerated: {live_tolerated_count})"
if not success and stderr:
event_details += f" Stderr: {stderr.strip().splitlines()[-1] if stderr.strip() else 'N/A'}"
if stop_reason:
event_details += f" Aborted: {stop_reason}."
event = { 'type': 'fetch_batch', 'profile': profile_name, 'proxy_url': proxy_url, 'success': success, 'details': event_details, 'video_count': len(url_batch) }
state_manager.log_event(event)
logger.info(f"[Worker {worker_id}] [{profile_name}] Batch processing complete. Worker will now unlock profile and attempt next batch.")
except Exception as e:
logger.error(f"[Worker {worker_id}] Unexpected error in worker loop: {e}", exc_info=True)
if locked_profile:
profile_manager_instance.record_activity(locked_profile['name'], 'failure')
finally:
if locked_profile and batch_started:
# --- Reconcile pending downloads counter ---
if downloads_per_url > 0:
# The initial increment was (url_batch_len * downloads_per_url).
# The actual number of downloads that will happen is (live_success_count * downloads_per_url).
# The adjustment is the difference.
initial_increment = url_batch_len * downloads_per_url
actual_downloads = live_success_count * downloads_per_url
adjustment = actual_downloads - initial_increment
if adjustment != 0:
logger.warning(f"[Worker {worker_id}] [{profile_name}] Reconciling pending downloads. Batch created {live_success_count}/{url_batch_len} successful info.json(s). Adjusting counter by {adjustment}.")
profile_manager_instance.increment_pending_downloads(locked_profile['name'], adjustment)
if locked_profile:
last_used_profile_name = locked_profile['name']
cooldown = None
# DESIGN: The cooldown duration is not configured in the worker's policy.
# Instead, it is read from a central Redis key. This key is set by the
# policy-enforcer, making the enforcer the single source of truth for
# this policy. This allows changing the cooldown behavior without
# restarting the workers.
cooldown_config = profile_manager_instance.get_config('unlock_cooldown_seconds')
if cooldown_config:
try:
val = json.loads(cooldown_config)
if isinstance(val, list) and len(val) == 2 and val[0] < val[1]:
cooldown = random.randint(val[0], val[1])
elif isinstance(val, int):
cooldown = val
except (json.JSONDecodeError, TypeError):
if isinstance(cooldown_config, str) and cooldown_config.isdigit():
cooldown = int(cooldown_config)
if cooldown:
logger.info(f"[Worker {worker_id}] Putting profile '{locked_profile['name']}' into COOLDOWN for {cooldown}s.")
profile_manager_instance.unlock_profile(
locked_profile['name'],
owner=owner_id,
rest_for_seconds=cooldown
)
if temp_task_dir_host and os.path.exists(temp_task_dir_host):
# If shutdown is requested, a batch might have been interrupted after files were
# created but before they were post-processed. We preserve the temp directory
# to allow for manual recovery of the info.json files.
if state_manager.shutdown_event.is_set() and any(Path(temp_task_dir_host).iterdir()):
logger.warning(f"Shutdown requested. Preserving temporary task directory for manual recovery: {temp_task_dir_host}")
else:
shutil.rmtree(temp_task_dir_host)
logger.info(f"[Worker {worker_id}] Worker loop finished.")
return []
def run_direct_docker_download_worker(worker_id, policy, state_manager, args, profile_manager_instance, running_processes, process_lock, profile_prefix=None):
"""A worker for the 'direct_docker_cli' orchestration mode with `mode: download_only`."""
logger.info(f"[Worker {worker_id}] Download worker thread started for pool '{profile_prefix or '*'}'.")
try:
owner_id = f"direct-docker-dl-worker-{worker_id}"
settings = policy.get('settings', {})
exec_control = policy.get('execution_control', {})
d_policy = policy.get('download_policy', {})
direct_policy = policy.get('direct_docker_cli_policy', {})
queue_policy = policy.get('queue_policy')
# Prioritize the passed-in profile_prefix for worker pool compatibility.
if not profile_prefix:
profile_prefix = d_policy.get('profile_prefix')
# Unlike other modes, this worker can function without a prefix (it will try to lock any active profile).
# The check `if not profile_prefix` is removed to allow this flexibility.
# --- Docker specific config ---
image_name = direct_policy.get('docker_image_name')
host_mount_path = direct_policy.get('docker_host_mount_path')
container_mount_path = direct_policy.get('docker_container_mount_path')
host_download_path = direct_policy.get('docker_host_download_path')
container_download_path = direct_policy.get('docker_container_download_path')
network_name = direct_policy.get('docker_network_name')
if not all([image_name, host_mount_path, container_mount_path, host_download_path, container_download_path]):
logger.error(f"[Worker {worker_id}] Direct docker download mode requires all docker_* keys in 'direct_docker_cli_policy'. Worker exiting.")
return []
try:
os.makedirs(host_mount_path, exist_ok=True)
os.makedirs(host_download_path, exist_ok=True)
except OSError as e:
logger.error(f"[Worker {worker_id}] Could not create required host directories: {e}. Worker exiting.")
return []
no_task_streak = 0
last_used_profile_name = None
task_counter = 0
while not state_manager.shutdown_event.is_set():
locked_profile = None
claimed_task_path_host = None
temp_config_dir_host = None
was_banned_by_parser = False
task = None
task_id = None
try:
if no_task_streak > 0 and not queue_policy: # Polling only makes sense for file mode
polling_interval = exec_control.get('worker_polling_interval_seconds', 1)
# --- Add diagnostic logging ---
all_profiles_in_pool = profile_manager_instance.list_profiles()
profiles_in_prefix = [p for p in all_profiles_in_pool if p['name'].startswith(profile_prefix or '')]
if profiles_in_prefix:
state_counts = collections.Counter(p['state'] for p in profiles_in_prefix)
states_summary = ', '.join(f"{count} {state}" for state, count in sorted(state_counts.items()))
logger.info(f"[Worker {worker_id}] No tasks found or profiles available. Pool status ({profile_prefix or '*'}*): {states_summary}. Pausing for {polling_interval}s. (Streak: {no_task_streak})")
else:
logger.info(f"[Worker {worker_id}] No tasks found or profiles available. No profiles found with prefix '{profile_prefix or '*'}'. Pausing for {polling_interval}s. (Streak: {no_task_streak})")
# --- End diagnostic logging ---
time.sleep(polling_interval)
if state_manager.shutdown_event.is_set(): continue
# 1. Get a task
if not queue_policy:
# File-based mode: Find a task and lock its associated profile
locked_profile, claimed_task_path_host = find_task_and_lock_profile(
profile_manager_instance, owner_id, profile_prefix, policy, worker_id
)
else:
# Queue-based mode
task = state_manager.get_download_task()
if task:
task_id = task.get('id') or task.get('task_id')
if not task_id:
task_id = f"dl_task_{worker_id}_{task_counter}"
task_counter += 1
task['task_id'] = task_id
info_json_path_str = task.get('info_json_path')
if not info_json_path_str or not os.path.exists(info_json_path_str):
logger.error(f"[Worker {worker_id}] Task {task_id} has invalid info_json_path: {info_json_path_str}. Skipping.")
state_manager.report_download_skipped(task_id, {"error": "Invalid info_json_path", "task": task})
auth_profile_name = task.get('auth_profile_name')
auth_env = task.get('auth_env')
if auth_profile_name and auth_env:
auth_manager = get_auth_manager(profile_manager_instance, auth_env)
if auth_manager:
auth_manager.decrement_pending_downloads(auth_profile_name)
continue
claimed_task_path_host = Path(info_json_path_str)
# Now lock a profile
specific_profile = task.get('auth_profile_name') or task.get('profile_name')
if specific_profile:
locked_profile = profile_manager_instance.lock_profile(owner=owner_id, specific_profile_name=specific_profile)
if not locked_profile:
logger.warning(f"[Worker {worker_id}] Could not lock specific profile '{specific_profile}'. Trying any profile with prefix.")
locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix)
else:
locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix)
if not locked_profile:
logger.warning(f"[Worker {worker_id}] No profiles available for task {task_id}. Re-queueing.")
state_manager.add_download_tasks_batch([task])
claimed_task_path_host = None
task = None
if task:
state_manager.mark_download_in_progress(task_id, owner_id)
if not locked_profile:
if not queue_policy:
no_task_streak += 1
else:
# In queue mode, if we didn't get a task or a profile, we just poll.
polling_interval = exec_control.get('worker_polling_interval_seconds', 1)
logger.debug(f"[Worker {worker_id}] No download tasks or profiles available. Sleeping for {polling_interval}s.")
time.sleep(polling_interval)
continue
profile_name = locked_profile['name']
# We have a task and a lock.
# User-Agent is not used for download simulation.
user_agent = None
if claimed_task_path_host:
no_task_streak = 0
auth_profile_name, auth_env = None, None
info_data = None
# --- Read info.json content and metadata first ---
try:
with open(claimed_task_path_host, 'r', encoding='utf-8') as f:
info_data = json.load(f)
# This is critical for decrementing the counter in the finally block
metadata = info_data.get('_ytops_metadata', {})
auth_profile_name = metadata.get('profile_name')
auth_env = metadata.get('auth_env')
except (IOError, json.JSONDecodeError) as e:
logger.error(f"CRITICAL: Could not read or parse task file '{claimed_task_path_host.name}': {e}. This task will be skipped, but the pending downloads counter CANNOT be decremented.")
continue # Skip to finally block to unlock profile
if args.dummy or args.dummy_batch:
logger.info(f"========== [Worker {worker_id}] BEGIN DUMMY DOCKER DOWNLOAD PER-FORMAT SIMULATION ==========")
logger.info(f"[Worker {worker_id}] Profile: {profile_name} | Task: {claimed_task_path_host.name}")
dummy_settings = policy.get('settings', {}).get('dummy_simulation_settings', {})
min_seconds = dummy_settings.get('download_min_seconds', 1.0)
max_seconds = dummy_settings.get('download_max_seconds', 3.0)
failure_rate = dummy_settings.get('download_failure_rate', 0.0)
skipped_rate = dummy_settings.get('download_skipped_failure_rate', 0.0)
# In dummy mode, prioritize the format from the task file, then from the policy.
format_selection = info_data.get('_ytops_download_format')
source_of_format = "task file"
if not format_selection:
format_selection = d_policy.get('formats', '')
source_of_format = "policy"
if not format_selection:
logger.warning(f"[Worker {worker_id}] DUMMY: No format specified in task file or policy. Simulating a single download.")
formats_to_test = ['dummy_format']
else:
formats_to_test = [f.strip() for f in format_selection.split(',') if f.strip()]
logger.info(f"[Worker {worker_id}] DUMMY: Simulating downloads for formats (from {source_of_format}): {', '.join(formats_to_test)}")
for format_id in formats_to_test:
if state_manager.shutdown_event.is_set():
logger.info(f"[Worker {worker_id}] DUMMY: Shutdown requested, stopping format simulation.")
break
logger.info(f"[Worker {worker_id}] DUMMY: Simulating download for format '{format_id}'...")
time.sleep(random.uniform(min_seconds, max_seconds))
rand_val = random.random()
should_fail_skipped = rand_val < skipped_rate
should_fail_fatal = not should_fail_skipped and rand_val < (skipped_rate + failure_rate)
success = False
details = ""
error_type = None
is_tolerated_error = False
if should_fail_skipped:
logger.warning(f"[Worker {worker_id}] DUMMY: Simulating skipped download failure for format '{format_id}'.")
details = f"Dummy skipped failure for format {format_id}"
error_type = "DummySkippedFailure"
is_tolerated_error = True
profile_manager_instance.record_activity(profile_name, 'tolerated_error')
elif should_fail_fatal:
logger.warning(f"[Worker {worker_id}] DUMMY: Simulating fatal download failure for format '{format_id}'.")
details = f"Dummy fatal failure for format {format_id}"
error_type = "DummyFailure"
profile_manager_instance.record_activity(profile_name, 'download_error')
else:
logger.info(f"[Worker {worker_id}] DUMMY: Simulating download success for format '{format_id}'.")
success = True
details = f"Dummy success for format {format_id}"
profile_manager_instance.record_activity(profile_name, 'download')
event = {
'type': 'direct_docker_download',
'profile': profile_name,
'proxy_url': locked_profile['proxy'],
'success': success,
'details': details,
'error_type': error_type,
'is_tolerated_error': is_tolerated_error,
'format': format_id
}
state_manager.log_event(event)
logger.info(f"========== [Worker {worker_id}] END DUMMY DOCKER DOWNLOAD SIMULATION ==========")
# In dummy mode, we just rename the file to processed and continue to the finally block.
try:
base_path_str = str(claimed_task_path_host).rsplit('.LOCKED.', 1)[0]
processed_path = Path(f"{base_path_str}.processed")
claimed_task_path_host.rename(processed_path)
logger.debug(f"DUMMY MODE: Renamed processed task file to '{processed_path.name}'.")
except (OSError, IndexError) as e:
logger.error(f"DUMMY MODE: Failed to rename processed task file '{claimed_task_path_host}': {e}")
continue # Skip to finally block
# --- Check for URL expiration before running Docker ---
if d_policy.get('check_url_expiration', True):
# Heuristic: check the first available format URL
first_format = next((f for f in info_data.get('formats', []) if 'url' in f), None)
if first_format:
url_to_check = first_format['url']
time_shift_minutes = d_policy.get('expire_time_shift_minutes', 0)
status, time_left_seconds = sp_utils.check_url_expiry(url_to_check, time_shift_minutes)
logger.debug(f"[Worker {worker_id}] [{profile_name}] URL expiration check for task '{claimed_task_path_host.name}': status={status}, time_left={time_left_seconds:.0f}s")
if status == 'expired':
details = "Download URL is expired"
if time_shift_minutes > 0 and time_left_seconds > 0:
logger.warning(f"[Worker {worker_id}] [{profile_name}] Skipping task '{claimed_task_path_host.name}' because its URL will expire in {time_left_seconds/60:.1f}m (within {time_shift_minutes}m time-shift).")
details = f"URL will expire within {time_shift_minutes}m time-shift"
else:
logger.warning(f"[Worker {worker_id}] [{profile_name}] Skipping task '{claimed_task_path_host.name}' because its URL is expired.")
profile_manager_instance.record_activity(profile_name, 'tolerated_error')
event = {
'type': 'direct_docker_download', 'profile': profile_name,
'proxy_url': locked_profile['proxy'], 'success': False,
'error_type': 'Skipped (Expired URL)', 'details': details,
'is_tolerated_error': True
}
state_manager.log_event(event)
try:
base_path_str = str(claimed_task_path_host).rsplit('.LOCKED.', 1)[0]
processed_path = Path(f"{base_path_str}.processed")
claimed_task_path_host.rename(processed_path)
logger.debug(f"Renamed expired task file to '{processed_path.name}'.")
except (OSError, IndexError) as e:
logger.error(f"Failed to rename expired task file '{claimed_task_path_host}': {e}")
continue # Skip to the finally block
# The path to the task file inside the container needs to be relative to the host mount root.
# We must make the task path absolute first to correctly calculate the relative path from the absolute mount path.
relative_task_path = os.path.relpath(os.path.abspath(claimed_task_path_host), host_mount_path)
task_path_container = os.path.join(container_mount_path, relative_task_path)
# 3. Prepare config file on host in a temporary directory
temp_config_dir_host = tempfile.mkdtemp(prefix=f"docker-dl-config-{worker_id}-", dir=host_mount_path)
config_dir_name = os.path.basename(temp_config_dir_host)
config_dir_container = os.path.join(container_mount_path, config_dir_name)
environment = {}
base_config_content = ""
base_config_file = direct_policy.get('ytdlp_config_file')
if base_config_file:
config_path_to_read = Path(base_config_file)
if not config_path_to_read.exists():
config_path_to_read = Path(sp_utils._PROJECT_ROOT) / base_config_file
if config_path_to_read.exists():
try:
with open(config_path_to_read, 'r', encoding='utf-8') as base_f:
base_config_content = base_f.read()
except IOError as e:
logger.error(f"[Worker {worker_id}] Could not read ytdlp_config_file '{config_path_to_read}': {e}")
config_overrides = direct_policy.get('ytdlp_config_overrides', {}).copy()
config_overrides['proxy'] = locked_profile['proxy']
config_overrides['load-info-json'] = task_path_container
config_overrides['output'] = os.path.join(container_download_path, '%(id)s.f%(format_id)s.%(ext)s')
# Prevent yt-dlp from using a cache directory.
config_overrides['no-cache-dir'] = True
overrides_content = sp_utils._config_dict_to_flags_file_content(config_overrides)
raw_args_from_policy = direct_policy.get('ytdlp_raw_args', [])
raw_args_content = '\n'.join(raw_args_from_policy)
config_content = f"{base_config_content.strip()}\n\n# --- Overrides from policy ---\n{overrides_content}"
if raw_args_content:
config_content += f"\n\n# --- Raw args from policy ---\n{raw_args_content}"
logger.info(f"[Worker {worker_id}] [{profile_name}] Generated yt-dlp config:\n---config---\n{config_content}\n------------")
ytdlp_config_dir_host = os.path.join(temp_config_dir_host, 'yt-dlp')
os.makedirs(ytdlp_config_dir_host, exist_ok=True)
temp_config_file_host = os.path.join(ytdlp_config_dir_host, 'config')
with open(temp_config_file_host, 'w', encoding='utf-8') as f:
f.write(config_content)
# 4. Construct and run docker run command
volumes = {
os.path.abspath(host_mount_path): {'bind': container_mount_path, 'mode': 'ro'},
os.path.abspath(host_download_path): {'bind': container_download_path, 'mode': 'rw'}
}
# The command tells yt-dlp exactly where to find the config file we created.
command = ['yt-dlp', '--config-locations', os.path.join(config_dir_container, 'yt-dlp/config')]
logger.info(f"[Worker {worker_id}] [{profile_name}] Running docker command: {' '.join(shlex.quote(s) for s in command)}")
# For logging purposes, construct the full equivalent command line with host paths
log_config_overrides_for_host = config_overrides.copy()
log_config_overrides_for_host['load-info-json'] = str(claimed_task_path_host)
log_config_overrides_for_host['output'] = os.path.join(host_download_path, '%(id)s.f%(format_id)s.%(ext)s')
log_command_override = ['yt-dlp']
if base_config_content:
log_command_override.extend(sp_utils._parse_config_file_to_cli_args(base_config_content))
log_command_override.extend(sp_utils._config_dict_to_cli_flags(log_config_overrides_for_host))
raw_args_from_policy = direct_policy.get('ytdlp_raw_args', [])
for raw_arg in raw_args_from_policy:
log_command_override.extend(shlex.split(raw_arg))
# --- Live log parsing and activity recording ---
live_success_count = 0
live_failure_count = 0
live_tolerated_count = 0
activity_lock = threading.Lock()
tolerated_error_patterns = direct_policy.get('tolerated_error_patterns', [])
fatal_error_patterns = direct_policy.get('fatal_error_patterns', [])
def log_parser_callback(line):
nonlocal live_success_count, live_failure_count, live_tolerated_count, was_banned_by_parser
# Success is a high-priority check. Only record one success per task.
if '[download] 100% of' in line or 'has already been downloaded' in line:
with activity_lock:
# Only count one success per task
if live_success_count == 0:
live_success_count += 1
logger.info(f"[Worker {worker_id}] [{profile_name}] Live download success detected from log.")
profile_manager_instance.record_activity(profile_name, 'download')
return False
# Check for fatal patterns
for pattern in fatal_error_patterns:
if re.search(pattern, line, re.IGNORECASE):
with activity_lock:
live_failure_count += 1
logger.error(f"[Worker {worker_id}] [{profile_name}] Live FATAL download error #{live_failure_count} detected from log: {line}")
profile_manager_instance.record_activity(profile_name, 'download_error')
if direct_policy.get('ban_on_fatal_error_in_batch'):
logger.warning(f"Banning profile '{profile_name}' immediately due to fatal download error to stop container.")
profile_manager_instance.update_profile_state(profile_name, 'BANNED', 'Fatal error during download')
was_banned_by_parser = True
return True # Signal to stop container
return False # Do not stop if ban_on_fatal_error_in_batch is false
# Only process lines that contain ERROR: for tolerated/generic failures
if 'ERROR:' not in line:
return False
# Check if it's a tolerated error
for pattern in tolerated_error_patterns:
if re.search(pattern, line, re.IGNORECASE):
with activity_lock:
live_tolerated_count += 1
logger.warning(f"[Worker {worker_id}] [{profile_name}] Live TOLERATED download error #{live_tolerated_count} detected from log: {line}")
profile_manager_instance.record_activity(profile_name, 'tolerated_error')
return False
# If it's an ERROR: line and not tolerated, it's a failure
with activity_lock:
live_failure_count += 1
logger.warning(f"[Worker {worker_id}] [{profile_name}] Live download failure #{live_failure_count} detected from log: {line}")
profile_manager_instance.record_activity(profile_name, 'download_error')
return False
retcode, stdout, stderr, stop_reason = run_docker_container(
image_name=image_name,
command=command,
volumes=volumes,
stream_prefix=f"[Worker {worker_id} | docker-ytdlp] ",
network_name=network_name,
log_callback=log_parser_callback,
profile_manager=profile_manager_instance,
profile_name=profile_name,
environment=environment,
log_command_override=log_command_override
)
# 5. Post-process and record activity
full_output = f"{stdout}\n{stderr}"
is_bot_error = "Sign in to confirm you're not a bot" in full_output
if is_bot_error:
logger.warning(f"[Worker {worker_id}] [{profile_name}] Bot detection occurred during download. Marking as failure.")
# --- Final Outcome Determination ---
# Activity is now recorded live by the log parser. This block just determines
# the overall success/failure for logging and event reporting.
success = False
final_outcome = "unknown"
with activity_lock:
if live_success_count > 0:
success = True
final_outcome = "download"
elif live_failure_count > 0 or is_bot_error:
final_outcome = "download_error"
elif live_tolerated_count > 0:
final_outcome = "tolerated_error"
elif retcode == 0:
# Fallback if no logs were matched but exit was clean.
success = True
final_outcome = "download"
logger.warning(f"[Worker {worker_id}] [{profile_name}] No specific success/error log line matched, but exit code is 0. Assuming success, but this may indicate a parsing issue.")
# We record a success here as a fallback, in case the log parser missed it.
profile_manager_instance.record_activity(profile_name, 'download')
else:
final_outcome = "download_error"
logger.warning(f"[Worker {worker_id}] [{profile_name}] No specific error log line matched, but exit code was {retcode}. Recording a generic download_error.")
profile_manager_instance.record_activity(profile_name, 'download_error')
# --- Airflow Directory Logic ---
if success and d_policy.get('output_to_airflow_ready_dir'):
try:
# Get video_id from the info.json
with open(claimed_task_path_host, 'r', encoding='utf-8') as f:
info_data = json.load(f)
video_id = info_data.get('id')
if not video_id:
logger.error(f"[{profile_name}] Could not find video ID in '{claimed_task_path_host.name}' for moving files.")
else:
# Scan the download directory for all resulting media files
downloaded_files = [f for f in os.listdir(host_download_path) if not f.endswith(('.part', '.ytdl'))]
logger.info(f"[{profile_name}] Found {len(downloaded_files)} downloaded file(s) to process in '{host_download_path}'.")
if not downloaded_files:
logger.warning(f"[{profile_name}] Download reported success, but no media files were found in '{host_download_path}'.")
# --- Prepare the single destination directory for all artifacts ---
now = datetime.now()
rounded_minute = (now.minute // 10) * 10
timestamp_str = now.strftime('%Y%m%dT%H') + f"{rounded_minute:02d}"
base_path = d_policy.get('airflow_ready_dir_base_path', 'downloadfiles/videos/ready')
if not os.path.isabs(base_path):
base_path = os.path.join(sp_utils._PROJECT_ROOT, base_path)
final_dir_base = os.path.join(base_path, timestamp_str)
final_dir_path = os.path.join(final_dir_base, video_id)
os.makedirs(final_dir_path, exist_ok=True)
# --- Copy info.json once ---
new_info_json_name = f"info_{video_id}.json"
dest_info_json_path = os.path.join(final_dir_path, new_info_json_name)
if not os.path.exists(dest_info_json_path):
shutil.copy(claimed_task_path_host, dest_info_json_path)
logger.info(f"[{profile_name}] Copied info.json to {dest_info_json_path}")
# --- Process each downloaded media file ---
for downloaded_filename in downloaded_files:
downloaded_file_host_path = os.path.join(host_download_path, downloaded_filename)
if not os.path.exists(downloaded_file_host_path):
logger.warning(f"[{profile_name}] File '{downloaded_filename}' disappeared before it could be processed.")
continue
media_path = Path(downloaded_file_host_path)
# 1. Run ffprobe
if d_policy.get('run_ffprobe'):
ffprobe_filename = f"ffprobe_{media_path.stem}.json"
# Create ffprobe json in the final destination to avoid moving it
ffprobe_output_path = Path(final_dir_path) / ffprobe_filename
_run_ffprobe(media_path, ffprobe_output_path)
# 2. Move media file
final_media_path = Path(final_dir_path) / media_path.name
shutil.move(str(media_path), str(final_media_path))
logger.info(f"[{profile_name}] Moved media file '{media_path.name}' to {final_media_path}")
media_path = final_media_path # Update media_path to its new location
# 3. Cleanup the media file at its final location
if d_policy.get('cleanup'):
if os.path.exists(media_path):
_cleanup_media_file(media_path)
except Exception as e:
logger.error(f"[{profile_name}] Failed during post-download processing for Airflow: {e}", exc_info=True)
event_details = f"Docker download finished. Exit: {retcode}. Final Outcome: {final_outcome}. (Live successes: {live_success_count}, Live failures: {live_failure_count}, Live tolerated: {live_tolerated_count})"
if not success and stderr:
event_details += f" Stderr: {stderr.strip().splitlines()[-1] if stderr.strip() else 'N/A'}"
if stop_reason:
event_details += f" Aborted: {stop_reason}."
event = { 'type': 'direct_docker_download', 'profile': profile_name, 'proxy_url': locked_profile['proxy'], 'success': success, 'details': event_details }
state_manager.log_event(event)
logger.info(f"[Worker {worker_id}] [{profile_name}] Task processing complete. Worker will now unlock profile and attempt next task.")
# 6. Clean up task file
if not queue_policy:
# File-based mode: rename to .processed
try:
# The claimed_task_path_host has a .LOCKED suffix, remove it before adding .processed
base_path_str = str(claimed_task_path_host).rsplit('.LOCKED.', 1)[0]
processed_path = Path(f"{base_path_str}.processed")
claimed_task_path_host.rename(processed_path)
logger.debug(f"[{sp_utils.get_display_name(claimed_task_path_host)}] Renamed processed task file to '{processed_path.name}'.")
except (OSError, IndexError) as e:
logger.error(f"Failed to rename processed task file '{claimed_task_path_host}': {e}")
elif d_policy.get('rename_source_info_json_on_success'):
# Queue-based mode: respect rename policy
source_path_to_rename = task.get('info_json_path')
if success and source_path_to_rename and os.path.exists(source_path_to_rename):
try:
processed_path = source_path_to_rename + ".processed"
shutil.move(source_path_to_rename, processed_path)
logger.info(f"[Worker {worker_id}] Renamed source info.json to '{processed_path}'")
except Exception as e:
logger.warning(f"[Worker {worker_id}] Could not rename source info.json '{source_path_to_rename}': {e}")
# After this point, claimed_task_path_host is no longer valid.
# The metadata has already been read into auth_profile_name and auth_env.
else:
# This case should not be reached with the new task-first locking logic.
logger.warning(f"[Worker {worker_id}] Inconsistent state: locked profile '{profile_name}' but no task was claimed. Unlocking and continuing.")
except Exception as e:
logger.error(f"[Worker {worker_id}] An unexpected error occurred in the worker loop: {e}", exc_info=True)
if locked_profile:
profile_manager_instance.record_activity(locked_profile['name'], 'failure') # Generic failure
time.sleep(5)
finally:
if locked_profile:
if claimed_task_path_host:
# The auth_profile_name and auth_env variables were populated in the `try` block
# before the task file was renamed or deleted.
if auth_profile_name and auth_env:
auth_manager = get_auth_manager(profile_manager_instance, auth_env)
if auth_manager:
auth_manager.decrement_pending_downloads(auth_profile_name)
else:
logger.error(f"Could not get auth profile manager for env '{auth_env}'. Pending downloads counter will not be decremented.")
else:
logger.warning(f"Could not find auth profile name and/or auth_env in info.json metadata. Pending downloads counter will not be decremented. (Profile: {auth_profile_name}, Env: {auth_env})")
if was_banned_by_parser:
logger.info(f"[Worker {worker_id}] Profile '{locked_profile['name']}' was already banned by the log parser. Skipping unlock/cooldown.")
else:
last_used_profile_name = locked_profile['name']
cooldown = None
# Only apply cooldown if a task was actually claimed and processed.
if claimed_task_path_host:
# Enforcer is the only point where we configure to apply different policies,
# since we might restart enforcer, but won't restart stress-policy working on auth and downloads simultaneously.
# This is like applying a policy across multiple workers/machines without needing to restart each of them.
# DESIGN: The cooldown duration is not configured in the worker's policy.
# Instead, it is read from a central Redis key. This key is set by the
# policy-enforcer, making the enforcer the single source of truth for
# this policy. This allows changing the cooldown behavior without
# restarting the workers.
cooldown_source_value = profile_manager_instance.get_config('unlock_cooldown_seconds')
source_description = "Redis config"
if cooldown_source_value is None:
cooldown_source_value = d_policy.get('default_unlock_cooldown_seconds')
source_description = "local policy"
if cooldown_source_value is not None:
try:
# If from Redis, it's a string that needs parsing.
# If from local policy, it's already an int or list.
val = cooldown_source_value
if isinstance(val, str):
val = json.loads(val)
if isinstance(val, list) and len(val) == 2 and val[0] < val[1]:
cooldown = random.randint(val[0], val[1])
elif isinstance(val, int):
cooldown = val
if cooldown is not None:
logger.debug(f"Determined cooldown from {source_description}: {cooldown_source_value}")
except (json.JSONDecodeError, TypeError):
if isinstance(cooldown_source_value, str) and cooldown_source_value.isdigit():
cooldown = int(cooldown_source_value)
logger.debug(f"Determined cooldown from {source_description}: {cooldown_source_value}")
if cooldown:
logger.info(f"[Worker {worker_id}] Putting profile '{locked_profile['name']}' into COOLDOWN for {cooldown}s.")
profile_manager_instance.unlock_profile(
locked_profile['name'],
owner=owner_id,
rest_for_seconds=cooldown
)
if not queue_policy and claimed_task_path_host and os.path.exists(claimed_task_path_host):
# Clean up .LOCKED file in file-based mode
try: os.remove(claimed_task_path_host)
except OSError: pass
if task and task_id:
state_manager.remove_download_in_progress(task_id)
if temp_config_dir_host and os.path.exists(temp_config_dir_host):
try:
shutil.rmtree(temp_config_dir_host)
except OSError: pass
logger.info(f"[Worker {worker_id}] Worker loop finished.")
return []
except Exception as e:
logger.error(f"[Worker {worker_id}] A fatal, unhandled error occurred in the worker thread: {e}", exc_info=True)
return []
def run_direct_download_worker(worker_id, policy, state_manager, args, profile_manager_instance, running_processes, process_lock, profile_prefix=None):
"""A persistent worker for the 'direct_download_cli' orchestration mode."""
owner_id = f"direct-dl-worker-{worker_id}"
settings = policy.get('settings', {})
exec_control = policy.get('execution_control', {})
d_policy = policy.get('download_policy', {})
direct_policy = policy.get('direct_download_cli_policy', {})
# Prioritize the passed-in profile_prefix for worker pool compatibility.
if not profile_prefix:
profile_prefix = d_policy.get('profile_prefix')
# Unlike other modes, this worker can function without a prefix (it will try to lock any active profile).
# The check `if not profile_prefix` is removed to allow this flexibility.
output_dir = direct_policy.get('output_dir')
if not output_dir:
logger.error(f"[Worker {worker_id}] Direct download mode requires 'direct_download_cli_policy.output_dir'. Worker exiting.")
return []
os.makedirs(output_dir, exist_ok=True)
no_task_streak = 0
while not state_manager.shutdown_event.is_set():
locked_profile = None
claimed_task_path = None
try:
# 0. If no tasks were found, pause briefly.
if no_task_streak > 0:
polling_interval = exec_control.get('worker_polling_interval_seconds', 1)
# --- Add diagnostic logging ---
all_profiles_in_pool = profile_manager_instance.list_profiles()
profiles_in_prefix = [p for p in all_profiles_in_pool if p['name'].startswith(profile_prefix or '')]
if profiles_in_prefix:
state_counts = collections.Counter(p['state'] for p in profiles_in_prefix)
states_summary = ', '.join(f"{count} {state}" for state, count in sorted(state_counts.items()))
logger.info(f"[Worker {worker_id}] No tasks found for available profiles. Pool status ({profile_prefix or '*'}*): {states_summary}. Pausing for {polling_interval}s. (Streak: {no_task_streak})")
else:
logger.info(f"[Worker {worker_id}] No tasks found for available profiles. No profiles found with prefix '{profile_prefix or '*'}'. Pausing for {polling_interval}s. (Streak: {no_task_streak})")
# --- End diagnostic logging ---
time.sleep(polling_interval)
if state_manager.shutdown_event.is_set(): continue
# 1. Find a task and lock its associated profile
locked_profile, claimed_task_path = find_task_and_lock_profile(
profile_manager_instance, owner_id, profile_prefix, policy, worker_id
)
if not locked_profile:
no_task_streak += 1
# The main loop will pause if the streak continues.
continue
profile_name = locked_profile['name']
# We have a task and a lock.
if claimed_task_path:
no_task_streak = 0 # Reset streak
auth_profile_name, auth_env = None, None
# --- Read metadata before processing/deleting file ---
try:
with open(claimed_task_path, 'r', encoding='utf-8') as f:
info_data = json.load(f)
metadata = info_data.get('_ytops_metadata', {})
auth_profile_name = metadata.get('profile_name')
auth_env = metadata.get('auth_env')
except (IOError, json.JSONDecodeError) as e:
logger.error(f"Could not read info.json to get auth profile for decrementing counter: {e}")
# 3. Construct and run the command
ytdlp_cmd_str = direct_policy.get('ytdlp_command')
if not ytdlp_cmd_str:
logger.error(f"[Worker {worker_id}] Direct download mode requires 'direct_download_cli_policy.ytdlp_command'.")
break
proxy_url = locked_profile['proxy']
proxy_rename = direct_policy.get('proxy_rename')
if proxy_rename:
rename_rule = proxy_rename.strip("'\"")
if rename_rule.startswith('s/') and rename_rule.count('/') >= 2:
try:
parts = rename_rule.split('/')
proxy_url = re.sub(parts[1], parts[2], proxy_url)
except (re.error, IndexError):
logger.error(f"[Worker {worker_id}] Invalid proxy_rename rule: {proxy_rename}")
output_template = os.path.join(output_dir, '%(title)s - %(id)s.%(ext)s')
cmd = shlex.split(ytdlp_cmd_str)
cmd.extend(['--load-info-json', str(claimed_task_path)])
cmd.extend(['--proxy', proxy_url])
cmd.extend(['-o', output_template])
ytdlp_args = direct_policy.get('ytdlp_args')
if ytdlp_args:
cmd.extend(shlex.split(ytdlp_args))
if args.verbose and '--verbose' not in cmd:
cmd.append('--verbose')
custom_env = direct_policy.get('env_vars', {}).copy()
# --- PYTHONPATH for custom yt-dlp module ---
ytdlp_module_path = direct_policy.get('ytdlp_module_path')
if ytdlp_module_path:
existing_pythonpath = custom_env.get('PYTHONPATH', os.environ.get('PYTHONPATH', ''))
custom_env['PYTHONPATH'] = f"{ytdlp_module_path}{os.pathsep}{existing_pythonpath}".strip(os.pathsep)
logger.debug(f"[Worker {worker_id}] Using custom PYTHONPATH: {custom_env['PYTHONPATH']}")
# Pass profile info to the custom yt-dlp process
custom_env['YTDLP_PROFILE_NAME'] = profile_name
custom_env['YTDLP_PROXY_URL'] = locked_profile['proxy'] # Original proxy
env_name = profile_manager_instance.key_prefix.replace('_profile_mgmt_', '')
custom_env['YTDLP_SIM_MODE'] = env_name
# Create a per-profile cache directory and set XDG_CACHE_HOME
cache_dir_base = direct_policy.get('cache_dir_base', '.cache')
profile_cache_dir = os.path.join(cache_dir_base, profile_name)
try:
os.makedirs(profile_cache_dir, exist_ok=True)
custom_env['XDG_CACHE_HOME'] = profile_cache_dir
except OSError as e:
logger.error(f"[Worker {worker_id}] Failed to create cache directory '{profile_cache_dir}': {e}")
logger.info(f"[Worker {worker_id}] [{profile_name}] Processing task '{claimed_task_path.name}'...")
if args.dummy or args.dummy_batch:
logger.info(f"========== [Worker {worker_id}] BEGIN DUMMY DIRECT DOWNLOAD ==========")
logger.info(f"[Worker {worker_id}] Profile: {profile_name} | Task: {claimed_task_path.name}")
logger.info(f"[Worker {worker_id}] Would run command: {' '.join(shlex.quote(s) for s in cmd)}")
logger.info(f"[Worker {worker_id}] With environment: {custom_env}")
dummy_settings = policy.get('settings', {}).get('dummy_simulation_settings', {})
min_seconds = dummy_settings.get('download_min_seconds', 0.5)
max_seconds = dummy_settings.get('download_max_seconds', 1.5)
failure_rate = dummy_settings.get('download_failure_rate', 0.0)
skipped_rate = dummy_settings.get('download_skipped_failure_rate', 0.0)
time.sleep(random.uniform(min_seconds, max_seconds))
rand_val = random.random()
should_fail_skipped = rand_val < skipped_rate
should_fail_fatal = not should_fail_skipped and rand_val < (skipped_rate + failure_rate)
if should_fail_skipped:
logger.warning(f"[Worker {worker_id}] DUMMY: Simulating skipped download failure.")
profile_manager_instance.record_activity(profile_name, 'tolerated_error')
retcode = 0
stderr = "Dummy skipped failure"
elif should_fail_fatal:
logger.warning(f"[Worker {worker_id}] DUMMY: Simulating fatal download failure.")
profile_manager_instance.record_activity(profile_name, 'download_error')
retcode = 1
stderr = "Dummy fatal failure"
else:
logger.info(f"[Worker {worker_id}] DUMMY: Simulating download success.")
profile_manager_instance.record_activity(profile_name, 'download')
retcode = 0
stderr = ""
logger.info(f"========== [Worker {worker_id}] END DUMMY DIRECT DOWNLOAD ==========")
else:
logger.info(f"[Worker {worker_id}] [{profile_name}] Running command: {' '.join(shlex.quote(s) for s in cmd)}")
logger.info(f"[Worker {worker_id}] [{profile_name}] With environment: {custom_env}")
retcode, stdout, stderr = run_command(
cmd, running_processes, process_lock, env=custom_env, stream_output=args.verbose,
stream_prefix=f"[Worker {worker_id} | yt-dlp] "
)
# 4. Record activity
if not (args.dummy or args.dummy_batch):
success = (retcode == 0)
activity_type = 'download' if success else 'download_error'
logger.info(f"[Worker {worker_id}] Recording '{activity_type}' for profile '{profile_name}'.")
profile_manager_instance.record_activity(profile_name, activity_type)
event_details = f"Download finished. Exit code: {retcode}."
if retcode != 0 and stderr:
event_details += f" Stderr: {stderr.strip().splitlines()[-1]}"
event = {'type': 'direct_download', 'profile': profile_name, 'proxy_url': proxy_url, 'success': (retcode == 0), 'details': event_details}
state_manager.log_event(event)
# 5. Clean up the processed task file
try:
os.remove(claimed_task_path)
logger.debug(f"[{sp_utils.get_display_name(claimed_task_path)}] Removed processed task file.")
except OSError as e:
logger.error(f"Failed to remove processed task file '{claimed_task_path}': {e}")
else:
no_task_streak += 1
logger.info(f"[Worker {worker_id}] No tasks found for profile '{profile_name}'.")
except Exception as e:
logger.error(f"[Worker {worker_id}] An unexpected error occurred in the worker loop: {e}", exc_info=True)
if locked_profile:
profile_manager_instance.record_activity(locked_profile['name'], 'failure') # Generic failure
time.sleep(5)
finally:
if locked_profile:
if claimed_task_path:
# The auth_profile_name and auth_env variables were populated in the `try` block
# before the task file was deleted.
if auth_profile_name and auth_env:
auth_manager = get_auth_manager(profile_manager_instance, auth_env)
if auth_manager:
auth_manager.decrement_pending_downloads(auth_profile_name)
else:
logger.error(f"Could not get auth profile manager for env '{auth_env}'. Pending downloads counter will not be decremented.")
else:
logger.warning(f"Could not find auth profile name and/or auth_env in info.json metadata. Pending downloads counter will not be decremented. (Profile: {auth_profile_name}, Env: {auth_env})")
cooldown = None
if claimed_task_path:
# Enforcer is the only point where we configure to apply different policies,
# since we might restart enforcer, but won't restart stress-policy working on auth and downloads simultaneously.
# This is like applying a policy across multiple workers/machines without needing to restart each of them.
# DESIGN: The cooldown duration is not configured in the worker's policy.
# Instead, it is read from a central Redis key. This key is set by the
# policy-enforcer, making the enforcer the single source of truth for
# this policy. This allows changing the cooldown behavior without
# restarting the workers.
cooldown_config = profile_manager_instance.get_config('unlock_cooldown_seconds')
if cooldown_config:
try:
val = json.loads(cooldown_config)
if isinstance(val, list) and len(val) == 2 and val[0] < val[1]:
cooldown = random.randint(val[0], val[1])
elif isinstance(val, int):
cooldown = val
except (json.JSONDecodeError, TypeError):
if cooldown_config.isdigit():
cooldown = int(cooldown_config)
if cooldown:
logger.info(f"[Worker {worker_id}] Putting profile '{locked_profile['name']}' into COOLDOWN for {cooldown}s.")
profile_manager_instance.unlock_profile(
locked_profile['name'],
owner=owner_id,
rest_for_seconds=cooldown
)
locked_profile = None
logger.info(f"[Worker {worker_id}] Worker loop finished.")
return []