#!/usr/bin/env python3 """ Architectural Overview for the Stress Policy Tool: This file, stress_policy_tool.py, is the main entry point and orchestrator. It is responsible for: - Parsing command-line arguments. - Setting up logging and the main shutdown handler. - Initializing the StateManager and ProfileManager. - Running the main execution loop (ThreadPoolExecutor) based on the chosen orchestration mode. - Delegating the actual work to functions in the `workers.py` module. The core logic has been refactored into the following modules within `ytops_client/stress_policy/`: - arg_parser.py: Defines the command-line interface for the 'stress-policy' command using argparse. - workers.py: Contains all core worker functions that are executed by the ThreadPoolExecutor, such as `process_task`, `run_direct_batch_worker`, and their helpers. This is where the main logic for fetching info.json and running downloads resides. - state_manager.py: Manages run state, statistics, rate limits, and persistence between runs (e.g., `_state.json`, `_stats.jsonl`). - process_runners.py: A low-level module that handles the execution of external subprocesses (`run_command`) and Docker containers (`run_docker_container`). - utils.py: Provides stateless utility functions shared across the tool, such as loading YAML policies, applying overrides, and formatting. """ """ Policy-driven stress-testing orchestrator for video format downloads. This tool orchestrates complex, multi-stage stress tests based on a YAML policy file. It supports several modes of operation: - full_stack: A complete workflow that first fetches an info.json for a given URL using a profile, and then uses that info.json to perform one or more downloads. - fetch_only: Only performs the info.json generation step. This is useful for simulating user authentication and browsing behavior. - download_only: Only performs the download step, using a directory of pre-existing info.json files as its source. - direct_batch_cli (fetch_only): A high-throughput mode for generating info.json files by calling a custom, Redis-aware yt-dlp command-line tool directly in batch mode. This mode bypasses the get-info Thrift service. The workflow is as follows: 1. The orchestrator worker locks a profile from the auth pool. 2. It takes a 'batch' of URLs from the source file. 3. It invokes the configured yt-dlp command, passing the profile name and proxy via environment variables. 4. The custom yt-dlp process then does the following for each URL in the batch: a. Checks Redis to ensure the profile has not been externally BANNED. b. Fetches the info.json. c. Records 'success', 'failure', or 'tolerated_error' for the profile in Redis. 5. After the yt-dlp process finishes, the orchestrator worker post-processes the generated info.json files to inject metadata (profile name, proxy). 6. The worker unlocks the profile. 7. The worker repeats this cycle with a new profile and the next batch of URLs. The tool uses a profile management system (v2) based on Redis for coordinating state between multiple workers and enforcing policies (e.g., rate limits, cooldowns). """ import argparse import collections import concurrent.futures import fnmatch import json import logging import os import random import re import shlex import signal import sys import tempfile import shutil import threading import time from copy import deepcopy from datetime import datetime, timezone from pathlib import Path try: from dotenv import load_dotenv except ImportError: load_dotenv = None try: import docker except ImportError: docker = None from .profile_manager_tool import ProfileManager from .stress_policy.state_manager import StateManager from .stress_policy.process_runners import run_command, run_docker_container, get_worker_id from .stress_policy import utils as sp_utils # This block replaces the obsolete import from the deprecated `workers.py` file. # Worker functions are now imported from their own dedicated modules as per the refactoring. from .stress_policy.direct_batch_worker import run_direct_batch_worker from .stress_policy.direct_docker_worker import run_direct_docker_worker, run_direct_docker_download_worker from .stress_policy.direct_download_worker import run_direct_download_worker from .stress_policy.throughput_worker import run_throughput_worker # Helper functions for the legacy "task-first" mode are now in worker_utils.py from .stress_policy.worker_utils import _run_download_logic, process_profile_task from .stress_policy.queue_workers import ( run_queue_auth_worker, run_queue_download_worker ) from .stress_policy.queue_provider import RedisQueueProvider from .stress_policy.arg_parser import add_stress_policy_parser # Add a global event for graceful shutdown shutdown_event = threading.Event() # Globals for tracking and terminating subprocesses on shutdown running_processes = set() process_lock = threading.Lock() # Configure logging logger = logging.getLogger('stress_policy_tool') def main_stress_policy(args): """Main logic for the 'stress-policy' command.""" if args.list_policies: return sp_utils.list_policies() if not args.policy: print("Error: --policy is required unless using --list-policies.", file=sys.stderr) return 1 # Handle --show-overrides early, as it doesn't run the test. if args.show_overrides: policy = sp_utils.load_policy(args.policy, args.policy_name) if not policy: return 1 # load_policy prints its own error sp_utils.print_policy_overrides(policy) return 0 policy = sp_utils.load_policy(args.policy, args.policy_name) policy = sp_utils.apply_overrides(policy, args.set) # If orchestrator is verbose, make downloaders verbose too by passing it through. if args.verbose: d_policy = policy.setdefault('download_policy', {}) extra_args = d_policy.get('extra_args', '') if '--verbose' not in extra_args: d_policy['extra_args'] = f"{extra_args} --verbose".strip() # --- Set safe defaults --- settings = policy.get('settings', {}) mode = settings.get('mode', 'full_stack') # For continuous download mode, it is almost always desired to mark files as # processed to avoid an infinite loop on the same files. We make this the # default and issue a warning if it's not explicitly set. if mode == 'download_only' and settings.get('directory_scan_mode') == 'continuous': if 'mark_processed_files' not in settings: # Use print because logger is not yet configured. print("WARNING: In 'continuous' download mode, 'settings.mark_processed_files' was not set.", file=sys.stderr) print(" Defaulting to 'true' to prevent reprocessing files.", file=sys.stderr) print(" Set it to 'false' explicitly in your policy to disable this behavior.", file=sys.stderr) settings['mark_processed_files'] = True # Load .env file *after* loading policy to respect env_file from policy. if load_dotenv: sim_params = policy.get('simulation_parameters', {}) # Coalesce from CLI, then policy. An explicit CLI arg takes precedence. env_file = args.env_file or sim_params.get('env_file') if not env_file and args.env and '.env' in args.env and os.path.exists(args.env): # Use print because logger is not yet configured. print(f"Warning: --env should be an environment name (e.g., 'sim'), not a file path. Treating '{args.env}' as --env-file. The environment name will default to 'sim'.", file=sys.stderr) env_file = args.env args.env = 'sim' was_loaded = load_dotenv(env_file) if was_loaded: # Use print because logger is not yet configured. print(f"Loaded environment variables from {env_file or '.env file'}", file=sys.stderr) elif args.env_file: # Only error if user explicitly passed it print(f"Error: The specified --env-file was not found: {args.env_file}", file=sys.stderr) return 1 if args.profile_prefix: # This shortcut overrides the profile_prefix for all relevant stages. # Useful for simple fetch_only or download_only runs. policy.setdefault('info_json_generation_policy', {})['profile_prefix'] = args.profile_prefix policy.setdefault('download_policy', {})['profile_prefix'] = args.profile_prefix # Use print because logger is not yet configured. print(f"Overriding profile_prefix for all stages with CLI arg: {args.profile_prefix}", file=sys.stderr) # Apply direct CLI overrides after --set, so they have final precedence. if args.auto_merge_fragments is not None: policy.setdefault('download_policy', {})['auto_merge_fragments'] = args.auto_merge_fragments if args.remove_fragments_after_merge is not None: policy.setdefault('download_policy', {})['remove_fragments_after_merge'] = args.remove_fragments_after_merge if args.fragments_dir is not None: policy.setdefault('download_policy', {})['aria_fragments_dir'] = args.fragments_dir if args.remote_dir is not None: policy.setdefault('download_policy', {})['aria_remote_dir'] = args.remote_dir if args.cleanup is not None: policy.setdefault('download_policy', {})['cleanup'] = args.cleanup if args.expire_time_shift_minutes is not None: policy.setdefault('download_policy', {})['expire_time_shift_minutes'] = args.expire_time_shift_minutes policy_name = policy.get('name', args.policy_name or Path(args.policy).stem) # --- Logging Setup --- log_level = logging.DEBUG if args.verbose else logging.INFO log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' if args.verbose else '%(asctime)s - %(message)s' date_format = None if args.verbose else '%H:%M:%S' root_logger = logging.getLogger() root_logger.setLevel(log_level) # Silence noisy loggers from dependencies like docker-py logging.getLogger('urllib3.connectionpool').setLevel(logging.INFO if args.verbose else logging.WARNING) # Remove any existing handlers to avoid duplicate logs for handler in root_logger.handlers[:]: root_logger.removeHandler(handler) # Add console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(logging.Formatter(log_format, datefmt=date_format)) root_logger.addHandler(console_handler) if not args.disable_log_writing: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f') log_filename = f"stress-policy-{timestamp}-{policy_name}.log" try: # Open in append mode to be safe, though timestamp should be unique. file_handler = logging.FileHandler(log_filename, mode='a', encoding='utf-8') file_handler.setFormatter(logging.Formatter(log_format, datefmt=date_format)) root_logger.addHandler(file_handler) # Use print because logger is just being set up. print(f"Logging to file: {log_filename}", file=sys.stderr) except IOError as e: print(f"Error: Could not open log file {log_filename}: {e}", file=sys.stderr) state_manager = StateManager(policy_name, disable_log_writing=args.disable_log_writing, shutdown_event=shutdown_event) if args.reset_infojson: info_json_dir = settings.get('info_json_dir') if not info_json_dir: logger.error("--reset-infojson requires 'settings.info_json_dir' to be set in the policy.") return 1 logger.info(f"--- Resetting info.json files in '{info_json_dir}' ---") source_dir = Path(info_json_dir) if not source_dir.is_dir(): logger.warning(f"Source directory for reset does not exist: {source_dir}. Skipping reset.") else: processed_files = list(source_dir.rglob('*.json.processed')) locked_files = list(source_dir.rglob('*.json.LOCKED.*')) files_to_reset = processed_files + locked_files if not files_to_reset: logger.info("No processed or locked files found to reset.") else: reset_count = 0 for file_to_reset in files_to_reset: original_path = None if file_to_reset.name.endswith('.processed'): original_path_str = str(file_to_reset).removesuffix('.processed') original_path = Path(original_path_str) elif '.LOCKED.' in file_to_reset.name: original_path_str = str(file_to_reset).split('.LOCKED.')[0] original_path = Path(original_path_str) if original_path: try: if original_path.exists(): logger.warning(f"Original file '{original_path.name}' already exists. Deleting '{file_to_reset.name}' instead of renaming.") file_to_reset.unlink() else: file_to_reset.rename(original_path) logger.debug(f"Reset '{file_to_reset.name}' to '{original_path.name}'") reset_count += 1 except (IOError, OSError) as e: logger.error(f"Failed to reset '{file_to_reset.name}': {e}") logger.info(f"Reset {reset_count} info.json file(s).") if args.pre_cleanup_media is not None: cleanup_path_str = args.pre_cleanup_media d_policy = policy.get('download_policy', {}) direct_docker_policy = policy.get('direct_docker_cli_policy', {}) if cleanup_path_str == '.': # Special value from `const` # Determine path from policy if direct_docker_policy.get('docker_host_download_path'): cleanup_path_str = direct_docker_policy['docker_host_download_path'] elif d_policy.get('output_dir'): cleanup_path_str = d_policy['output_dir'] else: logger.error("--pre-cleanup-media was used without a path, but could not determine a download directory from the policy.") return 1 cleanup_path = Path(cleanup_path_str) if not cleanup_path.is_dir(): logger.warning(f"Directory for media cleanup does not exist, skipping: {cleanup_path}") else: logger.info(f"--- Cleaning up media files in '{cleanup_path}' ---") media_extensions = ['.mp4', '.m4a', '.webm', '.mkv', '.part', '.ytdl'] files_deleted = 0 for ext in media_extensions: for media_file in cleanup_path.rglob(f'*{ext}'): try: media_file.unlink() logger.debug(f"Deleted {media_file}") files_deleted += 1 except OSError as e: logger.error(f"Failed to delete media file '{media_file}': {e}") logger.info(f"Deleted {files_deleted} media file(s).") if args.reset_local_cache_folder is not None: cache_path_str = args.reset_local_cache_folder direct_docker_policy = policy.get('direct_docker_cli_policy', {}) if cache_path_str == '.': # Special value from `const` if direct_docker_policy.get('docker_host_cache_path'): cache_path_str = direct_docker_policy['docker_host_cache_path'] else: logger.error("--reset-local-cache-folder was used without a path, but 'direct_docker_cli_policy.docker_host_cache_path' is not set in the policy.") return 1 cache_path = Path(cache_path_str) if not cache_path.is_dir(): logger.warning(f"Local cache directory for reset does not exist, skipping: {cache_path}") else: logger.info(f"--- Resetting local cache folder '{cache_path}' ---") try: shutil.rmtree(cache_path) os.makedirs(cache_path) logger.info(f"Successfully deleted and recreated cache folder '{cache_path}'.") except OSError as e: logger.error(f"Failed to reset cache folder '{cache_path}': {e}") if policy.get('name') in ['continuous_auth_simulation', 'continuous_download_simulation']: logger.warning("This policy is part of a multi-stage simulation.") if 'auth' in policy.get('name', ''): logger.warning("It is recommended to run this auth policy using: ./bin/run-profile-simulation") if 'download' in policy.get('name', ''): logger.warning("It is recommended to run this download policy using: ./bin/run-download-simulation") time.sleep(2) # --- Graceful shutdown handler --- def shutdown_handler(signum, frame): if not shutdown_event.is_set(): logger.info(f"\nSignal {signum} received, shutting down gracefully...") shutdown_event.set() # Save state immediately to prevent loss on interrupt. logger.info("Attempting to save state before shutdown...") state_manager.close() logger.info("Shutdown requested. Allowing in-progress tasks to complete. No new tasks will be started. Press Ctrl+C again to force exit.") else: logger.info("Second signal received, forcing exit.") # On second signal, forcefully terminate subprocesses. with process_lock: if running_processes: logger.info(f"Forcefully terminating {len(running_processes)} running subprocess(es)...") for p in running_processes: try: # Kill the entire process group to ensure child processes (like yt-dlp) are terminated. os.killpg(os.getpgid(p.pid), signal.SIGKILL) except (ProcessLookupError, PermissionError): pass # Process already finished or we lack permissions # Use os._exit for a hard exit that doesn't run cleanup handlers, # which can deadlock if locks are held. os._exit(1) signal.signal(signal.SIGINT, shutdown_handler) signal.signal(signal.SIGTERM, shutdown_handler) settings = policy.get('settings', {}) exec_control = policy.get('execution_control', {}) mode = settings.get('mode', 'full_stack') orchestration_mode = settings.get('orchestration_mode') # --- Profile Manager Setup for Locking Mode --- profile_manager = None profile_managers = {} if settings.get('profile_mode') == 'from_pool_with_lock': logger.info("--- Profile Locking Mode Enabled ---") logger.info("This mode requires profiles to be set up and managed by the policy enforcer.") logger.info("1. Ensure you have run: bin/setup-profiles-from-policy") logger.info("2. Ensure the policy enforcer is running in the background: bin/ytops-client policy-enforcer --live") logger.info(" (e.g. using policies/8_unified_simulation_enforcer.yaml)") logger.info("3. To monitor profiles, use: bin/ytops-client profile list --live") logger.info("------------------------------------") # Coalesce Redis settings from CLI args, .env file, and defaults redis_host = args.redis_host or os.getenv('REDIS_HOST', os.getenv('MASTER_HOST_IP', 'localhost')) redis_port = args.redis_port if args.redis_port is not None else int(os.getenv('REDIS_PORT', 6379)) redis_password = args.redis_password or os.getenv('REDIS_PASSWORD') redis_db = args.redis_db if args.redis_db is not None else int(os.getenv('REDIS_DB', 0)) sim_params = policy.get('simulation_parameters', {}) def setup_manager(sim_type, env_cli_arg, env_policy_key): # Determine the effective environment name with correct precedence: # 1. Specific CLI arg (e.g., --auth-env) # 2. General CLI arg (--env) # 3. Specific policy setting (e.g., simulation_parameters.auth_env) # 4. General policy setting (simulation_parameters.env) # 5. Hardcoded default ('sim') policy_env = sim_params.get(env_policy_key) default_policy_env = sim_params.get('env') effective_env = env_cli_arg or args.env or policy_env or default_policy_env or 'sim' logger.info(f"Setting up ProfileManager for {sim_type} simulation using env: '{effective_env}'") if args.key_prefix: key_prefix = args.key_prefix else: key_prefix = f"{effective_env}_profile_mgmt_" return ProfileManager( redis_host=redis_host, redis_port=redis_port, redis_password=redis_password, key_prefix=key_prefix, redis_db=redis_db ) # Determine which managers are needed based on mode and orchestration mode needs_auth = False needs_download = False if mode in ['full_stack', 'fetch_only']: needs_auth = True if mode in ['full_stack', 'download_only']: needs_download = True if orchestration_mode == 'direct_batch_cli': direct_policy = policy.get('direct_batch_cli_policy', {}) use_env = direct_policy.get('use_profile_env', 'auth') if use_env == 'download': needs_download = True else: # auth is default needs_auth = True if needs_auth: # For backward compatibility, policy might have 'env' instead of 'auth_env' auth_env_key = 'auth_env' if 'auth_env' in sim_params else 'env' profile_managers['auth'] = setup_manager('Auth', args.auth_env, auth_env_key) if needs_download: download_env_key = 'download_env' if 'download_env' in sim_params else 'env' profile_managers['download'] = setup_manager('Download', args.download_env, download_env_key) # For modes with only one manager, set the legacy `profile_manager` variable # for components that haven't been updated to use the `profile_managers` dict. if len(profile_managers) == 1: profile_manager = list(profile_managers.values())[0] # --- Throughput Orchestration Mode --- if orchestration_mode == 'throughput': logger.info("--- Throughput Orchestration Mode Enabled ---") if mode != 'download_only' or settings.get('profile_mode') != 'from_pool_with_lock': logger.error("Orchestration mode 'throughput' is only compatible with 'download_only' mode and 'from_pool_with_lock' profile mode.") return 1 download_manager = profile_managers.get('download') if not download_manager: logger.error("Throughput mode requires a download profile manager.") return 1 original_workers_setting = exec_control.get('workers') if original_workers_setting == 'auto': d_policy = policy.get('download_policy', {}) profile_prefix = d_policy.get('profile_prefix') if not profile_prefix: logger.error("Cannot calculate 'auto' workers for throughput mode without 'download_policy.profile_prefix'.") return 1 all_profiles = download_manager.list_profiles() matching_profiles = [p for p in all_profiles if p['name'].startswith(profile_prefix)] calculated_workers = len(matching_profiles) if calculated_workers == 0: logger.error(f"Cannot use 'auto' workers: No profiles found with prefix '{profile_prefix}'. Please run setup-profiles.") return 1 exec_control['workers'] = calculated_workers logger.info(f"Calculated 'auto' workers for throughput mode: {calculated_workers} (based on {len(matching_profiles)} profiles with prefix '{profile_prefix}').") sp_utils.display_effective_policy(policy, policy_name, args, sources=[], original_workers_setting=original_workers_setting) if args.dry_run: return 0 workers = exec_control.get('workers', 1) with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: futures = [ executor.submit(run_throughput_worker, i, policy, state_manager, args, download_manager, running_processes, process_lock) for i in range(workers) ] # Wait for shutdown signal shutdown_event.wait() logger.info("Shutdown signal received, waiting for throughput workers to finish current tasks...") # The workers will exit their loops upon seeing the shutdown_event. # We don't need complex shutdown logic here; the main `finally` block will handle summary. concurrent.futures.wait(futures) # In this mode, the main loop is handled by workers. So we return here. state_manager.print_summary(policy) state_manager.close() return 0 # --- Direct Batch CLI Orchestration Mode --- elif orchestration_mode == 'direct_batch_cli': logger.info("--- Direct Batch CLI Orchestration Mode Enabled ---") if mode != 'fetch_only' or settings.get('profile_mode') != 'from_pool_with_lock': logger.error("Orchestration mode 'direct_batch_cli' is only compatible with 'fetch_only' mode and 'from_pool_with_lock' profile mode.") return 1 direct_policy = policy.get('direct_batch_cli_policy', {}) use_env = direct_policy.get('use_profile_env', 'auth') # Default to auth for backward compatibility profile_manager_instance = profile_managers.get(use_env) if not profile_manager_instance: logger.error(f"Direct batch CLI mode requires a '{use_env}' profile manager, but it was not configured.") logger.error("Check 'simulation_parameters' in your policy and the 'mode' setting.") return 1 urls_file = settings.get('urls_file') if not urls_file: logger.error("Direct batch CLI mode requires 'settings.urls_file'.") return 1 try: with open(urls_file, 'r', encoding='utf-8') as f: urls_list = [line.strip() for line in f if line.strip()] except IOError as e: logger.error(f"Could not read urls_file '{urls_file}': {e}") return 1 if not urls_list: logger.error(f"URL file '{urls_file}' is empty. Nothing to do.") return 1 # Handle starting from a specific index start_index = 0 if args.start_from_url_index is not None: start_index = max(0, args.start_from_url_index - 1) state_manager.update_last_url_index(start_index, force=True) else: start_index = state_manager.get_last_url_index() if start_index >= len(urls_list) and len(urls_list) > 0: logger.warning("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") logger.warning("!!! ALL URLS HAVE BEEN PROCESSED IN PREVIOUS RUNS (based on state file) !!!") logger.warning(f"!!! State file indicates start index {start_index + 1}, but URL file has only {len(urls_list)} URLs. !!!") logger.warning("!!! Deleting state file and stopping. Please run the command again to start from the beginning. !!!") logger.warning("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") if not args.dry_run and not args.disable_log_writing: state_manager.close() # ensure it's closed before deleting try: os.remove(state_manager.state_file_path) logger.info(f"Deleted state file: {state_manager.state_file_path}") except OSError as e: logger.error(f"Failed to delete state file: {e}") else: logger.info("[Dry Run] Would have deleted state file and stopped.") return 0 # Stop execution. if start_index > 0: logger.info(f"Starting/resuming from URL index {start_index + 1}.") # The worker's get_next_url_batch will respect this starting index. sp_utils.display_effective_policy(policy, policy_name, args, sources=urls_list) if args.dry_run: return 0 # --- Worker Pool Setup (including dynamic discovery) --- worker_pools = exec_control.get('worker_pools', []) discovery_config = exec_control.get('worker_pool_discovery') if discovery_config: if worker_pools: logger.warning("Both 'worker_pools' and 'worker_pool_discovery' are defined. 'worker_pool_discovery' will take precedence.") discovery_pattern = discovery_config.get('profile_prefix_pattern') workers_per_group = discovery_config.get('workers_per_profile_group', 1) direct_policy = policy.get('direct_batch_cli_policy', {}) use_env = direct_policy.get('use_profile_env', 'auth') manager_for_discovery = profile_managers.get(use_env) if not manager_for_discovery: logger.error(f"Could not determine profile manager for worker pool discovery in mode '{orchestration_mode}/{mode}'.") return 1 if not discovery_pattern: logger.error("'worker_pool_discovery' is missing required key 'profile_prefix_pattern'.") return 1 logger.info(f"Discovering worker pools from profile prefixes matching '{discovery_pattern}'...") try: all_profiles = manager_for_discovery.list_profiles() found_prefixes = set() for profile in all_profiles: profile_name = profile['name'] if fnmatch.fnmatch(profile_name, discovery_pattern): # Assuming standard name format like 'user31_001', extract 'user31' prefix = profile_name.rsplit('_', 1)[0] found_prefixes.add(prefix) if not found_prefixes: logger.warning(f"Worker pool discovery found no profiles matching pattern '{discovery_pattern}'. No workers will be started.") worker_pools = [] else: worker_pools = [] for prefix in sorted(list(found_prefixes)): worker_pools.append({ 'profile_prefix': prefix, 'workers': workers_per_group }) logger.info(f"Discovered {len(found_prefixes)} profile groups, creating {workers_per_group} worker(s) for each: {', '.join(sorted(list(found_prefixes)))}") except Exception as e: logger.error(f"Failed to discover profile groups from Redis: {e}", exc_info=True) return 1 if not worker_pools and exec_control.get('workers'): # Fallback for legacy 'workers: N' config prefix = policy.get('info_json_generation_policy', {}).get('profile_prefix') worker_pools.append({'workers': exec_control.get('workers'), 'profile_prefix': prefix or 'user'}) if not worker_pools: logger.error("No workers configured. Use 'execution_control.worker_pools' or 'worker_pool_discovery'.") return 1 if args.profile_prefix: logger.info(f"CLI --profile-prefix '{args.profile_prefix}' is set, it will override any prefixes defined in worker_pools.") for pool in worker_pools: pool['profile_prefix'] = args.profile_prefix worker_specs = [] worker_id_counter = 0 for pool in worker_pools: pool_workers = pool.get('workers', 1) prefix_str = pool.get('profile_prefix', '') prefixes = [p.strip() for p in prefix_str.split(',') if p.strip()] if not prefixes: logger.warning(f"Worker pool has no profile_prefix. Skipping: {pool}") continue for i in range(pool_workers): assigned_prefix = prefixes[i % len(prefixes)] worker_policy = deepcopy(policy) worker_policy.setdefault('info_json_generation_policy', {})['profile_prefix'] = assigned_prefix worker_specs.append({ 'func': run_direct_batch_worker, 'args': (worker_id_counter, worker_policy, state_manager, args, profile_manager_instance, urls_list, running_processes, process_lock) }) worker_id_counter += 1 with concurrent.futures.ThreadPoolExecutor(max_workers=len(worker_specs)) as executor: futures = [executor.submit(spec['func'], *spec['args']) for spec in worker_specs] # Wait for all workers to complete. They will exit their loops when no URLs are left. concurrent.futures.wait(futures) if shutdown_event.is_set(): logger.info("Shutdown signal received, workers have finished.") state_manager.print_summary(policy) state_manager.close() return 0 # --- Direct Docker CLI Orchestration Mode --- elif orchestration_mode == 'direct_docker_cli': logger.info("--- Direct Docker CLI Orchestration Mode Enabled ---") if not docker: logger.error("The 'direct_docker_cli' orchestration mode requires the Docker SDK for Python.") logger.error("Please install it with: pip install docker") return 1 if mode not in ['fetch_only', 'download_only'] or settings.get('profile_mode') != 'from_pool_with_lock': logger.error("Orchestration mode 'direct_docker_cli' is only compatible with 'fetch_only' or 'download_only' modes and 'from_pool_with_lock' profile mode.") return 1 direct_policy = policy.get('direct_docker_cli_policy', {}) use_env = direct_policy.get('use_profile_env', 'auth' if mode == 'fetch_only' else 'download') profile_manager_instance = profile_managers.get(use_env) if not profile_manager_instance: logger.error(f"Direct docker CLI mode requires a '{use_env}' profile manager, but it was not configured.") return 1 workers = exec_control.get('workers', 1) if mode == 'fetch_only': urls_file = settings.get('urls_file') if not urls_file: logger.error("Direct docker CLI (fetch) mode requires 'settings.urls_file'.") return 1 try: with open(urls_file, 'r', encoding='utf-8') as f: urls_list = [line.strip() for line in f if line.strip()] except IOError as e: logger.error(f"Could not read urls_file '{urls_file}': {e}") return 1 if not urls_list: logger.error(f"URL file '{urls_file}' is empty. Nothing to do.") return 1 start_index = 0 if args.start_from_url_index is not None: start_index = max(0, args.start_from_url_index - 1) state_manager.update_last_url_index(start_index, force=True) else: start_index = state_manager.get_last_url_index() if start_index >= len(urls_list) and len(urls_list) > 0: logger.warning("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") logger.warning("!!! ALL URLS HAVE BEEN PROCESSED IN PREVIOUS RUNS (based on state file) !!!") logger.warning(f"!!! State file indicates start index {start_index + 1}, but URL file has only {len(urls_list)} URLs. !!!") logger.warning("!!! Deleting state file and stopping. Please run the command again to start from the beginning. !!!") logger.warning("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") if not args.dry_run and not args.disable_log_writing: state_manager.close() try: os.remove(state_manager.state_file_path) logger.info(f"Deleted state file: {state_manager.state_file_path}") except OSError as e: logger.error(f"Failed to delete state file: {e}") else: logger.info("[Dry Run] Would have deleted state file and stopped.") return 0 if start_index > 0: logger.info(f"Starting/resuming from URL index {start_index + 1}.") sp_utils.display_effective_policy(policy, policy_name, args, sources=urls_list) if args.dry_run: return 0 # --- Worker Pool Setup (including dynamic discovery) --- worker_pools = exec_control.get('worker_pools', []) discovery_config = exec_control.get('worker_pool_discovery') if discovery_config: if worker_pools: logger.warning("Both 'worker_pools' and 'worker_pool_discovery' are defined. 'worker_pool_discovery' will take precedence.") discovery_pattern = discovery_config.get('profile_prefix_pattern') workers_per_group = discovery_config.get('workers_per_profile_group', 1) direct_policy = policy.get('direct_docker_cli_policy', {}) use_env = direct_policy.get('use_profile_env', 'auth' if mode == 'fetch_only' else 'download') manager_for_discovery = profile_managers.get(use_env) if not manager_for_discovery: logger.error(f"Could not determine profile manager for worker pool discovery in mode '{orchestration_mode}/{mode}'.") return 1 if not discovery_pattern: logger.error("'worker_pool_discovery' is missing required key 'profile_prefix_pattern'.") return 1 logger.info(f"Discovering worker pools from profile prefixes matching '{discovery_pattern}'...") try: all_profiles = manager_for_discovery.list_profiles() found_prefixes = set() for profile in all_profiles: profile_name = profile['name'] if fnmatch.fnmatch(profile_name, discovery_pattern): # Assuming standard name format like 'user31_001', extract 'user31' prefix = profile_name.rsplit('_', 1)[0] found_prefixes.add(prefix) if not found_prefixes: logger.warning(f"Worker pool discovery found no profiles matching pattern '{discovery_pattern}'. No workers will be started.") worker_pools = [] else: worker_pools = [] for prefix in sorted(list(found_prefixes)): worker_pools.append({ 'profile_prefix': prefix, 'workers': workers_per_group }) logger.info(f"Discovered {len(found_prefixes)} profile groups, creating {workers_per_group} worker(s) for each: {', '.join(sorted(list(found_prefixes)))}") except Exception as e: logger.error(f"Failed to discover profile groups from Redis: {e}", exc_info=True) return 1 if not worker_pools and exec_control.get('workers'): # Fallback for legacy 'workers: N' config prefix = policy.get('info_json_generation_policy', {}).get('profile_prefix') worker_pools.append({'workers': exec_control.get('workers'), 'profile_prefix': prefix or 'user'}) if not worker_pools: logger.error("No workers configured. Use 'execution_control.worker_pools' or 'worker_pool_discovery'.") return 1 if args.profile_prefix: logger.info(f"CLI --profile-prefix '{args.profile_prefix}' is set, it will override any prefixes defined in worker_pools.") for pool in worker_pools: pool['profile_prefix'] = args.profile_prefix worker_specs = [] worker_id_counter = 0 for pool in worker_pools: pool_workers = pool.get('workers', 1) prefix_str = pool.get('profile_prefix', '') prefixes = [p.strip() for p in prefix_str.split(',') if p.strip()] if not prefixes: logger.warning(f"Worker pool has no profile_prefix. Skipping: {pool}") continue # Each worker in the pool gets the full list of prefixes from the pool configuration. for i in range(pool_workers): worker_policy = deepcopy(policy) # The worker functions will now handle a comma-separated list of prefixes. worker_policy.setdefault('info_json_generation_policy', {})['profile_prefix'] = prefix_str worker_specs.append({ 'func': run_direct_docker_worker, 'args': (worker_id_counter, worker_policy, state_manager, args, profile_manager_instance, urls_list, running_processes, process_lock) }) worker_id_counter += 1 with concurrent.futures.ThreadPoolExecutor(max_workers=len(worker_specs)) as executor: futures = [executor.submit(spec['func'], *spec['args']) for spec in worker_specs] # This worker runs until shutdown, like the download worker shutdown_event.wait() logger.info("Shutdown signal received, waiting for direct docker workers to finish...") concurrent.futures.wait(futures) elif mode == 'download_only': info_json_dir = settings.get('info_json_dir') if not info_json_dir: logger.error("Direct docker CLI (download) mode requires 'settings.info_json_dir'.") return 1 try: os.makedirs(info_json_dir, exist_ok=True) except OSError as e: logger.error(f"Failed to create info.json directory '{info_json_dir}': {e}") return 1 sp_utils.display_effective_policy(policy, policy_name, args, sources=[]) if args.dry_run: return 0 # --- Worker Pool Setup (including dynamic discovery) --- worker_pools = exec_control.get('worker_pools', []) discovery_config = exec_control.get('worker_pool_discovery') if discovery_config: if worker_pools: logger.warning("Both 'worker_pools' and 'worker_pool_discovery' are defined. 'worker_pool_discovery' will take precedence.") discovery_pattern = discovery_config.get('profile_prefix_pattern') workers_per_group = discovery_config.get('workers_per_profile_group', 1) direct_policy = policy.get('direct_docker_cli_policy', {}) use_env = direct_policy.get('use_profile_env', 'auth' if mode == 'fetch_only' else 'download') manager_for_discovery = profile_managers.get(use_env) if not manager_for_discovery: logger.error(f"Could not determine profile manager for worker pool discovery in mode '{orchestration_mode}/{mode}'.") return 1 if not discovery_pattern: logger.error("'worker_pool_discovery' is missing required key 'profile_prefix_pattern'.") return 1 logger.info(f"Discovering worker pools from profile prefixes matching '{discovery_pattern}'...") try: all_profiles = manager_for_discovery.list_profiles() found_prefixes = set() for profile in all_profiles: profile_name = profile['name'] if fnmatch.fnmatch(profile_name, discovery_pattern): # Assuming standard name format like 'user31_001', extract 'user31' prefix = profile_name.rsplit('_', 1)[0] found_prefixes.add(prefix) if not found_prefixes: logger.warning(f"Worker pool discovery found no profiles matching pattern '{discovery_pattern}'. No workers will be started.") worker_pools = [] else: worker_pools = [] for prefix in sorted(list(found_prefixes)): worker_pools.append({ 'profile_prefix': prefix, 'workers': workers_per_group }) logger.info(f"Discovered {len(found_prefixes)} profile groups, creating {workers_per_group} worker(s) for each: {', '.join(sorted(list(found_prefixes)))}") except Exception as e: logger.error(f"Failed to discover profile groups from Redis: {e}", exc_info=True) return 1 if not worker_pools and exec_control.get('workers'): # Fallback for legacy 'workers: N' config prefix = policy.get('download_policy', {}).get('profile_prefix') worker_pools.append({'workers': exec_control.get('workers'), 'profile_prefix': prefix or 'user'}) if not worker_pools: logger.error("No workers configured. Use 'execution_control.worker_pools' or 'worker_pool_discovery'.") return 1 if args.profile_prefix: logger.info(f"CLI --profile-prefix '{args.profile_prefix}' is set, it will override any prefixes defined in worker_pools.") for pool in worker_pools: pool['profile_prefix'] = args.profile_prefix worker_specs = [] worker_id_counter = 0 for pool in worker_pools: pool_workers = pool.get('workers', 1) prefix_str = pool.get('profile_prefix', '') prefixes = [p.strip() for p in prefix_str.split(',') if p.strip()] if not prefixes: logger.warning(f"Worker pool has no profile_prefix. Skipping: {pool}") continue # Each worker in the pool gets the full list of prefixes from the pool configuration. for i in range(pool_workers): worker_policy = deepcopy(policy) # The worker functions will now handle a comma-separated list of prefixes. worker_policy.setdefault('download_policy', {})['profile_prefix'] = prefix_str worker_specs.append({ 'func': run_direct_docker_download_worker, 'args': (worker_id_counter, worker_policy, state_manager, args, profile_manager_instance, running_processes, process_lock) }) worker_id_counter += 1 with concurrent.futures.ThreadPoolExecutor(max_workers=len(worker_specs)) as executor: futures = [executor.submit(spec['func'], *spec['args']) for spec in worker_specs] # This worker runs until shutdown shutdown_event.wait() logger.info("Shutdown signal received, waiting for direct docker download workers to finish...") concurrent.futures.wait(futures) state_manager.print_summary(policy) state_manager.close() return 0 # --- Direct Download CLI Orchestration Mode --- elif orchestration_mode == 'direct_download_cli': logger.info("--- Direct Download CLI Orchestration Mode Enabled ---") if mode != 'download_only' or settings.get('profile_mode') != 'from_pool_with_lock': logger.error("Orchestration mode 'direct_download_cli' is only compatible with 'download_only' mode and 'from_pool_with_lock' profile mode.") return 1 download_manager = profile_managers.get('download') if not download_manager: logger.error("Direct download CLI mode requires a download profile manager.") return 1 info_json_dir = settings.get('info_json_dir') if not info_json_dir: logger.error("Direct download CLI mode requires 'settings.info_json_dir'.") return 1 try: os.makedirs(info_json_dir, exist_ok=True) except OSError as e: logger.error(f"Failed to create info.json directory '{info_json_dir}': {e}") return 1 sp_utils.display_effective_policy(policy, policy_name, args, sources=[]) if args.dry_run: return 0 # --- Worker Pool Setup (including dynamic discovery) --- worker_pools = exec_control.get('worker_pools', []) discovery_config = exec_control.get('worker_pool_discovery') if discovery_config: if worker_pools: logger.warning("Both 'worker_pools' and 'worker_pool_discovery' are defined. 'worker_pool_discovery' will take precedence.") discovery_pattern = discovery_config.get('profile_prefix_pattern') workers_per_group = discovery_config.get('workers_per_profile_group', 1) manager_for_discovery = profile_managers.get('download') if not manager_for_discovery: logger.error(f"Could not determine profile manager for worker pool discovery in mode '{orchestration_mode}/{mode}'.") return 1 if not discovery_pattern: logger.error("'worker_pool_discovery' is missing required key 'profile_prefix_pattern'.") return 1 logger.info(f"Discovering worker pools from profile prefixes matching '{discovery_pattern}'...") try: all_profiles = manager_for_discovery.list_profiles() found_prefixes = set() for profile in all_profiles: profile_name = profile['name'] if fnmatch.fnmatch(profile_name, discovery_pattern): # Assuming standard name format like 'user31_001', extract 'user31' prefix = profile_name.rsplit('_', 1)[0] found_prefixes.add(prefix) if not found_prefixes: logger.warning(f"Worker pool discovery found no profiles matching pattern '{discovery_pattern}'. No workers will be started.") worker_pools = [] else: worker_pools = [] for prefix in sorted(list(found_prefixes)): worker_pools.append({ 'profile_prefix': prefix, 'workers': workers_per_group }) logger.info(f"Discovered {len(found_prefixes)} profile groups, creating {workers_per_group} worker(s) for each: {', '.join(sorted(list(found_prefixes)))}") except Exception as e: logger.error(f"Failed to discover profile groups from Redis: {e}", exc_info=True) return 1 if not worker_pools and exec_control.get('workers'): # Fallback for legacy 'workers: N' config prefix = policy.get('download_policy', {}).get('profile_prefix') worker_pools.append({'workers': exec_control.get('workers'), 'profile_prefix': prefix or 'user'}) if not worker_pools: logger.error("No workers configured. Use 'execution_control.worker_pools' or 'worker_pool_discovery'.") return 1 if args.profile_prefix: logger.info(f"CLI --profile-prefix '{args.profile_prefix}' is set, it will override any prefixes defined in worker_pools.") for pool in worker_pools: pool['profile_prefix'] = args.profile_prefix worker_specs = [] worker_id_counter = 0 for pool in worker_pools: pool_workers = pool.get('workers', 1) prefix_str = pool.get('profile_prefix', '') prefixes = [p.strip() for p in prefix_str.split(',') if p.strip()] if not prefixes: logger.warning(f"Worker pool has no profile_prefix. Skipping: {pool}") continue # Each worker in the pool gets the full list of prefixes from the pool configuration. for i in range(pool_workers): worker_policy = deepcopy(policy) # The worker functions will now handle a comma-separated list of prefixes. worker_policy.setdefault('download_policy', {})['profile_prefix'] = prefix_str worker_specs.append({ 'func': run_direct_download_worker, 'args': (worker_id_counter, worker_policy, state_manager, args, download_manager, running_processes, process_lock) }) worker_id_counter += 1 with concurrent.futures.ThreadPoolExecutor(max_workers=len(worker_specs)) as executor: futures = [executor.submit(spec['func'], *spec['args']) for spec in worker_specs] shutdown_event.wait() logger.info("Shutdown signal received, waiting for direct download workers to finish...") concurrent.futures.wait(futures) state_manager.print_summary(policy) state_manager.close() return 0 # --- Queue Auth Orchestration Mode --- elif orchestration_mode == 'queue_auth': logger.info("--- Queue Auth Orchestration Mode Enabled ---") if mode != 'fetch_only' or settings.get('profile_mode') != 'from_pool_with_lock': logger.error("Orchestration mode 'queue_auth' is only compatible with 'fetch_only' mode and 'from_pool_with_lock' profile mode.") return 1 auth_manager = profile_managers.get('auth') if not auth_manager: logger.error("Queue auth mode requires an auth profile manager.") return 1 # Initialize queue provider queue_policy = policy.get('queue_policy', {}) redis_host = args.redis_host or os.getenv('REDIS_HOST') or queue_policy.get('redis_host') or 'localhost' redis_port = args.redis_port if args.redis_port is not None else (int(os.getenv('REDIS_PORT')) if os.getenv('REDIS_PORT') else (queue_policy.get('redis_port') or 6379)) redis_password = args.redis_password or os.getenv('REDIS_PASSWORD') or queue_policy.get('redis_password') redis_db = args.redis_db if args.redis_db is not None else (int(os.getenv('REDIS_DB')) if os.getenv('REDIS_DB') else (queue_policy.get('redis_db') or 0)) # Extract env from manager's key prefix, unless disabled by policy use_env_prefix = queue_policy.get('use_env_prefix', True) env_prefix = None if use_env_prefix: env_prefix = auth_manager.key_prefix.removesuffix('_profile_mgmt_') state_manager.initialize_queue_provider( redis_host=redis_host, redis_port=redis_port, redis_password=redis_password, redis_db=redis_db, env_prefix=env_prefix ) # Create save directory if specified save_dir = settings.get('save_info_json_dir') if save_dir: try: os.makedirs(save_dir, exist_ok=True) logger.info(f"Created save directory for info.json files: {save_dir}") except OSError as e: logger.error(f"Failed to create save directory '{save_dir}': {e}") return 1 # In dummy mode, do not populate the queue. Warn the user instead. if args.dummy or args.dummy_batch: input_queue = queue_policy.get('input_queue', 'queue2_auth_inbox') logger.warning("--- Dummy Mode Notice ---") logger.warning(f"Dummy mode is enabled. The tool will NOT automatically populate the queue.") logger.warning(f"Please ensure the input queue '{input_queue}' contains tasks for the workers to process.") logger.warning(f"You can populate it using another tool, for example: ./bin/ytops-client queue push {input_queue} --payload-json '{{\"url\":\"https://youtu.be/dQw4w9WgXcQ\"}}' --count 100") logger.warning("-------------------------") # Requeue failed tasks if requested if args.requeue_failed: requeued = state_manager.requeue_failed_auth_tasks( batch_size=queue_policy.get('requeue_batch_size', 100) ) logger.info(f"Requeued {requeued} failed authentication tasks.") sp_utils.display_effective_policy(policy, policy_name, args, sources=[]) if args.dry_run: return 0 # --- Worker Pool Setup (including dynamic discovery) --- worker_pools = exec_control.get('worker_pools', []) discovery_config = exec_control.get('worker_pool_discovery') if discovery_config: if worker_pools: logger.warning("Both 'worker_pools' and 'worker_pool_discovery' are defined. 'worker_pool_discovery' will take precedence.") discovery_pattern = discovery_config.get('profile_prefix_pattern') workers_per_group = discovery_config.get('workers_per_profile_group', 1) manager_for_discovery = profile_managers.get('auth') if not manager_for_discovery: logger.error(f"Could not determine profile manager for worker pool discovery in mode '{orchestration_mode}/{mode}'.") return 1 if not discovery_pattern: logger.error("'worker_pool_discovery' is missing required key 'profile_prefix_pattern'.") return 1 logger.info(f"Discovering worker pools from profile prefixes matching '{discovery_pattern}'...") try: all_profiles = manager_for_discovery.list_profiles() found_prefixes = set() for profile in all_profiles: profile_name = profile['name'] if fnmatch.fnmatch(profile_name, discovery_pattern): # Assuming standard name format like 'user31_001', extract 'user31' prefix = profile_name.rsplit('_', 1)[0] found_prefixes.add(prefix) if not found_prefixes: logger.warning(f"Worker pool discovery found no profiles matching pattern '{discovery_pattern}'. No workers will be started.") worker_pools = [] else: worker_pools = [] for prefix in sorted(list(found_prefixes)): worker_pools.append({ 'profile_prefix': prefix, 'workers': workers_per_group }) logger.info(f"Discovered {len(found_prefixes)} profile groups, creating {workers_per_group} worker(s) for each: {', '.join(sorted(list(found_prefixes)))}") except Exception as e: logger.error(f"Failed to discover profile groups from Redis: {e}", exc_info=True) return 1 if not worker_pools and exec_control.get('workers'): # Fallback for legacy 'workers: N' config prefix = policy.get('info_json_generation_policy', {}).get('profile_prefix') worker_pools.append({'workers': exec_control.get('workers'), 'profile_prefix': prefix or 'user'}) if not worker_pools: logger.error("No workers configured. Use 'execution_control.worker_pools' or 'worker_pool_discovery'.") return 1 if args.profile_prefix: logger.info(f"CLI --profile-prefix '{args.profile_prefix}' is set, it will override any prefixes defined in worker_pools.") for pool in worker_pools: pool['profile_prefix'] = args.profile_prefix worker_specs = [] worker_id_counter = 0 for pool in worker_pools: pool_workers = pool.get('workers', 1) prefix_str = pool.get('profile_prefix', '') prefixes = [p.strip() for p in prefix_str.split(',') if p.strip()] if not prefixes: logger.warning(f"Worker pool has no profile_prefix. Skipping: {pool}") continue # Each worker in the pool gets the full list of prefixes from the pool configuration. for i in range(pool_workers): worker_policy = deepcopy(policy) # The worker functions will now handle a comma-separated list of prefixes. worker_policy.setdefault('info_json_generation_policy', {})['profile_prefix'] = prefix_str worker_specs.append({ 'func': run_queue_auth_worker, 'args': (worker_id_counter, worker_policy, state_manager, args, auth_manager, running_processes, process_lock) }) worker_id_counter += 1 with concurrent.futures.ThreadPoolExecutor(max_workers=len(worker_specs)) as executor: futures = [executor.submit(spec['func'], *spec['args']) for spec in worker_specs] shutdown_event.wait() logger.info("Shutdown signal received, waiting for queue auth workers to finish...") concurrent.futures.wait(futures) state_manager.print_summary(policy) state_manager.close() return 0 # --- Queue Download Orchestration Mode --- elif orchestration_mode == 'queue_download': logger.info("--- Queue Download Orchestration Mode Enabled ---") if mode != 'download_only' or settings.get('profile_mode') != 'from_pool_with_lock': logger.error("Orchestration mode 'queue_download' is only compatible with 'download_only' mode and 'from_pool_with_lock' profile mode.") return 1 download_manager = profile_managers.get('download') if not download_manager: logger.error("Queue download mode requires a download profile manager.") return 1 # Initialize queue provider queue_policy = policy.get('queue_policy', {}) redis_host = args.redis_host or os.getenv('REDIS_HOST') or queue_policy.get('redis_host') or 'localhost' redis_port = args.redis_port if args.redis_port is not None else (int(os.getenv('REDIS_PORT')) if os.getenv('REDIS_PORT') else (queue_policy.get('redis_port') or 6379)) redis_password = args.redis_password or os.getenv('REDIS_PASSWORD') or queue_policy.get('redis_password') redis_db = args.redis_db if args.redis_db is not None else (int(os.getenv('REDIS_DB')) if os.getenv('REDIS_DB') else (queue_policy.get('redis_db') or 0)) # Extract env from manager's key prefix, unless disabled by policy use_env_prefix = queue_policy.get('use_env_prefix', True) env_prefix = None if use_env_prefix: env_prefix = download_manager.key_prefix.removesuffix('_profile_mgmt_') state_manager.initialize_queue_provider( redis_host=redis_host, redis_port=redis_port, redis_password=redis_password, redis_db=redis_db, env_prefix=env_prefix ) # In dummy mode, do not populate the queue. Warn the user instead. if args.dummy or args.dummy_batch: input_queue = queue_policy.get('input_queue', 'queue2_dl_inbox') logger.warning("--- Dummy Mode Notice ---") logger.warning(f"Dummy mode is enabled. The tool will NOT automatically populate the queue.") logger.warning(f"Please ensure the input queue '{input_queue}' contains tasks for the workers to process.") logger.warning("-------------------------") # Get download policy d_policy = policy.get('download_policy', {}) # Create output directory if specified output_dir = d_policy.get('output_dir') if output_dir: try: os.makedirs(output_dir, exist_ok=True) logger.info(f"Created output directory for downloads: {output_dir}") except OSError as e: logger.error(f"Failed to create output directory '{output_dir}': {e}") return 1 # Requeue failed tasks if requested if args.requeue_failed: requeued = state_manager.requeue_failed_download_tasks( batch_size=queue_policy.get('requeue_batch_size', 100) ) logger.info(f"Requeued {requeued} failed download tasks.") sp_utils.display_effective_policy(policy, policy_name, args, sources=[]) if args.dry_run: return 0 # --- Worker Pool Setup (including dynamic discovery) --- worker_pools = exec_control.get('worker_pools', []) discovery_config = exec_control.get('worker_pool_discovery') if discovery_config: if worker_pools: logger.warning("Both 'worker_pools' and 'worker_pool_discovery' are defined. 'worker_pool_discovery' will take precedence.") discovery_pattern = discovery_config.get('profile_prefix_pattern') workers_per_group = discovery_config.get('workers_per_profile_group', 1) manager_for_discovery = profile_managers.get('download') if not manager_for_discovery: logger.error(f"Could not determine profile manager for worker pool discovery in mode '{orchestration_mode}/{mode}'.") return 1 if not discovery_pattern: logger.error("'worker_pool_discovery' is missing required key 'profile_prefix_pattern'.") return 1 logger.info(f"Discovering worker pools from profile prefixes matching '{discovery_pattern}'...") try: all_profiles = manager_for_discovery.list_profiles() found_prefixes = set() for profile in all_profiles: profile_name = profile['name'] if fnmatch.fnmatch(profile_name, discovery_pattern): # Assuming standard name format like 'user31_001', extract 'user31' prefix = profile_name.rsplit('_', 1)[0] found_prefixes.add(prefix) if not found_prefixes: logger.warning(f"Worker pool discovery found no profiles matching pattern '{discovery_pattern}'. No workers will be started.") worker_pools = [] else: worker_pools = [] for prefix in sorted(list(found_prefixes)): worker_pools.append({ 'profile_prefix': prefix, 'workers': workers_per_group }) logger.info(f"Discovered {len(found_prefixes)} profile groups, creating {workers_per_group} worker(s) for each: {', '.join(sorted(list(found_prefixes)))}") except Exception as e: logger.error(f"Failed to discover profile groups from Redis: {e}", exc_info=True) return 1 if not worker_pools and exec_control.get('workers'): # Fallback for legacy 'workers: N' config prefix = policy.get('download_policy', {}).get('profile_prefix') worker_pools.append({'workers': exec_control.get('workers'), 'profile_prefix': prefix or 'user'}) if not worker_pools: logger.error("No workers configured. Use 'execution_control.worker_pools' or 'worker_pool_discovery'.") return 1 if args.profile_prefix: logger.info(f"CLI --profile-prefix '{args.profile_prefix}' is set, it will override any prefixes defined in worker_pools.") for pool in worker_pools: pool['profile_prefix'] = args.profile_prefix worker_specs = [] worker_id_counter = 0 for pool in worker_pools: pool_workers = pool.get('workers', 1) prefix_str = pool.get('profile_prefix', '') prefixes = [p.strip() for p in prefix_str.split(',') if p.strip()] if not prefixes: logger.warning(f"Worker pool has no profile_prefix. Skipping: {pool}") continue # Each worker in the pool gets the full list of prefixes from the pool configuration. for i in range(pool_workers): worker_policy = deepcopy(policy) # The worker functions will now handle a comma-separated list of prefixes. worker_policy.setdefault('download_policy', {})['profile_prefix'] = prefix_str worker_specs.append({ 'func': run_queue_download_worker, 'args': (worker_id_counter, worker_policy, state_manager, args, download_manager, running_processes, process_lock) }) worker_id_counter += 1 with concurrent.futures.ThreadPoolExecutor(max_workers=len(worker_specs)) as executor: futures = [executor.submit(spec['func'], *spec['args']) for spec in worker_specs] shutdown_event.wait() logger.info("Shutdown signal received, waiting for queue download workers to finish...") concurrent.futures.wait(futures) state_manager.print_summary(policy) state_manager.close() return 0 # --- Queue Full Stack Orchestration Mode --- elif orchestration_mode == 'queue_full_stack': logger.info("--- Queue Full Stack Orchestration Mode Enabled ---") if mode != 'full_stack' or settings.get('profile_mode') != 'from_pool_with_lock': logger.error("Orchestration mode 'queue_full_stack' is only compatible with 'full_stack' mode and 'from_pool_with_lock' profile mode.") return 1 auth_manager = profile_managers.get('auth') if not auth_manager: logger.error("Queue full stack mode requires an auth profile manager.") return 1 download_manager = profile_managers.get('download') if not download_manager: logger.error("Queue full stack mode requires a download profile manager.") return 1 # Initialize queue provider queue_policy = policy.get('queue_policy', {}) redis_host = args.redis_host or os.getenv('REDIS_HOST') or queue_policy.get('redis_host') or 'localhost' redis_port = args.redis_port if args.redis_port is not None else (int(os.getenv('REDIS_PORT')) if os.getenv('REDIS_PORT') else (queue_policy.get('redis_port') or 6379)) redis_password = args.redis_password or os.getenv('REDIS_PASSWORD') or queue_policy.get('redis_password') redis_db = args.redis_db if args.redis_db is not None else (int(os.getenv('REDIS_DB')) if os.getenv('REDIS_DB') else (queue_policy.get('redis_db') or 0)) # Extract env from auth manager's key prefix, unless disabled by policy use_env_prefix = queue_policy.get('use_env_prefix', True) env_prefix = None if use_env_prefix: auth_prefix = auth_manager.key_prefix.removesuffix('_profile_mgmt_') download_prefix = download_manager.key_prefix.removesuffix('_profile_mgmt_') if auth_prefix != download_prefix: logger.warning(f"Auth environment ('{auth_prefix}') and Download environment ('{download_prefix}') are different.") logger.warning(f"Using '{auth_prefix}' as the prefix for all shared Redis queues.") env_prefix = auth_prefix state_manager.initialize_queue_provider( redis_host=redis_host, redis_port=redis_port, redis_password=redis_password, redis_db=redis_db, env_prefix=env_prefix ) # Create directories if specified save_dir = settings.get('save_info_json_dir') if save_dir: try: os.makedirs(save_dir, exist_ok=True) logger.info(f"Created save directory for info.json files: {save_dir}") except OSError as e: logger.error(f"Failed to create save directory '{save_dir}': {e}") return 1 output_dir = d_policy.get('output_dir') if output_dir: try: os.makedirs(output_dir, exist_ok=True) logger.info(f"Created output directory for downloads: {output_dir}") except OSError as e: logger.error(f"Failed to create output directory '{output_dir}': {e}") return 1 # Requeue failed tasks if requested if args.requeue_failed: requeued_auth = state_manager.requeue_failed_auth_tasks( batch_size=queue_policy.get('requeue_batch_size', 100) ) requeued_dl = state_manager.requeue_failed_download_tasks( batch_size=queue_policy.get('requeue_batch_size', 100) ) logger.info(f"Requeued {requeued_auth} failed authentication tasks and {requeued_dl} failed download tasks.") sp_utils.display_effective_policy(policy, policy_name, args, sources=[]) if args.dry_run: return 0 # Start both auth and download workers auth_workers = exec_control.get('auth_workers', 1) download_workers = exec_control.get('download_workers', 2) with concurrent.futures.ThreadPoolExecutor(max_workers=auth_workers + download_workers) as executor: # Start auth workers auth_futures = [ executor.submit(run_queue_auth_worker, i, policy, state_manager, args, auth_manager, running_processes, process_lock) for i in range(auth_workers) ] # Start download workers dl_futures = [ executor.submit(run_queue_download_worker, i + auth_workers, policy, state_manager, args, download_manager, running_processes, process_lock) for i in range(download_workers) ] # Start requeue task if configured requeue_interval = queue_policy.get('requeue_interval_seconds') requeue_enabled = queue_policy.get('requeue_failed_tasks', False) if requeue_enabled and requeue_interval: def requeue_task(): while not shutdown_event.is_set(): time.sleep(requeue_interval) if shutdown_event.is_set(): break try: requeued_auth = state_manager.requeue_failed_auth_tasks( batch_size=queue_policy.get('requeue_batch_size', 100) ) requeued_dl = state_manager.requeue_failed_download_tasks( batch_size=queue_policy.get('requeue_batch_size', 100) ) if requeued_auth > 0 or requeued_dl > 0: logger.info(f"Auto-requeued {requeued_auth} failed auth tasks and {requeued_dl} failed download tasks.") except Exception as e: logger.error(f"Error in auto-requeue task: {e}") requeue_future = executor.submit(requeue_task) all_futures = auth_futures + dl_futures + [requeue_future] else: all_futures = auth_futures + dl_futures # Wait for shutdown signal shutdown_event.wait() logger.info("Shutdown signal received, waiting for queue workers to finish...") concurrent.futures.wait(all_futures) state_manager.print_summary(policy) state_manager.close() return 0 # --- Default (Task-First) Orchestration Mode --- sources = [] # This will be a list of URLs or Path objects if mode in ['full_stack', 'fetch_only']: urls_file = settings.get('urls_file') if not urls_file: logger.error("Policy mode requires 'settings.urls_file'.") return 1 try: with open(urls_file, 'r', encoding='utf-8') as f: content = f.read() try: data = json.loads(content) if isinstance(data, list) and all(isinstance(item, str) for item in data): sources = data logger.info(f"Loaded {len(sources)} URLs/IDs from JSON array in {urls_file}.") else: logger.error(f"URL file '{urls_file}' is valid JSON but not an array of strings.") return 1 except json.JSONDecodeError: sources = [line.strip() for line in content.splitlines() if line.strip()] logger.info(f"Loaded {len(sources)} URLs/IDs from text file {urls_file}.") except IOError as e: logger.error(f"Failed to read urls_file {urls_file}: {e}") return 1 # Clean up URLs/IDs which might have extra quotes, commas, or brackets from copy-pasting cleaned_sources = [] for source in sources: cleaned_source = source.strip().rstrip(',').strip().strip('\'"[]').strip() if cleaned_source: cleaned_sources.append(cleaned_source) if len(cleaned_sources) != len(sources): logger.info(f"Cleaned URL list, removed {len(sources) - len(cleaned_sources)} empty or invalid entries.") sources = cleaned_sources elif mode == 'download_only': # If not in continuous mode, load sources once at the start. # In continuous mode, `sources` is populated at the start of each cycle. if settings.get('directory_scan_mode') != 'continuous': info_json_dir = settings.get('info_json_dir') if not info_json_dir: logger.error("Policy mode 'download_only' requires 'settings.info_json_dir'.") return 1 try: all_files = sorted(Path(info_json_dir).glob('*.json')) sample_percent = settings.get('info_json_dir_sample_percent') if sample_percent and 0 < sample_percent <= 100: sample_count = int(len(all_files) * (sample_percent / 100.0)) num_to_sample = min(len(all_files), max(1, sample_count)) sources = random.sample(all_files, k=num_to_sample) logger.info(f"Randomly sampled {len(sources)} files ({sample_percent}%) from {info_json_dir}") else: sources = all_files except (IOError, FileNotFoundError) as e: logger.error(f"Failed to read info_json_dir {info_json_dir}: {e}") return 1 # In continuous download mode, sources are loaded inside the loop, so we skip this check. if settings.get('directory_scan_mode') != 'continuous' and not sources: logger.error("No sources (URLs or info.json files) to process. Exiting.") return 1 start_index = 0 if mode in ['full_stack', 'fetch_only']: if args.start_from_url_index is not None: # User provided a 1-based index via CLI start_index = max(0, args.start_from_url_index - 1) logger.info(f"Starting from URL index {start_index + 1} as requested by --start-from-url-index.") # When user specifies it, we should overwrite the saved state. state_manager.update_last_url_index(start_index, force=True) else: start_index = state_manager.get_last_url_index() if start_index > 0: logger.info(f"Resuming from URL index {start_index + 1} based on saved state.") if start_index >= len(sources): logger.warning(f"Start index ({start_index + 1}) is beyond the end of the URL list ({len(sources)}). Nothing to process.") sources = [] # --- Auto-calculate workers if needed --- original_workers_setting = exec_control.get('workers') if original_workers_setting == 'auto': # In this simplified model, 'auto' is based on target rate, not profiles. target_rate_cfg = exec_control.get('target_rate', {}) target_reqs = target_rate_cfg.get('requests') target_mins = target_rate_cfg.get('per_minutes') if target_reqs and target_mins and sources: target_rpm = target_reqs / target_mins num_sources = len(sources) sleep_cfg = exec_control.get('sleep_between_tasks', {}) avg_sleep = (sleep_cfg.get('min_seconds', 0) + sleep_cfg.get('max_seconds', 0)) / 2 assumed_task_duration = 12 # Must match assumption in display_effective_policy # Formula: workers = (total_work_seconds) / (total_time_for_work) # total_time_for_work is derived from the target rate: # (total_cycle_time) = (60 * num_sources) / target_rpm # total_time_for_work = total_cycle_time - avg_sleep work_time_available = (60 * num_sources / target_rpm) - avg_sleep if work_time_available <= 0: # The sleep time alone makes the target rate impossible. # Set workers to max parallelism as a best-effort. num_workers = num_sources logger.warning(f"Target rate of {target_rpm} req/min is likely unachievable due to sleep time of {avg_sleep}s.") logger.warning(f"Setting workers to max parallelism ({num_workers}) as a best effort.") else: total_work_seconds = num_sources * assumed_task_duration num_workers = total_work_seconds / work_time_available calculated_workers = max(1, int(num_workers + 0.99)) # Ceiling exec_control['workers'] = calculated_workers logger.info(f"Calculated 'auto' workers based on target rate: {calculated_workers}") else: logger.warning("Cannot calculate 'auto' workers: 'target_rate' or sources are not defined. Defaulting to 1 worker.") exec_control['workers'] = 1 sp_utils.display_effective_policy( policy, policy_name, args, sources=sources, profile_names=None, # Profile grouping is removed original_workers_setting=original_workers_setting ) if args.dry_run: logger.info("Dry run complete. Exiting.") return 0 start_time = time.time() run_until_cfg = exec_control.get('run_until', {}) duration_seconds = (run_until_cfg.get('minutes') or 0) * 60 max_cycles = run_until_cfg.get('cycles') or 0 max_requests = run_until_cfg.get('requests') or 0 # --- Main test loop --- cycles = 0 try: while not shutdown_event.is_set(): if duration_seconds and (time.time() - start_time) > duration_seconds: logger.info("Reached duration limit. Stopping.") break if max_requests > 0 and state_manager.get_request_count() >= max_requests: logger.info(f"Reached max requests ({max_requests}). Stopping.") break # --- Rescan for sources if in continuous download mode --- if mode == 'download_only' and settings.get('directory_scan_mode') == 'continuous': info_json_dir = settings.get('info_json_dir') try: all_files_in_dir = Path(info_json_dir).glob('*.json') processed_files = state_manager.get_processed_files() new_files = [f for f in all_files_in_dir if str(f) not in processed_files] # Sort by modification time, oldest first, to process in order of creation new_files.sort(key=os.path.getmtime) max_files_per_cycle = settings.get('max_files_per_cycle') if max_files_per_cycle and len(new_files) > max_files_per_cycle: sources = new_files[:max_files_per_cycle] else: sources = new_files if not sources: sleep_duration = settings.get('sleep_if_no_new_files_seconds', 10) logger.info(f"No new info.json files found in '{info_json_dir}'. Sleeping for {sleep_duration}s...") # Interruptible sleep sleep_end_time = time.time() + sleep_duration while time.time() < sleep_end_time: if shutdown_event.is_set(): break time.sleep(0.5) if shutdown_event.is_set(): break continue # Skip to next iteration of the while loop except (IOError, FileNotFoundError) as e: logger.error(f"Failed to read info_json_dir {info_json_dir}: {e}. Retrying in 10s.") time.sleep(10) continue # --- Group sources for this cycle --- task_items = sources # If there's nothing to do this cycle, skip. if not task_items: if mode == 'download_only' and settings.get('directory_scan_mode') == 'continuous': # The sleep logic is handled inside the rescanning block. continue else: logger.info("No more sources to process. Ending test.") break cycles += 1 if max_cycles > 0 and cycles > max_cycles: logger.info(f"Reached max cycles ({max_cycles}). Stopping.") break logger.info(f"--- Cycle #{cycles} (Total Requests: {state_manager.get_request_count()}) ---") with concurrent.futures.ThreadPoolExecutor(max_workers=exec_control.get('workers', 1)) as executor: # Submit one task per source URL or info.json file future_to_task_info = { executor.submit(process_task, source, i, cycles, policy, state_manager, args, profile_managers, running_processes, process_lock): { 'source': source, 'abs_index': i } for i, source in enumerate(task_items) if i >= start_index } should_stop = False pending_futures = set(future_to_task_info.keys()) while pending_futures and not should_stop: done, pending_futures = concurrent.futures.wait( pending_futures, return_when=concurrent.futures.FIRST_COMPLETED ) for future in done: if shutdown_event.is_set(): should_stop = True break task_info = future_to_task_info[future] source = task_info['source'] abs_index = task_info.get('abs_index') try: results = future.result() if abs_index is not None and mode in ['full_stack', 'fetch_only']: # Update state to resume from the *next* URL. state_manager.update_last_url_index(abs_index + 1) # --- Mark file as processed --- # This is the central place to mark a source as complete for download_only mode. if mode == 'download_only': # In continuous mode, we add to state file to prevent re-picking in same run. if settings.get('directory_scan_mode') == 'continuous': state_manager.mark_file_as_processed(source) # If marking by rename is on, do that. if settings.get('mark_processed_files'): try: processed_dir = settings.get('processed_files_dir') timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') new_filename = f"{source.name}.{timestamp}.processed" if processed_dir: dest_path = Path(processed_dir) / new_filename dest_path.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(source), str(dest_path)) logger.info(f"Marked '{source.name}' as processed by moving to '{dest_path}'") else: # Fallback to old behavior: rename in place new_path = source.parent / new_filename source.rename(new_path) logger.info(f"Marked '{source.name}' as processed by renaming to '{new_path.name}'") except (IOError, OSError) as e: logger.error(f"Failed to mark processed file '{source.name}': {e}") # When using profile-aware mode, the file processing (including marking as # processed) is handled inside process_profile_task. # For non-profile mode, this logic was incorrect and has been moved. for result in results: if not result['success']: s_conditions = policy.get('stop_conditions', {}) is_cumulative_403_active = s_conditions.get('on_cumulative_403', {}).get('max_errors') if s_conditions.get('on_failure') or \ (s_conditions.get('on_http_403') and not is_cumulative_403_active and result['error_type'] == 'HTTP 403') or \ (s_conditions.get('on_timeout') and result['error_type'] == 'Timeout'): logger.info(f"!!! STOP CONDITION MET: Immediate stop on failure '{result['error_type']}' for {sp_utils.get_display_name(source)}. Shutting down all workers. !!!") should_stop = True break except concurrent.futures.CancelledError: logger.info(f"Task for {sp_utils.get_display_name(source)} was cancelled during shutdown.") event = { 'type': 'fetch' if mode != 'download_only' else 'download', 'path': str(source), 'success': False, 'error_type': 'Cancelled', 'details': 'Task cancelled during shutdown.' } state_manager.log_event(event) except Exception as exc: logger.error(f'{sp_utils.get_display_name(source)} generated an exception: {exc}') if should_stop: break # Check for all stop conditions after each task completes. # 1. Max requests limit if not should_stop and max_requests > 0 and state_manager.get_request_count() >= max_requests: logger.info(f"!!! STOP CONDITION MET: Reached request limit ({max_requests}). Shutting down. !!!") should_stop = True # 2. Duration limit if not should_stop and duration_seconds and (time.time() - start_time) > duration_seconds: logger.info(f"!!! STOP CONDITION MET: Reached duration limit ({run_until_cfg.get('minutes')} minutes). Shutting down. !!!") should_stop = True # 3. Cumulative error rate limits s_conditions = policy.get('stop_conditions', {}) error_rate_policy = s_conditions.get('on_error_rate') if not should_stop and error_rate_policy: max_errors = error_rate_policy.get('max_errors') per_minutes = error_rate_policy.get('per_minutes') if max_errors and per_minutes: error_count = state_manager.check_cumulative_error_rate(max_errors, per_minutes) if error_count > 0: logger.info(f"!!! STOP CONDITION MET: Error rate exceeded ({error_count} errors in last {per_minutes}m). Shutting down. !!!") should_stop = True cumulative_403_policy = s_conditions.get('on_cumulative_403') if not should_stop and cumulative_403_policy: max_errors = cumulative_403_policy.get('max_errors') per_minutes = cumulative_403_policy.get('per_minutes') if max_errors and per_minutes: error_count = state_manager.check_cumulative_error_rate(max_errors, per_minutes, error_type='HTTP 403') if error_count > 0: logger.info(f"!!! STOP CONDITION MET: Cumulative 403 rate exceeded ({error_count} in last {per_minutes}m). Shutting down. !!!") should_stop = True quality_degradation_policy = s_conditions.get('on_quality_degradation') if not should_stop and quality_degradation_policy: max_triggers = quality_degradation_policy.get('max_triggers') per_minutes = quality_degradation_policy.get('per_minutes') if max_triggers and per_minutes: trigger_count = state_manager.check_quality_degradation_rate(max_triggers, per_minutes) if trigger_count > 0: logger.info(f"!!! STOP CONDITION MET: Quality degradation triggered {trigger_count} times in last {per_minutes}m. Shutting down. !!!") should_stop = True if should_stop: break if should_stop and pending_futures: logger.info(f"Cancelling {len(pending_futures)} outstanding task(s).") for future in pending_futures: future.cancel() if should_stop: break if max_cycles > 0 and cycles >= max_cycles: break # If the run is not time-based (i.e., it's limited by cycles or requests) # and it's not a continuous directory scan, we should stop after one pass. # This makes the behavior of --set run_until.requests=N more intuitive: it acts # as an upper limit for a single pass, not a trigger for multiple passes. if settings.get('directory_scan_mode') != 'continuous' and not duration_seconds: logger.info("Run is not time-based. Halting after one full pass through sources.") break logger.info("Cycle complete.") except KeyboardInterrupt: logger.info("\nForceful shutdown requested...") finally: # --- Graceful Shutdown URL Reporting --- if shutdown_event.is_set(): orchestration_mode = settings.get('orchestration_mode') if orchestration_mode in ['direct_batch_cli', 'direct_docker_cli'] and mode == 'fetch_only': urls_file = settings.get('urls_file') # Check if urls_list was loaded for the relevant mode if urls_file and 'urls_list' in locals() and urls_list: last_index = state_manager.get_last_url_index() # The index points to the *next* URL to be processed. # If a batch was aborted, it might have been rewound. # We should save all URLs from this index onwards. if last_index < len(urls_list): unprocessed_urls = urls_list[last_index:] unprocessed_filename = f"unprocessed_urls_{policy_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" try: with open(unprocessed_filename, 'w', encoding='utf-8') as f: f.write('\n'.join(unprocessed_urls)) logger.warning(f"--- GRACEFUL SHUTDOWN ---") logger.warning(f"Saved {len(unprocessed_urls)} unprocessed URLs to '{unprocessed_filename}'.") logger.warning(f"Last processed URL index was {last_index}. Next run should start from index {last_index + 1}.") logger.warning(f"-------------------------") except IOError as e: logger.error(f"Could not save unprocessed URLs: {e}") state_manager.print_summary(policy) state_manager.close() return 0 def process_task(source, index, cycle_num, policy, state_manager, args, profile_managers, running_processes, process_lock): """ Worker task for a single source (URL or info.json path). This function is the main entry point for the 'task-first' orchestration mode. """ settings = policy.get('settings', {}) mode = settings.get('mode', 'full_stack') profile_mode = settings.get('profile_mode') auth_manager = profile_managers.get('auth') download_manager = profile_managers.get('download') # --- Full Stack Mode --- if mode == 'full_stack': # 1. Fetch info.json if not auth_manager: logger.error("Full-stack mode requires an 'auth' profile manager.") return [] # This part of the logic is simplified and does not exist in the provided codebase. # It would involve locking an auth profile, fetching info.json, and then unlocking. # For now, we'll assume a placeholder logic. logger.error("Full-stack mode (task-first) is not fully implemented in this version.") return [] # --- Fetch Only Mode --- elif mode == 'fetch_only': if not auth_manager: logger.error("Fetch-only mode requires an 'auth' profile manager.") return [] logger.error("Fetch-only mode (task-first) is not fully implemented in this version.") return [] # --- Download Only Mode --- elif mode == 'download_only': if profile_mode == 'from_pool_with_lock': if not download_manager: logger.error("Download-only with locking requires a 'download' profile manager.") return [] # In this mode, we process one file per profile. return process_profile_task( profile_name=None, # Profile is locked inside the task file_list=[source], policy=policy, state_manager=state_manager, cycle_num=cycle_num, args=args, running_processes=running_processes, process_lock=process_lock, profile_manager_instance=download_manager ) else: # Legacy mode without profile locking try: with open(source, 'r', encoding='utf-8') as f: info_json_content = f.read() except (IOError, FileNotFoundError) as e: logger.error(f"[{sp_utils.get_display_name(source)}] Could not read info.json file: {e}") return [] return _run_download_logic(source, info_json_content, policy, state_manager, args, running_processes, process_lock) return []