import collections import json import logging import os import random import re import shlex import sys import tempfile import shutil import threading import time from copy import deepcopy from datetime import datetime, timezone from pathlib import Path from . import utils as sp_utils from .process_runners import run_command, run_docker_container, get_worker_id from ..profile_manager_tool import ProfileManager logger = logging.getLogger(__name__) # --- Auth Profile Manager Cache --- # This is a cache to hold a ProfileManager instance for the auth simulation. # It's needed so that download workers can decrement the correct pending download counter. _auth_manager_cache = {} _auth_manager_lock = threading.Lock() def _get_auth_manager(current_manager, auth_env: str): """ Gets a ProfileManager instance for a specific auth simulation environment. It uses the auth_env provided from the info.json metadata. """ with _auth_manager_lock: if not auth_env: return None if auth_env in _auth_manager_cache: return _auth_manager_cache[auth_env] logger.info(f"Creating new ProfileManager for auth simulation env: '{auth_env}'") try: # Re-use connection settings from the current manager redis_conn_kwargs = current_manager.redis.connection_pool.connection_kwargs auth_key_prefix = f"{auth_env}_profile_mgmt_" auth_manager = ProfileManager( redis_host=redis_conn_kwargs.get('host'), redis_port=redis_conn_kwargs.get('port'), redis_password=redis_conn_kwargs.get('password'), key_prefix=auth_key_prefix ) _auth_manager_cache[auth_env] = auth_manager return auth_manager except Exception as e: logger.error(f"Failed to create ProfileManager for auth env '{auth_env}': {e}") return None def _run_download_logic(source, info_json_content, policy, state_manager, args, running_processes, process_lock, profile_name=None, profile_manager_instance=None): """Shared download logic for a single info.json.""" proxy_url = None if info_json_content: try: info_data = json.loads(info_json_content) proxy_url = info_data.get('_proxy_url') except (json.JSONDecodeError, AttributeError): logger.warning(f"[{sp_utils.get_display_name(source)}] Could not parse info.json to get proxy for download controls.") d_policy = policy.get('download_policy', {}) temp_download_dir = None local_policy = policy if d_policy.get('output_to_airflow_ready_dir'): local_policy = deepcopy(policy) temp_download_dir = tempfile.mkdtemp(prefix='stress-dl-video-') # This modification is safe because it's on a deep copy local_policy.setdefault('download_policy', {})['output_dir'] = temp_download_dir logger.info(f"[{sp_utils.get_display_name(source)}] Using temporary download directory: {temp_download_dir}") try: if not state_manager.check_and_update_download_rate_limit(proxy_url, local_policy): return [] state_manager.wait_for_proxy_cooldown(proxy_url, local_policy) results = process_info_json_cycle(source, info_json_content, local_policy, state_manager, args, running_processes, process_lock, proxy_url=proxy_url, profile_name=profile_name, profile_manager_instance=profile_manager_instance) state_manager.update_proxy_finish_time(proxy_url) # --- Post-download logic for Airflow dir --- if d_policy.get('output_to_airflow_ready_dir'): for result in results: if result.get('success') and result.get('downloaded_filepath'): try: video_id = result.get('video_id') if not video_id: # Fallback: extract from info.json content try: info_data = json.loads(info_json_content) video_id = info_data.get('id') except (json.JSONDecodeError, AttributeError): video_id = None if not video_id: logger.error(f"[{sp_utils.get_display_name(source)}] Could not find video ID in result for moving file.") continue now = datetime.now() rounded_minute = (now.minute // 10) * 10 timestamp_str = now.strftime('%Y%m%dT%H') + f"{rounded_minute:02d}" base_path = d_policy.get('airflow_ready_dir_base_path', 'downloadfiles/videos/ready') if not os.path.isabs(base_path): base_path = os.path.join(sp_utils._PROJECT_ROOT, base_path) final_dir_base = os.path.join(base_path, timestamp_str) final_dir_path = os.path.join(final_dir_base, video_id) os.makedirs(final_dir_path, exist_ok=True) downloaded_file = result['downloaded_filepath'] if os.path.exists(downloaded_file): shutil.move(downloaded_file, final_dir_path) logger.info(f"[{sp_utils.get_display_name(source)}] Moved media file to {final_dir_path}") # The source is the path to the task/info.json file. if isinstance(source, Path) and source.exists(): new_info_json_name = f"info_{video_id}.json" dest_info_json_path = os.path.join(final_dir_path, new_info_json_name) shutil.copy(source, dest_info_json_path) logger.info(f"[{sp_utils.get_display_name(source)}] Copied info.json to {dest_info_json_path}") except Exception as e: logger.error(f"[{sp_utils.get_display_name(source)}] Failed to move downloaded file to Airflow ready directory: {e}") return results finally: if temp_download_dir and os.path.exists(temp_download_dir): shutil.rmtree(temp_download_dir) logger.info(f"[{sp_utils.get_display_name(source)}] Cleaned up temporary directory: {temp_download_dir}") def process_profile_task(profile_name, file_list, policy, state_manager, cycle_num, args, running_processes, process_lock, profile_manager_instance=None): """Worker task for a profile, processing its files sequentially.""" logger.info(f"Worker {get_worker_id()} starting task for profile '{profile_name}' with {len(file_list)} files.") all_results = [] for i, file_path in enumerate(file_list): if state_manager.shutdown_event.is_set(): logger.info(f"Shutdown requested, stopping task for profile '{profile_name}'.") break try: with open(file_path, 'r', encoding='utf-8') as f: info_json_content = f.read() except (IOError, FileNotFoundError) as e: logger.error(f"[{sp_utils.get_display_name(file_path)}] Could not read info.json file: {e}") continue # Skip this file results_for_file = _run_download_logic(file_path, info_json_content, policy, state_manager, args, running_processes, process_lock, profile_name=profile_name, profile_manager_instance=profile_manager_instance) all_results.extend(results_for_file) # Mark file as processed if configured. This works for both 'once' and 'continuous' modes. settings = policy.get('settings', {}) if settings.get('directory_scan_mode') == 'continuous': state_manager.mark_file_as_processed(file_path) if settings.get('mark_processed_files'): try: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') new_path = file_path.parent / f"{file_path.name}.{timestamp}.processed" file_path.rename(new_path) logger.info(f"Marked '{file_path.name}' as processed by renaming to '{new_path.name}'") except (IOError, OSError) as e: logger.error(f"Failed to rename processed file '{file_path.name}': {e}") # Check for stop conditions after processing each file should_stop_profile = False for result in results_for_file: if not result['success']: s_conditions = policy.get('stop_conditions', {}) if s_conditions.get('on_failure') or \ (s_conditions.get('on_http_403') and result['error_type'] == 'HTTP 403') or \ (s_conditions.get('on_timeout') and result['error_type'] == 'Timeout'): logger.info(f"Stopping further processing for profile '{profile_name}' due to failure.") should_stop_profile = True break if should_stop_profile: break # Apply sleep between tasks for this profile if i < len(file_list) - 1: exec_control = policy.get('execution_control', {}) sleep_cfg = exec_control.get('sleep_between_tasks', {}) sleep_min = sleep_cfg.get('min_seconds', 0) sleep_max_val = sleep_cfg.get('max_seconds') if sleep_min > 0 or sleep_max_val is not None: sleep_max = sleep_min if sleep_max_val is None else sleep_max_val sleep_duration = 0 if sleep_max < sleep_min: logger.warning(f"sleep_between_tasks: max_seconds ({sleep_max}s) is less than min_seconds ({sleep_min}s). Using max_seconds as fixed sleep duration.") sleep_duration = sleep_max elif sleep_max > sleep_min: sleep_duration = random.uniform(sleep_min, sleep_max) else: # equal sleep_duration = sleep_min if sleep_duration > 0: logger.debug(f"Profile '{profile_name}' sleeping for {sleep_duration:.2f}s before next file.") # Interruptible sleep sleep_end_time = time.time() + sleep_duration while time.time() < sleep_end_time: if state_manager.shutdown_event.is_set(): break time.sleep(0.2) return all_results def run_download_worker(info_json_path, info_json_content, format_to_download, policy, args, running_processes, process_lock, state_manager, profile_name=None): """ Performs a single download attempt. Designed to be run in a worker thread. """ worker_id = get_worker_id() display_name = sp_utils.get_display_name(info_json_path) profile_log_part = f" [Profile: {profile_name}]" if profile_name else "" log_prefix = f"[Worker {worker_id}]{profile_log_part} [{display_name} @ {format_to_download}]" download_policy = policy.get('download_policy', {}) settings = policy.get('settings', {}) downloader = download_policy.get('downloader') # Get script command from settings, with fallback to download_policy for old format. script_cmd_str = settings.get('download_script') if not script_cmd_str: script_cmd_str = download_policy.get('script') if script_cmd_str: download_cmd = shlex.split(script_cmd_str) elif downloader == 'aria2c_rpc': download_cmd = [sys.executable, '-m', 'ytops_client.cli', 'download', 'aria-rpc'] elif downloader == 'native-cli': download_cmd = [sys.executable, '-m', 'ytops_client.cli', 'download', 'cli'] else: # Default to the new native-py downloader if downloader is 'native-py' or not specified. download_cmd = [sys.executable, '-m', 'ytops_client.cli', 'download', 'py'] download_cmd.extend(['-f', format_to_download]) if downloader == 'aria2c_rpc': if download_policy.get('aria_host'): download_cmd.extend(['--aria-host', str(download_policy['aria_host'])]) if download_policy.get('aria_port'): download_cmd.extend(['--aria-port', str(download_policy['aria_port'])]) if download_policy.get('aria_secret'): download_cmd.extend(['--aria-secret', str(download_policy['aria_secret'])]) if download_policy.get('output_dir'): download_cmd.extend(['--output-dir', str(download_policy['output_dir'])]) if download_policy.get('aria_remote_dir'): download_cmd.extend(['--remote-dir', str(download_policy['aria_remote_dir'])]) if download_policy.get('aria_fragments_dir'): download_cmd.extend(['--fragments-dir', str(download_policy['aria_fragments_dir'])]) # For stress testing, waiting is the desired default to get a success/fail result. # Allow disabling it by explicitly setting aria_wait: false in the policy. if download_policy.get('aria_wait', True): download_cmd.append('--wait') if download_policy.get('auto_merge_fragments'): download_cmd.append('--auto-merge-fragments') if download_policy.get('remove_fragments_after_merge'): download_cmd.append('--remove-fragments-after-merge') if download_policy.get('cleanup'): download_cmd.append('--cleanup') if download_policy.get('purge_on_complete'): download_cmd.append('--purge-on-complete') downloader_args = download_policy.get('downloader_args') proxy = download_policy.get('proxy') if proxy: # Note: proxy_rename is not supported for aria2c_rpc mode. proxy_arg = f"--all-proxy {shlex.quote(str(proxy))}" if downloader_args: downloader_args = f"{downloader_args} {proxy_arg}" else: downloader_args = proxy_arg if downloader_args: # For aria2c_rpc, the downloader_args value is passed directly to the script's --downloader-args option. download_cmd.extend(['--downloader-args', downloader_args]) elif downloader == 'native-cli': # This is the logic for the legacy download_tool.py (yt-dlp CLI wrapper). pause_seconds = download_policy.get('pause_before_download_seconds') if pause_seconds and isinstance(pause_seconds, (int, float)) and pause_seconds > 0: download_cmd.extend(['--pause', str(pause_seconds)]) if download_policy.get('continue_downloads'): download_cmd.append('--download-continue') # Add proxy if specified directly in the policy proxy = download_policy.get('proxy') if proxy: download_cmd.extend(['--proxy', str(proxy)]) proxy_rename = download_policy.get('proxy_rename') if proxy_rename: download_cmd.extend(['--proxy-rename', str(proxy_rename)]) extra_args = download_policy.get('extra_args') if extra_args: download_cmd.extend(shlex.split(extra_args)) # Note: 'downloader' here refers to yt-dlp's internal downloader, not our script. # The policy key 'external_downloader' is more clear, but we support 'downloader' for backward compatibility. ext_downloader = download_policy.get('external_downloader') or download_policy.get('downloader') if ext_downloader and ext_downloader not in ['native-cli', 'native-py', 'aria2c_rpc']: download_cmd.extend(['--downloader', str(ext_downloader)]) downloader_args = download_policy.get('downloader_args') if downloader_args: download_cmd.extend(['--downloader-args', str(downloader_args)]) if download_policy.get('merge_output_format'): download_cmd.extend(['--merge-output-format', str(download_policy['merge_output_format'])]) if download_policy.get('merge_output_format'): download_cmd.extend(['--merge-output-format', str(download_policy['merge_output_format'])]) if download_policy.get('cleanup'): download_cmd.append('--cleanup') else: # This is the default logic for the new native-py downloader. if download_policy.get('output_to_buffer'): download_cmd.append('--output-buffer') else: # --output-dir is only relevant if not outputting to buffer. if download_policy.get('output_dir'): download_cmd.extend(['--output-dir', str(download_policy['output_dir'])]) if download_policy.get('config'): download_cmd.extend(['--config', str(download_policy['config'])]) if download_policy.get('temp_path'): download_cmd.extend(['--temp-path', str(download_policy['temp_path'])]) if download_policy.get('continue_downloads'): download_cmd.append('--download-continue') pause_seconds = download_policy.get('pause_before_download_seconds') if pause_seconds and isinstance(pause_seconds, (int, float)) and pause_seconds > 0: download_cmd.extend(['--pause', str(pause_seconds)]) proxy = download_policy.get('proxy') if proxy: download_cmd.extend(['--proxy', str(proxy)]) proxy_rename = download_policy.get('proxy_rename') if proxy_rename: download_cmd.extend(['--proxy-rename', str(proxy_rename)]) # The 'extra_args' from the policy are for the download script itself, not for yt-dlp. # We need to split them and add them to the command. extra_args = download_policy.get('extra_args') if extra_args: download_cmd.extend(shlex.split(extra_args)) # Pass through downloader settings for yt-dlp to use # e.g. to tell yt-dlp to use aria2c as its backend ext_downloader = download_policy.get('external_downloader') if ext_downloader: download_cmd.extend(['--downloader', str(ext_downloader)]) downloader_args = download_policy.get('downloader_args') if downloader_args: download_cmd.extend(['--downloader-args', str(downloader_args)]) if args.dummy: # Create a copy to add the info.json path for logging, without modifying the original log_cmd = list(download_cmd) if isinstance(info_json_path, Path) and info_json_path.exists(): log_cmd.extend(['--load-info-json', str(info_json_path)]) else: log_cmd.extend(['--load-info-json', '']) logger.info(f"{log_prefix} Dummy mode: simulating download...") logger.info(f"{log_prefix} Dummy mode: Would run command: {' '.join(shlex.quote(s) for s in log_cmd)}") dummy_settings = policy.get('settings', {}).get('dummy_simulation_settings', {}) min_seconds = dummy_settings.get('download_min_seconds', 1.0) max_seconds = dummy_settings.get('download_max_seconds', 3.0) failure_rate = dummy_settings.get('download_failure_rate', 0.0) skipped_rate = dummy_settings.get('download_skipped_failure_rate', 0.0) sleep_duration = random.uniform(min_seconds, max_seconds) logger.info(f"{log_prefix} Dummy mode: simulating download for {sleep_duration:.2f}s (from policy range {min_seconds}-{max_seconds}s).") time.sleep(sleep_duration) # Simulate work rand_val = random.random() should_fail_skipped = rand_val < skipped_rate should_fail_fatal = not should_fail_skipped and rand_val < (skipped_rate + failure_rate) if should_fail_skipped: logger.warning(f"{log_prefix} Dummy mode: Injecting simulated skipped download failure.") return { 'type': 'download', 'path': str(info_json_path), 'format': format_to_download, 'success': False, 'error_type': 'DummySkippedFailure', 'details': 'FAIL (Dummy mode, skipped)', 'downloaded_bytes': 0, 'profile': profile_name, 'downloaded_filepath': None, 'is_tolerated_error': True } if should_fail_fatal: logger.warning(f"{log_prefix} Dummy mode: Injecting simulated fatal download failure.") return { 'type': 'download', 'path': str(info_json_path), 'format': format_to_download, 'success': False, 'error_type': 'DummyFailure', 'details': 'FAIL (Dummy mode, fatal)', 'downloaded_bytes': 0, 'profile': profile_name, 'downloaded_filepath': None } downloaded_filepath = f'/dev/null/{display_name}.mp4' if download_policy.get('output_to_airflow_ready_dir'): output_dir = download_policy.get('output_dir') if output_dir and os.path.isdir(output_dir): try: dummy_path_obj = Path(output_dir) / f"{display_name}.mp4" dummy_path_obj.touch() downloaded_filepath = str(dummy_path_obj) logger.info(f"{log_prefix} Dummy mode: created dummy file for Airflow move: {downloaded_filepath}") except OSError as e: logger.error(f"{log_prefix} Dummy mode: failed to create dummy file in '{output_dir}': {e}") return { 'type': 'download', 'path': str(info_json_path), 'format': format_to_download, 'success': True, 'error_type': None, 'details': 'OK (Dummy mode)', 'downloaded_bytes': random.randint(100000, 5000000), 'profile': profile_name, 'downloaded_filepath': downloaded_filepath } logger.info(f"{log_prefix} Kicking off download process...") temp_info_file_path = None try: if isinstance(info_json_path, Path) and info_json_path.exists(): # The info.json is already in a file, pass its path directly. download_cmd.extend(['--load-info-json', str(info_json_path)]) else: # The info.json content is in memory, so write it to a temporary file. import tempfile with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json', encoding='utf-8') as temp_f: temp_f.write(info_json_content) temp_info_file_path = temp_f.name download_cmd.extend(['--load-info-json', temp_info_file_path]) cmd_str_for_log = ' '.join(shlex.quote(s) for s in download_cmd) logger.info(f"{log_prefix} Running download command: {cmd_str_for_log}") output_to_buffer = download_policy.get('output_to_buffer', False) retcode, stdout, stderr = run_command( download_cmd, running_processes, process_lock, binary_stdout=output_to_buffer, stream_output=getattr(args, 'print_downloader_log', False), stream_prefix=f"{log_prefix} | " ) finally: if temp_info_file_path and os.path.exists(temp_info_file_path): os.unlink(temp_info_file_path) is_403_error = "HTTP Error 403" in stderr is_timeout_error = "Read timed out" in stderr output_to_buffer = download_policy.get('output_to_buffer', False) # Parse stdout to find the downloaded file path. # The download scripts print the final path, sometimes with a prefix. downloaded_filepath = None if stdout and not output_to_buffer: lines = stdout.strip().split('\n') if lines: last_line = lines[-1].strip() # Handle aria-rpc output format: "Download... successful: " aria_match = re.search(r'successful: (.+)', last_line) if aria_match: path_from_aria = aria_match.group(1).strip() if os.path.exists(path_from_aria): downloaded_filepath = path_from_aria else: logger.warning(f"[{display_name}] Path from aria-rpc output does not exist: '{path_from_aria}'") # Handle native-py/cli output format (just the path) elif os.path.exists(last_line): downloaded_filepath = last_line result = { 'type': 'download', 'path': str(info_json_path), 'format': format_to_download, 'success': retcode == 0, 'error_type': None, 'details': '', 'downloaded_bytes': 0, 'profile': profile_name, 'downloaded_filepath': downloaded_filepath } if retcode == 0: details_str = "OK" size_in_bytes = 0 if output_to_buffer: # The most accurate size is the length of the stdout buffer. size_in_bytes = len(stdout) # stdout is bytes details_str += f" (Buffered {sp_utils.format_size(size_in_bytes)})" else: size_match = re.search(r'\[download\]\s+100%\s+of\s+~?([0-9.]+)(B|KiB|MiB|GiB)', stderr) if size_match: value = float(size_match.group(1)) unit = size_match.group(2) multipliers = {"B": 1, "KiB": 1024, "MiB": 1024**2, "GiB": 1024**3} size_in_bytes = int(value * multipliers.get(unit, 1)) details_str += f" ({size_match.group(1)}{unit})" result['downloaded_bytes'] = size_in_bytes result['details'] = details_str else: # Check for shutdown first. This is the most likely cause for an abrupt non-zero exit. if state_manager.shutdown_event.is_set(): result['error_type'] = 'Cancelled' result['details'] = 'Task cancelled during shutdown.' else: # Check both stdout and stderr for error messages, as logging might be directed to stdout. full_output = f"{stdout}\n{stderr}" # Look for common error indicators from both yt-dlp and our scripts. error_lines = [ line for line in full_output.strip().split('\n') if 'ERROR:' in line or ' - ERROR - ' in line or 'DownloadError:' in line or 'Traceback' in line ] if error_lines: # Try to get the most specific part of the error. details = error_lines[-1].strip() # Our log format is "timestamp - logger - LEVEL - message" if ' - ERROR - ' in details: details = details.split(' - ERROR - ', 1)[-1] result['details'] = details else: # Fallback to last non-empty line of stderr if no explicit "ERROR" line found stderr_lines = [line for line in stderr.strip().split('\n') if line.strip()] result['details'] = stderr_lines[-1].strip() if stderr_lines else "Unknown error" if is_403_error: result['error_type'] = 'HTTP 403' elif is_timeout_error: result['error_type'] = 'Timeout' else: result['error_type'] = f'Exit Code {retcode}' return result def process_info_json_cycle(path, content, policy, state_manager, args, running_processes, process_lock, proxy_url=None, profile_name=None, profile_manager_instance=None): """ Processes one info.json file for one cycle, downloading selected formats. """ results = [] display_name = sp_utils.get_display_name(path) d_policy = policy.get('download_policy', {}) s_conditions = policy.get('stop_conditions', {}) try: info_data = json.loads(content) # If the task file specifies a format, use it instead of the policy's format list. # This is for granular tasks generated by the task-generator tool. if '_ytops_download_format' in info_data: format_selection = info_data['_ytops_download_format'] logger.info(f"[{display_name}] Using format '{format_selection}' from task file.") else: format_selection = d_policy.get('formats', '') available_formats = [f['format_id'] for f in info_data.get('formats', [])] if not available_formats: logger.warning(f"[{display_name}] No formats found in info.json. Skipping.") return [] formats_to_test = [] if format_selection == 'all': formats_to_test = available_formats elif format_selection.startswith('random:'): percent = float(format_selection.split(':')[1].rstrip('%')) count = max(1, int(len(available_formats) * (percent / 100.0))) formats_to_test = random.sample(available_formats, k=count) elif format_selection.startswith('random_from:'): choices = [f.strip() for f in format_selection.split(':', 1)[1].split(',')] valid_choices = [f for f in choices if f in available_formats] if valid_choices: formats_to_test = [random.choice(valid_choices)] else: # If the format selection contains complex selector characters (other than comma), # treat the entire string as a single format selector for yt-dlp to interpret. # Otherwise, split by comma to test each specified format ID individually. if any(c in format_selection for c in '/+[]()'): requested_formats = [format_selection] else: requested_formats = [f.strip() for f in format_selection.split(',') if f.strip()] formats_to_test = [] selector_keywords = ('best', 'worst', 'bestvideo', 'bestaudio') for req_fmt in requested_formats: # Treat as a selector and pass through if it contains special characters # or starts with a known selector keyword. if any(c in req_fmt for c in '/+[]()') or req_fmt.startswith(selector_keywords): formats_to_test.append(req_fmt) continue # Otherwise, treat as a specific format ID that must exist. # Check for exact match first. if req_fmt in available_formats: formats_to_test.append(req_fmt) continue # If no exact match, check for formats that start with this ID + '-' and then digits # e.g., req_fmt '140' should match '140-0' but not '140-something'. prefix_match_re = re.compile(rf'^{re.escape(req_fmt)}-\d+$') first_match = next((af for af in available_formats if prefix_match_re.match(af)), None) if first_match: logger.info(f"[{display_name}] Requested format '{req_fmt}' not found. Using first available match: '{first_match}'.") formats_to_test.append(first_match) else: logger.warning(f"[{display_name}] Requested format '{req_fmt}' not found in available formats and is not a recognized selector. Skipping this format.") except json.JSONDecodeError: logger.error(f"[{display_name}] Failed to parse info.json. Skipping.") return [] for i, format_id in enumerate(formats_to_test): if state_manager.shutdown_event.is_set(): logger.info(f"Shutdown requested, stopping further format tests for {display_name}.") break # Check if the format URL is expired before attempting to download format_details = next((f for f in info_data.get('formats', []) if f.get('format_id') == format_id), None) # If format_id is a complex selector, it won't be found directly. As a heuristic, # check the expiration of the first format URL, as they typically share the same expiration. if not format_details and any(c in format_id for c in '/+[]()'): available_formats_list = info_data.get('formats', []) if available_formats_list: format_details = available_formats_list[0] # The check is enabled by default and can be disabled via policy. if d_policy.get('check_url_expiration', True) and format_details and 'url' in format_details: url_to_check = format_details['url'] time_shift_minutes = d_policy.get('expire_time_shift_minutes', 0) status, time_left_seconds = sp_utils.check_url_expiry(url_to_check, time_shift_minutes) logger.debug(f"[{display_name}] URL expiration check for format '{format_id}': status={status}, time_left={time_left_seconds:.0f}s") if status == 'expired': details = "Download URL is expired" if time_shift_minutes > 0 and time_left_seconds > 0: logger.warning(f"[{display_name}] Skipping format '{format_id}' because its URL will expire in {time_left_seconds/60:.1f}m (within {time_shift_minutes}m time-shift).") details = f"URL will expire within {time_shift_minutes}m time-shift" else: logger.warning(f"[{display_name}] Skipping format '{format_id}' because its URL is expired.") result = { 'type': 'download', 'path': str(path), 'format': format_id, 'success': False, 'error_type': 'Skipped (Expired URL)', 'details': details, 'downloaded_bytes': 0, 'is_tolerated_error': True } if proxy_url: result['proxy_url'] = proxy_url if profile_manager_instance and profile_name: profile_manager_instance.record_activity(profile_name, 'tolerated_error') state_manager.log_event(result) results.append(result) continue # Move to the next format elif status == 'no_expiry_info': logger.debug(f"[{display_name}] No valid 'expire' parameter found in format URL for '{format_id}'. Skipping expiration check.") result = run_download_worker(path, content, format_id, policy, args, running_processes, process_lock, state_manager, profile_name=profile_name) if 'id' in info_data: result['video_id'] = info_data['id'] if proxy_url: result['proxy_url'] = proxy_url # Record download attempt/error if a profile is being used. if profile_manager_instance and profile_name: if result.get('success'): profile_manager_instance.record_activity(profile_name, 'download') elif result.get('error_type') == 'Cancelled': pass # Do not record cancellations elif result.get('is_tolerated_error'): profile_manager_instance.record_activity(profile_name, 'tolerated_error') else: profile_manager_instance.record_activity(profile_name, 'download_error') state_manager.log_event(result) results.append(result) worker_id = get_worker_id() status = "SUCCESS" if result['success'] else f"FAILURE ({result['error_type']})" profile_log_part = f" [Profile: {profile_name}]" if profile_name else "" logger.info(f"[Worker {worker_id}]{profile_log_part} Result for {display_name} (format {format_id}): {status} - {result.get('details', 'OK')}") if not result['success']: if s_conditions.get('on_failure') or \ (s_conditions.get('on_http_403') and result['error_type'] == 'HTTP 403') or \ (s_conditions.get('on_timeout') and result['error_type'] == 'Timeout'): logger.info(f"Stopping further format tests for {display_name} in this cycle due to failure.") break sleep_cfg = d_policy.get('sleep_between_formats', {}) sleep_min = sleep_cfg.get('min_seconds', 0) if sleep_min > 0 and i < len(formats_to_test) - 1: sleep_max = sleep_cfg.get('max_seconds') or sleep_min if sleep_max > sleep_min: sleep_duration = random.uniform(sleep_min, sleep_max) else: sleep_duration = sleep_min logger.debug(f"Sleeping for {sleep_duration:.2f}s between formats for {display_name}.") # Interruptible sleep sleep_end_time = time.time() + sleep_duration while time.time() < sleep_end_time: if state_manager.shutdown_event.is_set(): break time.sleep(0.2) return results def run_throughput_worker(worker_id, policy, state_manager, args, profile_manager_instance, running_processes, process_lock): """A persistent worker for the 'throughput' orchestration mode.""" owner_id = f"throughput-worker-{worker_id}" settings = policy.get('settings', {}) exec_control = policy.get('execution_control', {}) d_policy = policy.get('download_policy', {}) profile_prefix = d_policy.get('profile_prefix') if not profile_prefix: logger.error(f"[Worker {worker_id}] Throughput mode requires 'download_policy.profile_prefix'. Worker exiting.") return [] no_task_streak = 0 while not state_manager.shutdown_event.is_set(): locked_profile = None claimed_task_path = None try: # 0. If no tasks were found previously, pause briefly. if no_task_streak > 0: polling_interval = exec_control.get('worker_polling_interval_seconds', 1) logger.info(f"[Worker {worker_id}] No tasks found in previous attempt(s). Pausing for {polling_interval}s. (Streak: {no_task_streak})") time.sleep(polling_interval) if state_manager.shutdown_event.is_set(): continue # 1. Find a task and lock its associated profile locked_profile, claimed_task_path = find_task_and_lock_profile( profile_manager_instance, owner_id, profile_prefix, policy, worker_id ) if not locked_profile: # No task/profile combo was available. no_task_streak += 1 polling_interval = exec_control.get('worker_polling_interval_seconds', 1) logger.info(f"[Worker {worker_id}] No available tasks found for any active profiles. Pausing for {polling_interval}s.") time.sleep(polling_interval) continue profile_name = locked_profile['name'] # We have a task and a lock. if claimed_task_path: no_task_streak = 0 # Reset streak # 3. Process the task try: with open(claimed_task_path, 'r', encoding='utf-8') as f: info_json_content = f.read() except (IOError, FileNotFoundError) as e: logger.error(f"[{sp_utils.get_display_name(claimed_task_path)}] Could not read claimed task file: {e}") # Unlock profile and continue, file might be corrupted profile_manager_instance.unlock_profile(profile_name, owner=owner_id) locked_profile = None # Clean up the bad file try: claimed_task_path.unlink() except OSError: pass continue # The locked profile's proxy MUST be used for the download. local_policy = deepcopy(policy) local_policy.setdefault('download_policy', {})['proxy'] = locked_profile['proxy'] _run_download_logic( source=claimed_task_path, info_json_content=info_json_content, policy=local_policy, state_manager=state_manager, args=args, running_processes=running_processes, process_lock=process_lock, profile_name=profile_name, profile_manager_instance=profile_manager_instance ) # 4. Clean up the processed task file try: os.remove(claimed_task_path) logger.debug(f"[{sp_utils.get_display_name(claimed_task_path)}] Removed processed task file.") except OSError as e: logger.error(f"Failed to remove processed task file '{claimed_task_path}': {e}") else: # This case should not be reached with the new task-first locking logic. # If it is, it means find_task_and_lock_profile returned a profile but no task. logger.warning(f"[Worker {worker_id}] Inconsistent state: locked profile '{profile_name}' but no task was claimed. Unlocking and continuing.") except Exception as e: logger.error(f"[Worker {worker_id}] An unexpected error occurred in the worker loop: {e}", exc_info=True) time.sleep(5) # Pause before retrying to avoid spamming errors finally: if locked_profile: # 5. Unlock the profile. Only apply cooldown if a task was processed. cooldown = None if claimed_task_path: # Enforcer is the only point where we configure to apply different policies, # since we might restart enforcer, but won't restart stress-policy working on auth and downloads simultaneously. # This is like applying a policy across multiple workers/machines without needing to restart each of them. # DESIGN: The cooldown duration is not configured in the worker's policy. # Instead, it is read from a central Redis key. This key is set by the # policy-enforcer, making the enforcer the single source of truth for # this policy. This allows changing the cooldown behavior without # restarting the workers. cooldown_config = profile_manager_instance.get_config('unlock_cooldown_seconds') if cooldown_config: try: val = json.loads(cooldown_config) if isinstance(val, list) and len(val) == 2 and val[0] < val[1]: cooldown = random.randint(val[0], val[1]) elif isinstance(val, int): cooldown = val except (json.JSONDecodeError, TypeError): if cooldown_config.isdigit(): cooldown = int(cooldown_config) if cooldown: logger.info(f"[Worker {worker_id}] Putting profile '{locked_profile['name']}' into COOLDOWN for {cooldown}s.") profile_manager_instance.unlock_profile( locked_profile['name'], owner=owner_id, rest_for_seconds=cooldown ) locked_profile = None # 6. Throughput is now controlled by the enforcer via the profile's # 'unlock_cooldown_seconds' policy, which puts the profile into a # RESTING state. The worker does not need to sleep here and can # immediately try to lock a new profile to maximize throughput. logger.info(f"[Worker {worker_id}] Worker loop finished.") return [] # This function doesn't return results directly def _post_process_and_move_info_json(file_path, profile_name, proxy_url, policy, worker_id, profile_manager_instance=None): """Helper to post-process a single info.json file and move it to the final directory.""" direct_policy = policy.get('direct_docker_cli_policy', {}) settings = policy.get('settings', {}) save_dir = settings.get('save_info_json_dir') if not save_dir: return False video_id = "unknown" try: # Use a short delay and retry mechanism to handle cases where the file is not yet fully written. for attempt in range(3): try: with open(file_path, 'r+', encoding='utf-8') as f: info_data = json.load(f) video_id = info_data.get('id', 'unknown') env_name = profile_manager_instance.key_prefix.replace('_profile_mgmt_', '') if profile_manager_instance else 'unknown' info_data['_ytops_metadata'] = { 'profile_name': profile_name, 'proxy_url': proxy_url, 'generation_timestamp_utc': datetime.now(timezone.utc).isoformat(), 'auth_env': env_name } f.seek(0) json.dump(info_data, f, indent=2) f.truncate() break # Success except (json.JSONDecodeError, IOError) as e: if attempt < 2: time.sleep(0.2) else: raise e final_path = Path(save_dir) / file_path.name rename_template = direct_policy.get('rename_file_template') if rename_template: sanitized_proxy = re.sub(r'[:/]', '_', proxy_url) new_name = rename_template.format( video_id=video_id, profile_name=profile_name, proxy=sanitized_proxy ) final_path = Path(save_dir) / new_name # Use rename for atomic move os.rename(str(file_path), str(final_path)) logger.info(f"[Worker {worker_id}] Post-processed and moved info.json to '{final_path}'") return True except (IOError, json.JSONDecodeError, OSError) as e: logger.error(f"[Worker {worker_id}] Error post-processing '{file_path.name}' (video: {video_id}): {e}") return False def find_task_and_lock_profile(profile_manager, owner_id, profile_prefix, policy, worker_id): """ Scans for an available task and locks the specific ACTIVE profile that generated it. This preserves a 1-to-1 relationship between a profile and its tasks. Returns a tuple of (locked_profile_dict, claimed_task_path_obj) or (None, None). """ settings = policy.get('settings', {}) info_json_dir = settings.get('info_json_dir') if not info_json_dir: return None, None # 1. Get ACTIVE profiles from Redis. active_profiles = profile_manager.list_profiles(state_filter='ACTIVE') active_profile_names = {p['name'] for p in active_profiles if p['name'].startswith(profile_prefix)} if not active_profile_names: return None, None # 2. Get all available task files. try: task_files = list(Path(info_json_dir).glob('*.json')) except FileNotFoundError: logger.warning(f"Info JSON directory not found during scan: {info_json_dir}") return None, None if not task_files: return None, None profile_regex_str = settings.get('profile_extraction_regex') if not profile_regex_str: logger.error(f"[Worker {worker_id}] The task-locking strategy requires 'settings.profile_extraction_regex' to be defined in the policy.") return None, None try: profile_regex = re.compile(profile_regex_str) except re.error as e: logger.error(f"Invalid profile_extraction_regex in policy: '{profile_regex_str}'. Error: {e}") return None, None # 3. Shuffle tasks to distribute load if multiple workers are looking. random.shuffle(task_files) # 4. Iterate through tasks and try to lock their corresponding ACTIVE profile. for task_path in task_files: match = profile_regex.search(task_path.name) if not (match and match.groups()): continue profile_name = match.group(1) if profile_name in active_profile_names: # Found a task for an active profile. Try to lock it. locked_profile = profile_manager.lock_profile(owner=owner_id, specific_profile_name=profile_name) if locked_profile: # Success! Claim the file. locked_path = task_path.with_suffix(f"{task_path.suffix}.LOCKED.{worker_id}") try: task_path.rename(locked_path) logger.info(f"[Worker {worker_id}] Locked profile '{profile_name}' and claimed its task '{task_path.name}'.") return locked_profile, locked_path except FileNotFoundError: logger.warning(f"[Worker {worker_id}] Task '{task_path.name}' was claimed by another worker. Unlocking '{profile_name}'.") profile_manager.unlock_profile(profile_name, owner=owner_id) continue # Try next task except OSError as e: logger.error(f"[Worker {worker_id}] Error claiming task file '{task_path.name}': {e}") profile_manager.unlock_profile(profile_name, owner=owner_id) continue # No suitable task/profile combo found. logger.debug("Found task files, but none correspond to any currently ACTIVE profiles.") return None, None def run_direct_batch_worker(worker_id, policy, state_manager, args, profile_manager_instance, urls_list, running_processes, process_lock): """A worker for the 'direct_batch_cli' orchestration mode.""" owner_id = f"direct-batch-worker-{worker_id}" settings = policy.get('settings', {}) exec_control = policy.get('execution_control', {}) gen_policy = policy.get('info_json_generation_policy', {}) direct_policy = policy.get('direct_batch_cli_policy', {}) profile_prefix = gen_policy.get('profile_prefix') if not profile_prefix: logger.error(f"[Worker {worker_id}] Direct batch mode requires 'info_json_generation_policy.profile_prefix'. Worker exiting.") return [] batch_size = direct_policy.get('batch_size') if not batch_size: logger.error(f"[Worker {worker_id}] Direct batch mode requires 'direct_batch_cli_policy.batch_size'. Worker exiting.") return [] save_dir = settings.get('save_info_json_dir') if not save_dir: logger.error(f"[Worker {worker_id}] Direct batch mode requires 'settings.save_info_json_dir'. Worker exiting.") return [] os.makedirs(save_dir, exist_ok=True) last_used_profile_name = None while not state_manager.shutdown_event.is_set(): locked_profile = None temp_batch_file = None # --- Variables for robust finalization --- files_created = 0 url_batch_len = 0 batch_started = False # --- try: # 1. Lock a profile locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix) # --- New logic to avoid immediate reuse --- avoid_reuse = direct_policy.get('avoid_immediate_profile_reuse', False) if avoid_reuse and locked_profile and last_used_profile_name and locked_profile['name'] == last_used_profile_name: logger.info(f"[Worker {worker_id}] Re-locked same profile '{locked_profile['name']}'. Unlocking and pausing to allow for rotation.") profile_manager_instance.unlock_profile(locked_profile['name'], owner=owner_id) wait_seconds = direct_policy.get('avoid_reuse_max_wait_seconds', 5) time.sleep(wait_seconds) # After waiting, try to lock again. logger.info(f"[Worker {worker_id}] Attempting to lock a new profile after waiting.") locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix) if locked_profile and locked_profile['name'] == last_used_profile_name: logger.warning(f"[Worker {worker_id}] Still locking the same profile '{locked_profile['name']}' after waiting. Proceeding to use it to avoid getting stuck.") elif locked_profile: logger.info(f"[Worker {worker_id}] Switched to a different profile after waiting: '{locked_profile['name']}'.") # --- End new logic --- if not locked_profile: polling_interval = exec_control.get('worker_polling_interval_seconds', 1) # --- Add diagnostic logging --- all_profiles_in_pool = profile_manager_instance.list_profiles() profiles_in_prefix = [p for p in all_profiles_in_pool if p['name'].startswith(profile_prefix)] if profiles_in_prefix: state_counts = collections.Counter(p['state'] for p in profiles_in_prefix) states_summary = ', '.join(f"{count} {state}" for state, count in sorted(state_counts.items())) logger.info(f"[Worker {worker_id}] No auth profiles available to lock. Pool status ({profile_prefix}*): {states_summary}. Pausing for {polling_interval}s.") else: logger.info(f"[Worker {worker_id}] No auth profiles available to lock. No profiles found with prefix '{profile_prefix}'. Pausing for {polling_interval}s.") # --- End diagnostic logging --- time.sleep(polling_interval) continue profile_name = locked_profile['name'] proxy_url = locked_profile['proxy'] # 2. Get a batch of URLs from the shared list url_batch, start_idx = state_manager.get_next_url_batch(batch_size, urls_list) if not url_batch: logger.info(f"[Worker {worker_id}] No more URLs to process. Worker exiting.") break # Exit the while loop url_batch_len = len(url_batch) batch_started = True # Preemptively increment the counter to avoid race conditions with download workers. profile_manager_instance.increment_pending_downloads(profile_name, url_batch_len) logger.info(f"[Worker {worker_id}] [{profile_name}] Preemptively incremented pending downloads by {url_batch_len} for the upcoming batch.") end_idx = start_idx + len(url_batch) logger.info(f"[Worker {worker_id}] [{profile_name}] Processing batch of {len(url_batch)} URLs (lines {start_idx + 1}-{end_idx} from source).") video_ids_in_batch = {sp_utils.get_video_id(u) for u in url_batch} # 3. Write URLs to a temporary batch file with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', encoding='utf-8') as f: temp_batch_file = f.name f.write('\n'.join(url_batch)) # 4. Construct and run the command ytdlp_cmd_str = direct_policy.get('ytdlp_command') if not ytdlp_cmd_str: logger.error(f"[Worker {worker_id}] Direct batch mode requires 'direct_batch_cli_policy.ytdlp_command'.") break cmd = shlex.split(ytdlp_cmd_str) cmd.extend(['--batch-file', temp_batch_file]) cmd.extend(['--proxy', proxy_url]) # The output template should not include the .info.json extension, as # yt-dlp adds it automatically when --write-info-json is used. output_template_str = direct_policy.get('ytdlp_output_template', '%(id)s') ytdlp_args = direct_policy.get('ytdlp_args') custom_env = direct_policy.get('env_vars', {}).copy() # --- PYTHONPATH for custom yt-dlp module --- ytdlp_module_path = direct_policy.get('ytdlp_module_path') if ytdlp_module_path: existing_pythonpath = custom_env.get('PYTHONPATH', os.environ.get('PYTHONPATH', '')) # Prepend the custom path to PYTHONPATH to give it precedence custom_env['PYTHONPATH'] = f"{ytdlp_module_path}{os.pathsep}{existing_pythonpath}".strip(os.pathsep) logger.debug(f"[Worker {worker_id}] Using custom PYTHONPATH: {custom_env['PYTHONPATH']}") custom_env['YTDLP_PROFILE_NAME'] = profile_name custom_env['YTDLP_PROXY_URL'] = proxy_url env_name = profile_manager_instance.key_prefix.replace('_profile_mgmt_', '') custom_env['YTDLP_SIM_MODE'] = env_name # Create a per-profile cache directory and set XDG_CACHE_HOME cache_dir_base = direct_policy.get('cache_dir_base', '.cache') profile_cache_dir = os.path.join(cache_dir_base, profile_name) try: os.makedirs(profile_cache_dir, exist_ok=True) custom_env['XDG_CACHE_HOME'] = profile_cache_dir except OSError as e: logger.error(f"[Worker {worker_id}] Failed to create cache directory '{profile_cache_dir}': {e}") # --- Manage User-Agent --- # Use a consistent User-Agent per profile, storing it in the profile's cache directory. user_agent = None user_agent_file = os.path.join(profile_cache_dir, 'user_agent.txt') try: if os.path.exists(user_agent_file): with open(user_agent_file, 'r', encoding='utf-8') as f: user_agent = f.read().strip() if not user_agent: # File doesn't exist or is empty user_agent = sp_utils.generate_user_agent_from_policy(policy) with open(user_agent_file, 'w', encoding='utf-8') as f: f.write(user_agent) logger.info(f"[{profile_name}] Generated and saved new User-Agent: '{user_agent}'") else: logger.info(f"[{profile_name}] Using existing User-Agent from cache: '{user_agent}'") except IOError as e: logger.error(f"[Worker {worker_id}] Error accessing User-Agent file '{user_agent_file}': {e}. Using generated UA for this run.") user_agent = sp_utils.generate_user_agent_from_policy(policy) # fallback # Add proxy rename from policy if specified, for custom yt-dlp forks proxy_rename = direct_policy.get('ytdlp_proxy_rename') if proxy_rename: custom_env['YTDLP_PROXY_RENAME'] = proxy_rename if user_agent: cmd.extend(['--user-agent', user_agent]) if ytdlp_args: cmd.extend(shlex.split(ytdlp_args)) if args.verbose and '--verbose' not in cmd: cmd.append('--verbose') if args.dummy: # In dummy mode, we replace the real yt-dlp command with our dummy script. # The dummy script will handle Redis interactions (checking for bans, recording activity). # For logging, construct what the real command would have been log_cmd = list(cmd) # cmd has most args now log_cmd.extend(['-o', os.path.join('temp_dir', output_template_str)]) logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY MODE: Would run real command: {' '.join(shlex.quote(s) for s in log_cmd)}") logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY MODE: With environment for real command: {custom_env}") cmd = [ sys.executable, '-m', 'ytops_client.cli', 'yt-dlp-dummy' ] # The orchestrator is still responsible for managing temp directories and post-processing. with tempfile.TemporaryDirectory(prefix=f"ytdlp-dummy-batch-{worker_id}-") as temp_output_dir: output_template = os.path.join(temp_output_dir, output_template_str) cmd.extend(['--batch-file', temp_batch_file]) cmd.extend(['-o', output_template]) if args.verbose: cmd.append('--verbose') # Pass failure rates and Redis connection info to the dummy script via environment dummy_settings = policy.get('settings', {}).get('dummy_simulation_settings', {}) auth_failure_rate = dummy_settings.get('auth_failure_rate', 0.0) auth_skipped_rate = dummy_settings.get('auth_skipped_failure_rate', 0.0) custom_env['YTDLP_DUMMY_FAILURE_RATE'] = auth_failure_rate custom_env['YTDLP_DUMMY_SKIPPED_FAILURE_RATE'] = auth_skipped_rate custom_env['REDIS_HOST'] = profile_manager_instance.redis.connection_pool.connection_kwargs.get('host') custom_env['REDIS_PORT'] = profile_manager_instance.redis.connection_pool.connection_kwargs.get('port') redis_password = profile_manager_instance.redis.connection_pool.connection_kwargs.get('password') if redis_password: custom_env['REDIS_PASSWORD'] = redis_password logger.info(f"[Worker {worker_id}] [{profile_name}] DUMMY MODE: Running dummy yt-dlp script with updated environment: {custom_env}") retcode, stdout, stderr = run_command( cmd, running_processes, process_lock, env=custom_env, stream_output=args.verbose, stream_prefix=f"[Worker {worker_id} | yt-dlp-dummy] " ) # --- Post-processing is the same as in non-dummy mode --- processed_files = list(Path(temp_output_dir).glob('*.json')) for temp_path in processed_files: files_created += 1 video_id = "unknown" try: # The orchestrator injects its own metadata after the fact. with open(temp_path, 'r+', encoding='utf-8') as f: info_data = json.load(f) video_id = info_data.get('id', 'unknown') env_name = profile_manager_instance.key_prefix.replace('_profile_mgmt_', '') info_data['_ytops_metadata'] = { 'profile_name': profile_name, 'proxy_url': proxy_url, 'generation_timestamp_utc': datetime.now(timezone.utc).isoformat(), 'auth_env': env_name } f.seek(0) json.dump(info_data, f, indent=2) f.truncate() final_path = Path(save_dir) / temp_path.name rename_template = direct_policy.get('rename_file_template') if rename_template: sanitized_proxy = re.sub(r'[:/]', '_', proxy_url) new_name = rename_template.format( video_id=video_id, profile_name=profile_name, proxy=sanitized_proxy ) final_path = Path(save_dir) / new_name shutil.move(str(temp_path), str(final_path)) logger.info(f"[Worker {worker_id}] Post-processed and moved info.json to '{final_path}'") except (IOError, json.JSONDecodeError, OSError) as e: logger.error(f"[Worker {worker_id}] DUMMY MODE: Error post-processing '{temp_path.name}' (video: {video_id}): {e}") # The orchestrator still determines overall batch success and logs its own event. # It does NOT call record_activity, as the dummy script did that per-URL. success = (retcode == 0 and files_created > 0) if not success: reason = f"exit code was {retcode}" if retcode != 0 else f"0 files created" logger.warning(f"[Worker {worker_id}] [{profile_name}] DUMMY MODE: Marking batch as FAILED. Reason: {reason}.") # Record batch stats state_manager.record_batch_result(success, len(url_batch), profile_name=profile_name) event_details = f"Dummy batch completed. Files created: {files_created}/{len(url_batch)}." if not success and stderr: event_details += f" Stderr: {stderr.strip().splitlines()[-1]}" event = { 'type': 'fetch_batch', 'profile': profile_name, 'proxy_url': proxy_url, 'success': success, 'details': event_details, 'video_count': len(url_batch) } state_manager.log_event(event) else: with tempfile.TemporaryDirectory(prefix=f"ytdlp-batch-{worker_id}-") as temp_output_dir: output_template = os.path.join(temp_output_dir, output_template_str) cmd.extend(['-o', output_template]) logger.info(f"[Worker {worker_id}] [{profile_name}] Processing batch of {len(url_batch)} URLs...") logger.info(f"[Worker {worker_id}] [{profile_name}] Running command: {' '.join(shlex.quote(s) for s in cmd)}") logger.info(f"[Worker {worker_id}] [{profile_name}] With environment: {custom_env}") retcode, stdout, stderr = run_command( cmd, running_processes, process_lock, env=custom_env, stream_output=args.verbose, stream_prefix=f"[Worker {worker_id} | yt-dlp] " ) is_bot_error = "Sign in to confirm you're not a bot" in stderr if is_bot_error: logger.warning(f"[Worker {worker_id}] [{profile_name}] Bot detection occurred during batch. Marking as failure.") processed_files = list(Path(temp_output_dir).glob('*.json')) for temp_path in processed_files: files_created += 1 video_id = "unknown" try: with open(temp_path, 'r+', encoding='utf-8') as f: info_data = json.load(f) video_id = info_data.get('id', 'unknown') env_name = profile_manager_instance.key_prefix.replace('_profile_mgmt_', '') info_data['_ytops_metadata'] = { 'profile_name': profile_name, 'proxy_url': proxy_url, 'generation_timestamp_utc': datetime.now(timezone.utc).isoformat(), 'auth_env': env_name } f.seek(0) json.dump(info_data, f, indent=2) f.truncate() final_path = Path(save_dir) / temp_path.name rename_template = direct_policy.get('rename_file_template') if rename_template: sanitized_proxy = re.sub(r'[:/]', '_', proxy_url) new_name = rename_template.format( video_id=video_id, profile_name=profile_name, proxy=sanitized_proxy ) final_path = Path(save_dir) / new_name shutil.move(str(temp_path), str(final_path)) logger.info(f"[Worker {worker_id}] Post-processed and moved info.json to '{final_path}'") except (IOError, json.JSONDecodeError, OSError) as e: logger.error(f"[Worker {worker_id}] Error post-processing '{temp_path.name}' (video: {video_id}): {e}") # The orchestrator records per-URL success/failure for the profile. # A batch is considered an overall success for logging if it had no fatal errors # and produced at least one file. success = (files_created > 0 and not is_bot_error) if not success: reason = "bot detection occurred" if is_bot_error else f"0 files created out of {len(url_batch)}" logger.warning(f"[Worker {worker_id}] [{profile_name}] Marking batch as FAILED. Reason: {reason}.") # Record batch stats for overall orchestrator health state_manager.record_batch_result(success, len(url_batch), profile_name=profile_name) # In this mode, the custom yt-dlp script is responsible for recording # per-URL activity ('success', 'failure', 'tolerated_error') directly into Redis. # The orchestrator does not record activity here to avoid double-counting. logger.info(f"[Worker {worker_id}] [{profile_name}] Batch finished. Per-URL activity was recorded by the yt-dlp script.") event_details = f"Batch completed. Exit: {retcode}. Files created: {files_created}/{len(url_batch)}." if not success and stderr: if is_bot_error: event_details += " Stderr: Bot detection occurred." else: event_details += f" Stderr: {stderr.strip().splitlines()[-1]}" event = { 'type': 'fetch_batch', 'profile': profile_name, 'proxy_url': proxy_url, 'success': success, 'details': event_details, 'video_count': len(url_batch) } state_manager.log_event(event) except Exception as e: logger.error(f"[Worker {worker_id}] Unexpected error in worker loop: {e}", exc_info=True) if locked_profile: profile_manager_instance.record_activity(locked_profile['name'], 'failure') finally: if locked_profile and batch_started: # --- Reconcile pending downloads counter --- # This is in the finally block to guarantee it runs even if post-processing fails. adjustment = files_created - url_batch_len if adjustment != 0: logger.warning(f"[Worker {worker_id}] [{profile_name}] Reconciling pending downloads. Batch created {files_created}/{url_batch_len} files. Adjusting by {adjustment}.") profile_manager_instance.increment_pending_downloads(locked_profile['name'], adjustment) if locked_profile: last_used_profile_name = locked_profile['name'] cooldown = None # DESIGN: The cooldown duration is not configured in the worker's policy. # Instead, it is read from a central Redis key. This key is set by the # policy-enforcer, making the enforcer the single source of truth for # this policy. This allows changing the cooldown behavior without # restarting the workers. cooldown_config = profile_manager_instance.get_config('unlock_cooldown_seconds') if cooldown_config: try: val = json.loads(cooldown_config) if isinstance(val, list) and len(val) == 2 and val[0] < val[1]: cooldown = random.randint(val[0], val[1]) elif isinstance(val, int): cooldown = val except (json.JSONDecodeError, TypeError): if cooldown_config.isdigit(): cooldown = int(cooldown_config) if cooldown: logger.info(f"[Worker {worker_id}] Putting profile '{locked_profile['name']}' into COOLDOWN for {cooldown}s.") profile_manager_instance.unlock_profile( locked_profile['name'], owner=owner_id, rest_for_seconds=cooldown ) if temp_batch_file and os.path.exists(temp_batch_file): os.unlink(temp_batch_file) logger.info(f"[Worker {worker_id}] Worker loop finished.") return [] def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_manager_instance, urls_list, running_processes, process_lock): """A worker for the 'direct_docker_cli' orchestration mode (fetch_only).""" owner_id = f"direct-docker-worker-{worker_id}" settings = policy.get('settings', {}) exec_control = policy.get('execution_control', {}) gen_policy = policy.get('info_json_generation_policy', {}) direct_policy = policy.get('direct_docker_cli_policy', {}) profile_prefix = gen_policy.get('profile_prefix') if not profile_prefix: logger.error(f"[Worker {worker_id}] Direct docker mode requires 'info_json_generation_policy.profile_prefix'. Worker exiting.") return [] batch_size = direct_policy.get('batch_size') if not batch_size: logger.error(f"[Worker {worker_id}] Direct docker mode requires 'direct_docker_cli_policy.batch_size'. Worker exiting.") return [] save_dir = settings.get('save_info_json_dir') if not save_dir: logger.error(f"[Worker {worker_id}] Direct docker mode requires 'settings.save_info_json_dir'. Worker exiting.") return [] os.makedirs(save_dir, exist_ok=True) # --- Docker specific config --- image_name = direct_policy.get('docker_image_name') host_mount_path = os.path.abspath(direct_policy.get('docker_host_mount_path')) container_mount_path = direct_policy.get('docker_container_mount_path') host_cache_path = direct_policy.get('docker_host_cache_path') if host_cache_path: host_cache_path = os.path.abspath(host_cache_path) container_cache_path = direct_policy.get('docker_container_cache_path') network_name = direct_policy.get('docker_network_name') if not all([image_name, host_mount_path, container_mount_path]): logger.error(f"[Worker {worker_id}] Direct docker mode requires 'docker_image_name', 'docker_host_mount_path', and 'docker_container_mount_path'. Worker exiting.") return [] try: os.makedirs(host_mount_path, exist_ok=True) except OSError as e: logger.error(f"[Worker {worker_id}] Could not create docker_host_mount_path '{host_mount_path}': {e}. Worker exiting.") return [] last_used_profile_name = None while not state_manager.shutdown_event.is_set(): locked_profile = None temp_task_dir_host = None # --- Variables for robust finalization --- live_success_count = 0 url_batch_len = 0 batch_started = False # --- try: # 1. Lock a profile locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix) # --- New logic to avoid immediate reuse --- avoid_reuse = direct_policy.get('avoid_immediate_profile_reuse', False) if avoid_reuse and locked_profile and last_used_profile_name and locked_profile['name'] == last_used_profile_name: logger.info(f"[Worker {worker_id}] Re-locked same profile '{locked_profile['name']}'. Unlocking and pausing to allow for rotation.") profile_manager_instance.unlock_profile(locked_profile['name'], owner=owner_id) wait_seconds = direct_policy.get('avoid_reuse_max_wait_seconds', 5) time.sleep(wait_seconds) # After waiting, try to lock again. logger.info(f"[Worker {worker_id}] Attempting to lock a new profile after waiting.") locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix) if locked_profile and locked_profile['name'] == last_used_profile_name: logger.warning(f"[Worker {worker_id}] Still locking the same profile '{locked_profile['name']}' after waiting. Proceeding to use it to avoid getting stuck.") elif locked_profile: logger.info(f"[Worker {worker_id}] Switched to a different profile after waiting: '{locked_profile['name']}'.") # --- End new logic --- if not locked_profile: polling_interval = exec_control.get('worker_polling_interval_seconds', 1) # --- Add diagnostic logging --- all_profiles_in_pool = profile_manager_instance.list_profiles() profiles_in_prefix = [p for p in all_profiles_in_pool if p['name'].startswith(profile_prefix)] if profiles_in_prefix: state_counts = collections.Counter(p['state'] for p in profiles_in_prefix) states_summary = ', '.join(f"{count} {state}" for state, count in sorted(state_counts.items())) logger.info(f"[Worker {worker_id}] No auth profiles available to lock. Pool status ({profile_prefix}*): {states_summary}. Pausing for {polling_interval}s.") else: logger.info(f"[Worker {worker_id}] No auth profiles available to lock. No profiles found with prefix '{profile_prefix}'. Pausing for {polling_interval}s.") # --- End diagnostic logging --- time.sleep(polling_interval) continue profile_name = locked_profile['name'] proxy_url = locked_profile['proxy'] # --- Manage User-Agent, Visitor ID, and Cache Directory --- user_agent = None visitor_id = None environment = {} profile_cache_dir_host = None if host_cache_path: profile_cache_dir_host = os.path.join(host_cache_path, profile_name) try: os.makedirs(profile_cache_dir_host, exist_ok=True) if container_cache_path: environment['XDG_CACHE_HOME'] = container_cache_path # --- User-Agent --- user_agent_file = os.path.join(profile_cache_dir_host, 'user_agent.txt') if os.path.exists(user_agent_file): with open(user_agent_file, 'r', encoding='utf-8') as f: user_agent = f.read().strip() if not user_agent: user_agent = sp_utils.generate_user_agent_from_policy(policy) with open(user_agent_file, 'w', encoding='utf-8') as f: f.write(user_agent) logger.info(f"[{profile_name}] Generated and saved new User-Agent: '{user_agent}'") else: logger.info(f"[{profile_name}] Using existing User-Agent from cache: '{user_agent}'") # --- Visitor ID --- if direct_policy.get('track_visitor_id'): visitor_id_file = os.path.join(profile_cache_dir_host, 'visitor_id.txt') if os.path.exists(visitor_id_file): with open(visitor_id_file, 'r', encoding='utf-8') as f: visitor_id = f.read().strip() if visitor_id: logger.info(f"[{profile_name}] Using existing Visitor ID from cache: '{visitor_id}'") except IOError as e: logger.error(f"[Worker {worker_id}] Error accessing cache file in '{profile_cache_dir_host}': {e}. Using generated UA for this run.") user_agent = sp_utils.generate_user_agent_from_policy(policy) # Fallback for UA else: # Fallback if no cache is configured for auth simulation user_agent = sp_utils.generate_user_agent_from_policy(policy) # 2. Get a batch of URLs url_batch, start_idx = state_manager.get_next_url_batch(batch_size, urls_list) if not url_batch: logger.info(f"[Worker {worker_id}] No more URLs to process. Worker exiting.") break url_batch_len = len(url_batch) batch_started = True # Preemptively increment the counter to avoid race conditions with download workers. profile_manager_instance.increment_pending_downloads(profile_name, url_batch_len) logger.info(f"[Worker {worker_id}] [{profile_name}] Preemptively incremented pending downloads by {url_batch_len} for the upcoming batch.") end_idx = start_idx + len(url_batch) logger.info(f"[Worker {worker_id}] [{profile_name}] Processing batch of {len(url_batch)} URLs (lines {start_idx + 1}-{end_idx} from source).") # 3. Prepare files on the host temp_task_dir_host = tempfile.mkdtemp(prefix=f"docker-task-{worker_id}-", dir=host_mount_path) task_dir_name = os.path.basename(temp_task_dir_host) task_dir_container = os.path.join(container_mount_path, task_dir_name) # Set XDG_CONFIG_HOME for yt-dlp to find the config automatically environment['XDG_CONFIG_HOME'] = task_dir_container # Write batch file temp_batch_file_host = os.path.join(temp_task_dir_host, 'batch.txt') with open(temp_batch_file_host, 'w', encoding='utf-8') as f: f.write('\n'.join(url_batch)) # Write yt-dlp config file base_config_content = "" base_config_file = direct_policy.get('ytdlp_config_file') if base_config_file: # Try path as-is first, then relative to project root. config_path_to_read = Path(base_config_file) if not config_path_to_read.exists(): config_path_to_read = Path(sp_utils._PROJECT_ROOT) / base_config_file if config_path_to_read.exists(): try: with open(config_path_to_read, 'r', encoding='utf-8') as f: base_config_content = f.read() logger.info(f"[Worker {worker_id}] [{profile_name}] Loaded base config from '{config_path_to_read}'") except IOError as e: logger.error(f"[Worker {worker_id}] Could not read ytdlp_config_file '{config_path_to_read}': {e}") else: logger.error(f"[Worker {worker_id}] Could not find ytdlp_config_file: '{base_config_file}'") config_overrides = direct_policy.get('ytdlp_config_overrides', {}).copy() if direct_policy.get('use_cookies') and host_cache_path and container_cache_path: try: cookie_file_host = os.path.join(profile_cache_dir_host, 'cookies.txt') # Ensure the file exists and has the Netscape header if it's empty. if not os.path.exists(cookie_file_host) or os.path.getsize(cookie_file_host) == 0: with open(cookie_file_host, 'w', encoding='utf-8') as f: f.write("# Netscape HTTP Cookie File\n") logger.info(f"[{profile_name}] Created/initialized cookie file with header: {cookie_file_host}") cookie_file_container = os.path.join(container_cache_path, 'cookies.txt') config_overrides['cookies'] = cookie_file_container logger.info(f"[{profile_name}] Using persistent cookie jar: {cookie_file_host}") except (IOError, OSError) as e: logger.error(f"[Worker {worker_id}] Could not create cookie file in '{profile_cache_dir_host}': {e}") # Inject per-task values into overrides config_overrides['proxy'] = proxy_url config_overrides['batch-file'] = os.path.join(task_dir_container, 'batch.txt') # The output template should not include the .info.json extension, as # yt-dlp adds it automatically when --write-info-json is used. config_overrides['output'] = os.path.join(task_dir_container, '%(id)s') if user_agent: config_overrides['user-agent'] = user_agent overrides_content = sp_utils._config_dict_to_flags_file_content(config_overrides) raw_args_from_policy = direct_policy.get('ytdlp_raw_args', []) # --- Inject visitor_id into raw args if available --- if visitor_id: # Start with a copy of the raw args from policy new_raw_args = list(raw_args_from_policy) # --- Handle youtube extractor args --- youtube_arg_index = -1 original_youtube_value = None for i, arg in enumerate(new_raw_args): if arg.startswith('--extractor-args') and 'youtube:' in arg: youtube_arg_index = i try: parts = shlex.split(arg) if len(parts) == 2 and parts[1].startswith('youtube:'): original_youtube_value = parts[1] except ValueError: logger.warning(f"Could not parse extractor-arg, will not modify: {arg}") break # Found it, stop searching if youtube_arg_index != -1 and original_youtube_value: # Modify existing youtube arg new_value = f'{original_youtube_value.rstrip()};visitor_data={visitor_id}' if 'skip=' in new_value: new_value = re.sub(r'skip=([^;\'"]*)', r'skip=\1,webpage,configs', new_value) else: new_value += ';skip=webpage,configs' new_raw_args[youtube_arg_index] = f'--extractor-args "{new_value}"' else: # Add new youtube arg logger.warning(f"[{profile_name}] No existing '--extractor-args youtube:...' found. Adding a new one for visitor_id.") new_raw_args.append(f'--extractor-args "youtube:visitor_data={visitor_id};skip=webpage,configs"') # --- Handle youtubetab extractor args --- youtubetab_arg_index = -1 for i, arg in enumerate(new_raw_args): if arg.startswith('--extractor-args') and 'youtubetab:' in arg: youtubetab_arg_index = i break # The request is to set/replace this argument new_youtubetab_arg = '--extractor-args "youtubetab:skip=webpage"' if youtubetab_arg_index != -1: # Replace existing new_raw_args[youtubetab_arg_index] = new_youtubetab_arg else: # Add new new_raw_args.append(new_youtubetab_arg) raw_args_from_policy = new_raw_args # --- End visitor_id injection --- raw_args_content = '\n'.join(raw_args_from_policy) config_content = f"{base_config_content.strip()}\n\n# --- Overrides from policy ---\n{overrides_content}" if raw_args_content: config_content += f"\n\n# --- Raw args from policy ---\n{raw_args_content}" logger.info(f"[Worker {worker_id}] [{profile_name}] Generated yt-dlp config file content:\n---config---\n{config_content}\n------------") # Create the directory structure yt-dlp expects inside the temp task dir ytdlp_config_dir_host = os.path.join(temp_task_dir_host, 'yt-dlp') os.makedirs(ytdlp_config_dir_host, exist_ok=True) temp_config_file_host = os.path.join(ytdlp_config_dir_host, 'config') with open(temp_config_file_host, 'w', encoding='utf-8') as f: f.write(config_content) # 4. Construct and run the 'docker run' command volumes = { host_mount_path: { 'bind': container_mount_path, 'mode': 'rw' } } if host_cache_path and container_cache_path: profile_cache_dir_host = os.path.join(host_cache_path, profile_name) os.makedirs(profile_cache_dir_host, exist_ok=True) volumes[profile_cache_dir_host] = { 'bind': container_cache_path, 'mode': 'rw' } # The command tells yt-dlp where to find the config file we created. # We still set XDG_CONFIG_HOME for any other config it might look for. command = ['yt-dlp', '--config-locations', os.path.join(task_dir_container, 'yt-dlp/config')] logger.info(f"[Worker {worker_id}] [{profile_name}] Running docker command: {' '.join(shlex.quote(s) for s in command)}") # For logging purposes, construct the full equivalent command line with host paths log_config_overrides_for_host = config_overrides.copy() log_config_overrides_for_host['batch-file'] = temp_batch_file_host log_config_overrides_for_host['output'] = os.path.join(temp_task_dir_host, '%(id)s') if 'cookies' in log_config_overrides_for_host and host_cache_path: log_config_overrides_for_host['cookies'] = os.path.join(profile_cache_dir_host, 'cookies.txt') log_command_override = ['yt-dlp'] if base_config_content: log_command_override.extend(sp_utils._parse_config_file_to_cli_args(base_config_content)) log_command_override.extend(sp_utils._config_dict_to_cli_flags(log_config_overrides_for_host)) for raw_arg in raw_args_from_policy: log_command_override.extend(shlex.split(raw_arg)) # --- Live log parsing and activity recording --- live_failure_count = 0 live_tolerated_count = 0 activity_lock = threading.Lock() # Get error patterns from policy for live parsing tolerated_error_patterns = direct_policy.get('tolerated_error_patterns', []) fatal_error_patterns = direct_policy.get('fatal_error_patterns', []) def log_parser_callback(line): nonlocal live_success_count, live_failure_count, live_tolerated_count # --- Visitor ID Extraction --- if direct_policy.get('track_visitor_id') and profile_cache_dir_host: # e.g., [debug] [youtube] [pot:cache] TRACE: Retrieved cache spec PoTokenCacheSpec(key_bindings={'t': 'webpo', 'cb': '...', 'cbt': 'visitor_id', ... match = re.search(r"'cb': '([^']*)', 'cbt': 'visitor_id'", line) if match: new_visitor_id = match.group(1) logger.info(f"[Worker {worker_id}] [{profile_name}] Detected new Visitor ID: {new_visitor_id}") try: visitor_id_file = os.path.join(profile_cache_dir_host, 'visitor_id.txt') with open(visitor_id_file, 'w', encoding='utf-8') as f: f.write(new_visitor_id) logger.info(f"[{profile_name}] Saved new Visitor ID to cache.") except IOError as e: logger.error(f"[{profile_name}] Failed to save new Visitor ID to cache: {e}") # Success is the highest priority check if '[info] Writing video metadata as JSON to:' in line: with activity_lock: live_success_count += 1 logger.info(f"[Worker {worker_id}] [{profile_name}] Live success #{live_success_count} detected from log.") profile_manager_instance.record_activity(profile_name, 'success') # --- Immediate post-processing --- try: path_match = re.search(r"Writing video metadata as JSON to: '?([^']+)'?$", line) if not path_match: path_match = re.search(r"Writing video metadata as JSON to: (.*)$", line) if path_match: container_file_path = path_match.group(1).strip() if container_file_path.startswith(container_mount_path): relative_path = os.path.relpath(container_file_path, container_mount_path) host_file_path = os.path.join(host_mount_path, relative_path) # The file might not exist immediately. for _ in range(5): # Retry for up to 0.5s if os.path.exists(host_file_path): break time.sleep(0.1) if os.path.exists(host_file_path): _post_process_and_move_info_json( Path(host_file_path), profile_name, proxy_url, policy, worker_id, profile_manager_instance=profile_manager_instance ) else: logger.warning(f"File from log not found on host for immediate processing: {host_file_path}") except Exception as e: logger.error(f"Error during immediate post-processing from log line: {e}") # --- End immediate post-processing --- return False # Check for fatal patterns (e.g., bot detection) which might not start with ERROR: for pattern in fatal_error_patterns: if re.search(pattern, line, re.IGNORECASE): with activity_lock: live_failure_count += 1 logger.error(f"[Worker {worker_id}] [{profile_name}] Live FATAL error #{live_failure_count} detected from log: {line}") profile_manager_instance.record_activity(profile_name, 'failure') if direct_policy.get('ban_on_fatal_error_in_batch'): logger.warning(f"Banning profile '{profile_name}' immediately due to fatal error to stop container.") profile_manager_instance.update_profile_state(profile_name, 'BANNED', 'Fatal error during batch') return True # Signal to stop container return False # Do not stop if ban_on_fatal_error_in_batch is false # Only process lines that contain ERROR: for tolerated/generic failures if 'ERROR:' not in line: return False # Check if it's a tolerated error for pattern in tolerated_error_patterns: if re.search(pattern, line, re.IGNORECASE): with activity_lock: live_tolerated_count += 1 logger.warning(f"[Worker {worker_id}] [{profile_name}] Live TOLERATED error #{live_tolerated_count} detected from log: {line}") profile_manager_instance.record_activity(profile_name, 'tolerated_error') return False # If it's an ERROR: line and not tolerated, it's a failure with activity_lock: live_failure_count += 1 logger.warning(f"[Worker {worker_id}] [{profile_name}] Live failure #{live_failure_count} detected from log: {line}") profile_manager_instance.record_activity(profile_name, 'failure') return False retcode, stdout, stderr, stop_reason = run_docker_container( image_name=image_name, command=command, volumes=volumes, stream_prefix=f"[Worker {worker_id} | docker-ytdlp] ", network_name=network_name, log_callback=log_parser_callback, profile_manager=profile_manager_instance, profile_name=profile_name, environment=environment, log_command_override=log_command_override ) # 5. Post-process results logger.info(f"[Worker {worker_id}] [{profile_name}] Docker container finished. Post-processing results...") full_output = f"{stdout}\n{stderr}" is_bot_error = "Sign in to confirm you're not a bot" in full_output if is_bot_error: logger.warning(f"[Worker {worker_id}] [{profile_name}] Bot detection occurred during batch. Marking as failure.") # Fallback post-processing for any files missed by the live parser. # The live parser moves files, so this loop should only find leftovers. processed_files = list(Path(temp_task_dir_host).glob('*.json')) if processed_files: logger.info(f"[Worker {worker_id}] Found {len(processed_files)} leftover file(s) to process after live parsing.") for temp_path in processed_files: _post_process_and_move_info_json(temp_path, profile_name, proxy_url, policy, worker_id, profile_manager_instance=profile_manager_instance) # A batch is considered an overall success for logging if it had no fatal errors. # The per-URL activity has already been recorded live. # We use live_success_count for a more accurate success metric. success = (live_success_count > 0 and not is_bot_error) if not success: reason = "bot detection occurred" if is_bot_error else f"0 successful files created out of {len(url_batch)}" logger.warning(f"[Worker {worker_id}] [{profile_name}] Marking batch as FAILED. Reason: {reason}.") # Record batch stats for overall orchestrator health state_manager.record_batch_result(success, len(url_batch), profile_name=profile_name) # If live parsing didn't catch all activity (e.g., yt-dlp exits before printing logs), # we reconcile the counts here based on files created. with activity_lock: processed_count = live_success_count + live_failure_count + live_tolerated_count # Failures are harder to reconcile from file counts alone. # We assume live parsing caught them. The total number of failures is # (batch_size - files_created), but we don't know if they were already recorded. # The current live parsing is the most reliable source for failures. unaccounted_failures = len(url_batch) - processed_count if unaccounted_failures > 0: logger.info(f"[Worker {worker_id}] [{profile_name}] Reconciling activity: {unaccounted_failures} unaccounted failure(s).") for _ in range(unaccounted_failures): profile_manager_instance.record_activity(profile_name, 'failure') if stop_reason: logger.warning(f"[Worker {worker_id}] [{profile_name}] Batch aborted due to: {stop_reason}. Adjusting URL index.") # The batch was from start_idx to end_idx. # We processed `processed_count` URLs. # The next batch should start from `start_idx + processed_count`. # `get_next_url_batch` updated `last_url_index` to `end_idx`. We need to rewind it. with activity_lock: processed_count = live_success_count + live_failure_count + live_tolerated_count next_start_index = start_idx + processed_count state_manager.update_last_url_index(next_start_index, force=True) logger.info(f"[Worker {worker_id}] Rewound URL index to {next_start_index} for next worker.") event_details = f"Docker batch completed. Exit: {retcode}. Files created: {live_success_count}/{len(url_batch)}. (Live successes: {live_success_count}, Live failures: {live_failure_count}, Live tolerated: {live_tolerated_count})" if not success and stderr: event_details += f" Stderr: {stderr.strip().splitlines()[-1] if stderr.strip() else 'N/A'}" if stop_reason: event_details += f" Aborted: {stop_reason}." event = { 'type': 'fetch_batch', 'profile': profile_name, 'proxy_url': proxy_url, 'success': success, 'details': event_details, 'video_count': len(url_batch) } state_manager.log_event(event) logger.info(f"[Worker {worker_id}] [{profile_name}] Batch processing complete. Worker will now unlock profile and attempt next batch.") except Exception as e: logger.error(f"[Worker {worker_id}] Unexpected error in worker loop: {e}", exc_info=True) if locked_profile: profile_manager_instance.record_activity(locked_profile['name'], 'failure') finally: if locked_profile and batch_started: # --- Reconcile pending downloads counter --- # This is in the finally block to guarantee it runs even if post-processing fails. adjustment = live_success_count - url_batch_len if adjustment != 0: logger.warning(f"[Worker {worker_id}] [{profile_name}] Reconciling pending downloads. Batch created {live_success_count}/{url_batch_len} files. Adjusting by {adjustment}.") profile_manager_instance.increment_pending_downloads(locked_profile['name'], adjustment) if locked_profile: last_used_profile_name = locked_profile['name'] profile_manager_instance.unlock_profile(locked_profile['name'], owner=owner_id) if temp_task_dir_host and os.path.exists(temp_task_dir_host): # If shutdown is requested, a batch might have been interrupted after files were # created but before they were post-processed. We preserve the temp directory # to allow for manual recovery of the info.json files. if state_manager.shutdown_event.is_set() and any(Path(temp_task_dir_host).iterdir()): logger.warning(f"Shutdown requested. Preserving temporary task directory for manual recovery: {temp_task_dir_host}") else: shutil.rmtree(temp_task_dir_host) logger.info(f"[Worker {worker_id}] Worker loop finished.") return [] def run_direct_docker_download_worker(worker_id, policy, state_manager, args, profile_manager_instance, running_processes, process_lock): """A worker for the 'direct_docker_cli' orchestration mode with `mode: download_only`.""" owner_id = f"direct-docker-dl-worker-{worker_id}" settings = policy.get('settings', {}) exec_control = policy.get('execution_control', {}) d_policy = policy.get('download_policy', {}) direct_policy = policy.get('direct_docker_cli_policy', {}) profile_prefix = d_policy.get('profile_prefix') if not profile_prefix: logger.error(f"[Worker {worker_id}] Direct docker download mode requires 'download_policy.profile_prefix'. Worker exiting.") return [] # --- Docker specific config --- image_name = direct_policy.get('docker_image_name') host_mount_path = direct_policy.get('docker_host_mount_path') container_mount_path = direct_policy.get('docker_container_mount_path') host_download_path = direct_policy.get('docker_host_download_path') container_download_path = direct_policy.get('docker_container_download_path') network_name = direct_policy.get('docker_network_name') if not all([image_name, host_mount_path, container_mount_path, host_download_path, container_download_path]): logger.error(f"[Worker {worker_id}] Direct docker download mode requires all docker_* keys in 'direct_docker_cli_policy'. Worker exiting.") return [] try: os.makedirs(host_mount_path, exist_ok=True) os.makedirs(host_download_path, exist_ok=True) except OSError as e: logger.error(f"[Worker {worker_id}] Could not create required host directories: {e}. Worker exiting.") return [] no_task_streak = 0 last_used_profile_name = None while not state_manager.shutdown_event.is_set(): locked_profile = None claimed_task_path_host = None temp_config_dir_host = None was_banned_by_parser = False try: if no_task_streak > 0: polling_interval = exec_control.get('worker_polling_interval_seconds', 1) # --- Add diagnostic logging --- all_profiles_in_pool = profile_manager_instance.list_profiles() profiles_in_prefix = [p for p in all_profiles_in_pool if p['name'].startswith(profile_prefix)] if profiles_in_prefix: state_counts = collections.Counter(p['state'] for p in profiles_in_prefix) states_summary = ', '.join(f"{count} {state}" for state, count in sorted(state_counts.items())) logger.info(f"[Worker {worker_id}] No tasks found or profiles available. Pool status ({profile_prefix}*): {states_summary}. Pausing for {polling_interval}s. (Streak: {no_task_streak})") else: logger.info(f"[Worker {worker_id}] No tasks found or profiles available. No profiles found with prefix '{profile_prefix}'. Pausing for {polling_interval}s. (Streak: {no_task_streak})") # --- End diagnostic logging --- time.sleep(polling_interval) if state_manager.shutdown_event.is_set(): continue # 1. Find a task and lock its associated profile locked_profile, claimed_task_path_host = find_task_and_lock_profile( profile_manager_instance, owner_id, profile_prefix, policy, worker_id ) if not locked_profile: no_task_streak += 1 # The main loop will pause if the streak continues. continue profile_name = locked_profile['name'] # We have a task and a lock. # User-Agent is not used for download simulation. user_agent = None if claimed_task_path_host: no_task_streak = 0 auth_profile_name, auth_env = None, None info_data = None # --- Read info.json content and metadata first --- try: with open(claimed_task_path_host, 'r', encoding='utf-8') as f: info_data = json.load(f) # This is critical for decrementing the counter in the finally block metadata = info_data.get('_ytops_metadata', {}) auth_profile_name = metadata.get('profile_name') auth_env = metadata.get('auth_env') except (IOError, json.JSONDecodeError) as e: logger.error(f"CRITICAL: Could not read or parse task file '{claimed_task_path_host.name}': {e}. This task will be skipped, but the pending downloads counter CANNOT be decremented.") continue # Skip to finally block to unlock profile # --- Check for URL expiration before running Docker --- if d_policy.get('check_url_expiration', True): # Heuristic: check the first available format URL first_format = next((f for f in info_data.get('formats', []) if 'url' in f), None) if first_format: url_to_check = first_format['url'] time_shift_minutes = d_policy.get('expire_time_shift_minutes', 0) status, time_left_seconds = sp_utils.check_url_expiry(url_to_check, time_shift_minutes) logger.debug(f"[Worker {worker_id}] [{profile_name}] URL expiration check for task '{claimed_task_path_host.name}': status={status}, time_left={time_left_seconds:.0f}s") if status == 'expired': details = "Download URL is expired" if time_shift_minutes > 0 and time_left_seconds > 0: logger.warning(f"[Worker {worker_id}] [{profile_name}] Skipping task '{claimed_task_path_host.name}' because its URL will expire in {time_left_seconds/60:.1f}m (within {time_shift_minutes}m time-shift).") details = f"URL will expire within {time_shift_minutes}m time-shift" else: logger.warning(f"[Worker {worker_id}] [{profile_name}] Skipping task '{claimed_task_path_host.name}' because its URL is expired.") profile_manager_instance.record_activity(profile_name, 'tolerated_error') event = { 'type': 'direct_docker_download', 'profile': profile_name, 'proxy_url': locked_profile['proxy'], 'success': False, 'error_type': 'Skipped (Expired URL)', 'details': details, 'is_tolerated_error': True } state_manager.log_event(event) try: base_path_str = str(claimed_task_path_host).rsplit('.LOCKED.', 1)[0] processed_path = Path(f"{base_path_str}.processed") claimed_task_path_host.rename(processed_path) logger.debug(f"Renamed expired task file to '{processed_path.name}'.") except (OSError, IndexError) as e: logger.error(f"Failed to rename expired task file '{claimed_task_path_host}': {e}") continue # Skip to the finally block # The path to the task file inside the container needs to be relative to the host mount root. # We must make the task path absolute first to correctly calculate the relative path from the absolute mount path. relative_task_path = os.path.relpath(os.path.abspath(claimed_task_path_host), host_mount_path) task_path_container = os.path.join(container_mount_path, relative_task_path) # 3. Prepare config file on host in a temporary directory temp_config_dir_host = tempfile.mkdtemp(prefix=f"docker-dl-config-{worker_id}-", dir=host_mount_path) config_dir_name = os.path.basename(temp_config_dir_host) config_dir_container = os.path.join(container_mount_path, config_dir_name) environment = {'XDG_CONFIG_HOME': config_dir_container} base_config_content = "" base_config_file = direct_policy.get('ytdlp_config_file') if base_config_file: config_path_to_read = Path(base_config_file) if not config_path_to_read.exists(): config_path_to_read = Path(sp_utils._PROJECT_ROOT) / base_config_file if config_path_to_read.exists(): try: with open(config_path_to_read, 'r', encoding='utf-8') as base_f: base_config_content = base_f.read() except IOError as e: logger.error(f"[Worker {worker_id}] Could not read ytdlp_config_file '{config_path_to_read}': {e}") config_overrides = direct_policy.get('ytdlp_config_overrides', {}).copy() config_overrides['proxy'] = locked_profile['proxy'] config_overrides['load-info-json'] = task_path_container config_overrides['output'] = os.path.join(container_download_path, '%(id)s.f%(format_id)s.%(ext)s') # Prevent yt-dlp from using a cache directory. config_overrides['no-cache-dir'] = True overrides_content = sp_utils._config_dict_to_flags_file_content(config_overrides) raw_args_from_policy = direct_policy.get('ytdlp_raw_args', []) raw_args_content = '\n'.join(raw_args_from_policy) config_content = f"{base_config_content.strip()}\n\n# --- Overrides from policy ---\n{overrides_content}" if raw_args_content: config_content += f"\n\n# --- Raw args from policy ---\n{raw_args_content}" logger.info(f"[Worker {worker_id}] [{profile_name}] Generated yt-dlp config:\n---config---\n{config_content}\n------------") ytdlp_config_dir_host = os.path.join(temp_config_dir_host, 'yt-dlp') os.makedirs(ytdlp_config_dir_host, exist_ok=True) temp_config_file_host = os.path.join(ytdlp_config_dir_host, 'config') with open(temp_config_file_host, 'w', encoding='utf-8') as f: f.write(config_content) # 4. Construct and run docker run command volumes = { os.path.abspath(host_mount_path): {'bind': container_mount_path, 'mode': 'ro'}, os.path.abspath(host_download_path): {'bind': container_download_path, 'mode': 'rw'} } # The command tells yt-dlp where to find the config file we created. # We still set XDG_CONFIG_HOME for any other config it might look for. command = ['yt-dlp', '--config-locations', os.path.join(config_dir_container, 'yt-dlp/config')] logger.info(f"[Worker {worker_id}] [{profile_name}] Running docker command: {' '.join(shlex.quote(s) for s in command)}") # For logging purposes, construct the full equivalent command line with host paths log_config_overrides_for_host = config_overrides.copy() log_config_overrides_for_host['load-info-json'] = str(claimed_task_path_host) log_config_overrides_for_host['output'] = os.path.join(host_download_path, '%(id)s.f%(format_id)s.%(ext)s') log_command_override = ['yt-dlp'] if base_config_content: log_command_override.extend(sp_utils._parse_config_file_to_cli_args(base_config_content)) log_command_override.extend(sp_utils._config_dict_to_cli_flags(log_config_overrides_for_host)) raw_args_from_policy = direct_policy.get('ytdlp_raw_args', []) for raw_arg in raw_args_from_policy: log_command_override.extend(shlex.split(raw_arg)) # --- Live log parsing and activity recording --- live_success_count = 0 live_failure_count = 0 live_tolerated_count = 0 activity_lock = threading.Lock() tolerated_error_patterns = direct_policy.get('tolerated_error_patterns', []) fatal_error_patterns = direct_policy.get('fatal_error_patterns', []) def log_parser_callback(line): nonlocal live_success_count, live_failure_count, live_tolerated_count, was_banned_by_parser # Success is a high-priority check. Only record one success per task. if '[download] 100% of' in line or 'has already been downloaded' in line: with activity_lock: # Only count one success per task if live_success_count == 0: live_success_count += 1 logger.info(f"[Worker {worker_id}] [{profile_name}] Live download success detected from log.") profile_manager_instance.record_activity(profile_name, 'download') return False # Check for fatal patterns for pattern in fatal_error_patterns: if re.search(pattern, line, re.IGNORECASE): with activity_lock: live_failure_count += 1 logger.error(f"[Worker {worker_id}] [{profile_name}] Live FATAL download error #{live_failure_count} detected from log: {line}") profile_manager_instance.record_activity(profile_name, 'download_error') if direct_policy.get('ban_on_fatal_error_in_batch'): logger.warning(f"Banning profile '{profile_name}' immediately due to fatal download error to stop container.") profile_manager_instance.update_profile_state(profile_name, 'BANNED', 'Fatal error during download') was_banned_by_parser = True return True # Signal to stop container return False # Do not stop if ban_on_fatal_error_in_batch is false # Only process lines that contain ERROR: for tolerated/generic failures if 'ERROR:' not in line: return False # Check if it's a tolerated error for pattern in tolerated_error_patterns: if re.search(pattern, line, re.IGNORECASE): with activity_lock: live_tolerated_count += 1 logger.warning(f"[Worker {worker_id}] [{profile_name}] Live TOLERATED download error #{live_tolerated_count} detected from log: {line}") profile_manager_instance.record_activity(profile_name, 'tolerated_error') return False # If it's an ERROR: line and not tolerated, it's a failure with activity_lock: live_failure_count += 1 logger.warning(f"[Worker {worker_id}] [{profile_name}] Live download failure #{live_failure_count} detected from log: {line}") profile_manager_instance.record_activity(profile_name, 'download_error') return False retcode, stdout, stderr, stop_reason = run_docker_container( image_name=image_name, command=command, volumes=volumes, stream_prefix=f"[Worker {worker_id} | docker-ytdlp] ", network_name=network_name, log_callback=log_parser_callback, profile_manager=profile_manager_instance, profile_name=profile_name, environment=environment, log_command_override=log_command_override ) # 5. Post-process and record activity full_output = f"{stdout}\n{stderr}" is_bot_error = "Sign in to confirm you're not a bot" in full_output if is_bot_error: logger.warning(f"[Worker {worker_id}] [{profile_name}] Bot detection occurred during download. Marking as failure.") # --- Final Outcome Determination --- # Activity is now recorded live by the log parser. This block just determines # the overall success/failure for logging and event reporting. success = False final_outcome = "unknown" with activity_lock: if live_success_count > 0: success = True final_outcome = "download" elif live_failure_count > 0 or is_bot_error: final_outcome = "download_error" elif live_tolerated_count > 0: final_outcome = "tolerated_error" elif retcode == 0: # Fallback if no logs were matched but exit was clean. success = True final_outcome = "download" logger.warning(f"[Worker {worker_id}] [{profile_name}] No specific success/error log line matched, but exit code is 0. Assuming success, but this may indicate a parsing issue.") # We record a success here as a fallback, in case the log parser missed it. profile_manager_instance.record_activity(profile_name, 'download') else: final_outcome = "download_error" logger.warning(f"[Worker {worker_id}] [{profile_name}] No specific error log line matched, but exit code was {retcode}. Recording a generic download_error.") profile_manager_instance.record_activity(profile_name, 'download_error') # --- Airflow Directory Logic --- if success and d_policy.get('output_to_airflow_ready_dir'): # Find the downloaded file path from yt-dlp's output downloaded_filename = None # Order of checks is important: Merger -> VideoConvertor -> Destination merge_match = re.search(r'\[Merger\] Merging formats into "([^"]+)"', stdout) if merge_match: downloaded_filename = os.path.basename(merge_match.group(1)) else: convertor_match = re.search(r'\[VideoConvertor\].*?; Destination: (.*)', stdout) if convertor_match: downloaded_filename = os.path.basename(convertor_match.group(1).strip()) else: dest_match = re.search(r'\[download\] Destination: (.*)', stdout) if dest_match: downloaded_filename = os.path.basename(dest_match.group(1).strip()) if downloaded_filename: try: # Get video_id from the info.json with open(claimed_task_path_host, 'r', encoding='utf-8') as f: info_data = json.load(f) video_id = info_data.get('id') if not video_id: logger.error(f"[{profile_name}] Could not find video ID in '{claimed_task_path_host.name}' for moving file.") else: now = datetime.now() rounded_minute = (now.minute // 10) * 10 timestamp_str = now.strftime('%Y%m%dT%H') + f"{rounded_minute:02d}" base_path = d_policy.get('airflow_ready_dir_base_path', 'downloadfiles/videos/ready') if not os.path.isabs(base_path): base_path = os.path.join(sp_utils._PROJECT_ROOT, base_path) final_dir_base = os.path.join(base_path, timestamp_str) final_dir_path = os.path.join(final_dir_base, video_id) os.makedirs(final_dir_path, exist_ok=True) downloaded_file_host_path = os.path.join(host_download_path, downloaded_filename) if os.path.exists(downloaded_file_host_path): shutil.move(downloaded_file_host_path, final_dir_path) logger.info(f"[{profile_name}] Moved media file to {final_dir_path}") new_info_json_name = f"info_{video_id}.json" dest_info_json_path = os.path.join(final_dir_path, new_info_json_name) shutil.copy(claimed_task_path_host, dest_info_json_path) logger.info(f"[{profile_name}] Copied info.json to {dest_info_json_path}") except Exception as e: logger.error(f"[{profile_name}] Failed to move downloaded file to Airflow ready directory: {e}") else: logger.warning(f"[{profile_name}] Download succeeded, but could not parse final filename from output to move to Airflow dir.") event_details = f"Docker download finished. Exit: {retcode}. Final Outcome: {final_outcome}. (Live successes: {live_success_count}, Live failures: {live_failure_count}, Live tolerated: {live_tolerated_count})" if not success and stderr: event_details += f" Stderr: {stderr.strip().splitlines()[-1] if stderr.strip() else 'N/A'}" if stop_reason: event_details += f" Aborted: {stop_reason}." event = { 'type': 'direct_docker_download', 'profile': profile_name, 'proxy_url': locked_profile['proxy'], 'success': success, 'details': event_details } state_manager.log_event(event) logger.info(f"[Worker {worker_id}] [{profile_name}] Task processing complete. Worker will now unlock profile and attempt next task.") # 6. Clean up task file by renaming to .processed try: # The claimed_task_path_host has a .LOCKED suffix, remove it before adding .processed base_path_str = str(claimed_task_path_host).rsplit('.LOCKED.', 1)[0] processed_path = Path(f"{base_path_str}.processed") claimed_task_path_host.rename(processed_path) logger.debug(f"[{sp_utils.get_display_name(claimed_task_path_host)}] Renamed processed task file to '{processed_path.name}'.") except (OSError, IndexError) as e: logger.error(f"Failed to rename processed task file '{claimed_task_path_host}': {e}") # After this point, claimed_task_path_host is no longer valid. # The metadata has already been read into auth_profile_name and auth_env. else: # This case should not be reached with the new task-first locking logic. logger.warning(f"[Worker {worker_id}] Inconsistent state: locked profile '{profile_name}' but no task was claimed. Unlocking and continuing.") except Exception as e: logger.error(f"[Worker {worker_id}] An unexpected error occurred in the worker loop: {e}", exc_info=True) if locked_profile: profile_manager_instance.record_activity(locked_profile['name'], 'failure') # Generic failure time.sleep(5) finally: if locked_profile: if claimed_task_path_host: # The auth_profile_name and auth_env variables were populated in the `try` block # before the task file was renamed or deleted. if auth_profile_name and auth_env: auth_manager = _get_auth_manager(profile_manager_instance, auth_env) if auth_manager: auth_manager.decrement_pending_downloads(auth_profile_name) else: logger.error(f"Could not get auth profile manager for env '{auth_env}'. Pending downloads counter will not be decremented.") else: logger.warning(f"Could not find auth profile name and/or auth_env in info.json metadata. Pending downloads counter will not be decremented. (Profile: {auth_profile_name}, Env: {auth_env})") if was_banned_by_parser: logger.info(f"[Worker {worker_id}] Profile '{locked_profile['name']}' was already banned by the log parser. Skipping unlock/cooldown.") else: last_used_profile_name = locked_profile['name'] cooldown = None # Only apply cooldown if a task was actually claimed and processed. if claimed_task_path_host: # Enforcer is the only point where we configure to apply different policies, # since we might restart enforcer, but won't restart stress-policy working on auth and downloads simultaneously. # This is like applying a policy across multiple workers/machines without needing to restart each of them. # DESIGN: The cooldown duration is not configured in the worker's policy. # Instead, it is read from a central Redis key. This key is set by the # policy-enforcer, making the enforcer the single source of truth for # this policy. This allows changing the cooldown behavior without # restarting the workers. cooldown_source_value = profile_manager_instance.get_config('unlock_cooldown_seconds') source_description = "Redis config" if cooldown_source_value is None: cooldown_source_value = d_policy.get('default_unlock_cooldown_seconds') source_description = "local policy" if cooldown_source_value is not None: try: # If from Redis, it's a string that needs parsing. # If from local policy, it's already an int or list. val = cooldown_source_value if isinstance(val, str): val = json.loads(val) if isinstance(val, list) and len(val) == 2 and val[0] < val[1]: cooldown = random.randint(val[0], val[1]) elif isinstance(val, int): cooldown = val if cooldown is not None: logger.debug(f"Determined cooldown from {source_description}: {cooldown_source_value}") except (json.JSONDecodeError, TypeError): if isinstance(cooldown_source_value, str) and cooldown_source_value.isdigit(): cooldown = int(cooldown_source_value) logger.debug(f"Determined cooldown from {source_description}: {cooldown_source_value}") if cooldown: logger.info(f"[Worker {worker_id}] Putting profile '{locked_profile['name']}' into COOLDOWN for {cooldown}s.") profile_manager_instance.unlock_profile( locked_profile['name'], owner=owner_id, rest_for_seconds=cooldown ) if claimed_task_path_host and os.path.exists(claimed_task_path_host): try: os.remove(claimed_task_path_host) except OSError: pass if temp_config_dir_host and os.path.exists(temp_config_dir_host): try: shutil.rmtree(temp_config_dir_host) except OSError: pass logger.info(f"[Worker {worker_id}] Worker loop finished.") return [] def run_direct_download_worker(worker_id, policy, state_manager, args, profile_manager_instance, running_processes, process_lock): """A persistent worker for the 'direct_download_cli' orchestration mode.""" owner_id = f"direct-dl-worker-{worker_id}" settings = policy.get('settings', {}) exec_control = policy.get('execution_control', {}) d_policy = policy.get('download_policy', {}) direct_policy = policy.get('direct_download_cli_policy', {}) profile_prefix = d_policy.get('profile_prefix') if not profile_prefix: logger.error(f"[Worker {worker_id}] Direct download mode requires 'download_policy.profile_prefix'. Worker exiting.") return [] output_dir = direct_policy.get('output_dir') if not output_dir: logger.error(f"[Worker {worker_id}] Direct download mode requires 'direct_download_cli_policy.output_dir'. Worker exiting.") return [] os.makedirs(output_dir, exist_ok=True) no_task_streak = 0 while not state_manager.shutdown_event.is_set(): locked_profile = None claimed_task_path = None try: # 0. If no tasks were found, pause briefly. if no_task_streak > 0: polling_interval = exec_control.get('worker_polling_interval_seconds', 1) # --- Add diagnostic logging --- all_profiles_in_pool = profile_manager_instance.list_profiles() profiles_in_prefix = [p for p in all_profiles_in_pool if p['name'].startswith(profile_prefix)] if profiles_in_prefix: state_counts = collections.Counter(p['state'] for p in profiles_in_prefix) states_summary = ', '.join(f"{count} {state}" for state, count in sorted(state_counts.items())) logger.info(f"[Worker {worker_id}] No tasks found for available profiles. Pool status ({profile_prefix}*): {states_summary}. Pausing for {polling_interval}s. (Streak: {no_task_streak})") else: logger.info(f"[Worker {worker_id}] No tasks found for available profiles. No profiles found with prefix '{profile_prefix}'. Pausing for {polling_interval}s. (Streak: {no_task_streak})") # --- End diagnostic logging --- time.sleep(polling_interval) if state_manager.shutdown_event.is_set(): continue # 1. Find a task and lock its associated profile locked_profile, claimed_task_path = find_task_and_lock_profile( profile_manager_instance, owner_id, profile_prefix, policy, worker_id ) if not locked_profile: no_task_streak += 1 # The main loop will pause if the streak continues. continue profile_name = locked_profile['name'] # We have a task and a lock. if claimed_task_path: no_task_streak = 0 # Reset streak auth_profile_name, auth_env = None, None # --- Read metadata before processing/deleting file --- try: with open(claimed_task_path, 'r', encoding='utf-8') as f: info_data = json.load(f) metadata = info_data.get('_ytops_metadata', {}) auth_profile_name = metadata.get('profile_name') auth_env = metadata.get('auth_env') except (IOError, json.JSONDecodeError) as e: logger.error(f"Could not read info.json to get auth profile for decrementing counter: {e}") # 3. Construct and run the command ytdlp_cmd_str = direct_policy.get('ytdlp_command') if not ytdlp_cmd_str: logger.error(f"[Worker {worker_id}] Direct download mode requires 'direct_download_cli_policy.ytdlp_command'.") break proxy_url = locked_profile['proxy'] proxy_rename = direct_policy.get('proxy_rename') if proxy_rename: rename_rule = proxy_rename.strip("'\"") if rename_rule.startswith('s/') and rename_rule.count('/') >= 2: try: parts = rename_rule.split('/') proxy_url = re.sub(parts[1], parts[2], proxy_url) except (re.error, IndexError): logger.error(f"[Worker {worker_id}] Invalid proxy_rename rule: {proxy_rename}") output_template = os.path.join(output_dir, '%(title)s - %(id)s.%(ext)s') cmd = shlex.split(ytdlp_cmd_str) cmd.extend(['--load-info-json', str(claimed_task_path)]) cmd.extend(['--proxy', proxy_url]) cmd.extend(['-o', output_template]) ytdlp_args = direct_policy.get('ytdlp_args') if ytdlp_args: cmd.extend(shlex.split(ytdlp_args)) if args.verbose and '--verbose' not in cmd: cmd.append('--verbose') custom_env = direct_policy.get('env_vars', {}).copy() # --- PYTHONPATH for custom yt-dlp module --- ytdlp_module_path = direct_policy.get('ytdlp_module_path') if ytdlp_module_path: existing_pythonpath = custom_env.get('PYTHONPATH', os.environ.get('PYTHONPATH', '')) custom_env['PYTHONPATH'] = f"{ytdlp_module_path}{os.pathsep}{existing_pythonpath}".strip(os.pathsep) logger.debug(f"[Worker {worker_id}] Using custom PYTHONPATH: {custom_env['PYTHONPATH']}") # Pass profile info to the custom yt-dlp process custom_env['YTDLP_PROFILE_NAME'] = profile_name custom_env['YTDLP_PROXY_URL'] = locked_profile['proxy'] # Original proxy env_name = profile_manager_instance.key_prefix.replace('_profile_mgmt_', '') custom_env['YTDLP_SIM_MODE'] = env_name # Create a per-profile cache directory and set XDG_CACHE_HOME cache_dir_base = direct_policy.get('cache_dir_base', '.cache') profile_cache_dir = os.path.join(cache_dir_base, profile_name) try: os.makedirs(profile_cache_dir, exist_ok=True) custom_env['XDG_CACHE_HOME'] = profile_cache_dir except OSError as e: logger.error(f"[Worker {worker_id}] Failed to create cache directory '{profile_cache_dir}': {e}") logger.info(f"[Worker {worker_id}] [{profile_name}] Processing task '{claimed_task_path.name}'...") if args.dummy: logger.info(f"========== [Worker {worker_id}] BEGIN DUMMY DIRECT DOWNLOAD ==========") logger.info(f"[Worker {worker_id}] Profile: {profile_name} | Task: {claimed_task_path.name}") logger.info(f"[Worker {worker_id}] Would run command: {' '.join(shlex.quote(s) for s in cmd)}") logger.info(f"[Worker {worker_id}] With environment: {custom_env}") dummy_settings = policy.get('settings', {}).get('dummy_simulation_settings', {}) min_seconds = dummy_settings.get('download_min_seconds', 0.5) max_seconds = dummy_settings.get('download_max_seconds', 1.5) failure_rate = dummy_settings.get('download_failure_rate', 0.0) skipped_rate = dummy_settings.get('download_skipped_failure_rate', 0.0) time.sleep(random.uniform(min_seconds, max_seconds)) rand_val = random.random() should_fail_skipped = rand_val < skipped_rate should_fail_fatal = not should_fail_skipped and rand_val < (skipped_rate + failure_rate) if should_fail_skipped: logger.warning(f"[Worker {worker_id}] DUMMY: Simulating skipped download failure.") # A skipped/tolerated failure in yt-dlp usually results in exit code 0. # The orchestrator will see this as a success but the stderr can be used for context. retcode = 0 stderr = "Dummy skipped failure" elif should_fail_fatal: logger.warning(f"[Worker {worker_id}] DUMMY: Simulating fatal download failure.") retcode = 1 stderr = "Dummy fatal failure" else: logger.info(f"[Worker {worker_id}] DUMMY: Simulating download success.") retcode = 0 stderr = "" logger.info(f"========== [Worker {worker_id}] END DUMMY DIRECT DOWNLOAD ==========") else: logger.info(f"[Worker {worker_id}] [{profile_name}] Running command: {' '.join(shlex.quote(s) for s in cmd)}") logger.info(f"[Worker {worker_id}] [{profile_name}] With environment: {custom_env}") retcode, stdout, stderr = run_command( cmd, running_processes, process_lock, env=custom_env, stream_output=args.verbose, stream_prefix=f"[Worker {worker_id} | yt-dlp] " ) # 4. Record activity success = (retcode == 0) activity_type = 'download' if success else 'download_error' logger.info(f"[Worker {worker_id}] Recording '{activity_type}' for profile '{profile_name}'.") profile_manager_instance.record_activity(profile_name, activity_type) event_details = f"Download finished. Exit code: {retcode}." if not success and stderr: event_details += f" Stderr: {stderr.strip().splitlines()[-1]}" event = {'type': 'direct_download', 'profile': profile_name, 'proxy_url': proxy_url, 'success': success, 'details': event_details} state_manager.log_event(event) # 5. Clean up the processed task file try: os.remove(claimed_task_path) logger.debug(f"[{sp_utils.get_display_name(claimed_task_path)}] Removed processed task file.") except OSError as e: logger.error(f"Failed to remove processed task file '{claimed_task_path}': {e}") else: no_task_streak += 1 logger.info(f"[Worker {worker_id}] No tasks found for profile '{profile_name}'.") except Exception as e: logger.error(f"[Worker {worker_id}] An unexpected error occurred in the worker loop: {e}", exc_info=True) if locked_profile: profile_manager_instance.record_activity(locked_profile['name'], 'failure') # Generic failure time.sleep(5) finally: if locked_profile: if claimed_task_path: # The auth_profile_name and auth_env variables were populated in the `try` block # before the task file was deleted. if auth_profile_name and auth_env: auth_manager = _get_auth_manager(profile_manager_instance, auth_env) if auth_manager: auth_manager.decrement_pending_downloads(auth_profile_name) else: logger.error(f"Could not get auth profile manager for env '{auth_env}'. Pending downloads counter will not be decremented.") else: logger.warning(f"Could not find auth profile name and/or auth_env in info.json metadata. Pending downloads counter will not be decremented. (Profile: {auth_profile_name}, Env: {auth_env})") cooldown = None if claimed_task_path: # Enforcer is the only point where we configure to apply different policies, # since we might restart enforcer, but won't restart stress-policy working on auth and downloads simultaneously. # This is like applying a policy across multiple workers/machines without needing to restart each of them. # DESIGN: The cooldown duration is not configured in the worker's policy. # Instead, it is read from a central Redis key. This key is set by the # policy-enforcer, making the enforcer the single source of truth for # this policy. This allows changing the cooldown behavior without # restarting the workers. cooldown_config = profile_manager_instance.get_config('unlock_cooldown_seconds') if cooldown_config: try: val = json.loads(cooldown_config) if isinstance(val, list) and len(val) == 2 and val[0] < val[1]: cooldown = random.randint(val[0], val[1]) elif isinstance(val, int): cooldown = val except (json.JSONDecodeError, TypeError): if cooldown_config.isdigit(): cooldown = int(cooldown_config) if cooldown: logger.info(f"[Worker {worker_id}] Putting profile '{locked_profile['name']}' into COOLDOWN for {cooldown}s.") profile_manager_instance.unlock_profile( locked_profile['name'], owner=owner_id, rest_for_seconds=cooldown ) locked_profile = None logger.info(f"[Worker {worker_id}] Worker loop finished.") return []