From 5479e8c8f82e7d3a5a7ed9dd949d78c9aa12c141 Mon Sep 17 00:00:00 2001 From: aperez Date: Sat, 27 Dec 2025 16:57:26 +0300 Subject: [PATCH] Updates over task generator usage --- .../11_direct_docker_download_simulation.yaml | 4 + .../ytops_client/stress_policy/workers.py | 84 +++++-- .../ytops_client/task_generator_tool.py | 207 ++++++++++++------ 3 files changed, 207 insertions(+), 88 deletions(-) diff --git a/ytops_client-source/policies/11_direct_docker_download_simulation.yaml b/ytops_client-source/policies/11_direct_docker_download_simulation.yaml index 811b684..6a87f42 100644 --- a/ytops_client-source/policies/11_direct_docker_download_simulation.yaml +++ b/ytops_client-source/policies/11_direct_docker_download_simulation.yaml @@ -27,6 +27,10 @@ execution_control: download_policy: profile_prefix: "user1" + # A comma-separated list of format IDs to download for each info.json. + # This is used by the dummy mode simulation to test per-format downloads. + # In non-dummy mode, the format selector in ytdlp_config_overrides is used. + formats: "140-dashy,299-dashy" # Default cooldown in seconds if not specified by the enforcer in Redis. # The value from Redis (set via `unlock_cooldown_seconds` in the enforcer policy) # will always take precedence. This is a fallback. diff --git a/ytops_client-source/ytops_client/stress_policy/workers.py b/ytops_client-source/ytops_client/stress_policy/workers.py index 1c74b20..ef10ddf 100644 --- a/ytops_client-source/ytops_client/stress_policy/workers.py +++ b/ytops_client-source/ytops_client/stress_policy/workers.py @@ -977,7 +977,7 @@ def find_task_and_lock_profile(profile_manager, owner_id, profile_prefix, policy # 2. Get all available task files. try: - task_files = list(Path(info_json_dir).glob('*.json')) + task_files = list(Path(info_json_dir).rglob('*.json')) except FileNotFoundError: logger.warning(f"Info JSON directory not found during scan: {info_json_dir}") return None, None @@ -2124,33 +2124,77 @@ def run_direct_docker_download_worker(worker_id, policy, state_manager, args, pr continue # Skip to finally block to unlock profile if args.dummy or args.dummy_batch: - logger.info(f"========== [Worker {worker_id}] BEGIN DUMMY DOCKER DOWNLOAD ==========") + logger.info(f"========== [Worker {worker_id}] BEGIN DUMMY DOCKER DOWNLOAD PER-FORMAT SIMULATION ==========") logger.info(f"[Worker {worker_id}] Profile: {profile_name} | Task: {claimed_task_path_host.name}") - + dummy_settings = policy.get('settings', {}).get('dummy_simulation_settings', {}) min_seconds = dummy_settings.get('download_min_seconds', 1.0) max_seconds = dummy_settings.get('download_max_seconds', 3.0) failure_rate = dummy_settings.get('download_failure_rate', 0.0) skipped_rate = dummy_settings.get('download_skipped_failure_rate', 0.0) - time.sleep(random.uniform(min_seconds, max_seconds)) + # In dummy mode, prioritize the format from the task file, then from the policy. + format_selection = info_data.get('_ytops_download_format') + source_of_format = "task file" + if not format_selection: + format_selection = d_policy.get('formats', '') + source_of_format = "policy" - rand_val = random.random() - should_fail_skipped = rand_val < skipped_rate - should_fail_fatal = not should_fail_skipped and rand_val < (skipped_rate + failure_rate) - - if should_fail_skipped: - logger.warning(f"[Worker {worker_id}] DUMMY: Simulating skipped download failure.") - profile_manager_instance.record_activity(profile_name, 'tolerated_error') - elif should_fail_fatal: - logger.warning(f"[Worker {worker_id}] DUMMY: Simulating fatal download failure.") - profile_manager_instance.record_activity(profile_name, 'download_error') + if not format_selection: + logger.warning(f"[Worker {worker_id}] DUMMY: No format specified in task file or policy. Simulating a single download.") + formats_to_test = ['dummy_format'] else: - logger.info(f"[Worker {worker_id}] DUMMY: Simulating download success.") - profile_manager_instance.record_activity(profile_name, 'download') - - logger.info(f"========== [Worker {worker_id}] END DUMMY DOCKER DOWNLOAD ==========") - + formats_to_test = [f.strip() for f in format_selection.split(',') if f.strip()] + logger.info(f"[Worker {worker_id}] DUMMY: Simulating downloads for formats (from {source_of_format}): {', '.join(formats_to_test)}") + + for format_id in formats_to_test: + if state_manager.shutdown_event.is_set(): + logger.info(f"[Worker {worker_id}] DUMMY: Shutdown requested, stopping format simulation.") + break + + logger.info(f"[Worker {worker_id}] DUMMY: Simulating download for format '{format_id}'...") + time.sleep(random.uniform(min_seconds, max_seconds)) + + rand_val = random.random() + should_fail_skipped = rand_val < skipped_rate + should_fail_fatal = not should_fail_skipped and rand_val < (skipped_rate + failure_rate) + + success = False + details = "" + error_type = None + is_tolerated_error = False + + if should_fail_skipped: + logger.warning(f"[Worker {worker_id}] DUMMY: Simulating skipped download failure for format '{format_id}'.") + profile_manager_instance.record_activity(profile_name, 'tolerated_error') + details = f"Dummy skipped failure for format {format_id}" + error_type = "DummySkippedFailure" + is_tolerated_error = True + elif should_fail_fatal: + logger.warning(f"[Worker {worker_id}] DUMMY: Simulating fatal download failure for format '{format_id}'.") + profile_manager_instance.record_activity(profile_name, 'download_error') + details = f"Dummy fatal failure for format {format_id}" + error_type = "DummyFailure" + else: + logger.info(f"[Worker {worker_id}] DUMMY: Simulating download success for format '{format_id}'.") + profile_manager_instance.record_activity(profile_name, 'download') + success = True + details = f"Dummy success for format {format_id}" + + event = { + 'type': 'direct_docker_download', + 'profile': profile_name, + 'proxy_url': locked_profile['proxy'], + 'success': success, + 'details': details, + 'error_type': error_type, + 'is_tolerated_error': is_tolerated_error, + 'format': format_id + } + state_manager.log_event(event) + + logger.info(f"========== [Worker {worker_id}] END DUMMY DOCKER DOWNLOAD SIMULATION ==========") + # In dummy mode, we just rename the file to processed and continue to the finally block. try: base_path_str = str(claimed_task_path_host).rsplit('.LOCKED.', 1)[0] @@ -2159,7 +2203,7 @@ def run_direct_docker_download_worker(worker_id, policy, state_manager, args, pr logger.debug(f"DUMMY MODE: Renamed processed task file to '{processed_path.name}'.") except (OSError, IndexError) as e: logger.error(f"DUMMY MODE: Failed to rename processed task file '{claimed_task_path_host}': {e}") - + continue # Skip to finally block # --- Check for URL expiration before running Docker --- diff --git a/ytops_client-source/ytops_client/task_generator_tool.py b/ytops_client-source/ytops_client/task_generator_tool.py index cec933c..926a17f 100644 --- a/ytops_client-source/ytops_client/task_generator_tool.py +++ b/ytops_client-source/ytops_client/task_generator_tool.py @@ -7,7 +7,9 @@ import json import logging import os import re +import signal import sys +import time from pathlib import Path # Configure logging @@ -17,6 +19,13 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) +# Graceful shutdown handler +shutdown_event = False +def handle_shutdown(sig, frame): + global shutdown_event + logger.info("Shutdown signal received. Stopping task generator...") + shutdown_event = True + def sanitize_format_for_filename(format_str: str) -> str: """Sanitizes a format selector string to be filesystem-friendly.""" # Replace common problematic characters with underscores @@ -45,6 +54,9 @@ def add_task_generator_parser(subparsers): gen_parser.add_argument('--source-dir', required=True, help='Directory containing the source info.json files.') gen_parser.add_argument('--output-dir', required=True, help='Directory where the generated task files will be saved.') gen_parser.add_argument('--formats', required=True, help='A comma-separated list of format IDs or selectors to generate tasks for (e.g., "18,140,bestvideo").') + gen_parser.add_argument('--live', action='store_true', help='Run continuously, watching the source directory for new files.') + gen_parser.add_argument('--interval-seconds', type=int, default=10, help='When in --live mode, how often to scan for new files.') + gen_parser.add_argument('--dummy', action='store_true', help='Generate dummy task files without reading info.json content. Useful for testing download workers.') gen_parser.add_argument('--verbose', action='store_true', help='Enable verbose logging.') reset_parser = generate_subparsers.add_parser( @@ -112,10 +124,93 @@ def main_task_generator(args): return 1 +def _generate_tasks_for_file(source_file, output_dir, formats_to_generate, is_dummy_mode): + """Helper function to generate task files for a single source info.json.""" + try: + info_json_content = {} + if is_dummy_mode: + # In dummy mode, we don't read the file content. We create a minimal structure. + # We try to parse the filename to get video_id and profile_name for organization. + # Example filename: {video_id}-{profile_name}-{proxy}.info.json + parts = source_file.stem.split('-') + video_id = parts[0] if parts else 'dummy_video' + profile_name = next((p for p in parts if p.startswith('user')), None) + + info_json_content = { + 'id': video_id, + '_dummy': True, + '_ytops_metadata': { + 'profile_name': profile_name + } + } + logger.debug(f"DUMMY MODE: Generating tasks for source file: {source_file.name}") + else: + with open(source_file, 'r', encoding='utf-8') as f: + info_json_content = json.load(f) + except (IOError, json.JSONDecodeError) as e: + logger.warning(f"Skipping file '{source_file.name}' due to read/parse error: {e}") + return 0 + except Exception as e: + logger.error(f"An unexpected error occurred while processing '{source_file.name}': {e}") + return 0 + + tasks_generated_this_file = 0 + try: + # Use metadata to create a profile-specific subdirectory for better organization. + profile_name_from_meta = info_json_content.get('_ytops_metadata', {}).get('profile_name') + final_output_dir = output_dir + if profile_name_from_meta: + final_output_dir = output_dir / profile_name_from_meta + # Ensure subdirectory exists. This is done once per source file. + try: + final_output_dir.mkdir(parents=True, exist_ok=True) + except OSError as e: + logger.error(f"Could not create profile subdirectory '{final_output_dir}': {e}. Skipping tasks for this source file.") + return 0 + + for format_str in formats_to_generate: + task_data = info_json_content.copy() + # Add the target format to the task data itself. This makes the task file self-contained. + task_data['_ytops_download_format'] = format_str + + # Create a unique filename for the task + original_stem = source_file.stem + safe_format_str = sanitize_format_for_filename(format_str) + task_filename = f"{original_stem}-format-{safe_format_str}.json" + output_path = final_output_dir / task_filename + + # Check if this specific task file already exists to avoid re-writing + if output_path.exists(): + logger.debug(f"Task file already exists, skipping generation: {output_path}") + continue + + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(task_data, f, indent=2) + logger.debug(f"Generated task file: {output_path}") + tasks_generated_this_file += 1 + + # Mark source file as processed now that we've iterated through all formats for it. + try: + processed_path = source_file.with_suffix(f"{source_file.suffix}.processed") + source_file.rename(processed_path) + logger.debug(f"Marked '{source_file.name}' as processed.") + except (IOError, OSError) as e: + logger.error(f"Failed to mark source file '{source_file.name}' as processed: {e}") + + except IOError as e: + logger.error(f"An I/O error occurred while generating tasks for '{source_file.name}': {e}. It will be retried on the next run.") + # The file is not renamed, so it will be picked up again + + return tasks_generated_this_file + + def _main_task_generator_generate(args): if args.verbose: logging.getLogger().setLevel(logging.DEBUG) + signal.signal(signal.SIGINT, handle_shutdown) + signal.signal(signal.SIGTERM, handle_shutdown) + source_dir = Path(args.source_dir) output_dir = Path(args.output_dir) formats_to_generate = [f.strip() for f in args.formats.split(',') if f.strip()] @@ -130,74 +225,50 @@ def _main_task_generator_generate(args): logger.error(f"Could not create output directory '{output_dir}': {e}") return 1 - logger.info(f"Scanning for info.json files in '{source_dir}' (recursively)...") - source_files = list(source_dir.rglob('*.json')) - - if not source_files: - logger.info(f"No .json files found in '{source_dir}'. Nothing to do.") - return 0 - - logger.info(f"Found {len(source_files)} source file(s). Generating tasks for formats: {', '.join(formats_to_generate)}...") - - total_tasks_generated = 0 - for source_file in source_files: - try: - with open(source_file, 'r', encoding='utf-8') as f: - info_json_content = json.load(f) - except (IOError, json.JSONDecodeError) as e: - logger.warning(f"Skipping file '{source_file.name}' due to read/parse error: {e}") - continue - - try: - tasks_generated_this_run = 0 - - # Use metadata to create a profile-specific subdirectory for better organization. - profile_name_from_meta = info_json_content.get('_ytops_metadata', {}).get('profile_name') - final_output_dir = output_dir - if profile_name_from_meta: - final_output_dir = output_dir / profile_name_from_meta - # Ensure subdirectory exists. This is done once per source file. - try: - final_output_dir.mkdir(parents=True, exist_ok=True) - except OSError as e: - logger.error(f"Could not create profile subdirectory '{final_output_dir}': {e}. Skipping tasks for this source file.") - continue - - for format_str in formats_to_generate: - task_data = info_json_content.copy() - # Add the target format to the task data itself. This makes the task file self-contained. - task_data['_ytops_download_format'] = format_str - - # Create a unique filename for the task - original_stem = source_file.stem - safe_format_str = sanitize_format_for_filename(format_str) - task_filename = f"{original_stem}-format-{safe_format_str}.json" - output_path = final_output_dir / task_filename - - # Check if this specific task file already exists to avoid re-writing - if output_path.exists(): - logger.debug(f"Task file already exists, skipping generation: {output_path}") - continue - - with open(output_path, 'w', encoding='utf-8') as f: - json.dump(task_data, f, indent=2) - logger.debug(f"Generated task file: {output_path}") - tasks_generated_this_run += 1 - - if tasks_generated_this_run > 0: - total_tasks_generated += tasks_generated_this_run - - # Mark source file as processed by renaming - try: - processed_path = source_file.with_suffix(f"{source_file.suffix}.processed") - source_file.rename(processed_path) - logger.debug(f"Marked '{source_file.name}' as processed.") - except (IOError, OSError) as e: - logger.error(f"Failed to mark source file '{source_file.name}' as processed: {e}") + if not args.live: + logger.info(f"Scanning for info.json files in '{source_dir}' (recursively)...") + source_files = list(source_dir.rglob('*.json')) - except IOError as e: - logger.error(f"An I/O error occurred while generating tasks for '{source_file.name}': {e}. It will be retried on the next run.") - # The file is not renamed, so it will be picked up again + if not source_files: + logger.info(f"No .json files found in '{source_dir}'. Nothing to do.") + return 0 - logger.info(f"Successfully generated {total_tasks_generated} new task file(s) in '{output_dir}'.") + logger.info(f"Found {len(source_files)} source file(s). Generating tasks for formats: {', '.join(formats_to_generate)}...") + + total_tasks_generated = 0 + for source_file in source_files: + tasks_for_file = _generate_tasks_for_file(source_file, output_dir, formats_to_generate, args.dummy) + total_tasks_generated += tasks_for_file + + logger.info(f"Successfully generated {total_tasks_generated} new task file(s) in '{output_dir}'.") + return 0 + + # --- Live Mode --- + logger.info(f"Running in LIVE mode. Watching '{source_dir}' for new files every {args.interval_seconds}s. Press Ctrl+C to stop.") + total_tasks_generated = 0 + + while not shutdown_event: + try: + logger.debug("Live mode: Scanning for new source files...") + source_files = list(source_dir.rglob('*.json')) + + if not source_files: + logger.debug("Live mode: No source files found.") + else: + logger.info(f"Live mode: Found {len(source_files)} source file(s) to process.") + for source_file in source_files: + if shutdown_event: break + tasks_for_file = _generate_tasks_for_file(source_file, output_dir, formats_to_generate, args.dummy) + total_tasks_generated += tasks_for_file + + if shutdown_event: break + + logger.debug(f"Live mode: Scan complete. Sleeping for {args.interval_seconds}s...") + time.sleep(args.interval_seconds) + + except Exception as e: + logger.error(f"An unexpected error occurred in the live loop: {e}", exc_info=True) + time.sleep(5) # Pause before retrying to avoid spamming errors + + logger.info(f"Task generator stopped. Total tasks generated in this run: {total_tasks_generated}.") return 0