# -*- coding: utf-8 -*- # vim:fenc=utf-8 # # Copyright © 2024 rl # # Distributed under terms of the MIT license. """ DAG for processing YouTube URLs sequentially from a Redis queue using YTDLP Ops Thrift service. """ from airflow import DAG from airflow.exceptions import AirflowException, AirflowSkipException, AirflowFailException from airflow.hooks.base import BaseHook from airflow.models import BaseOperator, Variable from airflow.models.param import Param from airflow.operators.bash import BashOperator # Import BashOperator from airflow.operators.python import PythonOperator from airflow.providers.redis.hooks.redis import RedisHook from airflow.utils.dates import days_ago from airflow.utils.decorators import apply_defaults from datetime import datetime, timedelta from pangramia.yt.common.ttypes import TokenUpdateMode from pangramia.yt.exceptions.ttypes import PBServiceException from pangramia.yt.tokens_ops import YTTokenOpService from thrift.protocol import TBinaryProtocol from thrift.transport import TSocket, TTransport from thrift.transport.TTransport import TTransportException import json import logging import os import redis # Import redis exceptions if needed import socket import time import traceback # For logging stack traces in failure handler # Configure logging logger = logging.getLogger(__name__) # Default settings DEFAULT_QUEUE_NAME = 'video_queue' # Base name for queues DEFAULT_REDIS_CONN_ID = 'redis_default' DEFAULT_TIMEOUT = 30 # Default Thrift timeout in seconds MAX_RETRIES_REDIS_LOOKUP = 3 # Retries for fetching service details from Redis RETRY_DELAY_REDIS_LOOKUP = 10 # Delay (seconds) for Redis lookup retries # --- Helper Functions --- def _get_redis_client(redis_conn_id): """Gets a Redis client connection using RedisHook.""" try: hook = RedisHook(redis_conn_id=redis_conn_id) client = hook.get_conn() client.ping() logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.") return client except redis.exceptions.AuthenticationError: logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.") raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.") except Exception as e: logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}") raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}") def _extract_video_id(url): """Extracts YouTube video ID from URL.""" if not url or not isinstance(url, str): logger.debug("URL is empty or not a string, cannot extract video ID.") return None try: video_id = None if 'youtube.com/watch?v=' in url: video_id = url.split('v=')[1].split('&')[0] elif 'youtu.be/' in url: video_id = url.split('youtu.be/')[1].split('?')[0] if video_id and len(video_id) >= 11: video_id = video_id[:11] # Standard ID length logger.debug(f"Extracted video ID '{video_id}' from URL: {url}") return video_id else: logger.debug(f"Could not extract a standard video ID pattern from URL: {url}") return None except Exception as e: logger.error(f"Failed to extract video ID from URL '{url}'. Error: {e}") return None # --- Queue Management Callables --- def pop_url_from_queue(**context): """Pops a URL from the inbox queue and pushes to XCom.""" params = context['params'] queue_name = params['queue_name'] inbox_queue = f"{queue_name}_inbox" redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) logger.info(f"Attempting to pop URL from inbox queue: {inbox_queue}") try: client = _get_redis_client(redis_conn_id) # LPOP is non-blocking, returns None if empty url_bytes = client.lpop(inbox_queue) # Returns bytes if decode_responses=False on hook/client if url_bytes: url = url_bytes.decode('utf-8') if isinstance(url_bytes, bytes) else url_bytes logger.info(f"Popped URL: {url}") context['task_instance'].xcom_push(key='current_url', value=url) return url # Return URL for logging/potential use else: logger.info(f"Inbox queue '{inbox_queue}' is empty. Skipping downstream tasks.") context['task_instance'].xcom_push(key='current_url', value=None) # Raise AirflowSkipException to signal downstream tasks to skip raise AirflowSkipException(f"Inbox queue '{inbox_queue}' is empty.") except AirflowSkipException: raise # Re-raise skip exception except Exception as e: logger.error(f"Error popping URL from Redis queue '{inbox_queue}': {e}", exc_info=True) raise AirflowException(f"Failed to pop URL from Redis: {e}") def move_url_to_progress(**context): """Moves the current URL from XCom to the progress hash.""" ti = context['task_instance'] url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url') # This task should be skipped if pop_url_from_queue raised AirflowSkipException # Adding check for robustness if not url: logger.info("No URL found in XCom (or upstream skipped). Skipping move to progress.") raise AirflowSkipException("No URL to process.") params = context['params'] queue_name = params['queue_name'] progress_queue = f"{queue_name}_progress" redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) logger.info(f"Moving URL '{url}' to progress hash: {progress_queue}") progress_data = { 'status': 'processing', 'start_time': time.time(), 'dag_run_id': context['dag_run'].run_id, 'task_instance_key_str': context['task_instance_key_str'] } try: client = _get_redis_client(redis_conn_id) client.hset(progress_queue, url, json.dumps(progress_data)) logger.info(f"Moved URL '{url}' to progress hash '{progress_queue}'.") except Exception as e: logger.error(f"Error moving URL to Redis progress hash '{progress_queue}': {e}", exc_info=True) # If this fails, the URL is popped but not tracked as processing. Fail the task. raise AirflowException(f"Failed to move URL to progress hash: {e}") def handle_success(**context): """Moves URL from progress to result hash on success.""" ti = context['task_instance'] url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url') if not url: logger.warning("handle_success called but no URL found from pop_url_from_queue XCom. This shouldn't happen on success path.") return # Or raise error params = context['params'] queue_name = params['queue_name'] progress_queue = f"{queue_name}_progress" result_queue = f"{queue_name}_result" redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) # Pull results from get_token task info_json_path = ti.xcom_pull(task_ids='get_token', key='info_json_path') socks_proxy = ti.xcom_pull(task_ids='get_token', key='socks_proxy') ytdlp_command = ti.xcom_pull(task_ids='get_token', key='ytdlp_command') # Original command logger.info(f"Handling success for URL: {url}") logger.info(f" Info JSON Path: {info_json_path}") logger.info(f" SOCKS Proxy: {socks_proxy}") logger.info(f" YTDLP Command: {ytdlp_command[:100] if ytdlp_command else 'None'}...") # Log truncated command result_data = { 'status': 'success', 'end_time': time.time(), 'info_json_path': info_json_path, 'socks_proxy': socks_proxy, 'ytdlp_command': ytdlp_command, 'url': url, 'dag_run_id': context['dag_run'].run_id, 'task_instance_key_str': context['task_instance_key_str'] # Record which task instance succeeded } try: client = _get_redis_client(redis_conn_id) # Remove from progress hash removed_count = client.hdel(progress_queue, url) if removed_count > 0: logger.info(f"Removed URL '{url}' from progress hash '{progress_queue}'.") else: logger.warning(f"URL '{url}' not found in progress hash '{progress_queue}' during success handling.") # Add to result hash client.hset(result_queue, url, json.dumps(result_data)) logger.info(f"Stored success result for URL '{url}' in result hash '{result_queue}'.") except Exception as e: logger.error(f"Error handling success in Redis for URL '{url}': {e}", exc_info=True) # Even if Redis fails, the task succeeded. Log error but don't fail the task. # Consider adding retry logic for Redis operations here or marking state differently. def handle_failure(**context): """Moves URL from progress to fail hash on failure.""" ti = context['task_instance'] url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url') if not url: logger.error("handle_failure called but no URL found from pop_url_from_queue XCom.") # Cannot move to fail queue if URL is unknown return params = context['params'] queue_name = params['queue_name'] progress_queue = f"{queue_name}_progress" fail_queue = f"{queue_name}_fail" redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) # Get failure reason from the exception context exception = context.get('exception') error_message = str(exception) if exception else "Unknown error" # Get traceback if available tb_str = traceback.format_exc() if exception else "No traceback available." logger.info(f"Handling failure for URL: {url}") logger.error(f" Failure Reason: {error_message}") # Log the error that triggered failure logger.debug(f" Traceback:\n{tb_str}") # Log traceback at debug level fail_data = { 'status': 'failed', 'end_time': time.time(), 'error': error_message, 'traceback': tb_str, # Store traceback 'url': url, 'dag_run_id': context['dag_run'].run_id, 'task_instance_key_str': context['task_instance_key_str'] # Record which task instance failed } try: client = _get_redis_client(redis_conn_id) # Remove from progress hash removed_count = client.hdel(progress_queue, url) if removed_count > 0: logger.info(f"Removed URL '{url}' from progress hash '{progress_queue}'.") else: logger.warning(f"URL '{url}' not found in progress hash '{progress_queue}' during failure handling.") # Add to fail hash client.hset(fail_queue, url, json.dumps(fail_data)) logger.info(f"Stored failure details for URL '{url}' in fail hash '{fail_queue}'.") except Exception as e: logger.error(f"Error handling failure in Redis for URL '{url}': {e}", exc_info=True) # Log error, but the task already failed. # --- YtdlpOpsOperator --- class YtdlpOpsOperator(BaseOperator): """ Custom Airflow operator to interact with YTDLP Thrift service. Handles direct connections and Redis-based discovery, retrieves tokens, saves info.json, and manages errors. Modified to pull URL from XCom for sequential processing. """ # Removed 'url' from template_fields as it's pulled from XCom template_fields = ('service_ip', 'service_port', 'account_id', 'timeout', 'info_json_dir', 'redis_conn_id') @apply_defaults def __init__(self, # url parameter removed - will be pulled from XCom redis_conn_id=DEFAULT_REDIS_CONN_ID, max_retries_lookup=MAX_RETRIES_REDIS_LOOKUP, retry_delay_lookup=RETRY_DELAY_REDIS_LOOKUP, service_ip=None, service_port=None, redis_enabled=False, # Default to direct connection now account_id=None, # save_info_json removed, always True info_json_dir=None, # get_socks_proxy removed, always True # store_socks_proxy removed, always True # get_socks_proxy=True, # Removed # store_socks_proxy=True, # Store proxy in XCom by default # Removed timeout=DEFAULT_TIMEOUT, *args, **kwargs): super().__init__(*args, **kwargs) logger.info(f"Initializing YtdlpOpsOperator (Processor Version) with parameters: " f"redis_conn_id={redis_conn_id}, max_retries_lookup={max_retries_lookup}, retry_delay_lookup={retry_delay_lookup}, " f"service_ip={service_ip}, service_port={service_port}, redis_enabled={redis_enabled}, " f"account_id={account_id}, info_json_dir={info_json_dir}, timeout={timeout}") # save_info_json, get_socks_proxy, store_socks_proxy removed from log # Validate parameters based on connection mode if redis_enabled: # If using Redis, account_id is essential for lookup if not account_id: raise ValueError("account_id is required when redis_enabled=True for service lookup.") else: # If direct connection, IP and Port are essential if not service_ip or not service_port: raise ValueError("Both service_ip and service_port must be specified when redis_enabled=False.") # Account ID is still needed for the API call itself, but rely on DAG param or operator config if not account_id: logger.warning("No account_id provided for direct connection mode. Ensure it's set in DAG params or operator config.") # We won't assign 'default' here, let the value passed during instantiation be used. # self.url is no longer needed here self.redis_conn_id = redis_conn_id self.max_retries_lookup = max_retries_lookup self.retry_delay_lookup = int(retry_delay_lookup.total_seconds() if isinstance(retry_delay_lookup, timedelta) else retry_delay_lookup) self.service_ip = service_ip self.service_port = service_port self.redis_enabled = redis_enabled self.account_id = account_id # self.save_info_json removed self.info_json_dir = info_json_dir # Still needed # self.get_socks_proxy removed # self.store_socks_proxy removed self.timeout = timeout def execute(self, context): logger.info("Executing YtdlpOpsOperator (Processor Version)") transport = None ti = context['task_instance'] # Get task instance for XCom access try: # --- Get URL from XCom --- url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url') if not url: # This should ideally be caught by upstream skip, but handle defensively logger.info("No URL found in XCom from pop_url_from_queue. Skipping execution.") raise AirflowSkipException("Upstream task did not provide a URL.") logger.info(f"Processing URL from XCom: {url}") # --- End Get URL --- logger.info("Getting task parameters and rendering templates") params = context['params'] # DAG run params # Render template fields using context # Use render_template_as_native for better type handling if needed, else render_template redis_conn_id = self.render_template(self.redis_conn_id, context) service_ip = self.render_template(self.service_ip, context) service_port_rendered = self.render_template(self.service_port, context) account_id = self.render_template(self.account_id, context) timeout_rendered = self.render_template(self.timeout, context) info_json_dir = self.render_template(self.info_json_dir, context) # Rendered here for _save_info_json # Determine effective settings (DAG params override operator defaults) redis_enabled = params.get('redis_enabled', self.redis_enabled) account_id = params.get('account_id', account_id) # Use DAG param if provided redis_conn_id = params.get('redis_conn_id', redis_conn_id) # Use DAG param if provided logger.info(f"Effective settings: redis_enabled={redis_enabled}, account_id='{account_id}', redis_conn_id='{redis_conn_id}'") host = None port = None if redis_enabled: # Get Redis connection using the helper for consistency redis_client = _get_redis_client(redis_conn_id) logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}' for service discovery.") # Get service details from Redis with retries service_key = f"ytdlp:{account_id}" legacy_key = account_id # For backward compatibility for attempt in range(self.max_retries_lookup): try: logger.info(f"Attempt {attempt + 1}/{self.max_retries_lookup}: Fetching service details from Redis for keys: '{service_key}', '{legacy_key}'") service_details = redis_client.hgetall(service_key) if not service_details: logger.warning(f"Key '{service_key}' not found, trying legacy key '{legacy_key}'") service_details = redis_client.hgetall(legacy_key) if not service_details: raise ValueError(f"No service details found in Redis for keys: {service_key} or {legacy_key}") # Find IP and port (case-insensitive keys) ip_key = next((k for k in service_details if k.lower() == 'ip'), None) port_key = next((k for k in service_details if k.lower() == 'port'), None) if not ip_key: raise ValueError(f"'ip' key not found in Redis hash for {service_key}/{legacy_key}") if not port_key: raise ValueError(f"'port' key not found in Redis hash for {service_key}/{legacy_key}") host = service_details[ip_key] # Assumes decode_responses=True in hook port_str = service_details[port_key] try: port = int(port_str) except (ValueError, TypeError): raise ValueError(f"Invalid port value '{port_str}' found in Redis for {service_key}/{legacy_key}") logger.info(f"Extracted from Redis - Service IP: {host}, Service Port: {port}") break # Success except Exception as e: logger.warning(f"Attempt {attempt + 1} failed to get Redis details: {str(e)}") if attempt == self.max_retries_lookup - 1: logger.error("Max retries reached for fetching Redis details.") raise AirflowException(f"Failed to get service details from Redis after {self.max_retries_lookup} attempts: {e}") logger.info(f"Retrying in {self.retry_delay_lookup} seconds...") time.sleep(self.retry_delay_lookup) else: # Direct connection: Use rendered/param values host = params.get('service_ip', service_ip) # Use DAG param if provided port_str = params.get('service_port', service_port_rendered) # Use DAG param if provided logger.info(f"Using direct connection settings: service_ip={host}, service_port={port_str}") if not host or not port_str: raise ValueError("Direct connection requires service_ip and service_port (check Operator config and DAG params)") try: port = int(port_str) except (ValueError, TypeError): raise ValueError(f"Invalid service_port value: {port_str}") logger.info(f"Connecting directly to Thrift service at {host}:{port} (Redis bypassed)") # Validate and use timeout try: timeout = int(timeout_rendered) if timeout <= 0: raise ValueError("Timeout must be positive") logger.info(f"Using timeout: {timeout} seconds") except (ValueError, TypeError): logger.warning(f"Invalid timeout value: '{timeout_rendered}'. Using default: {DEFAULT_TIMEOUT}") timeout = DEFAULT_TIMEOUT # Create Thrift connection objects # socket_conn = TSocket.TSocket(host, port) # Original socket_conn = TSocket.TSocket(host, port, socket_family=socket.AF_INET) # Explicitly use AF_INET (IPv4) socket_conn.setTimeout(timeout * 1000) # Thrift timeout is in milliseconds transport = TTransport.TFramedTransport(socket_conn) # Use TFramedTransport if server expects it # transport = TTransport.TBufferedTransport(socket_conn) # Use TBufferedTransport if server expects it protocol = TBinaryProtocol.TBinaryProtocol(transport) client = YTTokenOpService.Client(protocol) logger.info(f"Attempting to connect to Thrift server at {host}:{port}...") try: transport.open() logger.info("Successfully connected to Thrift server.") # Test connection with ping try: client.ping() logger.info("Server ping successful.") except Exception as e: logger.error(f"Server ping failed: {e}") raise AirflowException(f"Server connection test (ping) failed: {e}") # Get token from service using the URL from XCom try: logger.info(f"Requesting token for accountId='{account_id}', url='{url}'") token_data = client.getOrRefreshToken( accountId=account_id, updateType=TokenUpdateMode.AUTO, url=url # Use the url variable from XCom ) logger.info("Successfully retrieved token data from service.") except PBServiceException as e: # Handle specific service exceptions error_code = getattr(e, 'errorCode', 'N/A') error_message = getattr(e, 'message', 'N/A') error_context = getattr(e, 'context', {}) logger.error(f"PBServiceException occurred: Code={error_code}, Message={error_message}") if error_context: logger.error(f" Context: {error_context}") # Log context separately # Construct a concise error message for AirflowException error_msg = f"YTDLP service error (Code: {error_code}): {error_message}" # Add specific error code handling if needed... logger.error(f"Failing task instance due to PBServiceException: {error_msg}") # Add explicit log before raising raise AirflowException(error_msg) # Fail task on service error except TTransportException as e: logger.error(f"Thrift transport error during getOrRefreshToken: {e}") logger.error(f"Failing task instance due to TTransportException: {e}") # Add explicit log before raising raise AirflowException(f"Transport error during API call: {e}") except Exception as e: logger.error(f"Unexpected error during getOrRefreshToken: {e}") logger.error(f"Failing task instance due to unexpected error during API call: {e}") # Add explicit log before raising raise AirflowException(f"Unexpected error during API call: {e}") except TTransportException as e: # Handle connection errors logger.error(f"Thrift transport error during connection: {str(e)}") logger.error(f"Failing task instance due to TTransportException during connection: {e}") # Add explicit log before raising raise AirflowException(f"Transport error connecting to YTDLP service: {str(e)}") # Removed the overly broad except Exception block here, as inner blocks raise AirflowException # --- Process Token Data --- logger.debug(f"Token data received. Attributes: {dir(token_data)}") info_json_path = None # Initialize # save_info_json is now always True logger.info("Proceeding to save info.json (save_info_json=True).") info_json = self._get_info_json(token_data) if info_json and self._is_valid_json(info_json): try: # Pass rendered info_json_dir to helper info_json_path = self._save_info_json(context, info_json, url, account_id, info_json_dir) if info_json_path: ti.xcom_push(key='info_json_path', value=info_json_path) logger.info(f"Successfully saved info.json and pushed path to XCom: {info_json_path}") else: ti.xcom_push(key='info_json_path', value=None) logger.warning("info.json saving failed (check logs from _save_info_json).") except Exception as e: logger.error(f"Unexpected error during info.json saving process: {e}", exc_info=True) ti.xcom_push(key='info_json_path', value=None) elif info_json: logger.warning("Retrieved infoJson is not valid JSON. Skipping save.") ti.xcom_push(key='info_json_path', value=None) else: logger.info("No infoJson found in token data. Skipping save.") ti.xcom_push(key='info_json_path', value=None) # Extract and potentially store SOCKS proxy # get_socks_proxy and store_socks_proxy are now always True socks_proxy = None logger.info("Attempting to extract SOCKS proxy (get_socks_proxy=True).") proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None) if proxy_attr: socks_proxy = getattr(token_data, proxy_attr) if socks_proxy: logger.info(f"Extracted SOCKS proxy ({proxy_attr}): {socks_proxy}") # Always store if found (store_socks_proxy=True) ti.xcom_push(key='socks_proxy', value=socks_proxy) logger.info("Pushed 'socks_proxy' to XCom.") else: logger.info(f"Found proxy attribute '{proxy_attr}' but value is empty.") # Store None if attribute found but empty ti.xcom_push(key='socks_proxy', value=None) logger.info("Pushed None to XCom for 'socks_proxy' as extracted value was empty.") else: logger.info("No SOCKS proxy attribute found in token data.") # Store None if attribute not found ti.xcom_push(key='socks_proxy', value=None) logger.info("Pushed None to XCom for 'socks_proxy' as attribute was not found.") # --- Removed old logic block --- # # Extract and potentially store SOCKS proxy # socks_proxy = None # get_socks_proxy = params.get('get_socks_proxy', self.get_socks_proxy) # store_socks_proxy = params.get('store_socks_proxy', self.store_socks_proxy) # # if get_socks_proxy: # proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None) # if proxy_attr: # socks_proxy = getattr(token_data, proxy_attr) # if socks_proxy: # logger.info(f"Extracted SOCKS proxy ({proxy_attr}): {socks_proxy}") # if store_socks_proxy: # ti.xcom_push(key='socks_proxy', value=socks_proxy) # logger.info("Pushed 'socks_proxy' to XCom.") # else: # logger.info(f"Found proxy attribute '{proxy_attr}' but value is empty.") # if store_socks_proxy: ti.xcom_push(key='socks_proxy', value=None) # else: # logger.info("get_socks_proxy is True, but no SOCKS proxy attribute found.") # if store_socks_proxy: ti.xcom_push(key='socks_proxy', value=None) # else: # logger.info("get_socks_proxy is False. Skipping proxy extraction.") # if store_socks_proxy: ti.xcom_push(key='socks_proxy', value=None) # --- End Removed old logic block --- # Get the original command from the server ytdlp_cmd = getattr(token_data, 'ytdlpCommand', None) if not ytdlp_cmd: logger.error("No 'ytdlpCommand' attribute found in token data.") raise AirflowException("Required 'ytdlpCommand' not received from service.") logger.info(f"Original command received from server: {ytdlp_cmd[:100]}...") # Log truncated # Push the *original* command to XCom ti.xcom_push(key='ytdlp_command', value=ytdlp_cmd) logger.info("Pushed original command to XCom key 'ytdlp_command'.") # No explicit return needed, success is implicit if no exception raised except (AirflowSkipException, AirflowFailException) as e: logger.info(f"Task skipped or failed explicitly: {e}") raise # Re-raise to let Airflow handle state except AirflowException as e: # Catch AirflowExceptions raised explicitly logger.error(f"Operation failed due to AirflowException: {e}", exc_info=True) raise # Re-raise AirflowExceptions to ensure task failure except (TTransportException, PBServiceException) as e: # Catch specific Thrift/Service errors not already handled inside inner try logger.error(f"Unhandled YTDLP Service/Transport error in outer block: {e}", exc_info=True) logger.error(f"Failing task instance due to unhandled outer Service/Transport error: {e}") # Add explicit log before raising raise AirflowException(f"Unhandled YTDLP service error: {e}") # Wrap in AirflowException to fail task except Exception as e: # General catch-all for truly unexpected errors logger.error(f"Caught unexpected error in YtdlpOpsOperator outer block: {e}", exc_info=True) logger.error(f"Failing task instance due to unexpected outer error: {e}") # Add explicit log before raising raise AirflowException(f"Unexpected error caused task failure: {e}") # Wrap to fail task finally: if transport and transport.isOpen(): logger.info("Closing Thrift transport.") transport.close() # --- Helper Methods --- def _get_info_json(self, token_data): """Safely extracts infoJson from token data.""" return getattr(token_data, 'infoJson', None) def _is_valid_json(self, json_str): """Checks if a string is valid JSON.""" if not json_str or not isinstance(json_str, str): return False try: json.loads(json_str) return True except json.JSONDecodeError: return False def _save_info_json(self, context, info_json, url, account_id, rendered_info_json_dir): """Saves info_json to a file. Uses pre-rendered directory path.""" try: video_id = _extract_video_id(url) # Use standalone helper save_dir = rendered_info_json_dir or "." # Use rendered path logger.info(f"Target directory for info.json: {save_dir}") # Ensure directory exists try: os.makedirs(save_dir, exist_ok=True) logger.info(f"Ensured directory exists: {save_dir}") except OSError as e: logger.error(f"Could not create directory {save_dir}: {e}. Cannot save info.json.") return None # Construct filename timestamp = int(time.time()) base_filename = f"info_{video_id or 'unknown'}_{account_id}_{timestamp}.json" info_json_path = os.path.join(save_dir, base_filename) latest_json_path = os.path.join(save_dir, "latest.json") # Path for the latest symlink/copy # Write to timestamped file try: logger.info(f"Writing info.json content (received from service) to {info_json_path}...") with open(info_json_path, 'w', encoding='utf-8') as f: f.write(info_json) logger.info(f"Successfully saved info.json to timestamped file: {info_json_path}") except IOError as e: logger.error(f"Failed to write info.json to {info_json_path}: {e}") return None # Write to latest.json (overwrite) - best effort try: with open(latest_json_path, 'w', encoding='utf-8') as f: f.write(info_json) logger.info(f"Updated latest.json file: {latest_json_path}") except IOError as e: logger.warning(f"Failed to update latest.json at {latest_json_path}: {e}") return info_json_path except Exception as e: logger.error(f"Unexpected error in _save_info_json: {e}", exc_info=True) return None # ============================================================================= # DAG Definition # ============================================================================= default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, # Default retries for tasks like queue management 'retry_delay': timedelta(minutes=1), 'start_date': days_ago(1), # Add concurrency control if needed for sequential processing # 'concurrency': 1, # Ensure only one task instance runs at a time per DAG run # 'max_active_runs': 1, # Ensure only one DAG run is active } # Define DAG with DAG( dag_id='ytdlp_proc_sequential_processor', # New DAG ID default_args=default_args, schedule_interval=None, # Manually triggered or triggered by external sensor/event catchup=False, description='Processes YouTube URLs sequentially from a Redis queue using YTDLP Ops.', tags=['ytdlp', 'thrift', 'client', 'sequential', 'queue', 'processor'], # Updated tags params={ # Define DAG parameters 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Base name for Redis queues (e.g., 'video_queue' -> video_queue_inbox, video_queue_progress, etc.)."), 'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="Airflow Redis connection ID."), # YtdlpOpsOperator specific params (can be overridden at task level if needed) 'redis_enabled': Param(False, type="boolean", description="Use Redis for service discovery? If False, uses service_ip/port."), # Default changed to False 'service_ip': Param(None, type=["null", "string"], description="Required Service IP if redis_enabled=False."), # Clarified requirement 'service_port': Param(None, type=["null", "integer"], description="Required Service port if redis_enabled=False."), # Clarified requirement 'account_id': Param('default_account', type="string", description="Account ID for the API call (used for Redis lookup if redis_enabled=True)."), # Clarified usage 'timeout': Param(DEFAULT_TIMEOUT, type="integer", description="Timeout in seconds for the Thrift connection."), # save_info_json removed, always True # get_socks_proxy removed, always True # store_socks_proxy removed, always True # Download specific parameters 'download_format': Param( # Default to best audio-only format (e.g., m4a) 'ba[ext=m4a]/bestaudio/best', type="string", description="yt-dlp format selection string (e.g., 'ba' for best audio, 'wv*+wa/w' for worst video+audio)." ), 'output_path_template': Param( # Simplified template, removed queue_name subdir "{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloads') }}/%(title)s [%(id)s].%(ext)s", type="string", description="yt-dlp output template (e.g., '/path/to/downloads/%(title)s.%(ext)s'). Uses Airflow Variable 'DOWNLOADS_TEMP'." ), # Simplified info_json_dir, just uses DOWNLOADS_TEMP variable 'info_json_dir': Param( "{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}", type="string", description="Directory to save info.json. Uses Airflow Variable 'DOWNLOADS_TEMP'." ) } ) as dag: # --- Task Definitions --- pop_url = PythonOperator( task_id='pop_url_from_queue', python_callable=pop_url_from_queue, # Params are implicitly passed via context ) pop_url.doc_md = """ ### Pop URL from Inbox Queue Pops the next available URL from the `{{ params.queue_name }}_inbox` Redis list. Pushes the URL to XCom key `current_url`. If the queue is empty, raises `AirflowSkipException` to skip downstream tasks. """ move_to_progress = PythonOperator( task_id='move_url_to_progress', python_callable=move_url_to_progress, trigger_rule='all_success', # Only run if pop_url succeeded (didn't skip) ) move_to_progress.doc_md = """ ### Move URL to Progress Hash Retrieves the `current_url` from XCom (pushed by `pop_url_from_queue`). Adds the URL as a key to the `{{ params.queue_name }}_progress` Redis hash with status 'processing'. This task is skipped if `pop_url_from_queue` was skipped. """ # YtdlpOpsOperator task to get the token get_token = YtdlpOpsOperator( task_id='get_token', # Operator params are inherited from DAG params by default, # but can be overridden here if needed. # We rely on the operator pulling the URL from XCom internally. # Pass DAG params explicitly to ensure they are used if overridden redis_conn_id="{{ params.redis_conn_id }}", redis_enabled="{{ params.redis_enabled }}", service_ip="{{ params.service_ip }}", service_port="{{ params.service_port }}", account_id="{{ params.account_id }}", timeout="{{ params.timeout }}", # save_info_json removed info_json_dir="{{ params.info_json_dir }}", # Pass the simplified path template # get_socks_proxy removed # store_socks_proxy removed retries=0, # Set operator retries to 0; failure handled by branching/failure handler trigger_rule='all_success', # Only run if move_to_progress succeeded ) get_token.doc_md = """ ### Get Token and Info Task Connects to the YTDLP Thrift service for the URL pulled from XCom (`current_url`). Retrieves token, metadata, command, and potentially proxy. Saves `info.json`. Failure of this task triggers the `handle_failure` path. Success triggers the `handle_success` path. **Pulls from XCom:** - `current_url` (from `pop_url_from_queue`) - *Used internally* **Pushes to XCom:** - `info_json_path` - `socks_proxy` - `ytdlp_command` """ # Task to perform the actual download using yt-dlp # Ensure info_json_path and socks_proxy are correctly quoted within the bash command # Use {% raw %} {% endraw %} around Jinja if needed, but direct templating should work here. # Added --no-simulate, --no-write-info-json, --ignore-errors, --no-progress download_video = BashOperator( task_id='download_video', bash_command=""" INFO_JSON_PATH="{{ ti.xcom_pull(task_ids='get_token', key='info_json_path') }}" PROXY="{{ ti.xcom_pull(task_ids='get_token', key='socks_proxy') }}" FORMAT="{{ params.download_format }}" OUTPUT_TEMPLATE="{{ params.output_path_template }}" echo "Starting download..." echo "Info JSON Path: $INFO_JSON_PATH" echo "Proxy: $PROXY" echo "Format: $FORMAT" echo "Output Template: $OUTPUT_TEMPLATE" # Check if info.json path exists if [ -z "$INFO_JSON_PATH" ] || [ ! -f "$INFO_JSON_PATH" ]; then echo "Error: info.json path is missing or file does not exist ($INFO_JSON_PATH)." exit 1 fi # Construct command CMD="yt-dlp --load-info-json \"$INFO_JSON_PATH\"" # Add proxy if it exists if [ -n "$PROXY" ]; then CMD="$CMD --proxy \"$PROXY\"" fi # Add format and output template CMD="$CMD -f \"$FORMAT\" -o \"$OUTPUT_TEMPLATE\"" # Add other useful flags CMD_ARRAY=(yt-dlp --load-info-json "$INFO_JSON_PATH") # Add proxy if it exists if [ -n "$PROXY" ]; then CMD_ARRAY+=(--proxy "$PROXY") fi # Add format and output template CMD_ARRAY+=(-f "$FORMAT" -o "$OUTPUT_TEMPLATE") # Add other useful flags CMD_ARRAY+=(--no-progress --no-simulate --no-write-info-json --ignore-errors --verbose) echo "Executing command array:" # Use printf to safely quote and display the command array printf "%q " "${CMD_ARRAY[@]}" echo "" # Newline after command # Execute the command directly using the array "${CMD_ARRAY[@]}" # Check exit code EXIT_CODE=$? if [ $EXIT_CODE -ne 0 ]; then echo "Error: yt-dlp command failed with exit code $EXIT_CODE" exit $EXIT_CODE fi echo "Download command completed successfully." """, trigger_rule='all_success', # Run only if get_token succeeded ) download_video.doc_md = """ ### Download Video/Audio Task Executes `yt-dlp` using the `info.json` and proxy obtained from the `get_token` task. Uses the `download_format` and `output_path_template` parameters from the DAG run configuration. Failure of this task triggers the `handle_failure` path. **Pulls from XCom (task_id='get_token'):** - `info_json_path` - `socks_proxy` """ # Task to handle successful token retrieval AND download success_handler = PythonOperator( task_id='handle_success', python_callable=handle_success, trigger_rule='all_success', # Run only if get_token succeeds ) success_handler.doc_md = """ ### Handle Success Task Runs after `get_token` succeeds. Retrieves `current_url` and results from `get_token` via XCom. Removes the URL from the `{{ params.queue_name }}_progress` hash. Adds the URL and results to the `{{ params.queue_name }}_result` hash. """ # Task to handle failed token retrieval or download failure_handler = PythonOperator( task_id='handle_failure', python_callable=handle_failure, trigger_rule='one_failed', # Run only if get_token or download_video fails ) failure_handler.doc_md = """ ### Handle Failure Task # Runs after `get_token` (or potentially `move_url_to_progress`) fails. # Retrieves `current_url` from XCom. # Retrieves the error message and traceback from the context. # Removes the URL from the `{{ params.queue_name }}_progress` hash. # Adds the URL and error details to the `{{ params.queue_name }}_fail` hash. # **Important:** This task succeeding means the failure was *handled*, the DAG run itself might still be marked as failed if `get_token` failed. # """ # --- Task Dependencies --- # Core processing flow pop_url >> move_to_progress >> get_token >> download_video # Handlers depend on the outcome of both token retrieval and download # Success handler runs only if download_video succeeds download_video >> success_handler # Default trigger_rule='all_success' is suitable # Failure handler runs if either get_token or download_video fails [get_token, download_video] >> failure_handler # Uses trigger_rule='one_failed' defined in the task # Removed Jinja filters as they are no longer needed for the simplified info_json_dir