1457 lines
73 KiB
Python

#!/usr/bin/env python3
"""
Architectural Overview for the Stress Policy Tool:
This file, stress_policy_tool.py, is the main entry point and orchestrator. It is responsible for:
- Parsing command-line arguments.
- Setting up logging and the main shutdown handler.
- Initializing the StateManager and ProfileManager.
- Running the main execution loop (ThreadPoolExecutor) based on the chosen orchestration mode.
- Delegating the actual work to functions in the `workers.py` module.
The core logic has been refactored into the following modules within `ytops_client/stress_policy/`:
- arg_parser.py: Defines the command-line interface for the 'stress-policy' command using argparse.
- workers.py: Contains all core worker functions that are executed by the ThreadPoolExecutor, such as `process_task`, `run_direct_batch_worker`, and their helpers. This is where the main logic for fetching info.json
and running downloads resides.
- state_manager.py: Manages run state, statistics, rate limits, and persistence between runs (e.g., `_state.json`, `_stats.jsonl`).
- process_runners.py: A low-level module that handles the execution of external subprocesses (`run_command`) and Docker containers (`run_docker_container`).
- utils.py: Provides stateless utility functions shared across the tool, such as loading YAML policies, applying overrides, and formatting.
"""
"""
Policy-driven stress-testing orchestrator for video format downloads.
This tool orchestrates complex, multi-stage stress tests based on a YAML policy file.
It supports several modes of operation:
- full_stack: A complete workflow that first fetches an info.json for a given URL
using a profile, and then uses that info.json to perform one or more downloads.
- fetch_only: Only performs the info.json generation step. This is useful for
simulating user authentication and browsing behavior.
- download_only: Only performs the download step, using a directory of pre-existing
info.json files as its source.
- direct_batch_cli (fetch_only): A high-throughput mode for generating info.json files
by calling a custom, Redis-aware yt-dlp command-line tool directly in batch mode.
This mode bypasses the get-info Thrift service. The workflow is as follows:
1. The orchestrator worker locks a profile from the auth pool.
2. It takes a 'batch' of URLs from the source file.
3. It invokes the configured yt-dlp command, passing the profile name and proxy via
environment variables.
4. The custom yt-dlp process then does the following for each URL in the batch:
a. Checks Redis to ensure the profile has not been externally BANNED.
b. Fetches the info.json.
c. Records 'success', 'failure', or 'tolerated_error' for the profile in Redis.
5. After the yt-dlp process finishes, the orchestrator worker post-processes the
generated info.json files to inject metadata (profile name, proxy).
6. The worker unlocks the profile.
7. The worker repeats this cycle with a new profile and the next batch of URLs.
The tool uses a profile management system (v2) based on Redis for coordinating
state between multiple workers and enforcing policies (e.g., rate limits, cooldowns).
"""
import argparse
import collections
import concurrent.futures
import json
import logging
import os
import random
import re
import shlex
import signal
import sys
import tempfile
import shutil
import threading
import time
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
try:
from dotenv import load_dotenv
except ImportError:
load_dotenv = None
try:
import docker
except ImportError:
docker = None
from .profile_manager_tool import ProfileManager
from .stress_policy.state_manager import StateManager
from .stress_policy.process_runners import run_command, run_docker_container, get_worker_id
from .stress_policy import utils as sp_utils
# This block replaces the obsolete import from the deprecated `workers.py` file.
# Worker functions are now imported from their own dedicated modules as per the refactoring.
from .stress_policy.direct_batch_worker import run_direct_batch_worker
from .stress_policy.direct_docker_worker import run_direct_docker_worker, run_direct_docker_download_worker
from .stress_policy.direct_download_worker import run_direct_download_worker
from .stress_policy.throughput_worker import run_throughput_worker
# Helper functions for the legacy "task-first" mode are now in worker_utils.py
from .stress_policy.worker_utils import _run_download_logic, process_profile_task
from .stress_policy.queue_workers import (
run_queue_auth_worker, run_queue_download_worker
)
from .stress_policy.queue_provider import RedisQueueProvider
from .stress_policy.arg_parser import add_stress_policy_parser
# Add a global event for graceful shutdown
shutdown_event = threading.Event()
# Globals for tracking and terminating subprocesses on shutdown
running_processes = set()
process_lock = threading.Lock()
# Configure logging
logger = logging.getLogger('stress_policy_tool')
def main_stress_policy(args):
"""Main logic for the 'stress-policy' command."""
if args.list_policies:
return sp_utils.list_policies()
if not args.policy:
print("Error: --policy is required unless using --list-policies.", file=sys.stderr)
return 1
# Handle --show-overrides early, as it doesn't run the test.
if args.show_overrides:
policy = sp_utils.load_policy(args.policy, args.policy_name)
if not policy:
return 1 # load_policy prints its own error
sp_utils.print_policy_overrides(policy)
return 0
policy = sp_utils.load_policy(args.policy, args.policy_name)
policy = sp_utils.apply_overrides(policy, args.set)
# If orchestrator is verbose, make downloaders verbose too by passing it through.
if args.verbose:
d_policy = policy.setdefault('download_policy', {})
extra_args = d_policy.get('extra_args', '')
if '--verbose' not in extra_args:
d_policy['extra_args'] = f"{extra_args} --verbose".strip()
# --- Set safe defaults ---
settings = policy.get('settings', {})
mode = settings.get('mode', 'full_stack')
# For continuous download mode, it is almost always desired to mark files as
# processed to avoid an infinite loop on the same files. We make this the
# default and issue a warning if it's not explicitly set.
if mode == 'download_only' and settings.get('directory_scan_mode') == 'continuous':
if 'mark_processed_files' not in settings:
# Use print because logger is not yet configured.
print("WARNING: In 'continuous' download mode, 'settings.mark_processed_files' was not set.", file=sys.stderr)
print(" Defaulting to 'true' to prevent reprocessing files.", file=sys.stderr)
print(" Set it to 'false' explicitly in your policy to disable this behavior.", file=sys.stderr)
settings['mark_processed_files'] = True
# Load .env file *after* loading policy to respect env_file from policy.
if load_dotenv:
sim_params = policy.get('simulation_parameters', {})
# Coalesce from CLI, then policy. An explicit CLI arg takes precedence.
env_file = args.env_file or sim_params.get('env_file')
if not env_file and args.env and '.env' in args.env and os.path.exists(args.env):
# Use print because logger is not yet configured.
print(f"Warning: --env should be an environment name (e.g., 'sim'), not a file path. Treating '{args.env}' as --env-file. The environment name will default to 'sim'.", file=sys.stderr)
env_file = args.env
args.env = 'sim'
was_loaded = load_dotenv(env_file)
if was_loaded:
# Use print because logger is not yet configured.
print(f"Loaded environment variables from {env_file or '.env file'}", file=sys.stderr)
elif args.env_file: # Only error if user explicitly passed it
print(f"Error: The specified --env-file was not found: {args.env_file}", file=sys.stderr)
return 1
if args.profile_prefix:
# This shortcut overrides the profile_prefix for all relevant stages.
# Useful for simple fetch_only or download_only runs.
policy.setdefault('info_json_generation_policy', {})['profile_prefix'] = args.profile_prefix
policy.setdefault('download_policy', {})['profile_prefix'] = args.profile_prefix
# Use print because logger is not yet configured.
print(f"Overriding profile_prefix for all stages with CLI arg: {args.profile_prefix}", file=sys.stderr)
# Apply direct CLI overrides after --set, so they have final precedence.
if args.auto_merge_fragments is not None:
policy.setdefault('download_policy', {})['auto_merge_fragments'] = args.auto_merge_fragments
if args.remove_fragments_after_merge is not None:
policy.setdefault('download_policy', {})['remove_fragments_after_merge'] = args.remove_fragments_after_merge
if args.fragments_dir is not None:
policy.setdefault('download_policy', {})['aria_fragments_dir'] = args.fragments_dir
if args.remote_dir is not None:
policy.setdefault('download_policy', {})['aria_remote_dir'] = args.remote_dir
if args.cleanup is not None:
policy.setdefault('download_policy', {})['cleanup'] = args.cleanup
if args.expire_time_shift_minutes is not None:
policy.setdefault('download_policy', {})['expire_time_shift_minutes'] = args.expire_time_shift_minutes
policy_name = policy.get('name', args.policy_name or Path(args.policy).stem)
# --- Logging Setup ---
log_level = logging.DEBUG if args.verbose else logging.INFO
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' if args.verbose else '%(asctime)s - %(message)s'
date_format = None if args.verbose else '%H:%M:%S'
root_logger = logging.getLogger()
root_logger.setLevel(log_level)
# Silence noisy loggers from dependencies like docker-py
logging.getLogger('urllib3.connectionpool').setLevel(logging.INFO if args.verbose else logging.WARNING)
# Remove any existing handlers to avoid duplicate logs
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# Add console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(log_format, datefmt=date_format))
root_logger.addHandler(console_handler)
if not args.disable_log_writing:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')
log_filename = f"stress-policy-{timestamp}-{policy_name}.log"
try:
# Open in append mode to be safe, though timestamp should be unique.
file_handler = logging.FileHandler(log_filename, mode='a', encoding='utf-8')
file_handler.setFormatter(logging.Formatter(log_format, datefmt=date_format))
root_logger.addHandler(file_handler)
# Use print because logger is just being set up.
print(f"Logging to file: {log_filename}", file=sys.stderr)
except IOError as e:
print(f"Error: Could not open log file {log_filename}: {e}", file=sys.stderr)
state_manager = StateManager(policy_name, disable_log_writing=args.disable_log_writing, shutdown_event=shutdown_event)
if args.reset_infojson:
info_json_dir = settings.get('info_json_dir')
if not info_json_dir:
logger.error("--reset-infojson requires 'settings.info_json_dir' to be set in the policy.")
return 1
logger.info(f"--- Resetting info.json files in '{info_json_dir}' ---")
source_dir = Path(info_json_dir)
if not source_dir.is_dir():
logger.warning(f"Source directory for reset does not exist: {source_dir}. Skipping reset.")
else:
processed_files = list(source_dir.rglob('*.json.processed'))
locked_files = list(source_dir.rglob('*.json.LOCKED.*'))
files_to_reset = processed_files + locked_files
if not files_to_reset:
logger.info("No processed or locked files found to reset.")
else:
reset_count = 0
for file_to_reset in files_to_reset:
original_path = None
if file_to_reset.name.endswith('.processed'):
original_path_str = str(file_to_reset).removesuffix('.processed')
original_path = Path(original_path_str)
elif '.LOCKED.' in file_to_reset.name:
original_path_str = str(file_to_reset).split('.LOCKED.')[0]
original_path = Path(original_path_str)
if original_path:
try:
if original_path.exists():
logger.warning(f"Original file '{original_path.name}' already exists. Deleting '{file_to_reset.name}' instead of renaming.")
file_to_reset.unlink()
else:
file_to_reset.rename(original_path)
logger.debug(f"Reset '{file_to_reset.name}' to '{original_path.name}'")
reset_count += 1
except (IOError, OSError) as e:
logger.error(f"Failed to reset '{file_to_reset.name}': {e}")
logger.info(f"Reset {reset_count} info.json file(s).")
if args.pre_cleanup_media is not None:
cleanup_path_str = args.pre_cleanup_media
d_policy = policy.get('download_policy', {})
direct_docker_policy = policy.get('direct_docker_cli_policy', {})
if cleanup_path_str == '.': # Special value from `const`
# Determine path from policy
if direct_docker_policy.get('docker_host_download_path'):
cleanup_path_str = direct_docker_policy['docker_host_download_path']
elif d_policy.get('output_dir'):
cleanup_path_str = d_policy['output_dir']
else:
logger.error("--pre-cleanup-media was used without a path, but could not determine a download directory from the policy.")
return 1
cleanup_path = Path(cleanup_path_str)
if not cleanup_path.is_dir():
logger.warning(f"Directory for media cleanup does not exist, skipping: {cleanup_path}")
else:
logger.info(f"--- Cleaning up media files in '{cleanup_path}' ---")
media_extensions = ['.mp4', '.m4a', '.webm', '.mkv', '.part', '.ytdl']
files_deleted = 0
for ext in media_extensions:
for media_file in cleanup_path.rglob(f'*{ext}'):
try:
media_file.unlink()
logger.debug(f"Deleted {media_file}")
files_deleted += 1
except OSError as e:
logger.error(f"Failed to delete media file '{media_file}': {e}")
logger.info(f"Deleted {files_deleted} media file(s).")
if args.reset_local_cache_folder is not None:
cache_path_str = args.reset_local_cache_folder
direct_docker_policy = policy.get('direct_docker_cli_policy', {})
if cache_path_str == '.': # Special value from `const`
if direct_docker_policy.get('docker_host_cache_path'):
cache_path_str = direct_docker_policy['docker_host_cache_path']
else:
logger.error("--reset-local-cache-folder was used without a path, but 'direct_docker_cli_policy.docker_host_cache_path' is not set in the policy.")
return 1
cache_path = Path(cache_path_str)
if not cache_path.is_dir():
logger.warning(f"Local cache directory for reset does not exist, skipping: {cache_path}")
else:
logger.info(f"--- Resetting local cache folder '{cache_path}' ---")
try:
shutil.rmtree(cache_path)
os.makedirs(cache_path)
logger.info(f"Successfully deleted and recreated cache folder '{cache_path}'.")
except OSError as e:
logger.error(f"Failed to reset cache folder '{cache_path}': {e}")
if policy.get('name') in ['continuous_auth_simulation', 'continuous_download_simulation']:
logger.warning("This policy is part of a multi-stage simulation.")
if 'auth' in policy.get('name', ''):
logger.warning("It is recommended to run this auth policy using: ./bin/run-profile-simulation")
if 'download' in policy.get('name', ''):
logger.warning("It is recommended to run this download policy using: ./bin/run-download-simulation")
time.sleep(2)
# --- Graceful shutdown handler ---
def shutdown_handler(signum, frame):
if not shutdown_event.is_set():
logger.info(f"\nSignal {signum} received, shutting down gracefully...")
shutdown_event.set()
# Save state immediately to prevent loss on interrupt.
logger.info("Attempting to save state before shutdown...")
state_manager.close()
logger.info("Shutdown requested. Allowing in-progress tasks to complete. No new tasks will be started. Press Ctrl+C again to force exit.")
else:
logger.info("Second signal received, forcing exit.")
# On second signal, forcefully terminate subprocesses.
with process_lock:
if running_processes:
logger.info(f"Forcefully terminating {len(running_processes)} running subprocess(es)...")
for p in running_processes:
try:
# Kill the entire process group to ensure child processes (like yt-dlp) are terminated.
os.killpg(os.getpgid(p.pid), signal.SIGKILL)
except (ProcessLookupError, PermissionError):
pass # Process already finished or we lack permissions
# Use os._exit for a hard exit that doesn't run cleanup handlers,
# which can deadlock if locks are held.
os._exit(1)
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
settings = policy.get('settings', {})
exec_control = policy.get('execution_control', {})
mode = settings.get('mode', 'full_stack')
orchestration_mode = settings.get('orchestration_mode')
# --- Profile Manager Setup for Locking Mode ---
profile_manager = None
profile_managers = {}
if settings.get('profile_mode') == 'from_pool_with_lock':
logger.info("--- Profile Locking Mode Enabled ---")
logger.info("This mode requires profiles to be set up and managed by the policy enforcer.")
logger.info("1. Ensure you have run: bin/setup-profiles-from-policy")
logger.info("2. Ensure the policy enforcer is running in the background: bin/ytops-client policy-enforcer --live")
logger.info(" (e.g. using policies/8_unified_simulation_enforcer.yaml)")
logger.info("3. To monitor profiles, use: bin/ytops-client profile list --live")
logger.info("------------------------------------")
# Coalesce Redis settings from CLI args, .env file, and defaults
redis_host = args.redis_host or os.getenv('REDIS_HOST', os.getenv('MASTER_HOST_IP', 'localhost'))
redis_port = args.redis_port if args.redis_port is not None else int(os.getenv('REDIS_PORT', 6379))
redis_password = args.redis_password or os.getenv('REDIS_PASSWORD')
redis_db = args.redis_db if args.redis_db is not None else int(os.getenv('REDIS_DB', 0))
sim_params = policy.get('simulation_parameters', {})
def setup_manager(sim_type, env_cli_arg, env_policy_key):
# Determine the effective environment name with correct precedence:
# 1. Specific CLI arg (e.g., --auth-env)
# 2. General CLI arg (--env)
# 3. Specific policy setting (e.g., simulation_parameters.auth_env)
# 4. General policy setting (simulation_parameters.env)
# 5. Hardcoded default ('sim')
policy_env = sim_params.get(env_policy_key)
default_policy_env = sim_params.get('env')
effective_env = env_cli_arg or args.env or policy_env or default_policy_env or 'sim'
logger.info(f"Setting up ProfileManager for {sim_type} simulation using env: '{effective_env}'")
if args.key_prefix:
key_prefix = args.key_prefix
else:
key_prefix = f"{effective_env}_profile_mgmt_"
return ProfileManager(
redis_host=redis_host, redis_port=redis_port,
redis_password=redis_password, key_prefix=key_prefix,
redis_db=redis_db
)
# Determine which managers are needed based on mode and orchestration mode
needs_auth = False
needs_download = False
if mode in ['full_stack', 'fetch_only']:
needs_auth = True
if mode in ['full_stack', 'download_only']:
needs_download = True
if orchestration_mode == 'direct_batch_cli':
direct_policy = policy.get('direct_batch_cli_policy', {})
use_env = direct_policy.get('use_profile_env', 'auth')
if use_env == 'download':
needs_download = True
else: # auth is default
needs_auth = True
if needs_auth:
# For backward compatibility, policy might have 'env' instead of 'auth_env'
auth_env_key = 'auth_env' if 'auth_env' in sim_params else 'env'
profile_managers['auth'] = setup_manager('Auth', args.auth_env, auth_env_key)
if needs_download:
download_env_key = 'download_env' if 'download_env' in sim_params else 'env'
profile_managers['download'] = setup_manager('Download', args.download_env, download_env_key)
# For modes with only one manager, set the legacy `profile_manager` variable
# for components that haven't been updated to use the `profile_managers` dict.
if len(profile_managers) == 1:
profile_manager = list(profile_managers.values())[0]
# --- Throughput Orchestration Mode ---
if orchestration_mode == 'throughput':
logger.info("--- Throughput Orchestration Mode Enabled ---")
if mode != 'download_only' or settings.get('profile_mode') != 'from_pool_with_lock':
logger.error("Orchestration mode 'throughput' is only compatible with 'download_only' mode and 'from_pool_with_lock' profile mode.")
return 1
download_manager = profile_managers.get('download')
if not download_manager:
logger.error("Throughput mode requires a download profile manager.")
return 1
original_workers_setting = exec_control.get('workers')
if original_workers_setting == 'auto':
d_policy = policy.get('download_policy', {})
profile_prefix = d_policy.get('profile_prefix')
if not profile_prefix:
logger.error("Cannot calculate 'auto' workers for throughput mode without 'download_policy.profile_prefix'.")
return 1
all_profiles = download_manager.list_profiles()
matching_profiles = [p for p in all_profiles if p['name'].startswith(profile_prefix)]
calculated_workers = len(matching_profiles)
if calculated_workers == 0:
logger.error(f"Cannot use 'auto' workers: No profiles found with prefix '{profile_prefix}'. Please run setup-profiles.")
return 1
exec_control['workers'] = calculated_workers
logger.info(f"Calculated 'auto' workers for throughput mode: {calculated_workers} (based on {len(matching_profiles)} profiles with prefix '{profile_prefix}').")
sp_utils.display_effective_policy(policy, policy_name, sources=[], original_workers_setting=original_workers_setting)
if args.dry_run: return 0
workers = exec_control.get('workers', 1)
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = [
executor.submit(run_throughput_worker, i, policy, state_manager, args, download_manager, running_processes, process_lock)
for i in range(workers)
]
# Wait for shutdown signal
shutdown_event.wait()
logger.info("Shutdown signal received, waiting for throughput workers to finish current tasks...")
# The workers will exit their loops upon seeing the shutdown_event.
# We don't need complex shutdown logic here; the main `finally` block will handle summary.
concurrent.futures.wait(futures)
# In this mode, the main loop is handled by workers. So we return here.
state_manager.print_summary(policy)
state_manager.close()
return 0
# --- Direct Batch CLI Orchestration Mode ---
elif orchestration_mode == 'direct_batch_cli':
logger.info("--- Direct Batch CLI Orchestration Mode Enabled ---")
if mode != 'fetch_only' or settings.get('profile_mode') != 'from_pool_with_lock':
logger.error("Orchestration mode 'direct_batch_cli' is only compatible with 'fetch_only' mode and 'from_pool_with_lock' profile mode.")
return 1
direct_policy = policy.get('direct_batch_cli_policy', {})
use_env = direct_policy.get('use_profile_env', 'auth') # Default to auth for backward compatibility
profile_manager_instance = profile_managers.get(use_env)
if not profile_manager_instance:
logger.error(f"Direct batch CLI mode requires a '{use_env}' profile manager, but it was not configured.")
logger.error("Check 'simulation_parameters' in your policy and the 'mode' setting.")
return 1
urls_file = settings.get('urls_file')
if not urls_file:
logger.error("Direct batch CLI mode requires 'settings.urls_file'.")
return 1
try:
with open(urls_file, 'r', encoding='utf-8') as f:
urls_list = [line.strip() for line in f if line.strip()]
except IOError as e:
logger.error(f"Could not read urls_file '{urls_file}': {e}")
return 1
if not urls_list:
logger.error(f"URL file '{urls_file}' is empty. Nothing to do.")
return 1
# Handle starting from a specific index
start_index = 0
if args.start_from_url_index is not None:
start_index = max(0, args.start_from_url_index - 1)
state_manager.update_last_url_index(start_index, force=True)
else:
start_index = state_manager.get_last_url_index()
if start_index >= len(urls_list) and len(urls_list) > 0:
logger.warning("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
logger.warning("!!! ALL URLS HAVE BEEN PROCESSED IN PREVIOUS RUNS (based on state file) !!!")
logger.warning(f"!!! State file indicates start index {start_index + 1}, but URL file has only {len(urls_list)} URLs. !!!")
logger.warning("!!! Deleting state file and stopping. Please run the command again to start from the beginning. !!!")
logger.warning("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
if not args.dry_run and not args.disable_log_writing:
state_manager.close() # ensure it's closed before deleting
try:
os.remove(state_manager.state_file_path)
logger.info(f"Deleted state file: {state_manager.state_file_path}")
except OSError as e:
logger.error(f"Failed to delete state file: {e}")
else:
logger.info("[Dry Run] Would have deleted state file and stopped.")
return 0 # Stop execution.
if start_index > 0:
logger.info(f"Starting/resuming from URL index {start_index + 1}.")
# The worker's get_next_url_batch will respect this starting index.
sp_utils.display_effective_policy(policy, policy_name, sources=urls_list)
if args.dry_run: return 0
workers = exec_control.get('workers', 1)
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = [
executor.submit(run_direct_batch_worker, i, policy, state_manager, args, profile_manager_instance, urls_list, running_processes, process_lock)
for i in range(workers)
]
# Wait for all workers to complete. They will exit their loops when no URLs are left.
concurrent.futures.wait(futures)
if shutdown_event.is_set():
logger.info("Shutdown signal received, workers have finished.")
state_manager.print_summary(policy)
state_manager.close()
return 0
# --- Direct Docker CLI Orchestration Mode ---
elif orchestration_mode == 'direct_docker_cli':
logger.info("--- Direct Docker CLI Orchestration Mode Enabled ---")
if not docker:
logger.error("The 'direct_docker_cli' orchestration mode requires the Docker SDK for Python.")
logger.error("Please install it with: pip install docker")
return 1
if mode not in ['fetch_only', 'download_only'] or settings.get('profile_mode') != 'from_pool_with_lock':
logger.error("Orchestration mode 'direct_docker_cli' is only compatible with 'fetch_only' or 'download_only' modes and 'from_pool_with_lock' profile mode.")
return 1
direct_policy = policy.get('direct_docker_cli_policy', {})
use_env = direct_policy.get('use_profile_env', 'auth' if mode == 'fetch_only' else 'download')
profile_manager_instance = profile_managers.get(use_env)
if not profile_manager_instance:
logger.error(f"Direct docker CLI mode requires a '{use_env}' profile manager, but it was not configured.")
return 1
workers = exec_control.get('workers', 1)
if mode == 'fetch_only':
urls_file = settings.get('urls_file')
if not urls_file:
logger.error("Direct docker CLI (fetch) mode requires 'settings.urls_file'.")
return 1
try:
with open(urls_file, 'r', encoding='utf-8') as f:
urls_list = [line.strip() for line in f if line.strip()]
except IOError as e:
logger.error(f"Could not read urls_file '{urls_file}': {e}")
return 1
if not urls_list:
logger.error(f"URL file '{urls_file}' is empty. Nothing to do.")
return 1
start_index = 0
if args.start_from_url_index is not None:
start_index = max(0, args.start_from_url_index - 1)
state_manager.update_last_url_index(start_index, force=True)
else:
start_index = state_manager.get_last_url_index()
if start_index >= len(urls_list) and len(urls_list) > 0:
logger.warning("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
logger.warning("!!! ALL URLS HAVE BEEN PROCESSED IN PREVIOUS RUNS (based on state file) !!!")
logger.warning(f"!!! State file indicates start index {start_index + 1}, but URL file has only {len(urls_list)} URLs. !!!")
logger.warning("!!! Deleting state file and stopping. Please run the command again to start from the beginning. !!!")
logger.warning("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
if not args.dry_run and not args.disable_log_writing:
state_manager.close()
try:
os.remove(state_manager.state_file_path)
logger.info(f"Deleted state file: {state_manager.state_file_path}")
except OSError as e:
logger.error(f"Failed to delete state file: {e}")
else:
logger.info("[Dry Run] Would have deleted state file and stopped.")
return 0
if start_index > 0:
logger.info(f"Starting/resuming from URL index {start_index + 1}.")
sp_utils.display_effective_policy(policy, policy_name, sources=urls_list)
if args.dry_run: return 0
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = [
executor.submit(run_direct_docker_worker, i, policy, state_manager, args, profile_manager_instance, urls_list, running_processes, process_lock)
for i in range(workers)
]
# This worker runs until shutdown, like the download worker
shutdown_event.wait()
logger.info("Shutdown signal received, waiting for direct docker workers to finish...")
concurrent.futures.wait(futures)
elif mode == 'download_only':
info_json_dir = settings.get('info_json_dir')
if not info_json_dir:
logger.error("Direct docker CLI (download) mode requires 'settings.info_json_dir'.")
return 1
try:
os.makedirs(info_json_dir, exist_ok=True)
except OSError as e:
logger.error(f"Failed to create info.json directory '{info_json_dir}': {e}")
return 1
sp_utils.display_effective_policy(policy, policy_name, sources=[])
if args.dry_run: return 0
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = [
executor.submit(run_direct_docker_download_worker, i, policy, state_manager, args, profile_manager_instance, running_processes, process_lock)
for i in range(workers)
]
# This worker runs until shutdown
shutdown_event.wait()
logger.info("Shutdown signal received, waiting for direct docker download workers to finish...")
concurrent.futures.wait(futures)
state_manager.print_summary(policy)
state_manager.close()
return 0
# --- Direct Download CLI Orchestration Mode ---
elif orchestration_mode == 'direct_download_cli':
logger.info("--- Direct Download CLI Orchestration Mode Enabled ---")
if mode != 'download_only' or settings.get('profile_mode') != 'from_pool_with_lock':
logger.error("Orchestration mode 'direct_download_cli' is only compatible with 'download_only' mode and 'from_pool_with_lock' profile mode.")
return 1
download_manager = profile_managers.get('download')
if not download_manager:
logger.error("Direct download CLI mode requires a download profile manager.")
return 1
info_json_dir = settings.get('info_json_dir')
if not info_json_dir:
logger.error("Direct download CLI mode requires 'settings.info_json_dir'.")
return 1
try:
os.makedirs(info_json_dir, exist_ok=True)
except OSError as e:
logger.error(f"Failed to create info.json directory '{info_json_dir}': {e}")
return 1
sp_utils.display_effective_policy(policy, policy_name, sources=[])
if args.dry_run: return 0
workers = exec_control.get('workers', 1)
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = [
executor.submit(run_direct_download_worker, i, policy, state_manager, args, download_manager, running_processes, process_lock)
for i in range(workers)
]
shutdown_event.wait()
logger.info("Shutdown signal received, waiting for direct download workers to finish...")
concurrent.futures.wait(futures)
state_manager.print_summary(policy)
state_manager.close()
return 0
# --- Queue Auth Orchestration Mode ---
elif orchestration_mode == 'queue_auth':
logger.info("--- Queue Auth Orchestration Mode Enabled ---")
if mode != 'fetch_only' or settings.get('profile_mode') != 'from_pool_with_lock':
logger.error("Orchestration mode 'queue_auth' is only compatible with 'fetch_only' mode and 'from_pool_with_lock' profile mode.")
return 1
auth_manager = profile_managers.get('auth')
if not auth_manager:
logger.error("Queue auth mode requires an auth profile manager.")
return 1
# Initialize queue provider
queue_policy = policy.get('queue_policy', {})
redis_host = args.redis_host or os.getenv('REDIS_HOST') or queue_policy.get('redis_host') or 'localhost'
redis_port = args.redis_port if args.redis_port is not None else (int(os.getenv('REDIS_PORT')) if os.getenv('REDIS_PORT') else (queue_policy.get('redis_port') or 6379))
redis_password = args.redis_password or os.getenv('REDIS_PASSWORD') or queue_policy.get('redis_password')
redis_db = args.redis_db if args.redis_db is not None else (int(os.getenv('REDIS_DB')) if os.getenv('REDIS_DB') else (queue_policy.get('redis_db') or 0))
# Extract env from manager's key prefix, unless disabled by policy
use_env_prefix = queue_policy.get('use_env_prefix', True)
env_prefix = None
if use_env_prefix:
env_prefix = auth_manager.key_prefix.removesuffix('_profile_mgmt_')
state_manager.initialize_queue_provider(
redis_host=redis_host,
redis_port=redis_port,
redis_password=redis_password,
redis_db=redis_db,
env_prefix=env_prefix
)
# Create save directory if specified
save_dir = settings.get('save_info_json_dir')
if save_dir:
try:
os.makedirs(save_dir, exist_ok=True)
logger.info(f"Created save directory for info.json files: {save_dir}")
except OSError as e:
logger.error(f"Failed to create save directory '{save_dir}': {e}")
return 1
# Requeue failed tasks if requested
if args.requeue_failed:
requeued = state_manager.requeue_failed_auth_tasks(
batch_size=queue_policy.get('requeue_batch_size', 100)
)
logger.info(f"Requeued {requeued} failed authentication tasks.")
sp_utils.display_effective_policy(policy, policy_name, sources=[])
if args.dry_run: return 0
workers = exec_control.get('workers', 1)
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = [
executor.submit(run_queue_auth_worker, i, policy, state_manager, args, auth_manager, running_processes, process_lock)
for i in range(workers)
]
shutdown_event.wait()
logger.info("Shutdown signal received, waiting for queue auth workers to finish...")
concurrent.futures.wait(futures)
state_manager.print_summary(policy)
state_manager.close()
return 0
# --- Queue Download Orchestration Mode ---
elif orchestration_mode == 'queue_download':
logger.info("--- Queue Download Orchestration Mode Enabled ---")
if mode != 'download_only' or settings.get('profile_mode') != 'from_pool_with_lock':
logger.error("Orchestration mode 'queue_download' is only compatible with 'download_only' mode and 'from_pool_with_lock' profile mode.")
return 1
download_manager = profile_managers.get('download')
if not download_manager:
logger.error("Queue download mode requires a download profile manager.")
return 1
# Initialize queue provider
queue_policy = policy.get('queue_policy', {})
redis_host = args.redis_host or os.getenv('REDIS_HOST') or queue_policy.get('redis_host') or 'localhost'
redis_port = args.redis_port if args.redis_port is not None else (int(os.getenv('REDIS_PORT')) if os.getenv('REDIS_PORT') else (queue_policy.get('redis_port') or 6379))
redis_password = args.redis_password or os.getenv('REDIS_PASSWORD') or queue_policy.get('redis_password')
redis_db = args.redis_db if args.redis_db is not None else (int(os.getenv('REDIS_DB')) if os.getenv('REDIS_DB') else (queue_policy.get('redis_db') or 0))
# Extract env from manager's key prefix, unless disabled by policy
use_env_prefix = queue_policy.get('use_env_prefix', True)
env_prefix = None
if use_env_prefix:
env_prefix = download_manager.key_prefix.removesuffix('_profile_mgmt_')
state_manager.initialize_queue_provider(
redis_host=redis_host,
redis_port=redis_port,
redis_password=redis_password,
redis_db=redis_db,
env_prefix=env_prefix
)
# Create output directory if specified
output_dir = d_policy.get('output_dir')
if output_dir:
try:
os.makedirs(output_dir, exist_ok=True)
logger.info(f"Created output directory for downloads: {output_dir}")
except OSError as e:
logger.error(f"Failed to create output directory '{output_dir}': {e}")
return 1
# Requeue failed tasks if requested
if args.requeue_failed:
requeued = state_manager.requeue_failed_download_tasks(
batch_size=queue_policy.get('requeue_batch_size', 100)
)
logger.info(f"Requeued {requeued} failed download tasks.")
sp_utils.display_effective_policy(policy, policy_name, sources=[])
if args.dry_run: return 0
workers = exec_control.get('workers', 1)
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = [
executor.submit(run_queue_download_worker, i, policy, state_manager, args, download_manager, running_processes, process_lock)
for i in range(workers)
]
shutdown_event.wait()
logger.info("Shutdown signal received, waiting for queue download workers to finish...")
concurrent.futures.wait(futures)
state_manager.print_summary(policy)
state_manager.close()
return 0
# --- Queue Full Stack Orchestration Mode ---
elif orchestration_mode == 'queue_full_stack':
logger.info("--- Queue Full Stack Orchestration Mode Enabled ---")
if mode != 'full_stack' or settings.get('profile_mode') != 'from_pool_with_lock':
logger.error("Orchestration mode 'queue_full_stack' is only compatible with 'full_stack' mode and 'from_pool_with_lock' profile mode.")
return 1
auth_manager = profile_managers.get('auth')
if not auth_manager:
logger.error("Queue full stack mode requires an auth profile manager.")
return 1
download_manager = profile_managers.get('download')
if not download_manager:
logger.error("Queue full stack mode requires a download profile manager.")
return 1
# Initialize queue provider
queue_policy = policy.get('queue_policy', {})
redis_host = args.redis_host or os.getenv('REDIS_HOST') or queue_policy.get('redis_host') or 'localhost'
redis_port = args.redis_port if args.redis_port is not None else (int(os.getenv('REDIS_PORT')) if os.getenv('REDIS_PORT') else (queue_policy.get('redis_port') or 6379))
redis_password = args.redis_password or os.getenv('REDIS_PASSWORD') or queue_policy.get('redis_password')
redis_db = args.redis_db if args.redis_db is not None else (int(os.getenv('REDIS_DB')) if os.getenv('REDIS_DB') else (queue_policy.get('redis_db') or 0))
# Extract env from auth manager's key prefix, unless disabled by policy
use_env_prefix = queue_policy.get('use_env_prefix', True)
env_prefix = None
if use_env_prefix:
auth_prefix = auth_manager.key_prefix.removesuffix('_profile_mgmt_')
download_prefix = download_manager.key_prefix.removesuffix('_profile_mgmt_')
if auth_prefix != download_prefix:
logger.warning(f"Auth environment ('{auth_prefix}') and Download environment ('{download_prefix}') are different.")
logger.warning(f"Using '{auth_prefix}' as the prefix for all shared Redis queues.")
env_prefix = auth_prefix
state_manager.initialize_queue_provider(
redis_host=redis_host,
redis_port=redis_port,
redis_password=redis_password,
redis_db=redis_db,
env_prefix=env_prefix
)
# Create directories if specified
save_dir = settings.get('save_info_json_dir')
if save_dir:
try:
os.makedirs(save_dir, exist_ok=True)
logger.info(f"Created save directory for info.json files: {save_dir}")
except OSError as e:
logger.error(f"Failed to create save directory '{save_dir}': {e}")
return 1
output_dir = d_policy.get('output_dir')
if output_dir:
try:
os.makedirs(output_dir, exist_ok=True)
logger.info(f"Created output directory for downloads: {output_dir}")
except OSError as e:
logger.error(f"Failed to create output directory '{output_dir}': {e}")
return 1
# Requeue failed tasks if requested
if args.requeue_failed:
requeued_auth = state_manager.requeue_failed_auth_tasks(
batch_size=queue_policy.get('requeue_batch_size', 100)
)
requeued_dl = state_manager.requeue_failed_download_tasks(
batch_size=queue_policy.get('requeue_batch_size', 100)
)
logger.info(f"Requeued {requeued_auth} failed authentication tasks and {requeued_dl} failed download tasks.")
sp_utils.display_effective_policy(policy, policy_name, sources=[])
if args.dry_run: return 0
# Start both auth and download workers
auth_workers = exec_control.get('auth_workers', 1)
download_workers = exec_control.get('download_workers', 2)
with concurrent.futures.ThreadPoolExecutor(max_workers=auth_workers + download_workers) as executor:
# Start auth workers
auth_futures = [
executor.submit(run_queue_auth_worker, i, policy, state_manager, args, auth_manager, running_processes, process_lock)
for i in range(auth_workers)
]
# Start download workers
dl_futures = [
executor.submit(run_queue_download_worker, i + auth_workers, policy, state_manager, args, download_manager, running_processes, process_lock)
for i in range(download_workers)
]
# Start requeue task if configured
requeue_interval = queue_policy.get('requeue_interval_seconds')
requeue_enabled = queue_policy.get('requeue_failed_tasks', False)
if requeue_enabled and requeue_interval:
def requeue_task():
while not shutdown_event.is_set():
time.sleep(requeue_interval)
if shutdown_event.is_set():
break
try:
requeued_auth = state_manager.requeue_failed_auth_tasks(
batch_size=queue_policy.get('requeue_batch_size', 100)
)
requeued_dl = state_manager.requeue_failed_download_tasks(
batch_size=queue_policy.get('requeue_batch_size', 100)
)
if requeued_auth > 0 or requeued_dl > 0:
logger.info(f"Auto-requeued {requeued_auth} failed auth tasks and {requeued_dl} failed download tasks.")
except Exception as e:
logger.error(f"Error in auto-requeue task: {e}")
requeue_future = executor.submit(requeue_task)
all_futures = auth_futures + dl_futures + [requeue_future]
else:
all_futures = auth_futures + dl_futures
# Wait for shutdown signal
shutdown_event.wait()
logger.info("Shutdown signal received, waiting for queue workers to finish...")
concurrent.futures.wait(all_futures)
state_manager.print_summary(policy)
state_manager.close()
return 0
# --- Default (Task-First) Orchestration Mode ---
sources = [] # This will be a list of URLs or Path objects
if mode in ['full_stack', 'fetch_only']:
urls_file = settings.get('urls_file')
if not urls_file:
logger.error("Policy mode requires 'settings.urls_file'.")
return 1
try:
with open(urls_file, 'r', encoding='utf-8') as f:
content = f.read()
try:
data = json.loads(content)
if isinstance(data, list) and all(isinstance(item, str) for item in data):
sources = data
logger.info(f"Loaded {len(sources)} URLs/IDs from JSON array in {urls_file}.")
else:
logger.error(f"URL file '{urls_file}' is valid JSON but not an array of strings.")
return 1
except json.JSONDecodeError:
sources = [line.strip() for line in content.splitlines() if line.strip()]
logger.info(f"Loaded {len(sources)} URLs/IDs from text file {urls_file}.")
except IOError as e:
logger.error(f"Failed to read urls_file {urls_file}: {e}")
return 1
# Clean up URLs/IDs which might have extra quotes, commas, or brackets from copy-pasting
cleaned_sources = []
for source in sources:
cleaned_source = source.strip().rstrip(',').strip().strip('\'"[]').strip()
if cleaned_source:
cleaned_sources.append(cleaned_source)
if len(cleaned_sources) != len(sources):
logger.info(f"Cleaned URL list, removed {len(sources) - len(cleaned_sources)} empty or invalid entries.")
sources = cleaned_sources
elif mode == 'download_only':
# If not in continuous mode, load sources once at the start.
# In continuous mode, `sources` is populated at the start of each cycle.
if settings.get('directory_scan_mode') != 'continuous':
info_json_dir = settings.get('info_json_dir')
if not info_json_dir:
logger.error("Policy mode 'download_only' requires 'settings.info_json_dir'.")
return 1
try:
all_files = sorted(Path(info_json_dir).glob('*.json'))
sample_percent = settings.get('info_json_dir_sample_percent')
if sample_percent and 0 < sample_percent <= 100:
sample_count = int(len(all_files) * (sample_percent / 100.0))
num_to_sample = min(len(all_files), max(1, sample_count))
sources = random.sample(all_files, k=num_to_sample)
logger.info(f"Randomly sampled {len(sources)} files ({sample_percent}%) from {info_json_dir}")
else:
sources = all_files
except (IOError, FileNotFoundError) as e:
logger.error(f"Failed to read info_json_dir {info_json_dir}: {e}")
return 1
# In continuous download mode, sources are loaded inside the loop, so we skip this check.
if settings.get('directory_scan_mode') != 'continuous' and not sources:
logger.error("No sources (URLs or info.json files) to process. Exiting.")
return 1
start_index = 0
if mode in ['full_stack', 'fetch_only']:
if args.start_from_url_index is not None:
# User provided a 1-based index via CLI
start_index = max(0, args.start_from_url_index - 1)
logger.info(f"Starting from URL index {start_index + 1} as requested by --start-from-url-index.")
# When user specifies it, we should overwrite the saved state.
state_manager.update_last_url_index(start_index, force=True)
else:
start_index = state_manager.get_last_url_index()
if start_index > 0:
logger.info(f"Resuming from URL index {start_index + 1} based on saved state.")
if start_index >= len(sources):
logger.warning(f"Start index ({start_index + 1}) is beyond the end of the URL list ({len(sources)}). Nothing to process.")
sources = []
# --- Auto-calculate workers if needed ---
original_workers_setting = exec_control.get('workers')
if original_workers_setting == 'auto':
# In this simplified model, 'auto' is based on target rate, not profiles.
target_rate_cfg = exec_control.get('target_rate', {})
target_reqs = target_rate_cfg.get('requests')
target_mins = target_rate_cfg.get('per_minutes')
if target_reqs and target_mins and sources:
target_rpm = target_reqs / target_mins
num_sources = len(sources)
sleep_cfg = exec_control.get('sleep_between_tasks', {})
avg_sleep = (sleep_cfg.get('min_seconds', 0) + sleep_cfg.get('max_seconds', 0)) / 2
assumed_task_duration = 12 # Must match assumption in display_effective_policy
# Formula: workers = (total_work_seconds) / (total_time_for_work)
# total_time_for_work is derived from the target rate:
# (total_cycle_time) = (60 * num_sources) / target_rpm
# total_time_for_work = total_cycle_time - avg_sleep
work_time_available = (60 * num_sources / target_rpm) - avg_sleep
if work_time_available <= 0:
# The sleep time alone makes the target rate impossible.
# Set workers to max parallelism as a best-effort.
num_workers = num_sources
logger.warning(f"Target rate of {target_rpm} req/min is likely unachievable due to sleep time of {avg_sleep}s.")
logger.warning(f"Setting workers to max parallelism ({num_workers}) as a best effort.")
else:
total_work_seconds = num_sources * assumed_task_duration
num_workers = total_work_seconds / work_time_available
calculated_workers = max(1, int(num_workers + 0.99)) # Ceiling
exec_control['workers'] = calculated_workers
logger.info(f"Calculated 'auto' workers based on target rate: {calculated_workers}")
else:
logger.warning("Cannot calculate 'auto' workers: 'target_rate' or sources are not defined. Defaulting to 1 worker.")
exec_control['workers'] = 1
sp_utils.display_effective_policy(
policy,
policy_name,
sources=sources,
profile_names=None, # Profile grouping is removed
original_workers_setting=original_workers_setting
)
if args.dry_run:
logger.info("Dry run complete. Exiting.")
return 0
start_time = time.time()
run_until_cfg = exec_control.get('run_until', {})
duration_seconds = (run_until_cfg.get('minutes') or 0) * 60
max_cycles = run_until_cfg.get('cycles') or 0
max_requests = run_until_cfg.get('requests') or 0
# --- Main test loop ---
cycles = 0
try:
while not shutdown_event.is_set():
if duration_seconds and (time.time() - start_time) > duration_seconds:
logger.info("Reached duration limit. Stopping.")
break
if max_requests > 0 and state_manager.get_request_count() >= max_requests:
logger.info(f"Reached max requests ({max_requests}). Stopping.")
break
# --- Rescan for sources if in continuous download mode ---
if mode == 'download_only' and settings.get('directory_scan_mode') == 'continuous':
info_json_dir = settings.get('info_json_dir')
try:
all_files_in_dir = Path(info_json_dir).glob('*.json')
processed_files = state_manager.get_processed_files()
new_files = [f for f in all_files_in_dir if str(f) not in processed_files]
# Sort by modification time, oldest first, to process in order of creation
new_files.sort(key=os.path.getmtime)
max_files_per_cycle = settings.get('max_files_per_cycle')
if max_files_per_cycle and len(new_files) > max_files_per_cycle:
sources = new_files[:max_files_per_cycle]
else:
sources = new_files
if not sources:
sleep_duration = settings.get('sleep_if_no_new_files_seconds', 10)
logger.info(f"No new info.json files found in '{info_json_dir}'. Sleeping for {sleep_duration}s...")
# Interruptible sleep
sleep_end_time = time.time() + sleep_duration
while time.time() < sleep_end_time:
if shutdown_event.is_set():
break
time.sleep(0.5)
if shutdown_event.is_set():
break
continue # Skip to next iteration of the while loop
except (IOError, FileNotFoundError) as e:
logger.error(f"Failed to read info_json_dir {info_json_dir}: {e}. Retrying in 10s.")
time.sleep(10)
continue
# --- Group sources for this cycle ---
task_items = sources
# If there's nothing to do this cycle, skip.
if not task_items:
if mode == 'download_only' and settings.get('directory_scan_mode') == 'continuous':
# The sleep logic is handled inside the rescanning block.
continue
else:
logger.info("No more sources to process. Ending test.")
break
cycles += 1
if max_cycles > 0 and cycles > max_cycles:
logger.info(f"Reached max cycles ({max_cycles}). Stopping.")
break
logger.info(f"--- Cycle #{cycles} (Total Requests: {state_manager.get_request_count()}) ---")
with concurrent.futures.ThreadPoolExecutor(max_workers=exec_control.get('workers', 1)) as executor:
# Submit one task per source URL or info.json file
future_to_task_info = {
executor.submit(process_task, source, i, cycles, policy, state_manager, args, profile_managers, running_processes, process_lock): {
'source': source,
'abs_index': i
}
for i, source in enumerate(task_items) if i >= start_index
}
should_stop = False
pending_futures = set(future_to_task_info.keys())
while pending_futures and not should_stop:
done, pending_futures = concurrent.futures.wait(
pending_futures, return_when=concurrent.futures.FIRST_COMPLETED
)
for future in done:
if shutdown_event.is_set():
should_stop = True
break
task_info = future_to_task_info[future]
source = task_info['source']
abs_index = task_info.get('abs_index')
try:
results = future.result()
if abs_index is not None and mode in ['full_stack', 'fetch_only']:
# Update state to resume from the *next* URL.
state_manager.update_last_url_index(abs_index + 1)
# --- Mark file as processed ---
# This is the central place to mark a source as complete for download_only mode.
if mode == 'download_only':
# In continuous mode, we add to state file to prevent re-picking in same run.
if settings.get('directory_scan_mode') == 'continuous':
state_manager.mark_file_as_processed(source)
# If marking by rename is on, do that.
if settings.get('mark_processed_files'):
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
new_path = source.parent / f"{source.name}.{timestamp}.processed"
source.rename(new_path)
logger.info(f"Marked '{source.name}' as processed by renaming to '{new_path.name}'")
except (IOError, OSError) as e:
logger.error(f"Failed to rename processed file '{source.name}': {e}")
# When using profile-aware mode, the file processing (including marking as
# processed) is handled inside process_profile_task.
# For non-profile mode, this logic was incorrect and has been moved.
for result in results:
if not result['success']:
s_conditions = policy.get('stop_conditions', {})
is_cumulative_403_active = s_conditions.get('on_cumulative_403', {}).get('max_errors')
if s_conditions.get('on_failure') or \
(s_conditions.get('on_http_403') and not is_cumulative_403_active and result['error_type'] == 'HTTP 403') or \
(s_conditions.get('on_timeout') and result['error_type'] == 'Timeout'):
logger.info(f"!!! STOP CONDITION MET: Immediate stop on failure '{result['error_type']}' for {sp_utils.get_display_name(source)}. Shutting down all workers. !!!")
should_stop = True
break
except concurrent.futures.CancelledError:
logger.info(f"Task for {sp_utils.get_display_name(source)} was cancelled during shutdown.")
event = {
'type': 'fetch' if mode != 'download_only' else 'download',
'path': str(source),
'success': False,
'error_type': 'Cancelled',
'details': 'Task cancelled during shutdown.'
}
state_manager.log_event(event)
except Exception as exc:
logger.error(f'{sp_utils.get_display_name(source)} generated an exception: {exc}')
if should_stop:
break
# Check for all stop conditions after each task completes.
# 1. Max requests limit
if not should_stop and max_requests > 0 and state_manager.get_request_count() >= max_requests:
logger.info(f"!!! STOP CONDITION MET: Reached request limit ({max_requests}). Shutting down. !!!")
should_stop = True
# 2. Duration limit
if not should_stop and duration_seconds and (time.time() - start_time) > duration_seconds:
logger.info(f"!!! STOP CONDITION MET: Reached duration limit ({run_until_cfg.get('minutes')} minutes). Shutting down. !!!")
should_stop = True
# 3. Cumulative error rate limits
s_conditions = policy.get('stop_conditions', {})
error_rate_policy = s_conditions.get('on_error_rate')
if not should_stop and error_rate_policy:
max_errors = error_rate_policy.get('max_errors')
per_minutes = error_rate_policy.get('per_minutes')
if max_errors and per_minutes:
error_count = state_manager.check_cumulative_error_rate(max_errors, per_minutes)
if error_count > 0:
logger.info(f"!!! STOP CONDITION MET: Error rate exceeded ({error_count} errors in last {per_minutes}m). Shutting down. !!!")
should_stop = True
cumulative_403_policy = s_conditions.get('on_cumulative_403')
if not should_stop and cumulative_403_policy:
max_errors = cumulative_403_policy.get('max_errors')
per_minutes = cumulative_403_policy.get('per_minutes')
if max_errors and per_minutes:
error_count = state_manager.check_cumulative_error_rate(max_errors, per_minutes, error_type='HTTP 403')
if error_count > 0:
logger.info(f"!!! STOP CONDITION MET: Cumulative 403 rate exceeded ({error_count} in last {per_minutes}m). Shutting down. !!!")
should_stop = True
quality_degradation_policy = s_conditions.get('on_quality_degradation')
if not should_stop and quality_degradation_policy:
max_triggers = quality_degradation_policy.get('max_triggers')
per_minutes = quality_degradation_policy.get('per_minutes')
if max_triggers and per_minutes:
trigger_count = state_manager.check_quality_degradation_rate(max_triggers, per_minutes)
if trigger_count > 0:
logger.info(f"!!! STOP CONDITION MET: Quality degradation triggered {trigger_count} times in last {per_minutes}m. Shutting down. !!!")
should_stop = True
if should_stop:
break
if should_stop and pending_futures:
logger.info(f"Cancelling {len(pending_futures)} outstanding task(s).")
for future in pending_futures:
future.cancel()
if should_stop: break
if max_cycles > 0 and cycles >= max_cycles:
break
# If the run is not time-based (i.e., it's limited by cycles or requests)
# and it's not a continuous directory scan, we should stop after one pass.
# This makes the behavior of --set run_until.requests=N more intuitive: it acts
# as an upper limit for a single pass, not a trigger for multiple passes.
if settings.get('directory_scan_mode') != 'continuous' and not duration_seconds:
logger.info("Run is not time-based. Halting after one full pass through sources.")
break
logger.info("Cycle complete.")
except KeyboardInterrupt:
logger.info("\nForceful shutdown requested...")
finally:
# --- Graceful Shutdown URL Reporting ---
if shutdown_event.is_set():
orchestration_mode = settings.get('orchestration_mode')
if orchestration_mode in ['direct_batch_cli', 'direct_docker_cli'] and mode == 'fetch_only':
urls_file = settings.get('urls_file')
# Check if urls_list was loaded for the relevant mode
if urls_file and 'urls_list' in locals() and urls_list:
last_index = state_manager.get_last_url_index()
# The index points to the *next* URL to be processed.
# If a batch was aborted, it might have been rewound.
# We should save all URLs from this index onwards.
if last_index < len(urls_list):
unprocessed_urls = urls_list[last_index:]
unprocessed_filename = f"unprocessed_urls_{policy_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
try:
with open(unprocessed_filename, 'w', encoding='utf-8') as f:
f.write('\n'.join(unprocessed_urls))
logger.warning(f"--- GRACEFUL SHUTDOWN ---")
logger.warning(f"Saved {len(unprocessed_urls)} unprocessed URLs to '{unprocessed_filename}'.")
logger.warning(f"Last processed URL index was {last_index}. Next run should start from index {last_index + 1}.")
logger.warning(f"-------------------------")
except IOError as e:
logger.error(f"Could not save unprocessed URLs: {e}")
state_manager.print_summary(policy)
state_manager.close()
return 0
def process_task(source, index, cycle_num, policy, state_manager, args, profile_managers, running_processes, process_lock):
"""
Worker task for a single source (URL or info.json path).
This function is the main entry point for the 'task-first' orchestration mode.
"""
settings = policy.get('settings', {})
mode = settings.get('mode', 'full_stack')
profile_mode = settings.get('profile_mode')
auth_manager = profile_managers.get('auth')
download_manager = profile_managers.get('download')
# --- Full Stack Mode ---
if mode == 'full_stack':
# 1. Fetch info.json
if not auth_manager:
logger.error("Full-stack mode requires an 'auth' profile manager.")
return []
# This part of the logic is simplified and does not exist in the provided codebase.
# It would involve locking an auth profile, fetching info.json, and then unlocking.
# For now, we'll assume a placeholder logic.
logger.error("Full-stack mode (task-first) is not fully implemented in this version.")
return []
# --- Fetch Only Mode ---
elif mode == 'fetch_only':
if not auth_manager:
logger.error("Fetch-only mode requires an 'auth' profile manager.")
return []
logger.error("Fetch-only mode (task-first) is not fully implemented in this version.")
return []
# --- Download Only Mode ---
elif mode == 'download_only':
if profile_mode == 'from_pool_with_lock':
if not download_manager:
logger.error("Download-only with locking requires a 'download' profile manager.")
return []
# In this mode, we process one file per profile.
return process_profile_task(
profile_name=None, # Profile is locked inside the task
file_list=[source],
policy=policy,
state_manager=state_manager,
cycle_num=cycle_num,
args=args,
running_processes=running_processes,
process_lock=process_lock,
profile_manager_instance=download_manager
)
else:
# Legacy mode without profile locking
try:
with open(source, 'r', encoding='utf-8') as f:
info_json_content = f.read()
except (IOError, FileNotFoundError) as e:
logger.error(f"[{sp_utils.get_display_name(source)}] Could not read info.json file: {e}")
return []
return _run_download_logic(source, info_json_content, policy, state_manager, args, running_processes, process_lock)
return []