1038 lines
49 KiB
Python

import collections
import collections.abc
import json
import logging
import re
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple, Union
from . import utils as sp_utils
from .queue_provider import QueueProvider, RedisQueueProvider
logger = logging.getLogger(__name__)
class StateManager:
"""Tracks statistics, manages rate limits, and persists state across runs."""
def __init__(self, policy_name, disable_log_writing=False, shutdown_event=None,
queue_provider: Optional[QueueProvider] = None):
self.disable_log_writing = disable_log_writing
self.state_file_path = Path(f"{policy_name}_state.json")
self.stats_file_path = Path(f"{policy_name}_stats.jsonl")
self.lock = threading.RLock()
self.start_time = time.time()
self.shutdown_event = shutdown_event or threading.Event()
self.events = []
self.state = {
'global_request_count': 0,
'rate_limit_trackers': {}, # e.g., {'per_ip': [ts1, ts2], 'profile_foo': [ts3, ts4]}
'profile_request_counts': {}, # for client rotation
'profile_last_refresh_time': {}, # for client rotation
'proxy_last_finish_time': {}, # for per-proxy sleep
'processed_files': [], # For continuous download_only mode
# For dynamic profile cooldown strategy
'profile_cooldown_counts': {},
'profile_cooldown_sleep_until': {},
'profile_pool_size': 0,
'profile_run_suffix': None,
'worker_profile_generations': {},
'last_url_index': 0,
# For batch modes
'total_batches_processed': 0,
'successful_batches': 0,
'failed_batches': 0,
'total_videos_processed': 0,
# For queue modes
'queue_stats': {
'auth': {
'total_processed': 0,
'successful': 0,
'failed': 0,
'skipped': 0
},
'download': {
'total_processed': 0,
'successful': 0,
'failed': 0,
'skipped': 0
}
}
}
self.stats_file_handle = None
self.queue_provider = queue_provider
self._load_state()
self.print_historical_summary()
self._open_stats_log()
def _load_state(self):
if self.disable_log_writing:
logger.info("Log writing is disabled. State will not be loaded from disk.")
return
if not self.state_file_path.exists():
logger.info(f"State file not found at '{self.state_file_path}', starting fresh.")
return
try:
with open(self.state_file_path, 'r', encoding='utf-8') as f:
self.state = json.load(f)
# Ensure keys exist
self.state.setdefault('global_request_count', 0)
self.state.setdefault('rate_limit_trackers', {})
self.state.setdefault('profile_request_counts', {})
self.state.setdefault('profile_last_refresh_time', {})
self.state.setdefault('proxy_last_finish_time', {})
self.state.setdefault('processed_files', [])
# For dynamic profile cooldown strategy
self.state.setdefault('profile_cooldown_counts', {})
self.state.setdefault('profile_cooldown_sleep_until', {})
self.state.setdefault('profile_pool_size', 0)
self.state.setdefault('profile_run_suffix', None)
self.state.setdefault('worker_profile_generations', {})
self.state.setdefault('last_url_index', 0)
# For batch modes
self.state.setdefault('total_batches_processed', 0)
self.state.setdefault('successful_batches', 0)
self.state.setdefault('failed_batches', 0)
self.state.setdefault('total_videos_processed', 0)
logger.info(f"Loaded state from {self.state_file_path}")
except (IOError, json.JSONDecodeError) as e:
logger.error(f"Could not load or parse state file {self.state_file_path}: {e}. Starting fresh.")
def _save_state(self):
if self.disable_log_writing:
return
with self.lock:
try:
with open(self.state_file_path, 'w', encoding='utf-8') as f:
json.dump(self.state, f, indent=2)
logger.info(f"Saved state to {self.state_file_path}")
except IOError as e:
logger.error(f"Could not save state to {self.state_file_path}: {e}")
def _open_stats_log(self):
if self.disable_log_writing:
return
try:
self.stats_file_handle = open(self.stats_file_path, 'a', encoding='utf-8')
except IOError as e:
logger.error(f"Could not open stats file {self.stats_file_path}: {e}")
def close(self):
"""Saves state and closes file handles."""
self._save_state()
if self.stats_file_handle:
self.stats_file_handle.close()
self.stats_file_handle = None
def mark_file_as_processed(self, file_path):
"""Adds a file path to the list of processed files in the state."""
with self.lock:
# Using a list and checking for existence is fine for moderate numbers of files.
# A set isn't JSON serializable.
processed = self.state.setdefault('processed_files', [])
file_str = str(file_path)
if file_str not in processed:
processed.append(file_str)
def get_last_url_index(self):
"""Gets the last URL index to start from."""
with self.lock:
return self.state.get('last_url_index', 0)
def get_next_url_batch(self, count, urls_list):
"""Gets the next batch of URLs to process, updating the state."""
with self.lock:
start_index = self.state.get('last_url_index', 0)
if start_index >= len(urls_list):
return [], start_index # No more URLs
end_index = start_index + count
batch = urls_list[start_index:end_index]
# Update state with the index of the *next* URL to be processed.
self.state['last_url_index'] = end_index
return batch, start_index
def update_last_url_index(self, index, force=False):
"""Updates the last processed URL index in the state.
Args:
index: The index of the *next* URL to process.
force: If True, sets the index regardless of the current value.
"""
with self.lock:
if force or index > self.state.get('last_url_index', 0):
self.state['last_url_index'] = index
def get_processed_files(self):
"""Returns a set of file paths that have been processed."""
with self.lock:
return set(self.state.get('processed_files', []))
def record_batch_result(self, success, video_count, profile_name=None):
with self.lock:
self.state['total_batches_processed'] = self.state.get('total_batches_processed', 0) + 1
self.state['total_videos_processed'] = self.state.get('total_videos_processed', 0) + video_count
if success:
self.state['successful_batches'] = self.state.get('successful_batches', 0) + 1
else:
self.state['failed_batches'] = self.state.get('failed_batches', 0) + 1
# Print live counter
total = self.state['total_batches_processed']
ok = self.state['successful_batches']
fail = self.state['failed_batches']
profile_log = f" [{profile_name}]" if profile_name else ""
logger.info(f"Batch #{total} complete.{profile_log} (Total OK: {ok}, Total Fail: {fail})")
def print_historical_summary(self):
"""Prints a summary based on the state loaded from disk, before new events."""
with self.lock:
now = time.time()
rate_trackers = self.state.get('rate_limit_trackers', {})
total_requests = self.state.get('global_request_count', 0)
if not rate_trackers and not total_requests:
logger.info("No historical data found in state file.")
return
logger.info("\n--- Summary From Previous Runs ---")
logger.info(f"Total info.json requests (all previous runs): {total_requests}")
if rate_trackers:
for key, timestamps in sorted(rate_trackers.items()):
# Time windows in seconds
windows = {
'last 10 min': 600,
'last 60 min': 3600,
'last 6 hours': 21600,
'last 24 hours': 86400
}
rates_str_parts = []
for name, seconds in windows.items():
count = sum(1 for ts in timestamps if now - ts <= seconds)
# Calculate rate in requests per minute
rate_rpm = (count / seconds) * 60 if seconds > 0 else 0
rates_str_parts.append(f"{count} req in {name} ({rate_rpm:.2f} rpm)")
logger.info(f"Tracker '{key}': " + ", ".join(rates_str_parts))
logger.info("------------------------------------")
def log_event(self, event_data):
with self.lock:
event_data['timestamp'] = datetime.now().isoformat()
self.events.append(event_data)
if self.stats_file_handle:
self.stats_file_handle.write(json.dumps(event_data) + '\n')
self.stats_file_handle.flush()
def get_request_count(self):
with self.lock:
return self.state.get('global_request_count', 0)
def increment_request_count(self):
with self.lock:
self.state['global_request_count'] = self.state.get('global_request_count', 0) + 1
def check_cumulative_error_rate(self, max_errors, per_minutes, error_type=None):
"""
Checks if a cumulative error rate has been exceeded.
If error_type is None, checks for any failure.
Returns the number of errors found if the threshold is met, otherwise 0.
"""
with self.lock:
now = time.time()
window_seconds = per_minutes * 60
if error_type:
recent_errors = [
e for e in self.events
if e.get('error_type') == error_type and (now - datetime.fromisoformat(e['timestamp']).timestamp()) <= window_seconds
]
else: # Generic failure check
recent_errors = [
e for e in self.events
# Only count failures that are not explicitly tolerated
if not e.get('success') and not e.get('is_tolerated_error') and (now - datetime.fromisoformat(e['timestamp']).timestamp()) <= window_seconds
]
if len(recent_errors) >= max_errors:
return len(recent_errors)
return 0
def check_quality_degradation_rate(self, max_triggers, per_minutes):
"""
Checks if the quality degradation trigger rate has been exceeded.
Returns the number of triggers found if the threshold is met, otherwise 0.
"""
with self.lock:
now = time.time()
window_seconds = per_minutes * 60
recent_triggers = [
e for e in self.events
if e.get('quality_degradation_trigger') and (now - datetime.fromisoformat(e['timestamp']).timestamp()) <= window_seconds
]
if len(recent_triggers) >= max_triggers:
return len(recent_triggers)
return 0
def check_and_update_rate_limit(self, profile_name, policy):
"""
Checks if a request is allowed based on policy rate limits.
If allowed, updates the internal state. Returns True if allowed, False otherwise.
"""
with self.lock:
now = time.time()
gen_policy = policy.get('info_json_generation_policy', {})
rate_limits = gen_policy.get('rate_limits', {})
# Check per-IP limit
ip_limit = rate_limits.get('per_ip')
if ip_limit:
tracker_key = 'per_ip'
max_req = ip_limit.get('max_requests')
period_min = ip_limit.get('per_minutes')
if max_req and period_min:
timestamps = self.state['rate_limit_trackers'].get(tracker_key, [])
# Filter out old timestamps
timestamps = [ts for ts in timestamps if now - ts < period_min * 60]
if len(timestamps) >= max_req:
logger.warning("Per-IP rate limit reached. Skipping task.")
return False
self.state['rate_limit_trackers'][tracker_key] = timestamps
# Check per-profile limit
profile_limit = rate_limits.get('per_profile')
if profile_limit and profile_name:
tracker_key = f"profile_{profile_name}"
max_req = profile_limit.get('max_requests')
period_min = profile_limit.get('per_minutes')
if max_req and period_min:
timestamps = self.state['rate_limit_trackers'].get(tracker_key, [])
timestamps = [ts for ts in timestamps if now - ts < period_min * 60]
if len(timestamps) >= max_req:
logger.warning(f"Per-profile rate limit for '{profile_name}' reached. Skipping task.")
return False
self.state['rate_limit_trackers'][tracker_key] = timestamps
# If all checks pass, record the new request timestamp for all relevant trackers
if ip_limit and ip_limit.get('max_requests'):
self.state['rate_limit_trackers'].setdefault('per_ip', []).append(now)
if profile_limit and profile_limit.get('max_requests') and profile_name:
self.state['rate_limit_trackers'].setdefault(f"profile_{profile_name}", []).append(now)
return True
def get_client_for_request(self, profile_name, gen_policy):
"""
Determines which client to use based on the client_rotation_policy.
Returns a tuple: (client_name, request_params_dict).
"""
with self.lock:
rotation_policy = gen_policy.get('client_rotation_policy')
# If no rotation policy, use the simple 'client' key.
if not rotation_policy:
client = gen_policy.get('client')
logger.info(f"Using client '{client}' for profile '{profile_name}'.")
req_params = gen_policy.get('request_params')
return client, req_params
# --- Rotation logic ---
now = time.time()
major_client = rotation_policy.get('major_client')
refresh_client = rotation_policy.get('refresh_client')
refresh_every = rotation_policy.get('refresh_every', {})
if not refresh_client or not refresh_every:
return major_client, rotation_policy.get('major_client_params')
should_refresh = False
# Check time-based refresh
refresh_minutes = refresh_every.get('minutes')
last_refresh_time = self.state['profile_last_refresh_time'].get(profile_name, 0)
if refresh_minutes and (now - last_refresh_time) > (refresh_minutes * 60):
should_refresh = True
# Check request-count-based refresh
refresh_requests = refresh_every.get('requests')
request_count = self.state['profile_request_counts'].get(profile_name, 0)
if refresh_requests and request_count >= refresh_requests:
should_refresh = True
if should_refresh:
logger.info(f"Profile '{profile_name}' is due for a refresh. Using refresh client '{refresh_client}'.")
self.state['profile_last_refresh_time'][profile_name] = now
self.state['profile_request_counts'][profile_name] = 0 # Reset counter
return refresh_client, rotation_policy.get('refresh_client_params')
else:
# Not refreshing, so increment request count for this profile
self.state['profile_request_counts'][profile_name] = request_count + 1
return major_client, rotation_policy.get('major_client_params')
def get_next_available_profile(self, policy):
"""
Finds or creates an available profile based on the dynamic cooldown policy.
Returns a profile name, or None if no profile is available.
"""
with self.lock:
now = time.time()
settings = policy.get('settings', {})
pm_policy = settings.get('profile_management')
if not pm_policy:
return None
prefix = pm_policy.get('prefix')
if not prefix:
logger.error("Profile management policy requires 'prefix'.")
return None
# Determine and persist the suffix for this run to ensure profile names are stable
run_suffix = self.state.get('profile_run_suffix')
if not run_suffix:
suffix_config = pm_policy.get('suffix')
if suffix_config == 'auto':
run_suffix = datetime.now().strftime('%Y%m%d%H%M')
else:
run_suffix = suffix_config or ''
self.state['profile_run_suffix'] = run_suffix
# Initialize pool size from policy if not already in state
if self.state.get('profile_pool_size', 0) == 0:
self.state['profile_pool_size'] = pm_policy.get('initial_pool_size', 1)
max_reqs = pm_policy.get('max_requests_per_profile')
sleep_mins = pm_policy.get('sleep_minutes_on_exhaustion')
# Loop until a profile is found or we decide we can't find one
while True:
# Try to find an existing, available profile
for i in range(self.state['profile_pool_size']):
profile_name = f"{prefix}_{run_suffix}_{i}" if run_suffix else f"{prefix}_{i}"
# Check if sleeping
sleep_until = self.state['profile_cooldown_sleep_until'].get(profile_name, 0)
if now < sleep_until:
continue # Still sleeping
# Check if it needs to be put to sleep
req_count = self.state['profile_cooldown_counts'].get(profile_name, 0)
if max_reqs and req_count >= max_reqs:
sleep_duration_seconds = (sleep_mins or 0) * 60
self.state['profile_cooldown_sleep_until'][profile_name] = now + sleep_duration_seconds
self.state['profile_cooldown_counts'][profile_name] = 0 # Reset count for next time
logger.info(f"Profile '{profile_name}' reached request limit ({req_count}/{max_reqs}). Putting to sleep for {sleep_mins} minutes.")
continue # Now sleeping, try next profile
# This profile is available
logger.info(f"Selected available profile '{profile_name}' (request count: {req_count}/{max_reqs if max_reqs else 'unlimited'}).")
return profile_name
# If we get here, no existing profile was available
if pm_policy.get('auto_expand_pool'):
new_profile_index = self.state['profile_pool_size']
self.state['profile_pool_size'] += 1
profile_name = f"{prefix}_{run_suffix}_{new_profile_index}" if run_suffix else f"{prefix}_{new_profile_index}"
logger.info(f"Profile pool exhausted. Expanding pool to size {self.state['profile_pool_size']}. New profile: '{profile_name}'")
return profile_name
else:
# No available profiles and pool expansion is disabled
return None
def get_or_rotate_worker_profile(self, worker_id, policy):
"""
Gets the current profile for a worker, rotating to a new generation if the lifetime limit is met.
This is used by the 'per_worker_with_rotation' profile mode.
"""
with self.lock:
pm_policy = policy.get('settings', {}).get('profile_management', {})
if not pm_policy:
logger.error("Profile mode 'per_worker_with_rotation' requires 'settings.profile_management' configuration in the policy.")
return f"error_profile_{worker_id}"
prefix = pm_policy.get('prefix')
if not prefix:
logger.error("Profile management for 'per_worker_with_rotation' requires a 'prefix'.")
return f"error_profile_{worker_id}"
max_reqs = pm_policy.get('max_requests_per_profile')
generations = self.state.setdefault('worker_profile_generations', {})
# worker_id is an int, but JSON keys must be strings
worker_id_str = str(worker_id)
current_gen = generations.get(worker_id_str, 0)
profile_name = f"{prefix}_{worker_id}_{current_gen}"
if not max_reqs: # No lifetime limit defined, so never rotate.
return profile_name
req_count = self.state.get('profile_cooldown_counts', {}).get(profile_name, 0)
if req_count >= max_reqs:
logger.info(f"Profile '{profile_name}' reached lifetime request limit ({req_count}/{max_reqs}). Rotating to new generation for worker {worker_id}.")
new_gen = current_gen + 1
generations[worker_id_str] = new_gen
# The request counts for the old profile are implicitly left behind.
# The new profile will start with a count of 0.
profile_name = f"{prefix}_{worker_id}_{new_gen}"
return profile_name
def record_profile_request(self, profile_name):
"""Increments the request counter for a profile for the cooldown policy."""
with self.lock:
if not profile_name:
return
counts = self.state.setdefault('profile_cooldown_counts', {})
counts[profile_name] = counts.get(profile_name, 0) + 1
def record_proxy_usage(self, proxy_url):
"""Records a request timestamp for a given proxy URL for statistical purposes."""
if not proxy_url:
return
with self.lock:
now = time.time()
# Use a prefix to avoid collisions with profile names or other keys
tracker_key = f"proxy_{proxy_url}"
self.state['rate_limit_trackers'].setdefault(tracker_key, []).append(now)
def check_and_update_download_rate_limit(self, proxy_url, policy):
"""Checks download rate limits. Returns True if allowed, False otherwise."""
with self.lock:
now = time.time()
d_policy = policy.get('download_policy', {})
rate_limits = d_policy.get('rate_limits', {})
# Check per-IP limit
ip_limit = rate_limits.get('per_ip')
if ip_limit:
tracker_key = 'download_per_ip' # Use a distinct key
max_req = ip_limit.get('max_requests')
period_min = ip_limit.get('per_minutes')
if max_req and period_min:
timestamps = self.state['rate_limit_trackers'].get(tracker_key, [])
timestamps = [ts for ts in timestamps if now - ts < period_min * 60]
if len(timestamps) >= max_req:
logger.warning("Per-IP download rate limit reached. Skipping task.")
return False
self.state['rate_limit_trackers'][tracker_key] = timestamps
# Check per-proxy limit
proxy_limit = rate_limits.get('per_proxy')
if proxy_limit and proxy_url:
tracker_key = f"download_proxy_{proxy_url}"
max_req = proxy_limit.get('max_requests')
period_min = proxy_limit.get('per_minutes')
if max_req and period_min:
timestamps = self.state['rate_limit_trackers'].get(tracker_key, [])
timestamps = [ts for ts in timestamps if now - ts < period_min * 60]
if len(timestamps) >= max_req:
logger.warning(f"Per-proxy download rate limit for '{proxy_url}' reached. Skipping task.")
return False
self.state['rate_limit_trackers'][tracker_key] = timestamps
# If all checks pass, record the new request timestamp for all relevant trackers
if ip_limit and ip_limit.get('max_requests'):
self.state['rate_limit_trackers'].setdefault('download_per_ip', []).append(now)
if proxy_limit and proxy_limit.get('max_requests') and proxy_url:
self.state['rate_limit_trackers'].setdefault(f"download_proxy_{proxy_url}", []).append(now)
return True
def wait_for_proxy_cooldown(self, proxy_url, policy):
"""If a per-proxy sleep is defined, wait until the cooldown period has passed."""
with self.lock:
d_policy = policy.get('download_policy', {})
sleep_duration = d_policy.get('sleep_per_proxy_seconds', 0)
if not proxy_url or not sleep_duration > 0:
return
last_finish = self.state.setdefault('proxy_last_finish_time', {}).get(proxy_url, 0)
elapsed = time.time() - last_finish
if elapsed < sleep_duration:
time_to_sleep = sleep_duration - elapsed
logger.info(f"Proxy '{proxy_url}' was used recently. Sleeping for {time_to_sleep:.2f}s.")
# Interruptible sleep
sleep_end_time = time.time() + time_to_sleep
while time.time() < sleep_end_time:
if self.shutdown_event.is_set():
logger.info("Shutdown requested during proxy cooldown sleep.")
break
time.sleep(0.2)
def update_proxy_finish_time(self, proxy_url):
"""Updates the last finish time for a proxy."""
with self.lock:
if not proxy_url:
return
self.state.setdefault('proxy_last_finish_time', {})[proxy_url] = time.time()
def print_summary(self, policy=None):
"""Print a summary of the test run."""
with self.lock:
# --- Cumulative Stats from State ---
now = time.time()
rate_trackers = self.state.get('rate_limit_trackers', {})
if rate_trackers:
logger.info("\n--- Cumulative Rate Summary (All Runs, updated at end of run) ---")
logger.info("This shows the total number of requests/downloads over various time windows, including previous runs.")
fetch_trackers = {k: v for k, v in rate_trackers.items() if not k.startswith('download_')}
download_trackers = {k: v for k, v in rate_trackers.items() if k.startswith('download_')}
def print_tracker_stats(trackers, tracker_type):
if not trackers:
logger.info(f"No historical {tracker_type} trackers found.")
return
logger.info(f"Historical {tracker_type} Trackers:")
for key, timestamps in sorted(trackers.items()):
windows = {
'last 10 min': 600, 'last 60 min': 3600,
'last 6 hours': 21600, 'last 24 hours': 86400
}
rates_str_parts = []
for name, seconds in windows.items():
count = sum(1 for ts in timestamps if now - ts <= seconds)
rate_rpm = (count / seconds) * 60 if seconds > 0 else 0
rates_str_parts.append(f"{count} in {name} ({rate_rpm:.2f}/min)")
# Clean up key for display
display_key = key.replace('download_', '').replace('per_ip', 'all_proxies/ips')
logger.info(f" - Tracker '{display_key}': " + ", ".join(rates_str_parts))
print_tracker_stats(fetch_trackers, "Fetch Request")
print_tracker_stats(download_trackers, "Download Attempt")
if not self.events:
logger.info("\nNo new events were recorded in this session.")
return
duration = time.time() - self.start_time
fetch_events = [e for e in self.events if e.get('type') == 'fetch']
batch_fetch_events = [e for e in self.events if e.get('type') == 'fetch_batch']
download_events = [e for e in self.events if e.get('type') not in ['fetch', 'fetch_batch']]
logger.info("\n--- Test Summary (This Run) ---")
logger.info(f"Total duration: {duration:.2f} seconds")
# Check for batch mode stats from state
if self.state.get('total_batches_processed', 0) > 0:
logger.info(f"Total batches processed (cumulative): {self.state['total_batches_processed']}")
logger.info(f" - Successful: {self.state['successful_batches']}")
logger.info(f" - Failed: {self.state['failed_batches']}")
logger.info(f"Total videos processed (cumulative): {self.state['total_videos_processed']}")
else:
logger.info(f"Total info.json requests (cumulative): {self.get_request_count()}")
if policy:
logger.info("\n--- Test Configuration ---")
settings = policy.get('settings', {})
d_policy = policy.get('download_policy', {})
if settings.get('urls_file'):
logger.info(f"URL source file: {settings['urls_file']}")
if settings.get('info_json_dir'):
logger.info(f"Info.json source dir: {settings['info_json_dir']}")
if d_policy:
logger.info(f"Download formats: {d_policy.get('formats', 'N/A')}")
if d_policy.get('downloader'):
logger.info(f"Downloader: {d_policy.get('downloader')}")
if d_policy.get('downloader_args'):
logger.info(f"Downloader args: {d_policy.get('downloader_args')}")
if d_policy.get('pause_before_download_seconds'):
logger.info(f"Pause before download: {d_policy.get('pause_before_download_seconds')}s")
if d_policy.get('sleep_between_formats'):
sleep_cfg = d_policy.get('sleep_between_formats')
logger.info(f"Sleep between formats: {sleep_cfg.get('min_seconds', 0)}-{sleep_cfg.get('max_seconds', 0)}s")
if fetch_events:
total_fetches = len(fetch_events)
successful_fetches = sum(1 for e in fetch_events if e['success'])
cancelled_fetches = sum(1 for e in fetch_events if e.get('error_type') == 'Cancelled')
failed_fetches = total_fetches - successful_fetches - cancelled_fetches
logger.info("\n--- Fetch Summary (This Run) ---")
logger.info(f"Total info.json fetch attempts: {total_fetches}")
logger.info(f" - Successful: {successful_fetches}")
logger.info(f" - Failed: {failed_fetches}")
if cancelled_fetches > 0:
logger.info(f" - Cancelled: {cancelled_fetches}")
completed_fetches = successful_fetches + failed_fetches
if completed_fetches > 0:
success_rate = (successful_fetches / completed_fetches) * 100
logger.info(f"Success rate (of completed): {success_rate:.2f}%")
elif total_fetches > 0:
logger.info("Success rate: N/A (no tasks completed)")
if duration > 1 and total_fetches > 0:
rpm = (total_fetches / duration) * 60
logger.info(f"Actual fetch rate: {rpm:.2f} requests/minute")
if failed_fetches > 0:
error_counts = collections.Counter(
e.get('error_type', 'Unknown')
for e in fetch_events if not e['success'] and e.get('error_type') != 'Cancelled'
)
logger.info("Failure breakdown:")
for error_type, count in sorted(error_counts.items()):
logger.info(f" - {error_type}: {count}")
profile_counts = collections.Counter(e.get('profile') for e in fetch_events if e.get('profile'))
if profile_counts:
logger.info("Requests per profile:")
for profile, count in sorted(profile_counts.items()):
logger.info(f" - {profile}: {count}")
proxy_counts = collections.Counter(e.get('proxy_url') for e in fetch_events if e.get('proxy_url'))
if proxy_counts:
logger.info("Requests per proxy:")
for proxy, count in sorted(proxy_counts.items()):
logger.info(f" - {proxy}: {count}")
if batch_fetch_events:
total_batches = len(batch_fetch_events)
successful_batches = sum(1 for e in batch_fetch_events if e['success'])
failed_batches = total_batches - successful_batches
total_videos_this_run = sum(e.get('video_count', 0) for e in batch_fetch_events)
logger.info("\n--- Batch Fetch Summary (This Run) ---")
logger.info(f"Total batches processed: {total_batches}")
logger.info(f"Total videos processed: {total_videos_this_run}")
logger.info(f" - Successful batches: {successful_batches}")
logger.info(f" - Failed batches: {failed_batches}")
profile_counts = collections.Counter(e.get('profile') for e in batch_fetch_events if e.get('profile'))
if profile_counts:
logger.info("Batches per profile:")
for profile, count in sorted(profile_counts.items()):
logger.info(f" - {profile}: {count}")
proxy_counts = collections.Counter(e.get('proxy_url') for e in batch_fetch_events if e.get('proxy_url'))
if proxy_counts:
logger.info("Batches per proxy:")
for proxy, count in sorted(proxy_counts.items()):
logger.info(f" - {proxy}: {count}")
if download_events:
total_attempts = len(download_events)
successes = sum(1 for e in download_events if e['success'])
cancelled = sum(1 for e in download_events if e.get('error_type') == 'Cancelled')
failures = total_attempts - successes - cancelled
# --- Profile Association for Download Events ---
download_profiles = [e.get('profile') for e in download_events]
# For download_only mode, we might need to fall back to regex extraction
# if the profile wasn't passed down (e.g., no profile grouping).
profile_regex = None
if policy:
settings = policy.get('settings', {})
if settings.get('mode') == 'download_only':
profile_regex = settings.get('profile_extraction_regex')
if profile_regex:
for i, e in enumerate(download_events):
if not download_profiles[i]: # If profile wasn't set in the event
path = Path(e.get('path', ''))
match = re.search(profile_regex, path.name)
if match and match.groups():
download_profiles[i] = match.group(1)
# Replace any remaining Nones with 'unknown_profile'
download_profiles = [p or 'unknown_profile' for p in download_profiles]
num_profiles_used = len(set(p for p in download_profiles if p != 'unknown_profile'))
logger.info("\n--- Download Summary (This Run) ---")
if policy:
workers = policy.get('execution_control', {}).get('workers', 'N/A')
logger.info(f"Workers configured: {workers}")
logger.info(f"Profiles utilized for downloads: {num_profiles_used}")
logger.info(f"Total download attempts: {total_attempts}")
logger.info(f" - Successful: {successes}")
logger.info(f" - Failed: {failures}")
if cancelled > 0:
logger.info(f" - Cancelled: {cancelled}")
completed_downloads = successes + failures
if completed_downloads > 0:
success_rate = (successes / completed_downloads) * 100
logger.info(f"Success rate (of completed): {success_rate:.2f}%")
elif total_attempts > 0:
logger.info("Success rate: N/A (no tasks completed)")
duration_hours = duration / 3600.0
if duration > 1 and total_attempts > 0:
dpm = (total_attempts / duration) * 60
logger.info(f"Actual overall download rate: {dpm:.2f} attempts/minute")
total_bytes = sum(e.get('downloaded_bytes', 0) for e in download_events if e['success'])
if total_bytes > 0:
logger.info(f"Total data downloaded: {sp_utils.format_size(total_bytes)}")
if failures > 0:
error_counts = collections.Counter(
e.get('error_type', 'Unknown')
for e in download_events if not e['success'] and e.get('error_type') != 'Cancelled'
)
logger.info("Failure breakdown:")
for error_type, count in sorted(error_counts.items()):
logger.info(f" - {error_type}: {count}")
# Add profile to each download event for easier counting
for i, e in enumerate(download_events):
e['profile'] = download_profiles[i]
profile_counts = collections.Counter(e.get('profile') for e in download_events if e.get('profile'))
if profile_counts:
logger.info("Downloads per profile:")
for profile, count in sorted(profile_counts.items()):
rate_per_hour = (count / duration_hours) if duration_hours > 0 else 0
logger.info(f" - {profile}: {count} attempts (avg this run: {rate_per_hour:.2f}/hour)")
proxy_counts = collections.Counter(e.get('proxy_url') for e in download_events if e.get('proxy_url'))
if proxy_counts:
logger.info("Downloads per proxy:")
for proxy, count in sorted(proxy_counts.items()):
rate_per_hour = (count / duration_hours) if duration_hours > 0 else 0
logger.info(f" - {proxy}: {count} attempts (avg this run: {rate_per_hour:.2f}/hour)")
logger.info("--------------------")
# --- Queue-specific methods ---
def initialize_queue_provider(self, redis_host: str, redis_port: int,
redis_password: Optional[str] = None, redis_db: int = 0,
env_prefix: Optional[str] = None):
"""Initialize the queue provider if not already set."""
if not self.queue_provider:
self.queue_provider = RedisQueueProvider(
redis_host=redis_host,
redis_port=redis_port,
redis_password=redis_password,
redis_db=redis_db,
env_prefix=env_prefix
)
logger.info(f"Initialized Redis queue provider with prefix: '{env_prefix}'")
return self.queue_provider
def get_auth_task(self) -> Optional[Dict]:
"""Get an authentication task from the queue."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return None
task = self.queue_provider.get_task(self.queue_provider.AUTH_INBOX)
if task:
with self.lock:
self.state['queue_stats']['auth']['total_processed'] += 1
return task
def get_auth_tasks_batch(self, batch_size: int) -> List[Dict]:
"""Get a batch of authentication tasks from the queue."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return []
tasks = self.queue_provider.get_tasks_batch(self.queue_provider.AUTH_INBOX, batch_size)
if tasks:
with self.lock:
self.state['queue_stats']['auth']['total_processed'] += len(tasks)
return tasks
def report_auth_success(self, task_id: str, result: Dict) -> bool:
"""Report a successful authentication task."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return False
success = self.queue_provider.report_success(self.queue_provider.AUTH_RESULT, task_id, result)
if success:
with self.lock:
self.state['queue_stats']['auth']['successful'] += 1
return success
def report_auth_failure(self, task_id: str, error: Dict) -> bool:
"""Report an authentication task failure."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return False
success = self.queue_provider.report_failure(self.queue_provider.AUTH_FAIL, task_id, error)
if success:
with self.lock:
self.state['queue_stats']['auth']['failed'] += 1
return success
def report_auth_skipped(self, task_id: str, reason: Dict) -> bool:
"""Report an authentication task that was skipped."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return False
success = self.queue_provider.report_skipped(self.queue_provider.AUTH_SKIPPED, task_id, reason)
if success:
with self.lock:
self.state['queue_stats']['auth']['skipped'] += 1
return success
def mark_auth_in_progress(self, task_id: str, worker_id: str) -> bool:
"""Mark an authentication task as in progress."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return False
return self.queue_provider.mark_in_progress(self.queue_provider.AUTH_PROGRESS, task_id, worker_id)
def remove_auth_in_progress(self, task_id: str) -> bool:
"""Remove an authentication task from the in-progress tracking."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return False
return self.queue_provider.remove_in_progress(self.queue_provider.AUTH_PROGRESS, task_id)
def add_auth_task(self, task: Dict) -> bool:
"""Add an authentication task to the queue."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return False
return self.queue_provider.add_task(self.queue_provider.AUTH_INBOX, task)
def add_auth_tasks_batch(self, tasks: List[Dict]) -> int:
"""Add a batch of authentication tasks to the queue."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return 0
return self.queue_provider.add_tasks_batch(self.queue_provider.AUTH_INBOX, tasks)
def add_download_task(self, task: Dict) -> bool:
"""Add a download task to the queue."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return False
return self.queue_provider.add_task(self.queue_provider.DL_TASKS, task)
def add_download_tasks_batch(self, tasks: List[Dict]) -> int:
"""Add a batch of download tasks to the queue."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return 0
return self.queue_provider.add_tasks_batch(self.queue_provider.DL_TASKS, tasks)
def get_download_task(self) -> Optional[Dict]:
"""Get a download task from the queue."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return None
task = self.queue_provider.get_task(self.queue_provider.DL_TASKS)
if task:
with self.lock:
self.state['queue_stats']['download']['total_processed'] += 1
return task
def get_download_tasks_batch(self, batch_size: int) -> List[Dict]:
"""Get a batch of download tasks from the queue."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return []
tasks = self.queue_provider.get_tasks_batch(self.queue_provider.DL_TASKS, batch_size)
if tasks:
with self.lock:
self.state['queue_stats']['download']['total_processed'] += len(tasks)
return tasks
def report_download_success(self, task_id: str, result: Dict) -> bool:
"""Report a successful download task."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return False
success = self.queue_provider.report_success(self.queue_provider.DL_RESULT, task_id, result)
if success:
with self.lock:
self.state['queue_stats']['download']['successful'] += 1
return success
def report_download_failure(self, task_id: str, error: Dict) -> bool:
"""Report a download task failure."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return False
success = self.queue_provider.report_failure(self.queue_provider.DL_FAIL, task_id, error)
if success:
with self.lock:
self.state['queue_stats']['download']['failed'] += 1
return success
def report_download_skipped(self, task_id: str, reason: Dict) -> bool:
"""Report a download task that was skipped."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return False
success = self.queue_provider.report_skipped(self.queue_provider.DL_SKIPPED, task_id, reason)
if success:
with self.lock:
self.state['queue_stats']['download']['skipped'] += 1
return success
def mark_download_in_progress(self, task_id: str, worker_id: str) -> bool:
"""Mark a download task as in progress."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return False
return self.queue_provider.mark_in_progress(self.queue_provider.DL_PROGRESS, task_id, worker_id)
def remove_download_in_progress(self, task_id: str) -> bool:
"""Remove a download task from the in-progress tracking."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return False
return self.queue_provider.remove_in_progress(self.queue_provider.DL_PROGRESS, task_id)
def requeue_failed_auth_tasks(self, batch_size: int = 100) -> int:
"""Requeue failed authentication tasks."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return 0
return self.queue_provider.requeue_failed_tasks(
self.queue_provider.AUTH_FAIL,
self.queue_provider.AUTH_INBOX,
batch_size
)
def requeue_failed_download_tasks(self, batch_size: int = 100) -> int:
"""Requeue failed download tasks."""
if not self.queue_provider:
logger.error("Queue provider not initialized")
return 0
return self.queue_provider.requeue_failed_tasks(
self.queue_provider.DL_FAIL,
self.queue_provider.DL_TASKS,
batch_size
)