From 02132f1732586bf3beebaa92c85bb481e6614dcc Mon Sep 17 00:00:00 2001 From: aperez Date: Fri, 26 Dec 2025 12:57:15 +0300 Subject: [PATCH] Update policies on proper queue reading --- .../policies/12_queue_auth_simulation.yaml | 125 ++++++++++++ .../13_queue_download_simulation.yaml | 108 +++++++++++ .../policies/queue_auth_simulation.yaml | 62 ------ .../policies/queue_download_simulation.yaml | 64 ------- .../policies/queue_full_stack_simulation.yaml | 78 -------- .../ytops_client/policy_enforcer_tool.py | 17 ++ .../ytops_client/stress_policy/workers.py | 181 +++++++++++++++--- .../ytops_client/stress_policy_tool.py | 140 +++++++++----- 8 files changed, 496 insertions(+), 279 deletions(-) create mode 100644 ytops_client-source/policies/12_queue_auth_simulation.yaml create mode 100644 ytops_client-source/policies/13_queue_download_simulation.yaml delete mode 100644 ytops_client-source/policies/queue_auth_simulation.yaml delete mode 100644 ytops_client-source/policies/queue_download_simulation.yaml delete mode 100644 ytops_client-source/policies/queue_full_stack_simulation.yaml diff --git a/ytops_client-source/policies/12_queue_auth_simulation.yaml b/ytops_client-source/policies/12_queue_auth_simulation.yaml new file mode 100644 index 0000000..6d48b60 --- /dev/null +++ b/ytops_client-source/policies/12_queue_auth_simulation.yaml @@ -0,0 +1,125 @@ +# Policy: Queue-based Authentication Simulation via Direct Docker Exec +# +# This policy simulates a continuous stream of info.json fetch requests using +# the 'direct_docker_cli' mode. It pulls URLs from a Redis queue, creates a +# temporary batch file, and then calls a yt-dlp command inside a running +# Docker container. +# +name: 12_queue_auth_simulation + +settings: + mode: fetch_only + orchestration_mode: direct_docker_cli + profile_mode: from_pool_with_lock + # The save directory MUST be inside the docker_host_mount_path. + save_info_json_dir: "run/docker_mount/fetched_info_jsons/queue_simulation" + +execution_control: + workers: 1 + # How long a worker should pause if it cannot find an available profile to lock. + worker_polling_interval_seconds: 1 + # No sleep between tasks; throughput is controlled by yt-dlp performance and profile availability. + +info_json_generation_policy: + profile_prefix: "user1" + +direct_docker_cli_policy: + # Which simulation environment's profiles to use for locking. + use_profile_env: "auth" + + # If true, a worker will try to lock a different profile than the one it just used. + avoid_immediate_profile_reuse: true + # How long the worker should wait for a different profile before re-using the same one. + avoid_reuse_max_wait_seconds: 5 + + # NOTE on Rate Limits: With the default yt-dlp settings, the rate limit for guest + # sessions is ~300 videos/hour (~1000 webpage/player requests per hour). + # For accounts, it is ~2000 videos/hour (~4000 webpage/player requests per hour). + # The enforcer policy (e.g., 8_unified_simulation_enforcer.yaml) should be + # configured to respect these limits via rotation and rest periods. + + # If true, extract the visitor_id from yt-dlp logs, save it per-profile, + # and inject it into subsequent requests for that profile. + #track_visitor_id: true + + # --- Docker Execution Settings --- + docker_image_name: "ytops/yt-dlp" # Image to use for `docker run` + docker_network_name: "airflow_proxynet" + # IMPORTANT: This path on the HOST will be mounted into the container at `docker_container_mount_path`. + docker_host_mount_path: "run/docker_mount" + docker_container_mount_path: "/config" # The mount point inside the container + + # Host path for persisting cache data (e.g., cookies, sigfuncs) between runs. + docker_host_cache_path: ".cache/queue_auth_simulation" + # Path inside the container where the cache is mounted. Should match HOME/.cache + docker_container_cache_path: "/config/.cache" + + # If true, create and use a persistent cookie jar per profile inside the cache dir. + # use_cookies: true + + # --- User-Agent Generation --- + # Template for generating User-Agent strings for new profiles. + # The '{major_version}' will be replaced by a version string. + user_agent_template: "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{major_version}.0.0.0 Safari/537.36" + # Range of Chrome major versions to use for the template. A range suitable for TV devices. + user_agent_version_range: [110, 120] + + # A base config file can be used, with overrides applied from the policy. + # The orchestrator will inject 'proxy', 'batch-file', and 'output' keys into the overrides. + ytdlp_config_file: "cli.auth.config" + ytdlp_config_overrides: + skip-download: true + write-info-json: true + no-write-subs: true + no-color: true + ignore-errors: true + use-extractors: ["youtube"] + + ytdlp_raw_args: + - '--extractor-args "youtube:formats=duplicate;jsc_trace=true;player_client=tv_simply;pot_trace=true;skip=translated_subs,hls"' + - '--extractor-args "youtubepot-bgutilhttp:base_url=http://172.17.0.1:4416"' + - '--sleep-requests 0.75' + # --retry-sleep linear=1::2' + + # --- Live Error Parsing Rules --- + # These regex patterns are checked against yt-dlp's stderr in real-time. + # If a fatal error is detected, immediately ban the profile to stop the container + # and prevent further errors in the same batch. + ban_on_fatal_error_in_batch: true + fatal_error_patterns: + - "Sign in to confirm you’re not a bot" + - "rate-limited by YouTube" + - "This content isn't available, try again later" + - "HTTP Error 502" + + tolerated_error_patterns: + - "HTTP Error 429" + - "The uploader has not made this video available in your country" + - "This video has been removed by the uploader" + - "Private video" + - "This is a private video" + - "Video is private" + - "Video unavailable" + - "account associated with this video has been terminated" + - "members-only content" + - "Sign in to confirm your age" + + # Template for renaming the final info.json. + rename_file_template: "{video_id}-{profile_name}-{proxy}.info.json" + +queue_policy: + # Set to false to use legacy, unprefixed queue names (e.g., 'queue2_auth_inbox'). + # Set to true (or omit) to use environment-prefixed names (e.g., 'sim_auth_queue2_auth_inbox'). + use_env_prefix: false + + # If specified, create download tasks for these formats + # Can be "all", a specific format ID, or a list of format IDs + formats_to_download: "140-dashy/140-dashy-0/140,299-dashy/298-dashy/137-dashy/136-dashy/135-dashy/134-dashy/133-dashy" + + # How many tasks a worker should pull from the queue at once. + # This will become the batch size for the docker run. + batch_size: 25 + +simulation_parameters: + auth_env: "sim_auth" + download_env: "sim_download" diff --git a/ytops_client-source/policies/13_queue_download_simulation.yaml b/ytops_client-source/policies/13_queue_download_simulation.yaml new file mode 100644 index 0000000..7397d56 --- /dev/null +++ b/ytops_client-source/policies/13_queue_download_simulation.yaml @@ -0,0 +1,108 @@ +# Policy: Queue-based Download Simulation via Direct Docker Exec +# +# This policy simulates a continuous stream of downloads using the +# 'direct_docker_cli' mode with `mode: download_only`. It pulls download +# tasks from a Redis queue, each containing a path to an info.json file, +# and invokes a yt-dlp command inside a running Docker container to perform +# the download. +# +name: 13_queue_download_simulation + +settings: + mode: download_only + orchestration_mode: direct_docker_cli + profile_mode: from_pool_with_lock + # In queue mode, info_json_dir is not used to find tasks. + # However, the paths inside the download tasks must be accessible + # within the docker_host_mount_path. + # The profile_extraction_regex is also not needed as the profile + # can be specified in the download task. + +execution_control: + workers: 4 + # How long a worker should pause if it cannot find an available profile or task. + worker_polling_interval_seconds: 1 + +download_policy: + profile_prefix: "user1" + # Default cooldown in seconds if not specified by the enforcer in Redis. + # The value from Redis (set via `unlock_cooldown_seconds` in the enforcer policy) + # will always take precedence. This is a fallback. + # Can be an integer (e.g., 1) or a range (e.g., [1, 3]). + default_unlock_cooldown_seconds: 1 + # If true, check if the download URL in the info.json is expired before + # attempting to download. This is enabled by default. + check_url_expiration: true + # --- Airflow Integration --- + # If true, move downloaded media and info.json to a timestamped, video-id-based + # directory structure that the Airflow DAGs can process. + output_to_airflow_ready_dir: true + airflow_ready_dir_base_path: "downloadfiles/videos/ready" + +direct_docker_cli_policy: + # Which simulation environment's profiles to use for locking. + use_profile_env: "download" + + # If true, a worker will try to lock a different profile than the one it just used. + # This is disabled for downloads, as the cooldown mechanism is sufficient. + avoid_immediate_profile_reuse: false + # How long the worker should wait for a different profile before re-using the same one. + avoid_reuse_max_wait_seconds: 5 + + # NOTE on Rate Limits: With the default yt-dlp settings, the rate limit for guest + # sessions is ~300 videos/hour (~1000 webpage/player requests per hour). + # For accounts, it is ~2000 videos/hour (~4000 webpage/player requests per hour). + # This enforcer policy should be configured to respect these limits via + # rotation and rest periods. + + # --- Docker Execution Settings --- + docker_image_name: "ytops/yt-dlp" + docker_network_name: "airflow_proxynet" + # Host path mounted into the container for task files (info.json, config). + # IMPORTANT: This must be the SAME host path used for the `info_json_dir` above, + # or a parent directory of it, so the container can see the task files. + docker_host_mount_path: "run/docker_mount" + docker_container_mount_path: "/config" + + # Path on the HOST where downloaded files will be saved. + docker_host_download_path: "downloaded_media/queue_downloads" + # Path inside the CONTAINER where `docker_host_download_path` is mounted. + docker_container_download_path: "/downloads" + + # A base config file can be used, with overrides applied from the policy. + # The orchestrator will inject 'proxy', 'load-info-json', and 'output' keys into the overrides. + ytdlp_config_file: "cli.download.config" + ytdlp_config_overrides: + format: "299-dashy/298-dashy/137-dashy/136-dashy/135-dashy/134-dashy/133-dashy,140-dashy/140-dashy-0/140" + #format: "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best" + no-resize-buffer: true + buffer-size: "4M" + concurrent-fragments: 8 + + ytdlp_raw_args: [] + + # --- Live Error Parsing Rules --- + # If a fatal error is detected, immediately ban the profile to stop the container. + ban_on_fatal_error_in_batch: true + fatal_error_patterns: + - "HTTP Error 403" + - "HTTP Error 502" + + tolerated_error_patterns: + - "timed out" + - "Timeout" + - "connection reset by peer" + - "Invalid data found when processing input" + - "Error opening input files" + +queue_policy: + # Set to false to use legacy, unprefixed queue names (e.g., 'queue2_dl_inbox'). + # Set to true (or omit) to use environment-prefixed names (e.g., 'sim_download_queue2_dl_inbox'). + use_env_prefix: false + + # How many tasks to process in a batch. For downloads, this should be 1, + # as each worker locks a profile for a single download task. + batch_size: 1 + +simulation_parameters: + download_env: "sim_download" diff --git a/ytops_client-source/policies/queue_auth_simulation.yaml b/ytops_client-source/policies/queue_auth_simulation.yaml deleted file mode 100644 index 9f910ea..0000000 --- a/ytops_client-source/policies/queue_auth_simulation.yaml +++ /dev/null @@ -1,62 +0,0 @@ -# Policy: Queue-based Authentication Simulation -# -# This policy simulates a continuous stream of authentication requests -# by pulling URLs from a Redis queue, processing them with yt-ops-server, -# and pushing results to appropriate result queues. -# -name: queue_auth_simulation - -settings: - mode: fetch_only - orchestration_mode: queue_auth - profile_mode: from_pool_with_lock - # Directory to save info.json files (optional). - # For distributed operation across multiple machines, this MUST be a shared location - # like an S3 bucket path. The underlying code must be adapted to handle S3 paths. - save_info_json_dir: "run/docker_mount/fetched_info_jsons/" #"s3://your-shared-bucket/stress_test/info_jsons/" - - - dummy_simulation_settings: - # Simulate auth processing time between 5 and 10 seconds for each URL. - auth_min_seconds: 5 - auth_max_seconds: 10 - # You can also control simulated failure rates here. - auth_failure_rate: 0.0 # 0% failure rate - auth_skipped_failure_rate: 0.0 # 0% skipped rate - -execution_control: - workers: 1 - # How long a worker should pause if it cannot find an available profile or task. - worker_polling_interval_seconds: 1 - # Run until conditions - run_until: - # Run for this many minutes (0 = unlimited) - minutes: 0 - # Process this many requests (0 = unlimited) - requests: 0 - - - - -info_json_generation_policy: - profile_prefix: "user1" - client: "ytdlp" - # Extra arguments to pass to the get-info command - extra_args: "--verbose" - -queue_policy: - # Set to false to use legacy, unprefixed queue names (e.g., 'queue2_auth_inbox'). - # Set to true (or omit) to use environment-prefixed names (e.g., 'sim_auth_queue2_auth_inbox'). - use_env_prefix: false - - # If specified, create download tasks for these formats - # Can be "all", a specific format ID, or a list of format IDs - formats_to_download: "140-dashy/140-dashy-0/140,299-dashy/298-dashy/137-dashy/136-dashy/135-dashy/134-dashy/133-dashy" - - # How many tasks a worker should pull from the queue at once. - # The worker will lock one profile to process the entire batch. - batch_size: 25 - -simulation_parameters: - auth_env: "sim_auth" - download_env: "sim_download" diff --git a/ytops_client-source/policies/queue_download_simulation.yaml b/ytops_client-source/policies/queue_download_simulation.yaml deleted file mode 100644 index 608b795..0000000 --- a/ytops_client-source/policies/queue_download_simulation.yaml +++ /dev/null @@ -1,64 +0,0 @@ -# Policy: Queue-based Download Simulation -# -# This policy simulates a continuous stream of download requests -# by pulling tasks from a Redis queue, downloading the specified formats, -# and pushing results to appropriate result queues. -# -name: queue_download_simulation - -settings: - mode: download_only - orchestration_mode: queue_download - profile_mode: from_pool_with_lock - - dummy_simulation_settings: - download_min_seconds: 5 - download_max_seconds: 8 - download_failure_rate: 0.0 - download_skipped_failure_rate: 0.0 - - -execution_control: - workers: 4 - # How long a worker should pause if it cannot find an available profile or task. - worker_polling_interval_seconds: 1 - # Run until conditions - run_until: - # Run for this many minutes (0 = unlimited) - minutes: 0 - # Process this many requests (0 = unlimited) - requests: 0 - -download_policy: - profile_prefix: "user1" - # Default cooldown in seconds if not specified by the enforcer in Redis. - # The value from Redis (set via `unlock_cooldown_seconds` in the enforcer policy) - # will always take precedence. This is a fallback. - # Can be an integer (e.g., 1) or a range (e.g., [1, 3]). - default_unlock_cooldown_seconds: 1 - # Directory to save downloaded files - output_dir: "downloaded_media/queue_downloads" - # Extra arguments to pass to the download command - extra_args: "--verbose" - - # After a download task is successfully processed, rename the source info.json - # to prevent re-processing. This is safe if you generate one download task per info.json. - rename_source_info_json_on_success: true - - # --- Airflow Integration --- - # If true, move downloaded media and info.json to a timestamped, video-id-based - # directory structure that the Airflow DAGs can process. - output_to_airflow_ready_dir: true - airflow_ready_dir_base_path: "downloadfiles/videos/ready" - -queue_policy: - # Set to false to use legacy, unprefixed queue names (e.g., 'queue2_dl_inbox'). - # Set to true (or omit) to use environment-prefixed names (e.g., 'sim_download_queue2_dl_inbox'). - use_env_prefix: false - - # How many tasks to process in a batch. For downloads, this should be 1, - # as each worker locks a profile for a single download task. - batch_size: 1 - -simulation_parameters: - download_env: "sim_download" diff --git a/ytops_client-source/policies/queue_full_stack_simulation.yaml b/ytops_client-source/policies/queue_full_stack_simulation.yaml deleted file mode 100644 index 44e1926..0000000 --- a/ytops_client-source/policies/queue_full_stack_simulation.yaml +++ /dev/null @@ -1,78 +0,0 @@ -# Policy: Queue-based Full Stack Simulation -# -# This policy simulates a complete workflow by running both authentication -# and download workers simultaneously, processing tasks from Redis queues. -# -name: queue_full_stack_simulation - -settings: - mode: full_stack - orchestration_mode: queue_full_stack - profile_mode: from_pool_with_lock - # Directory to save info.json files (optional) - save_info_json_dir: "run/queue_auth_results" - - dummy_simulation_settings: - auth_min_seconds: 0.75 - auth_max_seconds: 1.5 - auth_failure_rate: 0.0 - auth_skipped_failure_rate: 0.0 - download_min_seconds: 5 - download_max_seconds: 8 - download_failure_rate: 0.0 - download_skipped_failure_rate: 0.0 - -execution_control: - # Number of workers for each stage - auth_workers: 2 - download_workers: 4 - # How long a worker should pause if it cannot find an available profile or task. - worker_polling_interval_seconds: 1 - # Run until conditions - run_until: - # Run for this many minutes (0 = unlimited) - minutes: 0 - # Process this many requests (0 = unlimited) - requests: 0 - -info_json_generation_policy: - profile_prefix: "user1" - client: "ytdlp" - # Extra arguments to pass to the get-info command - extra_args: "--verbose" - -download_policy: - profile_prefix: "user1" - # Default cooldown in seconds if not specified by the enforcer in Redis. - default_unlock_cooldown_seconds: 1 - # Directory to save downloaded files - output_dir: "downloaded_media/queue_downloads" - # Extra arguments to pass to the download command - extra_args: "--verbose" - - # After a download task is successfully processed, rename the source info.json - # to prevent re-processing. This is safe if you generate one download task per info.json. - rename_source_info_json_on_success: true - -queue_policy: - # Redis connection settings (can be overridden by CLI args) - redis_host: "localhost" - redis_port: 6379 - redis_password: "" - redis_db: 0 - - # If specified, create download tasks for these formats - # Can be "all", a specific format ID, or a list of format IDs - formats_to_download: "140-dashy/140-dashy-0/140,299-dashy/298-dashy/137-dashy/136-dashy/135-dashy/134-dashy/133-dashy" - - # How many tasks to process in a batch (for batch operations) - batch_size: 10 - - # Queue management options - requeue_failed_tasks: true - requeue_batch_size: 50 - requeue_interval_seconds: 300 - -simulation_parameters: - auth_env: "sim_auth" - download_env: "sim_download" diff --git a/ytops_client-source/ytops_client/policy_enforcer_tool.py b/ytops_client-source/ytops_client/policy_enforcer_tool.py index 4391c35..0e22a15 100644 --- a/ytops_client-source/ytops_client/policy_enforcer_tool.py +++ b/ytops_client-source/ytops_client/policy_enforcer_tool.py @@ -326,6 +326,23 @@ class PolicyEnforcer: all_profiles_map[profile_name]['rest_reason'] = reason continue # Do not activate, group is full. + else: + # Defensive check for orphaned profiles that should be in a group. + # This can happen if list_profiles() returns an incomplete list for one cycle, + # causing the group maps to be incomplete. This check prevents a "stampede" + # of activations that would violate group limits. + is_orphan = False + for group in profile_groups: + prefix = group.get('prefix') + if prefix and profile_name.startswith(prefix): + is_orphan = True + logger.warning( + f"Profile '{profile_name}' appears to belong to group '{group.get('name')}' " + f"but was not found in the initial scan. Deferring activation to prevent violating group limits." + ) + break + if is_orphan: + continue # Skip activation for this profile # --- End group check --- # Before activating, ensure the profile's proxy is not resting. diff --git a/ytops_client-source/ytops_client/stress_policy/workers.py b/ytops_client-source/ytops_client/stress_policy/workers.py index 261787f..e4ea2cc 100644 --- a/ytops_client-source/ytops_client/stress_policy/workers.py +++ b/ytops_client-source/ytops_client/stress_policy/workers.py @@ -876,7 +876,7 @@ def run_throughput_worker(worker_id, policy, state_manager, args, profile_manage return [] # This function doesn't return results directly -def _post_process_and_move_info_json(file_path, profile_name, proxy_url, policy, worker_id, profile_manager_instance=None): +def _post_process_and_move_info_json(file_path, profile_name, proxy_url, policy, worker_id, state_manager, profile_manager_instance=None): """Helper to post-process a single info.json file and move it to the final directory.""" direct_policy = policy.get('direct_docker_cli_policy', {}) settings = policy.get('settings', {}) @@ -918,9 +918,35 @@ def _post_process_and_move_info_json(file_path, profile_name, proxy_url, policy, ) final_path = Path(save_dir) / new_name - # Use rename for atomic move - os.rename(str(file_path), str(final_path)) + # Use shutil.move, which can handle cross-device moves (e.g., to an S3 mount) + # by falling back to a copy-and-delete operation. + shutil.move(str(file_path), str(final_path)) logger.info(f"[Worker {worker_id}] Post-processed and moved info.json to '{final_path}'") + + # --- Create download task if in queue mode --- + queue_policy = policy.get('queue_policy', {}) + formats_to_download = queue_policy.get('formats_to_download') + + if formats_to_download and state_manager and state_manager.queue_provider: + try: + url = info_data.get('original_url') or info_data.get('webpage_url') + + download_task = { + 'info_json_path': str(final_path), + 'video_id': video_id, + 'url': url, + 'auth_profile_name': profile_name, + 'proxy_url': proxy_url, + 'auth_env': env_name, + } + + added_count = state_manager.add_download_tasks_batch([download_task]) + if added_count > 0: + logger.info(f"[Worker {worker_id}] [{profile_name}] Added {added_count} download task(s) to queue for {video_id}") + + except Exception as e: + logger.error(f"[Worker {worker_id}] [{profile_name}] Failed to create download task for {video_id}: {e}", exc_info=True) + return True except (IOError, json.JSONDecodeError, OSError) as e: logger.error(f"[Worker {worker_id}] Error post-processing '{file_path.name}' (video: {video_id}): {e}") @@ -1361,6 +1387,7 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man exec_control = policy.get('execution_control', {}) gen_policy = policy.get('info_json_generation_policy', {}) direct_policy = policy.get('direct_docker_cli_policy', {}) + queue_policy = policy.get('queue_policy') profile_prefix = gen_policy.get('profile_prefix') if not profile_prefix: @@ -1368,8 +1395,11 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man return [] batch_size = direct_policy.get('batch_size') + if not batch_size and queue_policy: + batch_size = queue_policy.get('batch_size') + if not batch_size: - logger.error(f"[Worker {worker_id}] Direct docker mode requires 'direct_docker_cli_policy.batch_size'. Worker exiting.") + logger.error(f"[Worker {worker_id}] Direct docker mode requires 'batch_size' in 'direct_docker_cli_policy' or 'queue_policy'. Worker exiting.") return [] save_dir = settings.get('save_info_json_dir') @@ -1401,6 +1431,7 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man while not state_manager.shutdown_event.is_set(): locked_profile = None temp_task_dir_host = None + task_batch = [] # --- Variables for robust finalization --- live_success_count = 0 url_batch_len = 0 @@ -1490,10 +1521,33 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man user_agent = sp_utils.generate_user_agent_from_policy(policy) # 2. Get a batch of URLs - url_batch, start_idx = state_manager.get_next_url_batch(batch_size, urls_list) + url_batch = [] + start_idx = 0 + + if not queue_policy: + url_batch, start_idx = state_manager.get_next_url_batch(batch_size, urls_list) + else: + task_batch = state_manager.get_auth_tasks_batch(batch_size) + if task_batch: + # Mark tasks as in-progress + for task in task_batch: + task_id = task.get('id') or task.get('task_id') + if task_id: + state_manager.mark_auth_in_progress(task_id, owner_id) + url_batch = [task.get('url') for task in task_batch if task.get('url')] + if not url_batch: - logger.info(f"[Worker {worker_id}] No more URLs to process. Worker exiting.") - break + if not queue_policy: + logger.info(f"[Worker {worker_id}] No more URLs to process. Worker exiting.") + break # Exit the while loop + else: + # Queue mode: no tasks, unlock and poll + polling_interval = exec_control.get('worker_polling_interval_seconds', 1) + logger.debug(f"[Worker {worker_id}] No tasks in queue for profile '{profile_name}'. Unlocking and sleeping for {polling_interval}s.") + profile_manager_instance.unlock_profile(profile_name, owner=owner_id) + locked_profile = None + time.sleep(polling_interval) + continue url_batch_len = len(url_batch) batch_started = True @@ -1722,7 +1776,7 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man if os.path.exists(host_file_path): _post_process_and_move_info_json( Path(host_file_path), profile_name, proxy_url, policy, worker_id, - profile_manager_instance=profile_manager_instance + state_manager, profile_manager_instance=profile_manager_instance ) else: logger.warning(f"File from log not found on host for immediate processing: {host_file_path}") @@ -1833,7 +1887,7 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man if processed_files: logger.info(f"[Worker {worker_id}] Found {len(processed_files)} leftover file(s) to process after live parsing.") for temp_path in processed_files: - _post_process_and_move_info_json(temp_path, profile_name, proxy_url, policy, worker_id, profile_manager_instance=profile_manager_instance) + _post_process_and_move_info_json(temp_path, profile_name, proxy_url, policy, worker_id, state_manager, profile_manager_instance=profile_manager_instance) # A batch is considered an overall success for logging if it had no fatal errors. # The per-URL activity has already been recorded live. @@ -1922,6 +1976,7 @@ def run_direct_docker_download_worker(worker_id, policy, state_manager, args, pr exec_control = policy.get('execution_control', {}) d_policy = policy.get('download_policy', {}) direct_policy = policy.get('direct_docker_cli_policy', {}) + queue_policy = policy.get('queue_policy') profile_prefix = d_policy.get('profile_prefix') if not profile_prefix: @@ -1949,13 +2004,16 @@ def run_direct_docker_download_worker(worker_id, policy, state_manager, args, pr no_task_streak = 0 last_used_profile_name = None + task_counter = 0 while not state_manager.shutdown_event.is_set(): locked_profile = None claimed_task_path_host = None temp_config_dir_host = None was_banned_by_parser = False + task = None + task_id = None try: - if no_task_streak > 0: + if no_task_streak > 0 and not queue_policy: # Polling only makes sense for file mode polling_interval = exec_control.get('worker_polling_interval_seconds', 1) # --- Add diagnostic logging --- all_profiles_in_pool = profile_manager_instance.list_profiles() @@ -1970,14 +2028,64 @@ def run_direct_docker_download_worker(worker_id, policy, state_manager, args, pr time.sleep(polling_interval) if state_manager.shutdown_event.is_set(): continue - # 1. Find a task and lock its associated profile - locked_profile, claimed_task_path_host = find_task_and_lock_profile( - profile_manager_instance, owner_id, profile_prefix, policy, worker_id - ) + # 1. Get a task + if not queue_policy: + # File-based mode: Find a task and lock its associated profile + locked_profile, claimed_task_path_host = find_task_and_lock_profile( + profile_manager_instance, owner_id, profile_prefix, policy, worker_id + ) + else: + # Queue-based mode + task = state_manager.get_download_task() + if task: + task_id = task.get('id') or task.get('task_id') + if not task_id: + task_id = f"dl_task_{worker_id}_{task_counter}" + task_counter += 1 + task['task_id'] = task_id + + info_json_path_str = task.get('info_json_path') + if not info_json_path_str or not os.path.exists(info_json_path_str): + logger.error(f"[Worker {worker_id}] Task {task_id} has invalid info_json_path: {info_json_path_str}. Skipping.") + state_manager.report_download_skipped(task_id, {"error": "Invalid info_json_path", "task": task}) + + auth_profile_name = task.get('auth_profile_name') + auth_env = task.get('auth_env') + if auth_profile_name and auth_env: + auth_manager = get_auth_manager(profile_manager_instance, auth_env) + if auth_manager: + auth_manager.decrement_pending_downloads(auth_profile_name) + continue + + claimed_task_path_host = Path(info_json_path_str) + + # Now lock a profile + specific_profile = task.get('auth_profile_name') or task.get('profile_name') + if specific_profile: + locked_profile = profile_manager_instance.lock_profile(owner=owner_id, specific_profile_name=specific_profile) + if not locked_profile: + logger.warning(f"[Worker {worker_id}] Could not lock specific profile '{specific_profile}'. Trying any profile with prefix.") + locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix) + else: + locked_profile = profile_manager_instance.lock_profile(owner=owner_id, profile_prefix=profile_prefix) + + if not locked_profile: + logger.warning(f"[Worker {worker_id}] No profiles available for task {task_id}. Re-queueing.") + state_manager.add_download_tasks_batch([task]) + claimed_task_path_host = None + task = None + + if task: + state_manager.mark_download_in_progress(task_id, owner_id) if not locked_profile: - no_task_streak += 1 - # The main loop will pause if the streak continues. + if not queue_policy: + no_task_streak += 1 + else: + # In queue mode, if we didn't get a task or a profile, we just poll. + polling_interval = exec_control.get('worker_polling_interval_seconds', 1) + logger.debug(f"[Worker {worker_id}] No download tasks or profiles available. Sleeping for {polling_interval}s.") + time.sleep(polling_interval) continue profile_name = locked_profile['name'] @@ -2318,15 +2426,27 @@ def run_direct_docker_download_worker(worker_id, policy, state_manager, args, pr logger.info(f"[Worker {worker_id}] [{profile_name}] Task processing complete. Worker will now unlock profile and attempt next task.") - # 6. Clean up task file by renaming to .processed - try: - # The claimed_task_path_host has a .LOCKED suffix, remove it before adding .processed - base_path_str = str(claimed_task_path_host).rsplit('.LOCKED.', 1)[0] - processed_path = Path(f"{base_path_str}.processed") - claimed_task_path_host.rename(processed_path) - logger.debug(f"[{sp_utils.get_display_name(claimed_task_path_host)}] Renamed processed task file to '{processed_path.name}'.") - except (OSError, IndexError) as e: - logger.error(f"Failed to rename processed task file '{claimed_task_path_host}': {e}") + # 6. Clean up task file + if not queue_policy: + # File-based mode: rename to .processed + try: + # The claimed_task_path_host has a .LOCKED suffix, remove it before adding .processed + base_path_str = str(claimed_task_path_host).rsplit('.LOCKED.', 1)[0] + processed_path = Path(f"{base_path_str}.processed") + claimed_task_path_host.rename(processed_path) + logger.debug(f"[{sp_utils.get_display_name(claimed_task_path_host)}] Renamed processed task file to '{processed_path.name}'.") + except (OSError, IndexError) as e: + logger.error(f"Failed to rename processed task file '{claimed_task_path_host}': {e}") + elif d_policy.get('rename_source_info_json_on_success'): + # Queue-based mode: respect rename policy + source_path_to_rename = task.get('info_json_path') + if success and source_path_to_rename and os.path.exists(source_path_to_rename): + try: + processed_path = source_path_to_rename + ".processed" + shutil.move(source_path_to_rename, processed_path) + logger.info(f"[Worker {worker_id}] Renamed source info.json to '{processed_path}'") + except Exception as e: + logger.warning(f"[Worker {worker_id}] Could not rename source info.json '{source_path_to_rename}': {e}") # After this point, claimed_task_path_host is no longer valid. # The metadata has already been read into auth_profile_name and auth_env. else: @@ -2344,7 +2464,7 @@ def run_direct_docker_download_worker(worker_id, policy, state_manager, args, pr # The auth_profile_name and auth_env variables were populated in the `try` block # before the task file was renamed or deleted. if auth_profile_name and auth_env: - auth_manager = _get_auth_manager(profile_manager_instance, auth_env) + auth_manager = get_auth_manager(profile_manager_instance, auth_env) if auth_manager: auth_manager.decrement_pending_downloads(auth_profile_name) else: @@ -2403,9 +2523,14 @@ def run_direct_docker_download_worker(worker_id, policy, state_manager, args, pr owner=owner_id, rest_for_seconds=cooldown ) - if claimed_task_path_host and os.path.exists(claimed_task_path_host): + if not queue_policy and claimed_task_path_host and os.path.exists(claimed_task_path_host): + # Clean up .LOCKED file in file-based mode try: os.remove(claimed_task_path_host) except OSError: pass + + if task and task_id: + state_manager.remove_download_in_progress(task_id) + if temp_config_dir_host and os.path.exists(temp_config_dir_host): try: shutil.rmtree(temp_config_dir_host) @@ -2612,7 +2737,7 @@ def run_direct_download_worker(worker_id, policy, state_manager, args, profile_m # The auth_profile_name and auth_env variables were populated in the `try` block # before the task file was deleted. if auth_profile_name and auth_env: - auth_manager = _get_auth_manager(profile_manager_instance, auth_env) + auth_manager = get_auth_manager(profile_manager_instance, auth_env) if auth_manager: auth_manager.decrement_pending_downloads(auth_profile_name) else: diff --git a/ytops_client-source/ytops_client/stress_policy_tool.py b/ytops_client-source/ytops_client/stress_policy_tool.py index ef8a751..6931b91 100644 --- a/ytops_client-source/ytops_client/stress_policy_tool.py +++ b/ytops_client-source/ytops_client/stress_policy_tool.py @@ -615,48 +615,72 @@ def main_stress_policy(args): workers = exec_control.get('workers', 1) if mode == 'fetch_only': - urls_file = settings.get('urls_file') - if not urls_file: - logger.error("Direct docker CLI (fetch) mode requires 'settings.urls_file'.") - return 1 - - try: - with open(urls_file, 'r', encoding='utf-8') as f: - urls_list = [line.strip() for line in f if line.strip()] - except IOError as e: - logger.error(f"Could not read urls_file '{urls_file}': {e}") - return 1 + queue_policy = policy.get('queue_policy') + urls_list = [] # Default to empty for queue mode - if not urls_list: - logger.error(f"URL file '{urls_file}' is empty. Nothing to do.") - return 1 + if not queue_policy: + urls_file = settings.get('urls_file') + if not urls_file: + logger.error("Direct docker CLI (fetch) mode requires 'settings.urls_file' if not configured for queue operation via 'queue_policy'.") + return 1 + + try: + with open(urls_file, 'r', encoding='utf-8') as f: + urls_list = [line.strip() for line in f if line.strip()] + except IOError as e: + logger.error(f"Could not read urls_file '{urls_file}': {e}") + return 1 - start_index = 0 - if args.start_from_url_index is not None: - start_index = max(0, args.start_from_url_index - 1) - state_manager.update_last_url_index(start_index, force=True) - else: - start_index = state_manager.get_last_url_index() - - if start_index >= len(urls_list) and len(urls_list) > 0: - logger.warning("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") - logger.warning("!!! ALL URLS HAVE BEEN PROCESSED IN PREVIOUS RUNS (based on state file) !!!") - logger.warning(f"!!! State file indicates start index {start_index + 1}, but URL file has only {len(urls_list)} URLs. !!!") - logger.warning("!!! Deleting state file and stopping. Please run the command again to start from the beginning. !!!") - logger.warning("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") - if not args.dry_run and not args.disable_log_writing: - state_manager.close() - try: - os.remove(state_manager.state_file_path) - logger.info(f"Deleted state file: {state_manager.state_file_path}") - except OSError as e: - logger.error(f"Failed to delete state file: {e}") + if not urls_list: + logger.error(f"URL file '{urls_file}' is empty. Nothing to do.") + return 1 + + start_index = 0 + if args.start_from_url_index is not None: + start_index = max(0, args.start_from_url_index - 1) + state_manager.update_last_url_index(start_index, force=True) else: - logger.info("[Dry Run] Would have deleted state file and stopped.") - return 0 + start_index = state_manager.get_last_url_index() + + if start_index >= len(urls_list) and len(urls_list) > 0: + logger.warning("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") + logger.warning("!!! ALL URLS HAVE BEEN PROCESSED IN PREVIOUS RUNS (based on state file) !!!") + logger.warning(f"!!! State file indicates start index {start_index + 1}, but URL file has only {len(urls_list)} URLs. !!!") + logger.warning("!!! Deleting state file and stopping. Please run the command again to start from the beginning. !!!") + logger.warning("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") + if not args.dry_run and not args.disable_log_writing: + state_manager.close() + try: + os.remove(state_manager.state_file_path) + logger.info(f"Deleted state file: {state_manager.state_file_path}") + except OSError as e: + logger.error(f"Failed to delete state file: {e}") + else: + logger.info("[Dry Run] Would have deleted state file and stopped.") + return 0 - if start_index > 0: - logger.info(f"Starting/resuming from URL index {start_index + 1}.") + if start_index > 0: + logger.info(f"Starting/resuming from URL index {start_index + 1}.") + else: + logger.info("Direct docker CLI (fetch) mode is running in QUEUE mode.") + # Initialize queue provider + redis_host = args.redis_host or os.getenv('REDIS_HOST') or queue_policy.get('redis_host') or 'localhost' + redis_port = args.redis_port if args.redis_port is not None else (int(os.getenv('REDIS_PORT')) if os.getenv('REDIS_PORT') else (queue_policy.get('redis_port') or 6379)) + redis_password = args.redis_password or os.getenv('REDIS_PASSWORD') or queue_policy.get('redis_password') + redis_db = args.redis_db if args.redis_db is not None else (int(os.getenv('REDIS_DB')) if os.getenv('REDIS_DB') else (queue_policy.get('redis_db') or 0)) + + use_env_prefix = queue_policy.get('use_env_prefix', True) + env_prefix = None + if use_env_prefix: + env_prefix = profile_manager_instance.key_prefix.removesuffix('_profile_mgmt_') + + state_manager.initialize_queue_provider( + redis_host=redis_host, + redis_port=redis_port, + redis_password=redis_password, + redis_db=redis_db, + env_prefix=env_prefix + ) sp_utils.display_effective_policy(policy, policy_name, sources=urls_list) if args.dry_run: return 0 @@ -671,15 +695,37 @@ def main_stress_policy(args): logger.info("Shutdown signal received, workers have finished.") elif mode == 'download_only': - info_json_dir = settings.get('info_json_dir') - if not info_json_dir: - logger.error("Direct docker CLI (download) mode requires 'settings.info_json_dir'.") - return 1 - try: - os.makedirs(info_json_dir, exist_ok=True) - except OSError as e: - logger.error(f"Failed to create info.json directory '{info_json_dir}': {e}") - return 1 + queue_policy = policy.get('queue_policy') + if not queue_policy: + info_json_dir = settings.get('info_json_dir') + if not info_json_dir: + logger.error("Direct docker CLI (download) mode requires 'settings.info_json_dir' if not configured for queue operation.") + return 1 + try: + os.makedirs(info_json_dir, exist_ok=True) + except OSError as e: + logger.error(f"Failed to create info.json directory '{info_json_dir}': {e}") + return 1 + else: + logger.info("Direct docker CLI (download) mode is running in QUEUE mode.") + # Initialize queue provider + redis_host = args.redis_host or os.getenv('REDIS_HOST') or queue_policy.get('redis_host') or 'localhost' + redis_port = args.redis_port if args.redis_port is not None else (int(os.getenv('REDIS_PORT')) if os.getenv('REDIS_PORT') else (queue_policy.get('redis_port') or 6379)) + redis_password = args.redis_password or os.getenv('REDIS_PASSWORD') or queue_policy.get('redis_password') + redis_db = args.redis_db if args.redis_db is not None else (int(os.getenv('REDIS_DB')) if os.getenv('REDIS_DB') else (queue_policy.get('redis_db') or 0)) + + use_env_prefix = queue_policy.get('use_env_prefix', True) + env_prefix = None + if use_env_prefix: + env_prefix = profile_manager_instance.key_prefix.removesuffix('_profile_mgmt_') + + state_manager.initialize_queue_provider( + redis_host=redis_host, + redis_port=redis_port, + redis_password=redis_password, + redis_db=redis_db, + env_prefix=env_prefix + ) sp_utils.display_effective_policy(policy, policy_name, sources=[]) if args.dry_run: return 0