diff --git a/ytops_client-source/policies/10_direct_docker_auth_simulation.yaml b/ytops_client-source/policies/10_direct_docker_auth_simulation.yaml index f0dac53..aaaa0d6 100644 --- a/ytops_client-source/policies/10_direct_docker_auth_simulation.yaml +++ b/ytops_client-source/policies/10_direct_docker_auth_simulation.yaml @@ -66,8 +66,10 @@ direct_docker_cli_policy: # Template for generating User-Agent strings for new profiles. # The '{major_version}' will be replaced by a version string. user_agent_template: "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{major_version}.0.0.0 Safari/537.36" - # Range of Chrome major versions to use for the template. A range suitable for TV devices. - user_agent_version_range: [110, 120] + # Range of Chrome major versions to use for the template. + # See CHROME_MAJOR_VERSION_RANGE in yt-dlp's random_user_agent(): + # https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/utils/networking.py + user_agent_version_range: [137, 143] batch_size: 25 diff --git a/ytops_client-source/policies/12_queue_auth_simulation.yaml b/ytops_client-source/policies/12_queue_auth_simulation.yaml index 6d48b60..50f2a8f 100644 --- a/ytops_client-source/policies/12_queue_auth_simulation.yaml +++ b/ytops_client-source/policies/12_queue_auth_simulation.yaml @@ -61,8 +61,10 @@ direct_docker_cli_policy: # Template for generating User-Agent strings for new profiles. # The '{major_version}' will be replaced by a version string. user_agent_template: "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{major_version}.0.0.0 Safari/537.36" - # Range of Chrome major versions to use for the template. A range suitable for TV devices. - user_agent_version_range: [110, 120] + # Range of Chrome major versions to use for the template. + # See CHROME_MAJOR_VERSION_RANGE in yt-dlp's random_user_agent(): + # https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/utils/networking.py + user_agent_version_range: [137, 143] # A base config file can be used, with overrides applied from the policy. # The orchestrator will inject 'proxy', 'batch-file', and 'output' keys into the overrides. diff --git a/ytops_client-source/ytops_client/stress_policy/workers.py b/ytops_client-source/ytops_client/stress_policy/workers.py index e4ea2cc..1c74b20 100644 --- a/ytops_client-source/ytops_client/stress_policy/workers.py +++ b/ytops_client-source/ytops_client/stress_policy/workers.py @@ -943,9 +943,13 @@ def _post_process_and_move_info_json(file_path, profile_name, proxy_url, policy, added_count = state_manager.add_download_tasks_batch([download_task]) if added_count > 0: logger.info(f"[Worker {worker_id}] [{profile_name}] Added {added_count} download task(s) to queue for {video_id}") + else: + logger.error(f"[Worker {worker_id}] [{profile_name}] Failed to add download task to queue for {video_id}") + return False except Exception as e: logger.error(f"[Worker {worker_id}] [{profile_name}] Failed to create download task for {video_id}: {e}", exc_info=True) + return False return True except (IOError, json.JSONDecodeError, OSError) as e: @@ -1033,6 +1037,7 @@ def run_direct_batch_worker(worker_id, policy, state_manager, args, profile_mana exec_control = policy.get('execution_control', {}) gen_policy = policy.get('info_json_generation_policy', {}) direct_policy = policy.get('direct_batch_cli_policy', {}) + queue_policy = policy.get('queue_policy') profile_prefix = gen_policy.get('profile_prefix') if not profile_prefix: @@ -1110,9 +1115,10 @@ def run_direct_batch_worker(worker_id, policy, state_manager, args, profile_mana url_batch_len = len(url_batch) batch_started = True - # Preemptively increment the counter to avoid race conditions with download workers. - profile_manager_instance.increment_pending_downloads(profile_name, url_batch_len) - logger.info(f"[Worker {worker_id}] [{profile_name}] Preemptively incremented pending downloads by {url_batch_len} for the upcoming batch.") + # Preemptively increment the counter if we expect to create download tasks. + if queue_policy and queue_policy.get('formats_to_download'): + profile_manager_instance.increment_pending_downloads(profile_name, url_batch_len) + logger.info(f"[Worker {worker_id}] [{profile_name}] Preemptively incremented pending downloads by {url_batch_len} for the upcoming batch.") end_idx = start_idx + len(url_batch) logger.info(f"[Worker {worker_id}] [{profile_name}] Processing batch of {len(url_batch)} URLs (lines {start_idx + 1}-{end_idx} from source).") @@ -1564,8 +1570,8 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man task_dir_name = os.path.basename(temp_task_dir_host) task_dir_container = os.path.join(container_mount_path, task_dir_name) - # Set XDG_CONFIG_HOME for yt-dlp to find the config automatically - environment['XDG_CONFIG_HOME'] = task_dir_container + # The config file path is passed explicitly to yt-dlp via --config-locations, + # so setting XDG_CONFIG_HOME in the environment is redundant. # Write batch file temp_batch_file_host = os.path.join(temp_task_dir_host, 'batch.txt') @@ -1701,8 +1707,7 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man 'mode': 'rw' } - # The command tells yt-dlp where to find the config file we created. - # We still set XDG_CONFIG_HOME for any other config it might look for. + # The command tells yt-dlp exactly where to find the config file we created. command = ['yt-dlp', '--config-locations', os.path.join(task_dir_container, 'yt-dlp/config')] logger.info(f"[Worker {worker_id}] [{profile_name}] Running docker command: {' '.join(shlex.quote(s) for s in command)}") @@ -1749,11 +1754,7 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man # Success is the highest priority check if '[info] Writing video metadata as JSON to:' in line: - with activity_lock: - live_success_count += 1 - logger.info(f"[Worker {worker_id}] [{profile_name}] Live success #{live_success_count} detected from log.") - profile_manager_instance.record_activity(profile_name, 'success') - + post_processed_successfully = False # --- Immediate post-processing --- try: path_match = re.search(r"Writing video metadata as JSON to: '?([^']+)'?$", line) @@ -1774,7 +1775,7 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man time.sleep(0.1) if os.path.exists(host_file_path): - _post_process_and_move_info_json( + post_processed_successfully = _post_process_and_move_info_json( Path(host_file_path), profile_name, proxy_url, policy, worker_id, state_manager, profile_manager_instance=profile_manager_instance ) @@ -1782,6 +1783,16 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man logger.warning(f"File from log not found on host for immediate processing: {host_file_path}") except Exception as e: logger.error(f"Error during immediate post-processing from log line: {e}") + + with activity_lock: + if post_processed_successfully: + live_success_count += 1 + logger.info(f"[Worker {worker_id}] [{profile_name}] Live success #{live_success_count} detected and post-processed.") + profile_manager_instance.record_activity(profile_name, 'success') + else: + live_failure_count += 1 + logger.error(f"[Worker {worker_id}] [{profile_name}] Post-processing failed for a successful fetch. Recording as failure.") + profile_manager_instance.record_activity(profile_name, 'failure') # --- End immediate post-processing --- return False @@ -1947,11 +1958,12 @@ def run_direct_docker_worker(worker_id, policy, state_manager, args, profile_man finally: if locked_profile and batch_started: # --- Reconcile pending downloads counter --- - # This is in the finally block to guarantee it runs even if post-processing fails. - adjustment = live_success_count - url_batch_len - if adjustment != 0: - logger.warning(f"[Worker {worker_id}] [{profile_name}] Reconciling pending downloads. Batch created {live_success_count}/{url_batch_len} files. Adjusting by {adjustment}.") - profile_manager_instance.increment_pending_downloads(locked_profile['name'], adjustment) + if queue_policy and queue_policy.get('formats_to_download'): + # This is in the finally block to guarantee it runs even if post-processing fails. + adjustment = live_success_count - url_batch_len + if adjustment != 0: + logger.warning(f"[Worker {worker_id}] [{profile_name}] Reconciling pending downloads. Batch created {live_success_count}/{url_batch_len} files. Adjusting by {adjustment}.") + profile_manager_instance.increment_pending_downloads(locked_profile['name'], adjustment) if locked_profile: last_used_profile_name = locked_profile['name'] @@ -2199,7 +2211,7 @@ def run_direct_docker_download_worker(worker_id, policy, state_manager, args, pr config_dir_name = os.path.basename(temp_config_dir_host) config_dir_container = os.path.join(container_mount_path, config_dir_name) - environment = {'XDG_CONFIG_HOME': config_dir_container} + environment = {} base_config_content = "" base_config_file = direct_policy.get('ytdlp_config_file') @@ -2243,8 +2255,7 @@ def run_direct_docker_download_worker(worker_id, policy, state_manager, args, pr os.path.abspath(host_mount_path): {'bind': container_mount_path, 'mode': 'ro'}, os.path.abspath(host_download_path): {'bind': container_download_path, 'mode': 'rw'} } - # The command tells yt-dlp where to find the config file we created. - # We still set XDG_CONFIG_HOME for any other config it might look for. + # The command tells yt-dlp exactly where to find the config file we created. command = ['yt-dlp', '--config-locations', os.path.join(config_dir_container, 'yt-dlp/config')] logger.info(f"[Worker {worker_id}] [{profile_name}] Running docker command: {' '.join(shlex.quote(s) for s in command)}")