Update policies on proper queue reading

This commit is contained in:
aperez 2025-12-26 12:57:15 +03:00
parent 65561579d8
commit 02132f1732
8 changed files with 496 additions and 279 deletions

View File

@ -0,0 +1,125 @@
# Policy: Queue-based Authentication Simulation via Direct Docker Exec
#
# This policy simulates a continuous stream of info.json fetch requests using
# the 'direct_docker_cli' mode. It pulls URLs from a Redis queue, creates a
# temporary batch file, and then calls a yt-dlp command inside a running
# Docker container.
#
name: 12_queue_auth_simulation
settings:
mode: fetch_only
orchestration_mode: direct_docker_cli
profile_mode: from_pool_with_lock
# The save directory MUST be inside the docker_host_mount_path.
save_info_json_dir: "run/docker_mount/fetched_info_jsons/queue_simulation"
execution_control:
workers: 1
# How long a worker should pause if it cannot find an available profile to lock.
worker_polling_interval_seconds: 1
# No sleep between tasks; throughput is controlled by yt-dlp performance and profile availability.
info_json_generation_policy:
profile_prefix: "user1"
direct_docker_cli_policy:
# Which simulation environment's profiles to use for locking.
use_profile_env: "auth"
# If true, a worker will try to lock a different profile than the one it just used.
avoid_immediate_profile_reuse: true
# How long the worker should wait for a different profile before re-using the same one.
avoid_reuse_max_wait_seconds: 5
# NOTE on Rate Limits: With the default yt-dlp settings, the rate limit for guest
# sessions is ~300 videos/hour (~1000 webpage/player requests per hour).
# For accounts, it is ~2000 videos/hour (~4000 webpage/player requests per hour).
# The enforcer policy (e.g., 8_unified_simulation_enforcer.yaml) should be
# configured to respect these limits via rotation and rest periods.
# If true, extract the visitor_id from yt-dlp logs, save it per-profile,
# and inject it into subsequent requests for that profile.
#track_visitor_id: true
# --- Docker Execution Settings ---
docker_image_name: "ytops/yt-dlp" # Image to use for `docker run`
docker_network_name: "airflow_proxynet"
# IMPORTANT: This path on the HOST will be mounted into the container at `docker_container_mount_path`.
docker_host_mount_path: "run/docker_mount"
docker_container_mount_path: "/config" # The mount point inside the container
# Host path for persisting cache data (e.g., cookies, sigfuncs) between runs.
docker_host_cache_path: ".cache/queue_auth_simulation"
# Path inside the container where the cache is mounted. Should match HOME/.cache
docker_container_cache_path: "/config/.cache"
# If true, create and use a persistent cookie jar per profile inside the cache dir.
# use_cookies: true
# --- User-Agent Generation ---
# Template for generating User-Agent strings for new profiles.
# The '{major_version}' will be replaced by a version string.
user_agent_template: "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{major_version}.0.0.0 Safari/537.36"
# Range of Chrome major versions to use for the template. A range suitable for TV devices.
user_agent_version_range: [110, 120]
# A base config file can be used, with overrides applied from the policy.
# The orchestrator will inject 'proxy', 'batch-file', and 'output' keys into the overrides.
ytdlp_config_file: "cli.auth.config"
ytdlp_config_overrides:
skip-download: true
write-info-json: true
no-write-subs: true
no-color: true
ignore-errors: true
use-extractors: ["youtube"]
ytdlp_raw_args:
- '--extractor-args "youtube:formats=duplicate;jsc_trace=true;player_client=tv_simply;pot_trace=true;skip=translated_subs,hls"'
- '--extractor-args "youtubepot-bgutilhttp:base_url=http://172.17.0.1:4416"'
- '--sleep-requests 0.75'
# --retry-sleep linear=1::2'
# --- Live Error Parsing Rules ---
# These regex patterns are checked against yt-dlp's stderr in real-time.
# If a fatal error is detected, immediately ban the profile to stop the container
# and prevent further errors in the same batch.
ban_on_fatal_error_in_batch: true
fatal_error_patterns:
- "Sign in to confirm youre not a bot"
- "rate-limited by YouTube"
- "This content isn't available, try again later"
- "HTTP Error 502"
tolerated_error_patterns:
- "HTTP Error 429"
- "The uploader has not made this video available in your country"
- "This video has been removed by the uploader"
- "Private video"
- "This is a private video"
- "Video is private"
- "Video unavailable"
- "account associated with this video has been terminated"
- "members-only content"
- "Sign in to confirm your age"
# Template for renaming the final info.json.
rename_file_template: "{video_id}-{profile_name}-{proxy}.info.json"
queue_policy:
# Set to false to use legacy, unprefixed queue names (e.g., 'queue2_auth_inbox').
# Set to true (or omit) to use environment-prefixed names (e.g., 'sim_auth_queue2_auth_inbox').
use_env_prefix: false
# If specified, create download tasks for these formats
# Can be "all", a specific format ID, or a list of format IDs
formats_to_download: "140-dashy/140-dashy-0/140,299-dashy/298-dashy/137-dashy/136-dashy/135-dashy/134-dashy/133-dashy"
# How many tasks a worker should pull from the queue at once.
# This will become the batch size for the docker run.
batch_size: 25
simulation_parameters:
auth_env: "sim_auth"
download_env: "sim_download"

View File

@ -0,0 +1,108 @@
# Policy: Queue-based Download Simulation via Direct Docker Exec
#
# This policy simulates a continuous stream of downloads using the
# 'direct_docker_cli' mode with `mode: download_only`. It pulls download
# tasks from a Redis queue, each containing a path to an info.json file,
# and invokes a yt-dlp command inside a running Docker container to perform
# the download.
#
name: 13_queue_download_simulation
settings:
mode: download_only
orchestration_mode: direct_docker_cli
profile_mode: from_pool_with_lock
# In queue mode, info_json_dir is not used to find tasks.
# However, the paths inside the download tasks must be accessible
# within the docker_host_mount_path.
# The profile_extraction_regex is also not needed as the profile
# can be specified in the download task.
execution_control:
workers: 4
# How long a worker should pause if it cannot find an available profile or task.
worker_polling_interval_seconds: 1
download_policy:
profile_prefix: "user1"
# Default cooldown in seconds if not specified by the enforcer in Redis.
# The value from Redis (set via `unlock_cooldown_seconds` in the enforcer policy)
# will always take precedence. This is a fallback.
# Can be an integer (e.g., 1) or a range (e.g., [1, 3]).
default_unlock_cooldown_seconds: 1
# If true, check if the download URL in the info.json is expired before
# attempting to download. This is enabled by default.
check_url_expiration: true
# --- Airflow Integration ---
# If true, move downloaded media and info.json to a timestamped, video-id-based
# directory structure that the Airflow DAGs can process.
output_to_airflow_ready_dir: true
airflow_ready_dir_base_path: "downloadfiles/videos/ready"
direct_docker_cli_policy:
# Which simulation environment's profiles to use for locking.
use_profile_env: "download"
# If true, a worker will try to lock a different profile than the one it just used.
# This is disabled for downloads, as the cooldown mechanism is sufficient.
avoid_immediate_profile_reuse: false
# How long the worker should wait for a different profile before re-using the same one.
avoid_reuse_max_wait_seconds: 5
# NOTE on Rate Limits: With the default yt-dlp settings, the rate limit for guest
# sessions is ~300 videos/hour (~1000 webpage/player requests per hour).
# For accounts, it is ~2000 videos/hour (~4000 webpage/player requests per hour).
# This enforcer policy should be configured to respect these limits via
# rotation and rest periods.
# --- Docker Execution Settings ---
docker_image_name: "ytops/yt-dlp"
docker_network_name: "airflow_proxynet"
# Host path mounted into the container for task files (info.json, config).
# IMPORTANT: This must be the SAME host path used for the `info_json_dir` above,
# or a parent directory of it, so the container can see the task files.
docker_host_mount_path: "run/docker_mount"
docker_container_mount_path: "/config"
# Path on the HOST where downloaded files will be saved.
docker_host_download_path: "downloaded_media/queue_downloads"
# Path inside the CONTAINER where `docker_host_download_path` is mounted.
docker_container_download_path: "/downloads"
# A base config file can be used, with overrides applied from the policy.
# The orchestrator will inject 'proxy', 'load-info-json', and 'output' keys into the overrides.
ytdlp_config_file: "cli.download.config"
ytdlp_config_overrides:
format: "299-dashy/298-dashy/137-dashy/136-dashy/135-dashy/134-dashy/133-dashy,140-dashy/140-dashy-0/140"
#format: "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best"
no-resize-buffer: true
buffer-size: "4M"
concurrent-fragments: 8
ytdlp_raw_args: []
# --- Live Error Parsing Rules ---
# If a fatal error is detected, immediately ban the profile to stop the container.
ban_on_fatal_error_in_batch: true
fatal_error_patterns:
- "HTTP Error 403"
- "HTTP Error 502"
tolerated_error_patterns:
- "timed out"
- "Timeout"
- "connection reset by peer"
- "Invalid data found when processing input"
- "Error opening input files"
queue_policy:
# Set to false to use legacy, unprefixed queue names (e.g., 'queue2_dl_inbox').
# Set to true (or omit) to use environment-prefixed names (e.g., 'sim_download_queue2_dl_inbox').
use_env_prefix: false
# How many tasks to process in a batch. For downloads, this should be 1,
# as each worker locks a profile for a single download task.
batch_size: 1
simulation_parameters:
download_env: "sim_download"

View File

@ -1,62 +0,0 @@
# Policy: Queue-based Authentication Simulation
#
# This policy simulates a continuous stream of authentication requests
# by pulling URLs from a Redis queue, processing them with yt-ops-server,
# and pushing results to appropriate result queues.
#
name: queue_auth_simulation
settings:
mode: fetch_only
orchestration_mode: queue_auth
profile_mode: from_pool_with_lock
# Directory to save info.json files (optional).
# For distributed operation across multiple machines, this MUST be a shared location
# like an S3 bucket path. The underlying code must be adapted to handle S3 paths.
save_info_json_dir: "run/docker_mount/fetched_info_jsons/" #"s3://your-shared-bucket/stress_test/info_jsons/"
dummy_simulation_settings:
# Simulate auth processing time between 5 and 10 seconds for each URL.
auth_min_seconds: 5
auth_max_seconds: 10
# You can also control simulated failure rates here.
auth_failure_rate: 0.0 # 0% failure rate
auth_skipped_failure_rate: 0.0 # 0% skipped rate
execution_control:
workers: 1
# How long a worker should pause if it cannot find an available profile or task.
worker_polling_interval_seconds: 1
# Run until conditions
run_until:
# Run for this many minutes (0 = unlimited)
minutes: 0
# Process this many requests (0 = unlimited)
requests: 0
info_json_generation_policy:
profile_prefix: "user1"
client: "ytdlp"
# Extra arguments to pass to the get-info command
extra_args: "--verbose"
queue_policy:
# Set to false to use legacy, unprefixed queue names (e.g., 'queue2_auth_inbox').
# Set to true (or omit) to use environment-prefixed names (e.g., 'sim_auth_queue2_auth_inbox').
use_env_prefix: false
# If specified, create download tasks for these formats
# Can be "all", a specific format ID, or a list of format IDs
formats_to_download: "140-dashy/140-dashy-0/140,299-dashy/298-dashy/137-dashy/136-dashy/135-dashy/134-dashy/133-dashy"
# How many tasks a worker should pull from the queue at once.
# The worker will lock one profile to process the entire batch.
batch_size: 25
simulation_parameters:
auth_env: "sim_auth"
download_env: "sim_download"

View File

@ -1,64 +0,0 @@
# Policy: Queue-based Download Simulation
#
# This policy simulates a continuous stream of download requests
# by pulling tasks from a Redis queue, downloading the specified formats,
# and pushing results to appropriate result queues.
#
name: queue_download_simulation
settings:
mode: download_only
orchestration_mode: queue_download
profile_mode: from_pool_with_lock
dummy_simulation_settings:
download_min_seconds: 5
download_max_seconds: 8
download_failure_rate: 0.0
download_skipped_failure_rate: 0.0
execution_control:
workers: 4
# How long a worker should pause if it cannot find an available profile or task.
worker_polling_interval_seconds: 1
# Run until conditions
run_until:
# Run for this many minutes (0 = unlimited)
minutes: 0
# Process this many requests (0 = unlimited)
requests: 0
download_policy:
profile_prefix: "user1"
# Default cooldown in seconds if not specified by the enforcer in Redis.
# The value from Redis (set via `unlock_cooldown_seconds` in the enforcer policy)
# will always take precedence. This is a fallback.
# Can be an integer (e.g., 1) or a range (e.g., [1, 3]).
default_unlock_cooldown_seconds: 1
# Directory to save downloaded files
output_dir: "downloaded_media/queue_downloads"
# Extra arguments to pass to the download command
extra_args: "--verbose"
# After a download task is successfully processed, rename the source info.json
# to prevent re-processing. This is safe if you generate one download task per info.json.
rename_source_info_json_on_success: true
# --- Airflow Integration ---
# If true, move downloaded media and info.json to a timestamped, video-id-based
# directory structure that the Airflow DAGs can process.
output_to_airflow_ready_dir: true
airflow_ready_dir_base_path: "downloadfiles/videos/ready"
queue_policy:
# Set to false to use legacy, unprefixed queue names (e.g., 'queue2_dl_inbox').
# Set to true (or omit) to use environment-prefixed names (e.g., 'sim_download_queue2_dl_inbox').
use_env_prefix: false
# How many tasks to process in a batch. For downloads, this should be 1,
# as each worker locks a profile for a single download task.
batch_size: 1
simulation_parameters:
download_env: "sim_download"

View File

@ -1,78 +0,0 @@
# Policy: Queue-based Full Stack Simulation
#
# This policy simulates a complete workflow by running both authentication
# and download workers simultaneously, processing tasks from Redis queues.
#
name: queue_full_stack_simulation
settings:
mode: full_stack
orchestration_mode: queue_full_stack
profile_mode: from_pool_with_lock
# Directory to save info.json files (optional)
save_info_json_dir: "run/queue_auth_results"
dummy_simulation_settings:
auth_min_seconds: 0.75
auth_max_seconds: 1.5
auth_failure_rate: 0.0
auth_skipped_failure_rate: 0.0
download_min_seconds: 5
download_max_seconds: 8
download_failure_rate: 0.0
download_skipped_failure_rate: 0.0
execution_control:
# Number of workers for each stage
auth_workers: 2
download_workers: 4
# How long a worker should pause if it cannot find an available profile or task.
worker_polling_interval_seconds: 1
# Run until conditions
run_until:
# Run for this many minutes (0 = unlimited)
minutes: 0
# Process this many requests (0 = unlimited)
requests: 0
info_json_generation_policy:
profile_prefix: "user1"
client: "ytdlp"
# Extra arguments to pass to the get-info command
extra_args: "--verbose"
download_policy:
profile_prefix: "user1"
# Default cooldown in seconds if not specified by the enforcer in Redis.
default_unlock_cooldown_seconds: 1
# Directory to save downloaded files
output_dir: "downloaded_media/queue_downloads"
# Extra arguments to pass to the download command
extra_args: "--verbose"
# After a download task is successfully processed, rename the source info.json
# to prevent re-processing. This is safe if you generate one download task per info.json.
rename_source_info_json_on_success: true
queue_policy:
# Redis connection settings (can be overridden by CLI args)
redis_host: "localhost"
redis_port: 6379
redis_password: ""
redis_db: 0
# If specified, create download tasks for these formats
# Can be "all", a specific format ID, or a list of format IDs
formats_to_download: "140-dashy/140-dashy-0/140,299-dashy/298-dashy/137-dashy/136-dashy/135-dashy/134-dashy/133-dashy"
# How many tasks to process in a batch (for batch operations)
batch_size: 10
# Queue management options
requeue_failed_tasks: true
requeue_batch_size: 50
requeue_interval_seconds: 300
simulation_parameters:
auth_env: "sim_auth"
download_env: "sim_download"

View File

@ -326,6 +326,23 @@ class PolicyEnforcer:
all_profiles_map[profile_name]['rest_reason'] = reason all_profiles_map[profile_name]['rest_reason'] = reason
continue # Do not activate, group is full. continue # Do not activate, group is full.
else:
# Defensive check for orphaned profiles that should be in a group.
# This can happen if list_profiles() returns an incomplete list for one cycle,
# causing the group maps to be incomplete. This check prevents a "stampede"
# of activations that would violate group limits.
is_orphan = False
for group in profile_groups:
prefix = group.get('prefix')
if prefix and profile_name.startswith(prefix):
is_orphan = True
logger.warning(
f"Profile '{profile_name}' appears to belong to group '{group.get('name')}' "
f"but was not found in the initial scan. Deferring activation to prevent violating group limits."
)
break
if is_orphan:
continue # Skip activation for this profile
# --- End group check --- # --- End group check ---
# Before activating, ensure the profile's proxy is not resting. # Before activating, ensure the profile's proxy is not resting.

View File

@ -876,7 +876,7 @@ def run_throughput_worker(worker_id, policy, state_manager, args, profile_manage
return [] # This function doesn't return results directly return [] # This function doesn't return results directly
def _post_process_and_move_info_json(file_path, profile_name, proxy_url, policy, worker_id, profile_manager_instance=None): def _post_process_and_move_info_json(file_path, profile_name, proxy_url, policy, worker_id, state_manager, profile_manager_instance=None):
"""Helper to post-process a single info.json file and move it to the final directory.""" """Helper to post-process a single info.json file and move it to the final directory."""
direct_policy = policy.get('direct_docker_cli_policy', {}) direct_policy = policy.get('direct_docker_cli_policy', {})
settings = policy.get('settings', {}) settings = policy.get('settings', {})
@ -918,9 +918,35 @@ def _post_process_and_move_info_json(file_path, profile_name, proxy_url, policy,
) )
final_path = Path(save_dir) / new_name final_path = Path(save_dir) / new_name
# Use rename for atomic move # Use shutil.move, which can handle cross-device moves (e.g., to an S3 mount)
os.rename(str(file_path), str(final_path)) # by falling back to a copy-and-delete operation.
shutil.move(str(file_path), str(final_path))
logger.info(f"[Worker {worker_id}] Post-processed and moved info.json to '{final_path}'") logger.info(f"[Worker {worker_id}] Post-processed and moved info.json to '{final_path}'")
# --- Create download task if in queue mode ---
queue_policy = policy.get('queue_policy', {})
formats_to_download = queue_policy.get('formats_to_download')
if formats_to_download and state_manager and state_manager.queue_provider:
try:
url = info_data.get('original_url') or info_data.get('webpage_url')
download_task = {
'info_json_path': str(final_path),
'video_id': video_id,
'url': url,
'auth_profile_name': profile_name,
'proxy_url': proxy_url,
'auth_env': env_name,
}
added_count = state_manager.add_download_tasks_batch([download_task])
if added_count > 0:
logger.info(f"[Worker {worker_id}] [{profile_name}] Added {added_count} download task(s) to queue for {video_id}")
except Exception as e:
logger.error(f"[Worker {worker_id}] [{profile_name}] Failed to create download task for {video_id}: {e}", exc_info=True)
return True return True
except (IOError, json.JSONDecodeError, OSError) as e: except (IOError, json.JSONDecodeError, OSError) as e:
logger.error(f"[Worker {worker_id}] Error post-processing '{file_path.name}' (video: {video_id}): {e}") logger.error(f"[Worker {worker_id}] Error post-processing '{file_path.name}' (video: {video_id}): {e}")
@ -1361,6 +1387,7 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man
exec_control = policy.get('execution_control', {}) exec_control = policy.get('execution_control', {})
gen_policy = policy.get('info_json_generation_policy', {}) gen_policy = policy.get('info_json_generation_policy', {})
direct_policy = policy.get('direct_docker_cli_policy', {}) direct_policy = policy.get('direct_docker_cli_policy', {})
queue_policy = policy.get('queue_policy')
profile_prefix = gen_policy.get('profile_prefix') profile_prefix = gen_policy.get('profile_prefix')
if not profile_prefix: if not profile_prefix:
@ -1368,8 +1395,11 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man
return [] return []
batch_size = direct_policy.get('batch_size') batch_size = direct_policy.get('batch_size')
if not batch_size and queue_policy:
batch_size = queue_policy.get('batch_size')
if not batch_size: if not batch_size:
logger.error(f"[Worker {worker_id}] Direct docker mode requires 'direct_docker_cli_policy.batch_size'. Worker exiting.") logger.error(f"[Worker {worker_id}] Direct docker mode requires 'batch_size' in 'direct_docker_cli_policy' or 'queue_policy'. Worker exiting.")
return [] return []
save_dir = settings.get('save_info_json_dir') save_dir = settings.get('save_info_json_dir')
@ -1401,6 +1431,7 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man
while not state_manager.shutdown_event.is_set(): while not state_manager.shutdown_event.is_set():
locked_profile = None locked_profile = None
temp_task_dir_host = None temp_task_dir_host = None
task_batch = []
# --- Variables for robust finalization --- # --- Variables for robust finalization ---
live_success_count = 0 live_success_count = 0
url_batch_len = 0 url_batch_len = 0
@ -1490,10 +1521,33 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man
user_agent = sp_utils.generate_user_agent_from_policy(policy) user_agent = sp_utils.generate_user_agent_from_policy(policy)
# 2. Get a batch of URLs # 2. Get a batch of URLs
url_batch, start_idx = state_manager.get_next_url_batch(batch_size, urls_list) url_batch = []
start_idx = 0
if not queue_policy:
url_batch, start_idx = state_manager.get_next_url_batch(batch_size, urls_list)
else:
task_batch = state_manager.get_auth_tasks_batch(batch_size)
if task_batch:
# Mark tasks as in-progress
for task in task_batch:
task_id = task.get('id') or task.get('task_id')
if task_id:
state_manager.mark_auth_in_progress(task_id, owner_id)
url_batch = [task.get('url') for task in task_batch if task.get('url')]
if not url_batch: if not url_batch:
logger.info(f"[Worker {worker_id}] No more URLs to process. Worker exiting.") if not queue_policy:
break logger.info(f"[Worker {worker_id}] No more URLs to process. Worker exiting.")
break # Exit the while loop
else:
# Queue mode: no tasks, unlock and poll
polling_interval = exec_control.get('worker_polling_interval_seconds', 1)
logger.debug(f"[Worker {worker_id}] No tasks in queue for profile '{profile_name}'. Unlocking and sleeping for {polling_interval}s.")
profile_manager_instance.unlock_profile(profile_name, owner=owner_id)
locked_profile = None
time.sleep(polling_interval)
continue
url_batch_len = len(url_batch) url_batch_len = len(url_batch)
batch_started = True batch_started = True
@ -1722,7 +1776,7 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man
if os.path.exists(host_file_path): if os.path.exists(host_file_path):
_post_process_and_move_info_json( _post_process_and_move_info_json(
Path(host_file_path), profile_name, proxy_url, policy, worker_id, Path(host_file_path), profile_name, proxy_url, policy, worker_id,
profile_manager_instance=profile_manager_instance state_manager, profile_manager_instance=profile_manager_instance
) )
else: else:
logger.warning(f"File from log not found on host for immediate processing: {host_file_path}") logger.warning(f"File from log not found on host for immediate processing: {host_file_path}")
@ -1833,7 +1887,7 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man
if processed_files: if processed_files:
logger.info(f"[Worker {worker_id}] Found {len(processed_files)} leftover file(s) to process after live parsing.") logger.info(f"[Worker {worker_id}] Found {len(processed_files)} leftover file(s) to process after live parsing.")
for temp_path in processed_files: for temp_path in processed_files:
_post_process_and_move_info_json(temp_path, profile_name, proxy_url, policy, worker_id, profile_manager_instance=profile_manager_instance) _post_process_and_move_info_json(temp_path, profile_name, proxy_url, policy, worker_id, state_manager, profile_manager_instance=profile_manager_instance)
# A batch is considered an overall success for logging if it had no fatal errors. # A batch is considered an overall success for logging if it had no fatal errors.
# The per-URL activity has already been recorded live. # The per-URL activity has already been recorded live.
@ -1922,6 +1976,7 @@ def run_direct_docker_download_worker(worker_id, policy, state_manager, args, pr
exec_control = policy.get('execution_control', {}) exec_control = policy.get('execution_control', {})
d_policy = policy.get('download_policy', {}) d_policy = policy.get('download_policy', {})
direct_policy = policy.get('direct_docker_cli_policy', {}) direct_policy = policy.get('direct_docker_cli_policy', {})
queue_policy = policy.get('queue_policy')
profile_prefix = d_policy.get('profile_prefix') profile_prefix = d_policy.get('profile_prefix')
if not profile_prefix: if not profile_prefix:
@ -1949,13 +2004,16 @@ def run_direct_docker_download_worker(worker_id, policy, state_manager, args, pr
no_task_streak = 0 no_task_streak = 0
last_used_profile_name = None last_used_profile_name = None
task_counter = 0
while not state_manager.shutdown_event.is_set(): while not state_manager.shutdown_event.is_set():
locked_profile = None locked_profile = None
claimed_task_path_host = None claimed_task_path_host = None
temp_config_dir_host = None temp_config_dir_host = None
was_banned_by_parser = False was_banned_by_parser = False
task = None
task_id = None
try: try:
if no_task_streak > 0: if no_task_streak > 0 and not queue_policy: # Polling only makes sense for file mode
polling_interval = exec_control.get('worker_polling_interval_seconds', 1) polling_interval = exec_control.get('worker_polling_interval_seconds', 1)
# --- Add diagnostic logging --- # --- Add diagnostic logging ---
all_profiles_in_pool = profile_manager_instance.list_profiles() all_profiles_in_pool = profile_manager_instance.list_profiles()
@ -1970,14 +2028,64 @@ def run_direct_docker_download_worker(worker_id, policy, state_manager, args, pr
time.sleep(polling_interval) time.sleep(polling_interval)
if state_manager.shutdown_event.is_set(): continue if state_manager.shutdown_event.is_set(): continue
# 1. Find a task and lock its associated profile # 1. Get a task
locked_profile, claimed_task_path_host = find_task_and_lock_profile( if not queue_policy:
profile_manager_instance, owner_id, profile_prefix, policy, worker_id # File-based mode: Find a task and lock its associated profile
) locked_profile, claimed_task_path_host = find_task_and_lock_profile(
profile_manager_instance, owner_id, profile_prefix, policy, worker_id
)
else:
# Queue-based mode
task = state_manager.get_download_task()
if task:
task_id = task.get('id') or task.get('task_id')
if not task_id:
task_id = f"dl_task_{worker_id}_{task_counter}"
task_counter += 1
task['task_id'] = task_id
info_json_path_str = task.get('info_json_path')
if not info_json_path_str or not os.path.exists(info_json_path_str):
logger.error(f"[Worker {worker_id}] Task {task_id} has invalid info_json_path: {info_json_path_str}. Skipping.")
state_manager.report_download_skipped(task_id, {"error": "Invalid info_json_path", "task": task})
auth_profile_name = task.get('auth_profile_name')
auth_env = task.get('auth_env')
if auth_profile_name and auth_env:
auth_manager = get_auth_manager(profile_manager_instance, auth_env)
if auth_manager:
auth_manager.decrement_pending_downloads(auth_profile_name)
continue
claimed_task_path_host = Path(info_json_path_str)
# Now lock a profile
specific_profile = task.get('auth_profile_name') or task.get('profile_name')
if specific_profile:
locked_profile = profile_manager_instance.lock_profile(owner=owner_id, specific_profile_name=specific_profile)
if not locked_profile:
logger.warning(f"[Worker {worker_id}] Could not lock specific profile '{specific_profile}'. Trying any profile with prefix.")
locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix)
else:
locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix)
if not locked_profile:
logger.warning(f"[Worker {worker_id}] No profiles available for task {task_id}. Re-queueing.")
state_manager.add_download_tasks_batch([task])
claimed_task_path_host = None
task = None
if task:
state_manager.mark_download_in_progress(task_id, owner_id)
if not locked_profile: if not locked_profile:
no_task_streak += 1 if not queue_policy:
# The main loop will pause if the streak continues. no_task_streak += 1
else:
# In queue mode, if we didn't get a task or a profile, we just poll.
polling_interval = exec_control.get('worker_polling_interval_seconds', 1)
logger.debug(f"[Worker {worker_id}] No download tasks or profiles available. Sleeping for {polling_interval}s.")
time.sleep(polling_interval)
continue continue
profile_name = locked_profile['name'] profile_name = locked_profile['name']
@ -2318,15 +2426,27 @@ def run_direct_docker_download_worker(worker_id, policy, state_manager, args, pr
logger.info(f"[Worker {worker_id}] [{profile_name}] Task processing complete. Worker will now unlock profile and attempt next task.") logger.info(f"[Worker {worker_id}] [{profile_name}] Task processing complete. Worker will now unlock profile and attempt next task.")
# 6. Clean up task file by renaming to .processed # 6. Clean up task file
try: if not queue_policy:
# The claimed_task_path_host has a .LOCKED suffix, remove it before adding .processed # File-based mode: rename to .processed
base_path_str = str(claimed_task_path_host).rsplit('.LOCKED.', 1)[0] try:
processed_path = Path(f"{base_path_str}.processed") # The claimed_task_path_host has a .LOCKED suffix, remove it before adding .processed
claimed_task_path_host.rename(processed_path) base_path_str = str(claimed_task_path_host).rsplit('.LOCKED.', 1)[0]
logger.debug(f"[{sp_utils.get_display_name(claimed_task_path_host)}] Renamed processed task file to '{processed_path.name}'.") processed_path = Path(f"{base_path_str}.processed")
except (OSError, IndexError) as e: claimed_task_path_host.rename(processed_path)
logger.error(f"Failed to rename processed task file '{claimed_task_path_host}': {e}") logger.debug(f"[{sp_utils.get_display_name(claimed_task_path_host)}] Renamed processed task file to '{processed_path.name}'.")
except (OSError, IndexError) as e:
logger.error(f"Failed to rename processed task file '{claimed_task_path_host}': {e}")
elif d_policy.get('rename_source_info_json_on_success'):
# Queue-based mode: respect rename policy
source_path_to_rename = task.get('info_json_path')
if success and source_path_to_rename and os.path.exists(source_path_to_rename):
try:
processed_path = source_path_to_rename + ".processed"
shutil.move(source_path_to_rename, processed_path)
logger.info(f"[Worker {worker_id}] Renamed source info.json to '{processed_path}'")
except Exception as e:
logger.warning(f"[Worker {worker_id}] Could not rename source info.json '{source_path_to_rename}': {e}")
# After this point, claimed_task_path_host is no longer valid. # After this point, claimed_task_path_host is no longer valid.
# The metadata has already been read into auth_profile_name and auth_env. # The metadata has already been read into auth_profile_name and auth_env.
else: else:
@ -2344,7 +2464,7 @@ def run_direct_docker_download_worker(worker_id, policy, state_manager, args, pr
# The auth_profile_name and auth_env variables were populated in the `try` block # The auth_profile_name and auth_env variables were populated in the `try` block
# before the task file was renamed or deleted. # before the task file was renamed or deleted.
if auth_profile_name and auth_env: if auth_profile_name and auth_env:
auth_manager = _get_auth_manager(profile_manager_instance, auth_env) auth_manager = get_auth_manager(profile_manager_instance, auth_env)
if auth_manager: if auth_manager:
auth_manager.decrement_pending_downloads(auth_profile_name) auth_manager.decrement_pending_downloads(auth_profile_name)
else: else:
@ -2403,9 +2523,14 @@ def run_direct_docker_download_worker(worker_id, policy, state_manager, args, pr
owner=owner_id, owner=owner_id,
rest_for_seconds=cooldown rest_for_seconds=cooldown
) )
if claimed_task_path_host and os.path.exists(claimed_task_path_host): if not queue_policy and claimed_task_path_host and os.path.exists(claimed_task_path_host):
# Clean up .LOCKED file in file-based mode
try: os.remove(claimed_task_path_host) try: os.remove(claimed_task_path_host)
except OSError: pass except OSError: pass
if task and task_id:
state_manager.remove_download_in_progress(task_id)
if temp_config_dir_host and os.path.exists(temp_config_dir_host): if temp_config_dir_host and os.path.exists(temp_config_dir_host):
try: try:
shutil.rmtree(temp_config_dir_host) shutil.rmtree(temp_config_dir_host)
@ -2612,7 +2737,7 @@ def run_direct_download_worker(worker_id, policy, state_manager, args, profile_m
# The auth_profile_name and auth_env variables were populated in the `try` block # The auth_profile_name and auth_env variables were populated in the `try` block
# before the task file was deleted. # before the task file was deleted.
if auth_profile_name and auth_env: if auth_profile_name and auth_env:
auth_manager = _get_auth_manager(profile_manager_instance, auth_env) auth_manager = get_auth_manager(profile_manager_instance, auth_env)
if auth_manager: if auth_manager:
auth_manager.decrement_pending_downloads(auth_profile_name) auth_manager.decrement_pending_downloads(auth_profile_name)
else: else:

View File

@ -615,48 +615,72 @@ def main_stress_policy(args):
workers = exec_control.get('workers', 1) workers = exec_control.get('workers', 1)
if mode == 'fetch_only': if mode == 'fetch_only':
urls_file = settings.get('urls_file') queue_policy = policy.get('queue_policy')
if not urls_file: urls_list = [] # Default to empty for queue mode
logger.error("Direct docker CLI (fetch) mode requires 'settings.urls_file'.")
return 1
try: if not queue_policy:
with open(urls_file, 'r', encoding='utf-8') as f: urls_file = settings.get('urls_file')
urls_list = [line.strip() for line in f if line.strip()] if not urls_file:
except IOError as e: logger.error("Direct docker CLI (fetch) mode requires 'settings.urls_file' if not configured for queue operation via 'queue_policy'.")
logger.error(f"Could not read urls_file '{urls_file}': {e}") return 1
return 1
if not urls_list: try:
logger.error(f"URL file '{urls_file}' is empty. Nothing to do.") with open(urls_file, 'r', encoding='utf-8') as f:
return 1 urls_list = [line.strip() for line in f if line.strip()]
except IOError as e:
logger.error(f"Could not read urls_file '{urls_file}': {e}")
return 1
start_index = 0 if not urls_list:
if args.start_from_url_index is not None: logger.error(f"URL file '{urls_file}' is empty. Nothing to do.")
start_index = max(0, args.start_from_url_index - 1) return 1
state_manager.update_last_url_index(start_index, force=True)
else:
start_index = state_manager.get_last_url_index()
if start_index >= len(urls_list) and len(urls_list) > 0: start_index = 0
logger.warning("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") if args.start_from_url_index is not None:
logger.warning("!!! ALL URLS HAVE BEEN PROCESSED IN PREVIOUS RUNS (based on state file) !!!") start_index = max(0, args.start_from_url_index - 1)
logger.warning(f"!!! State file indicates start index {start_index + 1}, but URL file has only {len(urls_list)} URLs. !!!") state_manager.update_last_url_index(start_index, force=True)
logger.warning("!!! Deleting state file and stopping. Please run the command again to start from the beginning. !!!")
logger.warning("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
if not args.dry_run and not args.disable_log_writing:
state_manager.close()
try:
os.remove(state_manager.state_file_path)
logger.info(f"Deleted state file: {state_manager.state_file_path}")
except OSError as e:
logger.error(f"Failed to delete state file: {e}")
else: else:
logger.info("[Dry Run] Would have deleted state file and stopped.") start_index = state_manager.get_last_url_index()
return 0
if start_index > 0: if start_index >= len(urls_list) and len(urls_list) > 0:
logger.info(f"Starting/resuming from URL index {start_index + 1}.") logger.warning("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
logger.warning("!!! ALL URLS HAVE BEEN PROCESSED IN PREVIOUS RUNS (based on state file) !!!")
logger.warning(f"!!! State file indicates start index {start_index + 1}, but URL file has only {len(urls_list)} URLs. !!!")
logger.warning("!!! Deleting state file and stopping. Please run the command again to start from the beginning. !!!")
logger.warning("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
if not args.dry_run and not args.disable_log_writing:
state_manager.close()
try:
os.remove(state_manager.state_file_path)
logger.info(f"Deleted state file: {state_manager.state_file_path}")
except OSError as e:
logger.error(f"Failed to delete state file: {e}")
else:
logger.info("[Dry Run] Would have deleted state file and stopped.")
return 0
if start_index > 0:
logger.info(f"Starting/resuming from URL index {start_index + 1}.")
else:
logger.info("Direct docker CLI (fetch) mode is running in QUEUE mode.")
# Initialize queue provider
redis_host = args.redis_host or os.getenv('REDIS_HOST') or queue_policy.get('redis_host') or 'localhost'
redis_port = args.redis_port if args.redis_port is not None else (int(os.getenv('REDIS_PORT')) if os.getenv('REDIS_PORT') else (queue_policy.get('redis_port') or 6379))
redis_password = args.redis_password or os.getenv('REDIS_PASSWORD') or queue_policy.get('redis_password')
redis_db = args.redis_db if args.redis_db is not None else (int(os.getenv('REDIS_DB')) if os.getenv('REDIS_DB') else (queue_policy.get('redis_db') or 0))
use_env_prefix = queue_policy.get('use_env_prefix', True)
env_prefix = None
if use_env_prefix:
env_prefix = profile_manager_instance.key_prefix.removesuffix('_profile_mgmt_')
state_manager.initialize_queue_provider(
redis_host=redis_host,
redis_port=redis_port,
redis_password=redis_password,
redis_db=redis_db,
env_prefix=env_prefix
)
sp_utils.display_effective_policy(policy, policy_name, sources=urls_list) sp_utils.display_effective_policy(policy, policy_name, sources=urls_list)
if args.dry_run: return 0 if args.dry_run: return 0
@ -671,15 +695,37 @@ def main_stress_policy(args):
logger.info("Shutdown signal received, workers have finished.") logger.info("Shutdown signal received, workers have finished.")
elif mode == 'download_only': elif mode == 'download_only':
info_json_dir = settings.get('info_json_dir') queue_policy = policy.get('queue_policy')
if not info_json_dir: if not queue_policy:
logger.error("Direct docker CLI (download) mode requires 'settings.info_json_dir'.") info_json_dir = settings.get('info_json_dir')
return 1 if not info_json_dir:
try: logger.error("Direct docker CLI (download) mode requires 'settings.info_json_dir' if not configured for queue operation.")
os.makedirs(info_json_dir, exist_ok=True) return 1
except OSError as e: try:
logger.error(f"Failed to create info.json directory '{info_json_dir}': {e}") os.makedirs(info_json_dir, exist_ok=True)
return 1 except OSError as e:
logger.error(f"Failed to create info.json directory '{info_json_dir}': {e}")
return 1
else:
logger.info("Direct docker CLI (download) mode is running in QUEUE mode.")
# Initialize queue provider
redis_host = args.redis_host or os.getenv('REDIS_HOST') or queue_policy.get('redis_host') or 'localhost'
redis_port = args.redis_port if args.redis_port is not None else (int(os.getenv('REDIS_PORT')) if os.getenv('REDIS_PORT') else (queue_policy.get('redis_port') or 6379))
redis_password = args.redis_password or os.getenv('REDIS_PASSWORD') or queue_policy.get('redis_password')
redis_db = args.redis_db if args.redis_db is not None else (int(os.getenv('REDIS_DB')) if os.getenv('REDIS_DB') else (queue_policy.get('redis_db') or 0))
use_env_prefix = queue_policy.get('use_env_prefix', True)
env_prefix = None
if use_env_prefix:
env_prefix = profile_manager_instance.key_prefix.removesuffix('_profile_mgmt_')
state_manager.initialize_queue_provider(
redis_host=redis_host,
redis_port=redis_port,
redis_password=redis_password,
redis_db=redis_db,
env_prefix=env_prefix
)
sp_utils.display_effective_policy(policy, policy_name, sources=[]) sp_utils.display_effective_policy(policy, policy_name, sources=[])
if args.dry_run: return 0 if args.dry_run: return 0