import collections import collections.abc import json import logging import re import threading import time from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any, Tuple, Union from . import utils as sp_utils from .queue_provider import QueueProvider, RedisQueueProvider logger = logging.getLogger(__name__) class StateManager: """Tracks statistics, manages rate limits, and persists state across runs.""" def __init__(self, policy_name, disable_log_writing=False, shutdown_event=None, queue_provider: Optional[QueueProvider] = None): self.disable_log_writing = disable_log_writing self.state_file_path = Path(f"{policy_name}_state.json") self.stats_file_path = Path(f"{policy_name}_stats.jsonl") self.lock = threading.RLock() self.start_time = time.time() self.shutdown_event = shutdown_event or threading.Event() self.events = [] self.state = { 'global_request_count': 0, 'rate_limit_trackers': {}, # e.g., {'per_ip': [ts1, ts2], 'profile_foo': [ts3, ts4]} 'profile_request_counts': {}, # for client rotation 'profile_last_refresh_time': {}, # for client rotation 'proxy_last_finish_time': {}, # for per-proxy sleep 'processed_files': [], # For continuous download_only mode # For dynamic profile cooldown strategy 'profile_cooldown_counts': {}, 'profile_cooldown_sleep_until': {}, 'profile_pool_size': 0, 'profile_run_suffix': None, 'worker_profile_generations': {}, 'last_url_index': 0, # For batch modes 'total_batches_processed': 0, 'successful_batches': 0, 'failed_batches': 0, 'total_videos_processed': 0, # For queue modes 'queue_stats': { 'auth': { 'total_processed': 0, 'successful': 0, 'failed': 0, 'skipped': 0 }, 'download': { 'total_processed': 0, 'successful': 0, 'failed': 0, 'skipped': 0 } } } self.stats_file_handle = None self.queue_provider = queue_provider self._load_state() self.print_historical_summary() self._open_stats_log() def _load_state(self): if self.disable_log_writing: logger.info("Log writing is disabled. State will not be loaded from disk.") return if not self.state_file_path.exists(): logger.info(f"State file not found at '{self.state_file_path}', starting fresh.") return try: with open(self.state_file_path, 'r', encoding='utf-8') as f: self.state = json.load(f) # Ensure keys exist self.state.setdefault('global_request_count', 0) self.state.setdefault('rate_limit_trackers', {}) self.state.setdefault('profile_request_counts', {}) self.state.setdefault('profile_last_refresh_time', {}) self.state.setdefault('proxy_last_finish_time', {}) self.state.setdefault('processed_files', []) # For dynamic profile cooldown strategy self.state.setdefault('profile_cooldown_counts', {}) self.state.setdefault('profile_cooldown_sleep_until', {}) self.state.setdefault('profile_pool_size', 0) self.state.setdefault('profile_run_suffix', None) self.state.setdefault('worker_profile_generations', {}) self.state.setdefault('last_url_index', 0) # For batch modes self.state.setdefault('total_batches_processed', 0) self.state.setdefault('successful_batches', 0) self.state.setdefault('failed_batches', 0) self.state.setdefault('total_videos_processed', 0) logger.info(f"Loaded state from {self.state_file_path}") except (IOError, json.JSONDecodeError) as e: logger.error(f"Could not load or parse state file {self.state_file_path}: {e}. Starting fresh.") def _save_state(self): if self.disable_log_writing: return with self.lock: try: with open(self.state_file_path, 'w', encoding='utf-8') as f: json.dump(self.state, f, indent=2) logger.info(f"Saved state to {self.state_file_path}") except IOError as e: logger.error(f"Could not save state to {self.state_file_path}: {e}") def _open_stats_log(self): if self.disable_log_writing: return try: self.stats_file_handle = open(self.stats_file_path, 'a', encoding='utf-8') except IOError as e: logger.error(f"Could not open stats file {self.stats_file_path}: {e}") def close(self): """Saves state and closes file handles.""" self._save_state() if self.stats_file_handle: self.stats_file_handle.close() self.stats_file_handle = None def mark_file_as_processed(self, file_path): """Adds a file path to the list of processed files in the state.""" with self.lock: # Using a list and checking for existence is fine for moderate numbers of files. # A set isn't JSON serializable. processed = self.state.setdefault('processed_files', []) file_str = str(file_path) if file_str not in processed: processed.append(file_str) def get_last_url_index(self): """Gets the last URL index to start from.""" with self.lock: return self.state.get('last_url_index', 0) def get_next_url_batch(self, count, urls_list): """Gets the next batch of URLs to process, updating the state.""" with self.lock: start_index = self.state.get('last_url_index', 0) if start_index >= len(urls_list): return [], start_index # No more URLs end_index = start_index + count batch = urls_list[start_index:end_index] # Update state with the index of the *next* URL to be processed. self.state['last_url_index'] = end_index return batch, start_index def update_last_url_index(self, index, force=False): """Updates the last processed URL index in the state. Args: index: The index of the *next* URL to process. force: If True, sets the index regardless of the current value. """ with self.lock: if force or index > self.state.get('last_url_index', 0): self.state['last_url_index'] = index def get_processed_files(self): """Returns a set of file paths that have been processed.""" with self.lock: return set(self.state.get('processed_files', [])) def record_batch_result(self, success, video_count, profile_name=None): with self.lock: self.state['total_batches_processed'] = self.state.get('total_batches_processed', 0) + 1 self.state['total_videos_processed'] = self.state.get('total_videos_processed', 0) + video_count if success: self.state['successful_batches'] = self.state.get('successful_batches', 0) + 1 else: self.state['failed_batches'] = self.state.get('failed_batches', 0) + 1 # Print live counter total = self.state['total_batches_processed'] ok = self.state['successful_batches'] fail = self.state['failed_batches'] profile_log = f" [{profile_name}]" if profile_name else "" logger.info(f"Batch #{total} complete.{profile_log} (Total OK: {ok}, Total Fail: {fail})") def print_historical_summary(self): """Prints a summary based on the state loaded from disk, before new events.""" with self.lock: now = time.time() rate_trackers = self.state.get('rate_limit_trackers', {}) total_requests = self.state.get('global_request_count', 0) if not rate_trackers and not total_requests: logger.info("No historical data found in state file.") return logger.info("\n--- Summary From Previous Runs ---") logger.info(f"Total info.json requests (all previous runs): {total_requests}") if rate_trackers: for key, timestamps in sorted(rate_trackers.items()): # Time windows in seconds windows = { 'last 10 min': 600, 'last 60 min': 3600, 'last 6 hours': 21600, 'last 24 hours': 86400 } rates_str_parts = [] for name, seconds in windows.items(): count = sum(1 for ts in timestamps if now - ts <= seconds) # Calculate rate in requests per minute rate_rpm = (count / seconds) * 60 if seconds > 0 else 0 rates_str_parts.append(f"{count} req in {name} ({rate_rpm:.2f} rpm)") logger.info(f"Tracker '{key}': " + ", ".join(rates_str_parts)) logger.info("------------------------------------") def log_event(self, event_data): with self.lock: event_data['timestamp'] = datetime.now().isoformat() self.events.append(event_data) if self.stats_file_handle: self.stats_file_handle.write(json.dumps(event_data) + '\n') self.stats_file_handle.flush() def get_request_count(self): with self.lock: return self.state.get('global_request_count', 0) def increment_request_count(self): with self.lock: self.state['global_request_count'] = self.state.get('global_request_count', 0) + 1 def check_cumulative_error_rate(self, max_errors, per_minutes, error_type=None): """ Checks if a cumulative error rate has been exceeded. If error_type is None, checks for any failure. Returns the number of errors found if the threshold is met, otherwise 0. """ with self.lock: now = time.time() window_seconds = per_minutes * 60 if error_type: recent_errors = [ e for e in self.events if e.get('error_type') == error_type and (now - datetime.fromisoformat(e['timestamp']).timestamp()) <= window_seconds ] else: # Generic failure check recent_errors = [ e for e in self.events # Only count failures that are not explicitly tolerated if not e.get('success') and not e.get('is_tolerated_error') and (now - datetime.fromisoformat(e['timestamp']).timestamp()) <= window_seconds ] if len(recent_errors) >= max_errors: return len(recent_errors) return 0 def check_quality_degradation_rate(self, max_triggers, per_minutes): """ Checks if the quality degradation trigger rate has been exceeded. Returns the number of triggers found if the threshold is met, otherwise 0. """ with self.lock: now = time.time() window_seconds = per_minutes * 60 recent_triggers = [ e for e in self.events if e.get('quality_degradation_trigger') and (now - datetime.fromisoformat(e['timestamp']).timestamp()) <= window_seconds ] if len(recent_triggers) >= max_triggers: return len(recent_triggers) return 0 def check_and_update_rate_limit(self, profile_name, policy): """ Checks if a request is allowed based on policy rate limits. If allowed, updates the internal state. Returns True if allowed, False otherwise. """ with self.lock: now = time.time() gen_policy = policy.get('info_json_generation_policy', {}) rate_limits = gen_policy.get('rate_limits', {}) # Check per-IP limit ip_limit = rate_limits.get('per_ip') if ip_limit: tracker_key = 'per_ip' max_req = ip_limit.get('max_requests') period_min = ip_limit.get('per_minutes') if max_req and period_min: timestamps = self.state['rate_limit_trackers'].get(tracker_key, []) # Filter out old timestamps timestamps = [ts for ts in timestamps if now - ts < period_min * 60] if len(timestamps) >= max_req: logger.warning("Per-IP rate limit reached. Skipping task.") return False self.state['rate_limit_trackers'][tracker_key] = timestamps # Check per-profile limit profile_limit = rate_limits.get('per_profile') if profile_limit and profile_name: tracker_key = f"profile_{profile_name}" max_req = profile_limit.get('max_requests') period_min = profile_limit.get('per_minutes') if max_req and period_min: timestamps = self.state['rate_limit_trackers'].get(tracker_key, []) timestamps = [ts for ts in timestamps if now - ts < period_min * 60] if len(timestamps) >= max_req: logger.warning(f"Per-profile rate limit for '{profile_name}' reached. Skipping task.") return False self.state['rate_limit_trackers'][tracker_key] = timestamps # If all checks pass, record the new request timestamp for all relevant trackers if ip_limit and ip_limit.get('max_requests'): self.state['rate_limit_trackers'].setdefault('per_ip', []).append(now) if profile_limit and profile_limit.get('max_requests') and profile_name: self.state['rate_limit_trackers'].setdefault(f"profile_{profile_name}", []).append(now) return True def get_client_for_request(self, profile_name, gen_policy): """ Determines which client to use based on the client_rotation_policy. Returns a tuple: (client_name, request_params_dict). """ with self.lock: rotation_policy = gen_policy.get('client_rotation_policy') # If no rotation policy, use the simple 'client' key. if not rotation_policy: client = gen_policy.get('client') logger.info(f"Using client '{client}' for profile '{profile_name}'.") req_params = gen_policy.get('request_params') return client, req_params # --- Rotation logic --- now = time.time() major_client = rotation_policy.get('major_client') refresh_client = rotation_policy.get('refresh_client') refresh_every = rotation_policy.get('refresh_every', {}) if not refresh_client or not refresh_every: return major_client, rotation_policy.get('major_client_params') should_refresh = False # Check time-based refresh refresh_minutes = refresh_every.get('minutes') last_refresh_time = self.state['profile_last_refresh_time'].get(profile_name, 0) if refresh_minutes and (now - last_refresh_time) > (refresh_minutes * 60): should_refresh = True # Check request-count-based refresh refresh_requests = refresh_every.get('requests') request_count = self.state['profile_request_counts'].get(profile_name, 0) if refresh_requests and request_count >= refresh_requests: should_refresh = True if should_refresh: logger.info(f"Profile '{profile_name}' is due for a refresh. Using refresh client '{refresh_client}'.") self.state['profile_last_refresh_time'][profile_name] = now self.state['profile_request_counts'][profile_name] = 0 # Reset counter return refresh_client, rotation_policy.get('refresh_client_params') else: # Not refreshing, so increment request count for this profile self.state['profile_request_counts'][profile_name] = request_count + 1 return major_client, rotation_policy.get('major_client_params') def get_next_available_profile(self, policy): """ Finds or creates an available profile based on the dynamic cooldown policy. Returns a profile name, or None if no profile is available. """ with self.lock: now = time.time() settings = policy.get('settings', {}) pm_policy = settings.get('profile_management') if not pm_policy: return None prefix = pm_policy.get('prefix') if not prefix: logger.error("Profile management policy requires 'prefix'.") return None # Determine and persist the suffix for this run to ensure profile names are stable run_suffix = self.state.get('profile_run_suffix') if not run_suffix: suffix_config = pm_policy.get('suffix') if suffix_config == 'auto': run_suffix = datetime.now().strftime('%Y%m%d%H%M') else: run_suffix = suffix_config or '' self.state['profile_run_suffix'] = run_suffix # Initialize pool size from policy if not already in state if self.state.get('profile_pool_size', 0) == 0: self.state['profile_pool_size'] = pm_policy.get('initial_pool_size', 1) max_reqs = pm_policy.get('max_requests_per_profile') sleep_mins = pm_policy.get('sleep_minutes_on_exhaustion') # Loop until a profile is found or we decide we can't find one while True: # Try to find an existing, available profile for i in range(self.state['profile_pool_size']): profile_name = f"{prefix}_{run_suffix}_{i}" if run_suffix else f"{prefix}_{i}" # Check if sleeping sleep_until = self.state['profile_cooldown_sleep_until'].get(profile_name, 0) if now < sleep_until: continue # Still sleeping # Check if it needs to be put to sleep req_count = self.state['profile_cooldown_counts'].get(profile_name, 0) if max_reqs and req_count >= max_reqs: sleep_duration_seconds = (sleep_mins or 0) * 60 self.state['profile_cooldown_sleep_until'][profile_name] = now + sleep_duration_seconds self.state['profile_cooldown_counts'][profile_name] = 0 # Reset count for next time logger.info(f"Profile '{profile_name}' reached request limit ({req_count}/{max_reqs}). Putting to sleep for {sleep_mins} minutes.") continue # Now sleeping, try next profile # This profile is available logger.info(f"Selected available profile '{profile_name}' (request count: {req_count}/{max_reqs if max_reqs else 'unlimited'}).") return profile_name # If we get here, no existing profile was available if pm_policy.get('auto_expand_pool'): new_profile_index = self.state['profile_pool_size'] self.state['profile_pool_size'] += 1 profile_name = f"{prefix}_{run_suffix}_{new_profile_index}" if run_suffix else f"{prefix}_{new_profile_index}" logger.info(f"Profile pool exhausted. Expanding pool to size {self.state['profile_pool_size']}. New profile: '{profile_name}'") return profile_name else: # No available profiles and pool expansion is disabled return None def get_or_rotate_worker_profile(self, worker_id, policy): """ Gets the current profile for a worker, rotating to a new generation if the lifetime limit is met. This is used by the 'per_worker_with_rotation' profile mode. """ with self.lock: pm_policy = policy.get('settings', {}).get('profile_management', {}) if not pm_policy: logger.error("Profile mode 'per_worker_with_rotation' requires 'settings.profile_management' configuration in the policy.") return f"error_profile_{worker_id}" prefix = pm_policy.get('prefix') if not prefix: logger.error("Profile management for 'per_worker_with_rotation' requires a 'prefix'.") return f"error_profile_{worker_id}" max_reqs = pm_policy.get('max_requests_per_profile') generations = self.state.setdefault('worker_profile_generations', {}) # worker_id is an int, but JSON keys must be strings worker_id_str = str(worker_id) current_gen = generations.get(worker_id_str, 0) profile_name = f"{prefix}_{worker_id}_{current_gen}" if not max_reqs: # No lifetime limit defined, so never rotate. return profile_name req_count = self.state.get('profile_cooldown_counts', {}).get(profile_name, 0) if req_count >= max_reqs: logger.info(f"Profile '{profile_name}' reached lifetime request limit ({req_count}/{max_reqs}). Rotating to new generation for worker {worker_id}.") new_gen = current_gen + 1 generations[worker_id_str] = new_gen # The request counts for the old profile are implicitly left behind. # The new profile will start with a count of 0. profile_name = f"{prefix}_{worker_id}_{new_gen}" return profile_name def record_profile_request(self, profile_name): """Increments the request counter for a profile for the cooldown policy.""" with self.lock: if not profile_name: return counts = self.state.setdefault('profile_cooldown_counts', {}) counts[profile_name] = counts.get(profile_name, 0) + 1 def record_proxy_usage(self, proxy_url): """Records a request timestamp for a given proxy URL for statistical purposes.""" if not proxy_url: return with self.lock: now = time.time() # Use a prefix to avoid collisions with profile names or other keys tracker_key = f"proxy_{proxy_url}" self.state['rate_limit_trackers'].setdefault(tracker_key, []).append(now) def check_and_update_download_rate_limit(self, proxy_url, policy): """Checks download rate limits. Returns True if allowed, False otherwise.""" with self.lock: now = time.time() d_policy = policy.get('download_policy', {}) rate_limits = d_policy.get('rate_limits', {}) # Check per-IP limit ip_limit = rate_limits.get('per_ip') if ip_limit: tracker_key = 'download_per_ip' # Use a distinct key max_req = ip_limit.get('max_requests') period_min = ip_limit.get('per_minutes') if max_req and period_min: timestamps = self.state['rate_limit_trackers'].get(tracker_key, []) timestamps = [ts for ts in timestamps if now - ts < period_min * 60] if len(timestamps) >= max_req: logger.warning("Per-IP download rate limit reached. Skipping task.") return False self.state['rate_limit_trackers'][tracker_key] = timestamps # Check per-proxy limit proxy_limit = rate_limits.get('per_proxy') if proxy_limit and proxy_url: tracker_key = f"download_proxy_{proxy_url}" max_req = proxy_limit.get('max_requests') period_min = proxy_limit.get('per_minutes') if max_req and period_min: timestamps = self.state['rate_limit_trackers'].get(tracker_key, []) timestamps = [ts for ts in timestamps if now - ts < period_min * 60] if len(timestamps) >= max_req: logger.warning(f"Per-proxy download rate limit for '{proxy_url}' reached. Skipping task.") return False self.state['rate_limit_trackers'][tracker_key] = timestamps # If all checks pass, record the new request timestamp for all relevant trackers if ip_limit and ip_limit.get('max_requests'): self.state['rate_limit_trackers'].setdefault('download_per_ip', []).append(now) if proxy_limit and proxy_limit.get('max_requests') and proxy_url: self.state['rate_limit_trackers'].setdefault(f"download_proxy_{proxy_url}", []).append(now) return True def wait_for_proxy_cooldown(self, proxy_url, policy): """If a per-proxy sleep is defined, wait until the cooldown period has passed.""" with self.lock: d_policy = policy.get('download_policy', {}) sleep_duration = d_policy.get('sleep_per_proxy_seconds', 0) if not proxy_url or not sleep_duration > 0: return last_finish = self.state.setdefault('proxy_last_finish_time', {}).get(proxy_url, 0) elapsed = time.time() - last_finish if elapsed < sleep_duration: time_to_sleep = sleep_duration - elapsed logger.info(f"Proxy '{proxy_url}' was used recently. Sleeping for {time_to_sleep:.2f}s.") # Interruptible sleep sleep_end_time = time.time() + time_to_sleep while time.time() < sleep_end_time: if self.shutdown_event.is_set(): logger.info("Shutdown requested during proxy cooldown sleep.") break time.sleep(0.2) def update_proxy_finish_time(self, proxy_url): """Updates the last finish time for a proxy.""" with self.lock: if not proxy_url: return self.state.setdefault('proxy_last_finish_time', {})[proxy_url] = time.time() def print_summary(self, policy=None): """Print a summary of the test run.""" with self.lock: # --- Cumulative Stats from State --- now = time.time() rate_trackers = self.state.get('rate_limit_trackers', {}) if rate_trackers: logger.info("\n--- Cumulative Rate Summary (All Runs, updated at end of run) ---") logger.info("This shows the total number of requests/downloads over various time windows, including previous runs.") fetch_trackers = {k: v for k, v in rate_trackers.items() if not k.startswith('download_')} download_trackers = {k: v for k, v in rate_trackers.items() if k.startswith('download_')} def print_tracker_stats(trackers, tracker_type): if not trackers: logger.info(f"No historical {tracker_type} trackers found.") return logger.info(f"Historical {tracker_type} Trackers:") for key, timestamps in sorted(trackers.items()): windows = { 'last 10 min': 600, 'last 60 min': 3600, 'last 6 hours': 21600, 'last 24 hours': 86400 } rates_str_parts = [] for name, seconds in windows.items(): count = sum(1 for ts in timestamps if now - ts <= seconds) rate_rpm = (count / seconds) * 60 if seconds > 0 else 0 rates_str_parts.append(f"{count} in {name} ({rate_rpm:.2f}/min)") # Clean up key for display display_key = key.replace('download_', '').replace('per_ip', 'all_proxies/ips') logger.info(f" - Tracker '{display_key}': " + ", ".join(rates_str_parts)) print_tracker_stats(fetch_trackers, "Fetch Request") print_tracker_stats(download_trackers, "Download Attempt") if not self.events: logger.info("\nNo new events were recorded in this session.") return duration = time.time() - self.start_time fetch_events = [e for e in self.events if e.get('type') == 'fetch'] batch_fetch_events = [e for e in self.events if e.get('type') == 'fetch_batch'] download_events = [e for e in self.events if e.get('type') not in ['fetch', 'fetch_batch']] logger.info("\n--- Test Summary (This Run) ---") logger.info(f"Total duration: {duration:.2f} seconds") # Check for batch mode stats from state if self.state.get('total_batches_processed', 0) > 0: logger.info(f"Total batches processed (cumulative): {self.state['total_batches_processed']}") logger.info(f" - Successful: {self.state['successful_batches']}") logger.info(f" - Failed: {self.state['failed_batches']}") logger.info(f"Total videos processed (cumulative): {self.state['total_videos_processed']}") else: logger.info(f"Total info.json requests (cumulative): {self.get_request_count()}") if policy: logger.info("\n--- Test Configuration ---") settings = policy.get('settings', {}) d_policy = policy.get('download_policy', {}) if settings.get('urls_file'): logger.info(f"URL source file: {settings['urls_file']}") if settings.get('info_json_dir'): logger.info(f"Info.json source dir: {settings['info_json_dir']}") if d_policy: logger.info(f"Download formats: {d_policy.get('formats', 'N/A')}") if d_policy.get('downloader'): logger.info(f"Downloader: {d_policy.get('downloader')}") if d_policy.get('downloader_args'): logger.info(f"Downloader args: {d_policy.get('downloader_args')}") if d_policy.get('pause_before_download_seconds'): logger.info(f"Pause before download: {d_policy.get('pause_before_download_seconds')}s") if d_policy.get('sleep_between_formats'): sleep_cfg = d_policy.get('sleep_between_formats') logger.info(f"Sleep between formats: {sleep_cfg.get('min_seconds', 0)}-{sleep_cfg.get('max_seconds', 0)}s") if fetch_events: total_fetches = len(fetch_events) successful_fetches = sum(1 for e in fetch_events if e['success']) cancelled_fetches = sum(1 for e in fetch_events if e.get('error_type') == 'Cancelled') failed_fetches = total_fetches - successful_fetches - cancelled_fetches logger.info("\n--- Fetch Summary (This Run) ---") logger.info(f"Total info.json fetch attempts: {total_fetches}") logger.info(f" - Successful: {successful_fetches}") logger.info(f" - Failed: {failed_fetches}") if cancelled_fetches > 0: logger.info(f" - Cancelled: {cancelled_fetches}") completed_fetches = successful_fetches + failed_fetches if completed_fetches > 0: success_rate = (successful_fetches / completed_fetches) * 100 logger.info(f"Success rate (of completed): {success_rate:.2f}%") elif total_fetches > 0: logger.info("Success rate: N/A (no tasks completed)") if duration > 1 and total_fetches > 0: rpm = (total_fetches / duration) * 60 logger.info(f"Actual fetch rate: {rpm:.2f} requests/minute") if failed_fetches > 0: error_counts = collections.Counter( e.get('error_type', 'Unknown') for e in fetch_events if not e['success'] and e.get('error_type') != 'Cancelled' ) logger.info("Failure breakdown:") for error_type, count in sorted(error_counts.items()): logger.info(f" - {error_type}: {count}") profile_counts = collections.Counter(e.get('profile') for e in fetch_events if e.get('profile')) if profile_counts: logger.info("Requests per profile:") for profile, count in sorted(profile_counts.items()): logger.info(f" - {profile}: {count}") proxy_counts = collections.Counter(e.get('proxy_url') for e in fetch_events if e.get('proxy_url')) if proxy_counts: logger.info("Requests per proxy:") for proxy, count in sorted(proxy_counts.items()): logger.info(f" - {proxy}: {count}") if batch_fetch_events: total_batches = len(batch_fetch_events) successful_batches = sum(1 for e in batch_fetch_events if e['success']) failed_batches = total_batches - successful_batches total_videos_this_run = sum(e.get('video_count', 0) for e in batch_fetch_events) logger.info("\n--- Batch Fetch Summary (This Run) ---") logger.info(f"Total batches processed: {total_batches}") logger.info(f"Total videos processed: {total_videos_this_run}") logger.info(f" - Successful batches: {successful_batches}") logger.info(f" - Failed batches: {failed_batches}") profile_counts = collections.Counter(e.get('profile') for e in batch_fetch_events if e.get('profile')) if profile_counts: logger.info("Batches per profile:") for profile, count in sorted(profile_counts.items()): logger.info(f" - {profile}: {count}") proxy_counts = collections.Counter(e.get('proxy_url') for e in batch_fetch_events if e.get('proxy_url')) if proxy_counts: logger.info("Batches per proxy:") for proxy, count in sorted(proxy_counts.items()): logger.info(f" - {proxy}: {count}") if download_events: total_attempts = len(download_events) successes = sum(1 for e in download_events if e['success']) cancelled = sum(1 for e in download_events if e.get('error_type') == 'Cancelled') failures = total_attempts - successes - cancelled # --- Profile Association for Download Events --- download_profiles = [e.get('profile') for e in download_events] # For download_only mode, we might need to fall back to regex extraction # if the profile wasn't passed down (e.g., no profile grouping). profile_regex = None if policy: settings = policy.get('settings', {}) if settings.get('mode') == 'download_only': profile_regex = settings.get('profile_extraction_regex') if profile_regex: for i, e in enumerate(download_events): if not download_profiles[i]: # If profile wasn't set in the event path = Path(e.get('path', '')) match = re.search(profile_regex, path.name) if match and match.groups(): download_profiles[i] = match.group(1) # Replace any remaining Nones with 'unknown_profile' download_profiles = [p or 'unknown_profile' for p in download_profiles] num_profiles_used = len(set(p for p in download_profiles if p != 'unknown_profile')) logger.info("\n--- Download Summary (This Run) ---") if policy: workers = policy.get('execution_control', {}).get('workers', 'N/A') logger.info(f"Workers configured: {workers}") logger.info(f"Profiles utilized for downloads: {num_profiles_used}") logger.info(f"Total download attempts: {total_attempts}") logger.info(f" - Successful: {successes}") logger.info(f" - Failed: {failures}") if cancelled > 0: logger.info(f" - Cancelled: {cancelled}") completed_downloads = successes + failures if completed_downloads > 0: success_rate = (successes / completed_downloads) * 100 logger.info(f"Success rate (of completed): {success_rate:.2f}%") elif total_attempts > 0: logger.info("Success rate: N/A (no tasks completed)") duration_hours = duration / 3600.0 if duration > 1 and total_attempts > 0: dpm = (total_attempts / duration) * 60 logger.info(f"Actual overall download rate: {dpm:.2f} attempts/minute") total_bytes = sum(e.get('downloaded_bytes', 0) for e in download_events if e['success']) if total_bytes > 0: logger.info(f"Total data downloaded: {sp_utils.format_size(total_bytes)}") if failures > 0: error_counts = collections.Counter( e.get('error_type', 'Unknown') for e in download_events if not e['success'] and e.get('error_type') != 'Cancelled' ) logger.info("Failure breakdown:") for error_type, count in sorted(error_counts.items()): logger.info(f" - {error_type}: {count}") # Add profile to each download event for easier counting for i, e in enumerate(download_events): e['profile'] = download_profiles[i] profile_counts = collections.Counter(e.get('profile') for e in download_events if e.get('profile')) if profile_counts: logger.info("Downloads per profile:") for profile, count in sorted(profile_counts.items()): rate_per_hour = (count / duration_hours) if duration_hours > 0 else 0 logger.info(f" - {profile}: {count} attempts (avg this run: {rate_per_hour:.2f}/hour)") proxy_counts = collections.Counter(e.get('proxy_url') for e in download_events if e.get('proxy_url')) if proxy_counts: logger.info("Downloads per proxy:") for proxy, count in sorted(proxy_counts.items()): rate_per_hour = (count / duration_hours) if duration_hours > 0 else 0 logger.info(f" - {proxy}: {count} attempts (avg this run: {rate_per_hour:.2f}/hour)") logger.info("--------------------") # --- Queue-specific methods --- def initialize_queue_provider(self, redis_host: str, redis_port: int, redis_password: Optional[str] = None, redis_db: int = 0, env_prefix: Optional[str] = None): """Initialize the queue provider if not already set.""" if not self.queue_provider: self.queue_provider = RedisQueueProvider( redis_host=redis_host, redis_port=redis_port, redis_password=redis_password, redis_db=redis_db, env_prefix=env_prefix ) logger.info(f"Initialized Redis queue provider with prefix: '{env_prefix}'") return self.queue_provider def get_auth_task(self) -> Optional[Dict]: """Get an authentication task from the queue.""" if not self.queue_provider: logger.error("Queue provider not initialized") return None task = self.queue_provider.get_task(self.queue_provider.AUTH_INBOX) if task: with self.lock: self.state['queue_stats']['auth']['total_processed'] += 1 return task def get_auth_tasks_batch(self, batch_size: int) -> List[Dict]: """Get a batch of authentication tasks from the queue.""" if not self.queue_provider: logger.error("Queue provider not initialized") return [] tasks = self.queue_provider.get_tasks_batch(self.queue_provider.AUTH_INBOX, batch_size) if tasks: with self.lock: self.state['queue_stats']['auth']['total_processed'] += len(tasks) return tasks def report_auth_success(self, task_id: str, result: Dict) -> bool: """Report a successful authentication task.""" if not self.queue_provider: logger.error("Queue provider not initialized") return False success = self.queue_provider.report_success(self.queue_provider.AUTH_RESULT, task_id, result) if success: with self.lock: self.state['queue_stats']['auth']['successful'] += 1 return success def report_auth_failure(self, task_id: str, error: Dict) -> bool: """Report an authentication task failure.""" if not self.queue_provider: logger.error("Queue provider not initialized") return False success = self.queue_provider.report_failure(self.queue_provider.AUTH_FAIL, task_id, error) if success: with self.lock: self.state['queue_stats']['auth']['failed'] += 1 return success def report_auth_skipped(self, task_id: str, reason: Dict) -> bool: """Report an authentication task that was skipped.""" if not self.queue_provider: logger.error("Queue provider not initialized") return False success = self.queue_provider.report_skipped(self.queue_provider.AUTH_SKIPPED, task_id, reason) if success: with self.lock: self.state['queue_stats']['auth']['skipped'] += 1 return success def mark_auth_in_progress(self, task_id: str, worker_id: str) -> bool: """Mark an authentication task as in progress.""" if not self.queue_provider: logger.error("Queue provider not initialized") return False return self.queue_provider.mark_in_progress(self.queue_provider.AUTH_PROGRESS, task_id, worker_id) def remove_auth_in_progress(self, task_id: str) -> bool: """Remove an authentication task from the in-progress tracking.""" if not self.queue_provider: logger.error("Queue provider not initialized") return False return self.queue_provider.remove_in_progress(self.queue_provider.AUTH_PROGRESS, task_id) def add_download_task(self, task: Dict) -> bool: """Add a download task to the queue.""" if not self.queue_provider: logger.error("Queue provider not initialized") return False return self.queue_provider.add_task(self.queue_provider.DL_TASKS, task) def add_download_tasks_batch(self, tasks: List[Dict]) -> int: """Add a batch of download tasks to the queue.""" if not self.queue_provider: logger.error("Queue provider not initialized") return 0 return self.queue_provider.add_tasks_batch(self.queue_provider.DL_TASKS, tasks) def get_download_task(self) -> Optional[Dict]: """Get a download task from the queue.""" if not self.queue_provider: logger.error("Queue provider not initialized") return None task = self.queue_provider.get_task(self.queue_provider.DL_TASKS) if task: with self.lock: self.state['queue_stats']['download']['total_processed'] += 1 return task def get_download_tasks_batch(self, batch_size: int) -> List[Dict]: """Get a batch of download tasks from the queue.""" if not self.queue_provider: logger.error("Queue provider not initialized") return [] tasks = self.queue_provider.get_tasks_batch(self.queue_provider.DL_TASKS, batch_size) if tasks: with self.lock: self.state['queue_stats']['download']['total_processed'] += len(tasks) return tasks def report_download_success(self, task_id: str, result: Dict) -> bool: """Report a successful download task.""" if not self.queue_provider: logger.error("Queue provider not initialized") return False success = self.queue_provider.report_success(self.queue_provider.DL_RESULT, task_id, result) if success: with self.lock: self.state['queue_stats']['download']['successful'] += 1 return success def report_download_failure(self, task_id: str, error: Dict) -> bool: """Report a download task failure.""" if not self.queue_provider: logger.error("Queue provider not initialized") return False success = self.queue_provider.report_failure(self.queue_provider.DL_FAIL, task_id, error) if success: with self.lock: self.state['queue_stats']['download']['failed'] += 1 return success def report_download_skipped(self, task_id: str, reason: Dict) -> bool: """Report a download task that was skipped.""" if not self.queue_provider: logger.error("Queue provider not initialized") return False success = self.queue_provider.report_skipped(self.queue_provider.DL_SKIPPED, task_id, reason) if success: with self.lock: self.state['queue_stats']['download']['skipped'] += 1 return success def mark_download_in_progress(self, task_id: str, worker_id: str) -> bool: """Mark a download task as in progress.""" if not self.queue_provider: logger.error("Queue provider not initialized") return False return self.queue_provider.mark_in_progress(self.queue_provider.DL_PROGRESS, task_id, worker_id) def remove_download_in_progress(self, task_id: str) -> bool: """Remove a download task from the in-progress tracking.""" if not self.queue_provider: logger.error("Queue provider not initialized") return False return self.queue_provider.remove_in_progress(self.queue_provider.DL_PROGRESS, task_id) def requeue_failed_auth_tasks(self, batch_size: int = 100) -> int: """Requeue failed authentication tasks.""" if not self.queue_provider: logger.error("Queue provider not initialized") return 0 return self.queue_provider.requeue_failed_tasks( self.queue_provider.AUTH_FAIL, self.queue_provider.AUTH_INBOX, batch_size ) def requeue_failed_download_tasks(self, batch_size: int = 100) -> int: """Requeue failed download tasks.""" if not self.queue_provider: logger.error("Queue provider not initialized") return 0 return self.queue_provider.requeue_failed_tasks( self.queue_provider.DL_FAIL, self.queue_provider.DL_TASKS, batch_size )