From fc2d740b65fc475cc7868e35336037bfa86e224a Mon Sep 17 00:00:00 2001 From: aperez Date: Fri, 18 Jul 2025 17:19:07 +0300 Subject: [PATCH] Updated utils for reuse redis connects --- dags/utils/__init__.py | 10 + dags/utils/redis_utils.py | 32 ++ dags/ytdlp_proc_sequential_processor.py | 707 ------------------------ 3 files changed, 42 insertions(+), 707 deletions(-) create mode 100644 dags/utils/__init__.py create mode 100644 dags/utils/redis_utils.py delete mode 100644 dags/ytdlp_proc_sequential_processor.py diff --git a/dags/utils/__init__.py b/dags/utils/__init__.py new file mode 100644 index 0000000..3dc96c9 --- /dev/null +++ b/dags/utils/__init__.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# +# Copyright © 2024 rl +# +# Distributed under terms of the MIT license. + +""" +Airflow DAG Utilities +""" diff --git a/dags/utils/redis_utils.py b/dags/utils/redis_utils.py new file mode 100644 index 0000000..e4d6f39 --- /dev/null +++ b/dags/utils/redis_utils.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# +# Copyright © 2024 rl +# +# Distributed under terms of the MIT license. + +""" +Redis utility functions for Airflow DAGs. +""" + +from airflow.exceptions import AirflowException +from airflow.providers.redis.hooks.redis import RedisHook +import logging +import redis + +logger = logging.getLogger(__name__) + +def _get_redis_client(redis_conn_id): + """Gets a Redis client connection using RedisHook.""" + try: + hook = RedisHook(redis_conn_id=redis_conn_id) + client = hook.get_conn() + client.ping() + logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.") + return client + except redis.exceptions.AuthenticationError: + logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.") + raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.") + except Exception as e: + logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}") + raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}") diff --git a/dags/ytdlp_proc_sequential_processor.py b/dags/ytdlp_proc_sequential_processor.py deleted file mode 100644 index 08c7c6d..0000000 --- a/dags/ytdlp_proc_sequential_processor.py +++ /dev/null @@ -1,707 +0,0 @@ -# -*- coding: utf-8 -*- -# vim:fenc=utf-8 -# -# Copyright © 2024 rl -# -# Distributed under terms of the MIT license. - -""" -DAG for processing YouTube URLs sequentially from a Redis queue using YTDLP Ops Thrift service. -""" - -from airflow import DAG -from airflow.exceptions import AirflowException, AirflowSkipException, AirflowFailException -from airflow.hooks.base import BaseHook -from airflow.models import BaseOperator, Variable -from airflow.models.param import Param -from airflow.operators.bash import BashOperator # Import BashOperator -from airflow.operators.python import PythonOperator -from airflow.operators.trigger_dagrun import TriggerDagRunOperator -from airflow.providers.redis.hooks.redis import RedisHook -from airflow.utils.dates import days_ago -from airflow.utils.decorators import apply_defaults -from datetime import datetime, timedelta -from pangramia.yt.common.ttypes import TokenUpdateMode -from pangramia.yt.exceptions.ttypes import PBServiceException -from pangramia.yt.tokens_ops import YTTokenOpService -from thrift.protocol import TBinaryProtocol -from thrift.transport import TSocket, TTransport -from thrift.transport.TTransport import TTransportException -import json -import logging -import os -import redis # Import redis exceptions if needed -import socket -import time -import traceback # For logging stack traces in failure handler - -# Configure logging -logger = logging.getLogger(__name__) - -# Default settings -DEFAULT_QUEUE_NAME = 'video_queue' # Base name for queues -DEFAULT_REDIS_CONN_ID = 'redis_default' -DEFAULT_TIMEOUT = 30 # Default Thrift timeout in seconds -MAX_RETRIES_REDIS_LOOKUP = 3 # Retries for fetching service details from Redis -RETRY_DELAY_REDIS_LOOKUP = 10 # Delay (seconds) for Redis lookup retries - -# --- Helper Functions --- - -from utils.redis_utils import _get_redis_client - -def _extract_video_id(url): - """Extracts YouTube video ID from URL.""" - if not url or not isinstance(url, str): - logger.debug("URL is empty or not a string, cannot extract video ID.") - return None - try: - video_id = None - if 'youtube.com/watch?v=' in url: - video_id = url.split('v=')[1].split('&')[0] - elif 'youtu.be/' in url: - video_id = url.split('youtu.be/')[1].split('?')[0] - - if video_id and len(video_id) >= 11: - video_id = video_id[:11] # Standard ID length - logger.debug(f"Extracted video ID '{video_id}' from URL: {url}") - return video_id - else: - logger.debug(f"Could not extract a standard video ID pattern from URL: {url}") - return None - except Exception as e: - logger.error(f"Failed to extract video ID from URL '{url}'. Error: {e}") - return None - -# --- Queue Management Callables --- - -def pop_url_from_queue(**context): - """Pops a URL from the inbox queue and pushes to XCom.""" - params = context['params'] - queue_name = params['queue_name'] - inbox_queue = f"{queue_name}_inbox" - redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) - logger.info(f"Attempting to pop URL from inbox queue: {inbox_queue}") - - try: - client = _get_redis_client(redis_conn_id) - # LPOP is non-blocking, returns None if empty - url_bytes = client.lpop(inbox_queue) # Returns bytes if decode_responses=False on hook/client - - if url_bytes: - url = url_bytes.decode('utf-8') if isinstance(url_bytes, bytes) else url_bytes - logger.info(f"Popped URL: {url}") - context['task_instance'].xcom_push(key='current_url', value=url) - return url # Return URL for logging/potential use - else: - logger.info(f"Inbox queue '{inbox_queue}' is empty. Skipping downstream tasks.") - context['task_instance'].xcom_push(key='current_url', value=None) - # Raise AirflowSkipException to signal downstream tasks to skip - raise AirflowSkipException(f"Inbox queue '{inbox_queue}' is empty.") - except AirflowSkipException: - raise # Re-raise skip exception - except Exception as e: - logger.error(f"Error popping URL from Redis queue '{inbox_queue}': {e}", exc_info=True) - raise AirflowException(f"Failed to pop URL from Redis: {e}") - - -def move_url_to_progress(**context): - """Moves the current URL from XCom to the progress hash.""" - ti = context['task_instance'] - url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url') - - # This task should be skipped if pop_url_from_queue raised AirflowSkipException - # Adding check for robustness - if not url: - logger.info("No URL found in XCom (or upstream skipped). Skipping move to progress.") - raise AirflowSkipException("No URL to process.") - - params = context['params'] - queue_name = params['queue_name'] - progress_queue = f"{queue_name}_progress" - redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) - logger.info(f"Moving URL '{url}' to progress hash: {progress_queue}") - - progress_data = { - 'status': 'processing', - 'start_time': time.time(), - 'dag_run_id': context['dag_run'].run_id, - 'task_instance_key_str': context['task_instance_key_str'] - } - - try: - client = _get_redis_client(redis_conn_id) - client.hset(progress_queue, url, json.dumps(progress_data)) - logger.info(f"Moved URL '{url}' to progress hash '{progress_queue}'.") - except Exception as e: - logger.error(f"Error moving URL to Redis progress hash '{progress_queue}': {e}", exc_info=True) - # If this fails, the URL is popped but not tracked as processing. Fail the task. - raise AirflowException(f"Failed to move URL to progress hash: {e}") - - -def handle_success(**context): - """Moves URL from progress to result hash on success.""" - ti = context['task_instance'] - url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url') - if not url: - logger.warning("handle_success called but no URL found from pop_url_from_queue XCom. This shouldn't happen on success path.") - return # Or raise error - - params = context['params'] - queue_name = params['queue_name'] - progress_queue = f"{queue_name}_progress" - result_queue = f"{queue_name}_result" - redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) - - # Pull results from get_token task - info_json_path = ti.xcom_pull(task_ids='get_token', key='info_json_path') - socks_proxy = ti.xcom_pull(task_ids='get_token', key='socks_proxy') - ytdlp_command = ti.xcom_pull(task_ids='get_token', key='ytdlp_command') # Original command - downloaded_file_path = ti.xcom_pull(task_ids='download_video') # Pull from download_video task - - logger.info(f"Handling success for URL: {url}") - logger.info(f" Info JSON Path: {info_json_path}") - logger.info(f" SOCKS Proxy: {socks_proxy}") - logger.info(f" YTDLP Command: {ytdlp_command[:100] if ytdlp_command else 'None'}...") # Log truncated command - logger.info(f" Downloaded File Path: {downloaded_file_path}") - - result_data = { - 'status': 'success', - 'end_time': time.time(), - 'info_json_path': info_json_path, - 'socks_proxy': socks_proxy, - 'ytdlp_command': ytdlp_command, - 'downloaded_file_path': downloaded_file_path, - 'url': url, - 'dag_run_id': context['dag_run'].run_id, - 'task_instance_key_str': context['task_instance_key_str'] # Record which task instance succeeded - } - - try: - client = _get_redis_client(redis_conn_id) - # Remove from progress hash - removed_count = client.hdel(progress_queue, url) - if removed_count > 0: - logger.info(f"Removed URL '{url}' from progress hash '{progress_queue}'.") - else: - logger.warning(f"URL '{url}' not found in progress hash '{progress_queue}' during success handling.") - - # Add to result hash - client.hset(result_queue, url, json.dumps(result_data)) - logger.info(f"Stored success result for URL '{url}' in result hash '{result_queue}'.") - - except Exception as e: - logger.error(f"Error handling success in Redis for URL '{url}': {e}", exc_info=True) - # Even if Redis fails, the task succeeded. Log error but don't fail the task. - # Consider adding retry logic for Redis operations here or marking state differently. - - -def handle_failure(**context): - """ - Handles failed processing. Depending on the `requeue_on_failure` parameter, - it either moves the URL to the fail hash or re-queues it in the inbox. - If `stop_on_failure` is True, this task will fail, stopping the DAG loop. - """ - ti = context['task_instance'] - url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url') - if not url: - logger.error("handle_failure called but no URL found from pop_url_from_queue XCom.") - return - - params = context['params'] - queue_name = params['queue_name'] - progress_queue = f"{queue_name}_progress" - fail_queue = f"{queue_name}_fail" - inbox_queue = f"{queue_name}_inbox" - redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) - requeue_on_failure = params.get('requeue_on_failure', False) - stop_on_failure = params.get('stop_on_failure', True) # Default to True - - exception = context.get('exception') - error_message = str(exception) if exception else "Unknown error" - tb_str = traceback.format_exc() if exception else "No traceback available." - - logger.info(f"Handling failure for URL: {url}") - logger.error(f" Failure Reason: {error_message}") - logger.debug(f" Traceback:\n{tb_str}") - - try: - client = _get_redis_client(redis_conn_id) - # Always remove from progress hash first - removed_count = client.hdel(progress_queue, url) - if removed_count > 0: - logger.info(f"Removed URL '{url}' from progress hash '{progress_queue}'.") - else: - logger.warning(f"URL '{url}' not found in progress hash '{progress_queue}' during failure handling.") - - if requeue_on_failure: - # Re-queue the URL for another attempt - client.rpush(inbox_queue, url) - logger.info(f"Re-queued failed URL '{url}' to inbox '{inbox_queue}' for retry.") - else: - # Move to the permanent fail hash - fail_data = { - 'status': 'failed', - 'end_time': time.time(), - 'error': error_message, - 'traceback': tb_str, - 'url': url, - 'dag_run_id': context['dag_run'].run_id, - 'task_instance_key_str': context['task_instance_key_str'] - } - client.hset(fail_queue, url, json.dumps(fail_data)) - logger.info(f"Stored failure details for URL '{url}' in fail hash '{fail_queue}'.") - - except Exception as e: - logger.error(f"Error during failure handling in Redis for URL '{url}': {e}", exc_info=True) - # This is a critical error in the failure handling logic itself. - raise AirflowException(f"Could not handle failure in Redis: {e}") - - # After handling Redis, decide whether to fail the task to stop the loop - if stop_on_failure: - logger.error("stop_on_failure is True. Failing this task to stop the DAG loop.") - # Re-raise the original exception to fail the task instance. - # This is better than AirflowFailException because it preserves the original error. - if exception: - raise exception - else: - # If for some reason there's no exception, fail explicitly. - raise AirflowFailException("Failing task as per stop_on_failure=True, but original exception was not found.") - - -# --- YtdlpOpsOperator --- - -class YtdlpOpsOperator(BaseOperator): - """ - Custom Airflow operator to interact with YTDLP Thrift service. Handles direct connections - and Redis-based discovery, retrieves tokens, saves info.json, and manages errors. - Modified to pull URL from XCom for sequential processing. - """ - # Removed 'url' from template_fields as it's pulled from XCom - template_fields = ('service_ip', 'service_port', 'account_id', 'timeout', 'info_json_dir', 'redis_conn_id') - - @apply_defaults - def __init__(self, - # url parameter removed - will be pulled from XCom - redis_conn_id=DEFAULT_REDIS_CONN_ID, - max_retries_lookup=MAX_RETRIES_REDIS_LOOKUP, - retry_delay_lookup=RETRY_DELAY_REDIS_LOOKUP, - service_ip=None, - service_port=None, - redis_enabled=False, # Default to direct connection now - account_id=None, - # save_info_json removed, always True - info_json_dir=None, - # get_socks_proxy removed, always True - # store_socks_proxy removed, always True - # get_socks_proxy=True, # Removed - # store_socks_proxy=True, # Store proxy in XCom by default # Removed - timeout=DEFAULT_TIMEOUT, - *args, **kwargs): - super().__init__(*args, **kwargs) - - logger.info(f"Initializing YtdlpOpsOperator (Processor Version) with parameters: " - f"redis_conn_id={redis_conn_id}, max_retries_lookup={max_retries_lookup}, retry_delay_lookup={retry_delay_lookup}, " - f"service_ip={service_ip}, service_port={service_port}, redis_enabled={redis_enabled}, " - f"account_id={account_id}, info_json_dir={info_json_dir}, timeout={timeout}") - # save_info_json, get_socks_proxy, store_socks_proxy removed from log - - # Validate parameters based on connection mode - if redis_enabled: - # If using Redis, account_id is essential for lookup - if not account_id: - raise ValueError("account_id is required when redis_enabled=True for service lookup.") - else: - # If direct connection, IP and Port are essential - if not service_ip or not service_port: - raise ValueError("Both service_ip and service_port must be specified when redis_enabled=False.") - # Account ID is still needed for the API call itself, but rely on DAG param or operator config - if not account_id: - logger.warning("No account_id provided for direct connection mode. Ensure it's set in DAG params or operator config.") - # We won't assign 'default' here, let the value passed during instantiation be used. - - # self.url is no longer needed here - self.redis_conn_id = redis_conn_id - self.max_retries_lookup = max_retries_lookup - self.retry_delay_lookup = int(retry_delay_lookup.total_seconds() if isinstance(retry_delay_lookup, timedelta) else retry_delay_lookup) - self.service_ip = service_ip - self.service_port = service_port - self.redis_enabled = redis_enabled - self.account_id = account_id - # self.save_info_json removed - self.info_json_dir = info_json_dir # Still needed - # self.get_socks_proxy removed - # self.store_socks_proxy removed - self.timeout = timeout - - def execute(self, context): - logger.info("Executing YtdlpOpsOperator (Processor Version)") - transport = None - ti = context['task_instance'] # Get task instance for XCom access - - try: - # --- Get URL from XCom --- - url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url') - if not url: - # This should ideally be caught by upstream skip, but handle defensively - logger.info("No URL found in XCom from pop_url_from_queue. Skipping execution.") - raise AirflowSkipException("Upstream task did not provide a URL.") - logger.info(f"Processing URL from XCom: {url}") - # --- End Get URL --- - - logger.info("Getting task parameters and rendering templates") - params = context['params'] # DAG run params - - # Render template fields using context - # Use render_template_as_native for better type handling if needed, else render_template - redis_conn_id = self.render_template(self.redis_conn_id, context) - service_ip = self.render_template(self.service_ip, context) - service_port_rendered = self.render_template(self.service_port, context) - account_id = self.render_template(self.account_id, context) - timeout_rendered = self.render_template(self.timeout, context) - info_json_dir = self.render_template(self.info_json_dir, context) # Rendered here for _save_info_json - - # Determine effective settings (DAG params override operator defaults) - redis_enabled = params.get('redis_enabled', self.redis_enabled) - account_id = params.get('account_id', account_id) # Use DAG param if provided - redis_conn_id = params.get('redis_conn_id', redis_conn_id) # Use DAG param if provided - - logger.info(f"Effective settings: redis_enabled={redis_enabled}, account_id='{account_id}', redis_conn_id='{redis_conn_id}'") - - host = None - port = None - - if redis_enabled: - # Get Redis connection using the helper for consistency - redis_client = _get_redis_client(redis_conn_id) - logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}' for service discovery.") - - # Get service details from Redis with retries - service_key = f"ytdlp:{account_id}" - legacy_key = account_id # For backward compatibility - - for attempt in range(self.max_retries_lookup): - try: - logger.info(f"Attempt {attempt + 1}/{self.max_retries_lookup}: Fetching service details from Redis for keys: '{service_key}', '{legacy_key}'") - service_details = redis_client.hgetall(service_key) - if not service_details: - logger.warning(f"Key '{service_key}' not found, trying legacy key '{legacy_key}'") - service_details = redis_client.hgetall(legacy_key) - - if not service_details: - raise ValueError(f"No service details found in Redis for keys: {service_key} or {legacy_key}") - - # Find IP and port (case-insensitive keys) - ip_key = next((k for k in service_details if k.lower() == 'ip'), None) - port_key = next((k for k in service_details if k.lower() == 'port'), None) - - if not ip_key: raise ValueError(f"'ip' key not found in Redis hash for {service_key}/{legacy_key}") - if not port_key: raise ValueError(f"'port' key not found in Redis hash for {service_key}/{legacy_key}") - - host = service_details[ip_key] # Assumes decode_responses=True in hook - port_str = service_details[port_key] - - try: - port = int(port_str) - except (ValueError, TypeError): - raise ValueError(f"Invalid port value '{port_str}' found in Redis for {service_key}/{legacy_key}") - - logger.info(f"Extracted from Redis - Service IP: {host}, Service Port: {port}") - break # Success - - except Exception as e: - logger.warning(f"Attempt {attempt + 1} failed to get Redis details: {str(e)}") - if attempt == self.max_retries_lookup - 1: - logger.error("Max retries reached for fetching Redis details.") - raise AirflowException(f"Failed to get service details from Redis after {self.max_retries_lookup} attempts: {e}") - logger.info(f"Retrying in {self.retry_delay_lookup} seconds...") - time.sleep(self.retry_delay_lookup) - else: - # Direct connection: Use rendered/param values - host = params.get('service_ip', service_ip) # Use DAG param if provided - port_str = params.get('service_port', service_port_rendered) # Use DAG param if provided - - logger.info(f"Using direct connection settings: service_ip={host}, service_port={port_str}") - - if not host or not port_str: - raise ValueError("Direct connection requires service_ip and service_port (check Operator config and DAG params)") - try: - port = int(port_str) - except (ValueError, TypeError): - raise ValueError(f"Invalid service_port value: {port_str}") - - logger.info(f"Connecting directly to Thrift service at {host}:{port} (Redis bypassed)") - - # Validate and use timeout - try: - timeout = int(timeout_rendered) - if timeout <= 0: raise ValueError("Timeout must be positive") - logger.info(f"Using timeout: {timeout} seconds") - except (ValueError, TypeError): - logger.warning(f"Invalid timeout value: '{timeout_rendered}'. Using default: {DEFAULT_TIMEOUT}") - timeout = DEFAULT_TIMEOUT - - # Create Thrift connection objects - # socket_conn = TSocket.TSocket(host, port) # Original - socket_conn = TSocket.TSocket(host, port, socket_family=socket.AF_INET) # Explicitly use AF_INET (IPv4) - socket_conn.setTimeout(timeout * 1000) # Thrift timeout is in milliseconds - transport = TTransport.TFramedTransport(socket_conn) # Use TFramedTransport if server expects it - # transport = TTransport.TBufferedTransport(socket_conn) # Use TBufferedTransport if server expects it - protocol = TBinaryProtocol.TBinaryProtocol(transport) - client = YTTokenOpService.Client(protocol) - - logger.info(f"Attempting to connect to Thrift server at {host}:{port}...") - try: - transport.open() - logger.info("Successfully connected to Thrift server.") - - # Test connection with ping - try: - client.ping() - logger.info("Server ping successful.") - except Exception as e: - logger.error(f"Server ping failed: {e}") - raise AirflowException(f"Server connection test (ping) failed: {e}") - - # Get token from service using the URL from XCom - try: - logger.info(f"Requesting token for accountId='{account_id}', url='{url}'") - token_data = client.getOrRefreshToken( - accountId=account_id, - updateType=TokenUpdateMode.AUTO, - url=url # Use the url variable from XCom - ) - logger.info("Successfully retrieved token data from service.") - except PBServiceException as e: - # Handle specific service exceptions - error_code = getattr(e, 'errorCode', 'N/A') - error_message = getattr(e, 'message', 'N/A') - error_context = getattr(e, 'context', {}) - logger.error(f"PBServiceException occurred: Code={error_code}, Message={error_message}") - if error_context: - logger.error(f" Context: {error_context}") # Log context separately - # Construct a concise error message for AirflowException - error_msg = f"YTDLP service error (Code: {error_code}): {error_message}" - # Add specific error code handling if needed... - logger.error(f"Failing task instance due to PBServiceException: {error_msg}") # Add explicit log before raising - raise AirflowException(error_msg) # Fail task on service error - except TTransportException as e: - logger.error(f"Thrift transport error during getOrRefreshToken: {e}") - logger.error(f"Failing task instance due to TTransportException: {e}") # Add explicit log before raising - raise AirflowException(f"Transport error during API call: {e}") - except Exception as e: - logger.error(f"Unexpected error during getOrRefreshToken: {e}") - logger.error(f"Failing task instance due to unexpected error during API call: {e}") # Add explicit log before raising - raise AirflowException(f"Unexpected error during API call: {e}") - - except TTransportException as e: - # Handle connection errors - logger.error(f"Thrift transport error during connection: {str(e)}") - logger.error(f"Failing task instance due to TTransportException during connection: {e}") # Add explicit log before raising - raise AirflowException(f"Transport error connecting to YTDLP service: {str(e)}") - # Removed the overly broad except Exception block here, as inner blocks raise AirflowException - - # --- Process Token Data --- - logger.debug(f"Token data received. Attributes: {dir(token_data)}") - - info_json_path = None # Initialize - - # save_info_json is now always True - logger.info("Proceeding to save info.json (save_info_json=True).") - info_json = self._get_info_json(token_data) - if info_json and self._is_valid_json(info_json): - try: - # Pass rendered info_json_dir to helper - info_json_path = self._save_info_json(context, info_json, url, account_id, info_json_dir) - if info_json_path: - ti.xcom_push(key='info_json_path', value=info_json_path) - logger.info(f"Successfully saved info.json and pushed path to XCom: {info_json_path}") - else: - ti.xcom_push(key='info_json_path', value=None) - logger.warning("info.json saving failed (check logs from _save_info_json).") - except Exception as e: - logger.error(f"Unexpected error during info.json saving process: {e}", exc_info=True) - ti.xcom_push(key='info_json_path', value=None) - elif info_json: - logger.warning("Retrieved infoJson is not valid JSON. Skipping save.") - ti.xcom_push(key='info_json_path', value=None) - else: - logger.info("No infoJson found in token data. Skipping save.") - ti.xcom_push(key='info_json_path', value=None) - - - # Extract and potentially store SOCKS proxy - # get_socks_proxy and store_socks_proxy are now always True - socks_proxy = None - logger.info("Attempting to extract SOCKS proxy (get_socks_proxy=True).") - proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None) - if proxy_attr: - socks_proxy = getattr(token_data, proxy_attr) - if socks_proxy: - logger.info(f"Extracted SOCKS proxy ({proxy_attr}): {socks_proxy}") - # Always store if found (store_socks_proxy=True) - ti.xcom_push(key='socks_proxy', value=socks_proxy) - logger.info("Pushed 'socks_proxy' to XCom.") - else: - logger.info(f"Found proxy attribute '{proxy_attr}' but value is empty.") - # Store None if attribute found but empty - ti.xcom_push(key='socks_proxy', value=None) - logger.info("Pushed None to XCom for 'socks_proxy' as extracted value was empty.") - else: - logger.info("No SOCKS proxy attribute found in token data.") - # Store None if attribute not found - ti.xcom_push(key='socks_proxy', value=None) - logger.info("Pushed None to XCom for 'socks_proxy' as attribute was not found.") - - -# --- Removed old logic block --- -# # Extract and potentially store SOCKS proxy -# socks_proxy = None -# get_socks_proxy = params.get('get_socks_proxy', self.get_socks_proxy) -# store_socks_proxy = params.get('store_socks_proxy', self.store_socks_proxy) -# -# if get_socks_proxy: -# proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None) -# if proxy_attr: -# socks_proxy = getattr(token_data, proxy_attr) -# if socks_proxy: -# logger.info(f"Extracted SOCKS proxy ({proxy_attr}): {socks_proxy}") -# if store_socks_proxy: -# ti.xcom_push(key='socks_proxy', value=socks_proxy) -# logger.info("Pushed 'socks_proxy' to XCom.") -# else: -# logger.info(f"Found proxy attribute '{proxy_attr}' but value is empty.") -# if store_socks_proxy: ti.xcom_push(key='socks_proxy', value=None) -# else: -# logger.info("get_socks_proxy is True, but no SOCKS proxy attribute found.") -# if store_socks_proxy: ti.xcom_push(key='socks_proxy', value=None) -# else: -# logger.info("get_socks_proxy is False. Skipping proxy extraction.") -# if store_socks_proxy: ti.xcom_push(key='socks_proxy', value=None) -# --- End Removed old logic block --- - - - # Get the original command from the server, or construct a fallback - ytdlp_cmd = getattr(token_data, 'ytdlpCommand', None) - if ytdlp_cmd: - logger.info(f"Original command received from server: {ytdlp_cmd[:100]}...") # Log truncated - else: - logger.warning("No 'ytdlpCommand' attribute found in token data. Constructing a fallback for logging.") - # Construct a representative command for logging purposes - if socks_proxy: - ytdlp_cmd = f"yt-dlp --dump-json --proxy \"{socks_proxy}\" \"{url}\"" - else: - ytdlp_cmd = f"yt-dlp --dump-json \"{url}\"" - logger.info(f"Constructed fallback command: {ytdlp_cmd}") - - # Push the command to XCom - ti.xcom_push(key='ytdlp_command', value=ytdlp_cmd) - logger.info("Pushed command to XCom key 'ytdlp_command'.") - - # No explicit return needed, success is implicit if no exception raised - - except (AirflowSkipException, AirflowFailException) as e: - logger.info(f"Task skipped or failed explicitly: {e}") - raise # Re-raise to let Airflow handle state - except AirflowException as e: # Catch AirflowExceptions raised explicitly - logger.error(f"Operation failed due to AirflowException: {e}", exc_info=True) - raise # Re-raise AirflowExceptions to ensure task failure - except (TTransportException, PBServiceException) as e: # Catch specific Thrift/Service errors not already handled inside inner try - logger.error(f"Unhandled YTDLP Service/Transport error in outer block: {e}", exc_info=True) - logger.error(f"Failing task instance due to unhandled outer Service/Transport error: {e}") # Add explicit log before raising - raise AirflowException(f"Unhandled YTDLP service error: {e}") # Wrap in AirflowException to fail task - except Exception as e: # General catch-all for truly unexpected errors - logger.error(f"Caught unexpected error in YtdlpOpsOperator outer block: {e}", exc_info=True) - logger.error(f"Failing task instance due to unexpected outer error: {e}") # Add explicit log before raising - raise AirflowException(f"Unexpected error caused task failure: {e}") # Wrap to fail task - finally: - if transport and transport.isOpen(): - logger.info("Closing Thrift transport.") - transport.close() - - # --- Helper Methods --- - - def _get_info_json(self, token_data): - """Safely extracts infoJson from token data.""" - return getattr(token_data, 'infoJson', None) - - def _is_valid_json(self, json_str): - """Checks if a string is valid JSON.""" - if not json_str or not isinstance(json_str, str): return False - try: - json.loads(json_str) - return True - except json.JSONDecodeError: - return False - - def _save_info_json(self, context, info_json, url, account_id, rendered_info_json_dir): - """Saves info_json to a file. Uses pre-rendered directory path.""" - try: - video_id = _extract_video_id(url) # Use standalone helper - - save_dir = rendered_info_json_dir or "." # Use rendered path - logger.info(f"Target directory for info.json: {save_dir}") - - # Ensure directory exists - try: - os.makedirs(save_dir, exist_ok=True) - logger.info(f"Ensured directory exists: {save_dir}") - except OSError as e: - logger.error(f"Could not create directory {save_dir}: {e}. Cannot save info.json.") - return None - - # Construct filename - timestamp = int(time.time()) - base_filename = f"info_{video_id or 'unknown'}_{account_id}_{timestamp}.json" - info_json_path = os.path.join(save_dir, base_filename) - latest_json_path = os.path.join(save_dir, "latest.json") # Path for the latest symlink/copy - - # Write to timestamped file - try: - logger.info(f"Writing info.json content (received from service) to {info_json_path}...") - with open(info_json_path, 'w', encoding='utf-8') as f: - f.write(info_json) - logger.info(f"Successfully saved info.json to timestamped file: {info_json_path}") - except IOError as e: - logger.error(f"Failed to write info.json to {info_json_path}: {e}") - return None - - # Write to latest.json (overwrite) - best effort - try: - with open(latest_json_path, 'w', encoding='utf-8') as f: - f.write(info_json) - logger.info(f"Updated latest.json file: {latest_json_path}") - except IOError as e: - logger.warning(f"Failed to update latest.json at {latest_json_path}: {e}") - - return info_json_path - - except Exception as e: - logger.error(f"Unexpected error in _save_info_json: {e}", exc_info=True) - return None - - -# ============================================================================= -# DAG Definition -# ============================================================================= - -default_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, # Default retries for tasks like queue management - 'retry_delay': timedelta(minutes=1), - 'start_date': days_ago(1), - # Add concurrency control if needed for sequential processing - # 'concurrency': 1, # Ensure only one task instance runs at a time per DAG run - # 'max_active_runs': 1, # Ensure only one DAG run is active -} - -# Define DAG -# -# --- DAG Block Deactivated on 2025-07-16 --- -# This DAG has been replaced by the Sensor/Worker pattern implemented in: -# - ytdlp_sensor_redis_queue.py (polls the queue) -# - ytdlp_worker_per_url.py (processes a single URL) -# This code is kept for reference but is not active. -#