# -*- coding: utf-8 -*- # vim:fenc=utf-8 # # Copyright © 2024 rl # # Distributed under terms of the MIT license. """ DAG for processing a single YouTube URL passed via DAG run configuration. This is the "Worker" part of a Sensor/Worker pattern. """ from airflow import DAG from airflow.exceptions import AirflowException, AirflowSkipException from airflow.models import BaseOperator, Variable from airflow.models.param import Param from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.redis.hooks.redis import RedisHook from airflow.utils.dates import days_ago from airflow.utils.decorators import apply_defaults from datetime import datetime, timedelta from pangramia.yt.common.ttypes import TokenUpdateMode from pangramia.yt.exceptions.ttypes import PBServiceException from pangramia.yt.tokens_ops import YTTokenOpService from thrift.protocol import TBinaryProtocol from thrift.transport import TSocket, TTransport from thrift.transport.TTransport import TTransportException import json import logging import os import redis import socket import time import traceback # Import utility functions from utils.redis_utils import _get_redis_client # Configure logging logger = logging.getLogger(__name__) # Default settings DEFAULT_QUEUE_NAME = 'video_queue' DEFAULT_REDIS_CONN_ID = 'redis_default' DEFAULT_MAX_URLS = 1 DEFAULT_TIMEOUT = 30 # Default Thrift timeout in seconds # --- Helper Functions --- def _extract_video_id(url): """Extracts YouTube video ID from URL.""" if not url or not isinstance(url, str): logger.debug("URL is empty or not a string, cannot extract video ID.") return None try: video_id = None if 'youtube.com/watch?v=' in url: video_id = url.split('v=')[1].split('&')[0] elif 'youtu.be/' in url: video_id = url.split('youtu.be/')[1].split('?')[0] if video_id and len(video_id) >= 11: video_id = video_id[:11] # Standard ID length logger.debug(f"Extracted video ID '{video_id}' from URL: {url}") return video_id else: logger.debug(f"Could not extract a standard video ID pattern from URL: {url}") return None except Exception as e: logger.error(f"Failed to extract video ID from URL '{url}'. Error: {e}") return None # --- Queue Management Callables (for success/failure reporting) --- def handle_success(**context): """Moves URL from progress to result hash on success.""" ti = context['task_instance'] params = context['params'] url = params.get('url') # Get URL from params, not XCom if not url: logger.warning("handle_success called but no URL found in DAG run parameters.") return queue_name = params['queue_name'] result_queue = f"{queue_name}_result" redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) # Pull results from previous tasks info_json_path = ti.xcom_pull(task_ids='get_token', key='info_json_path') socks_proxy = ti.xcom_pull(task_ids='get_token', key='socks_proxy') ytdlp_command = ti.xcom_pull(task_ids='get_token', key='ytdlp_command') downloaded_file_path = ti.xcom_pull(task_ids='download_video') logger.info(f"Handling success for URL: {url}") logger.info(f" Downloaded File Path: {downloaded_file_path}") result_data = { 'status': 'success', 'end_time': time.time(), 'info_json_path': info_json_path, 'socks_proxy': socks_proxy, 'ytdlp_command': ytdlp_command, 'downloaded_file_path': downloaded_file_path, 'url': url, 'dag_run_id': context['dag_run'].run_id, } try: # In the worker pattern, there's no "progress" hash to remove from. # We just add the result to the success hash. client = _get_redis_client(redis_conn_id) client.hset(result_queue, url, json.dumps(result_data)) logger.info(f"Stored success result for URL '{url}' in result hash '{result_queue}'.") except Exception as e: logger.error(f"Error handling success in Redis for URL '{url}': {e}", exc_info=True) # Log error but don't fail the task, as the main work succeeded. def handle_failure(**context): """ Handles failed processing. Records detailed error information to the fail hash and, if stop_on_failure is True, fails the task to make the DAG run failure visible. """ ti = context['task_instance'] params = context['params'] url = params.get('url') # Get URL from params if not url: logger.error("handle_failure called but no URL found in DAG run parameters.") return queue_name = params['queue_name'] fail_queue = f"{queue_name}_fail" inbox_queue = f"{queue_name}_inbox" redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) requeue_on_failure = params.get('requeue_on_failure', False) stop_on_failure = params.get('stop_on_failure', True) # --- Extract Detailed Error Information --- exception = context.get('exception') error_message = str(exception) if exception else "Unknown error" error_type = type(exception).__name__ if exception else "Unknown" tb_str = "".join(traceback.format_exception(etype=type(exception), value=exception, tb=exception.__traceback__)) if exception else "No traceback available." # Find the specific task that failed dag_run = context['dag_run'] failed_task_id = "unknown" # Look at direct upstream tasks of the current task ('handle_failure') upstream_tasks = ti.get_direct_relatives(upstream=True) for task in upstream_tasks: upstream_ti = dag_run.get_task_instance(task_id=task.task_id) if upstream_ti and upstream_ti.state == 'failed': failed_task_id = task.task_id break logger.info(f"Handling failure for URL: {url}") logger.error(f" Failed Task: {failed_task_id}") logger.error(f" Failure Type: {error_type}") logger.error(f" Failure Reason: {error_message}") logger.debug(f" Traceback:\n{tb_str}") try: client = _get_redis_client(redis_conn_id) if requeue_on_failure: client.rpush(inbox_queue, url) logger.info(f"Re-queued failed URL '{url}' to inbox '{inbox_queue}' for retry.") else: fail_data = { 'status': 'failed', 'end_time': time.time(), 'failed_task': failed_task_id, 'error_type': error_type, 'error_message': error_message, 'traceback': tb_str, 'url': url, 'dag_run_id': context['dag_run'].run_id, } client.hset(fail_queue, url, json.dumps(fail_data, indent=2)) logger.info(f"Stored detailed failure info for URL '{url}' in fail hash '{fail_queue}'.") except Exception as e: logger.error(f"Critical error during failure handling in Redis for URL '{url}': {e}", exc_info=True) # This is a critical error in the failure handling logic itself. raise AirflowException(f"Could not handle failure in Redis: {e}") # If stop_on_failure is True, we should fail this task to make the DAG run fail. # The loop is already stopped by the DAG structure, but this makes the failure visible. if stop_on_failure: logger.error("stop_on_failure is True. Failing this task to mark the DAG run as failed.") # Re-raise the original exception to fail the task instance. if exception: raise exception else: # If for some reason there's no exception, fail explicitly. raise AirflowException("Failing task as per stop_on_failure=True, but original exception was not found.") # --- YtdlpOpsOperator --- class YtdlpOpsOperator(BaseOperator): """ Custom Airflow operator to interact with YTDLP Thrift service. Processes a single URL passed via DAG run configuration. """ template_fields = ('service_ip', 'service_port', 'account_id', 'timeout', 'info_json_dir') @apply_defaults def __init__(self, service_ip=None, service_port=None, account_id=None, info_json_dir=None, timeout=DEFAULT_TIMEOUT, *args, **kwargs): super().__init__(*args, **kwargs) logger.info(f"Initializing YtdlpOpsOperator (Worker Version) with parameters: " f"service_ip={service_ip}, service_port={service_port}, " f"account_id={account_id}, info_json_dir={info_json_dir}, timeout={timeout}") if not service_ip or not service_port: raise ValueError("Both service_ip and service_port must be specified.") if not account_id: logger.warning("No account_id provided. Ensure it's set in DAG params or operator config.") self.service_ip = service_ip self.service_port = service_port self.account_id = account_id self.info_json_dir = info_json_dir self.timeout = timeout def execute(self, context): logger.info("Executing YtdlpOpsOperator (Worker Version)") transport = None ti = context['task_instance'] try: params = context['params'] url = params.get('url') if not url: raise AirflowException("DAG was triggered without a 'url' in its configuration.") logger.info(f"Processing URL from DAG run config: {url}") service_ip = self.render_template(self.service_ip, context) service_port_rendered = self.render_template(self.service_port, context) account_id = self.render_template(self.account_id, context) timeout_rendered = self.render_template(self.timeout, context) info_json_dir = self.render_template(self.info_json_dir, context) host = params.get('service_ip', service_ip) port_str = params.get('service_port', service_port_rendered) account_id = params.get('account_id', account_id) logger.info(f"Using direct connection settings: service_ip={host}, service_port={port_str}") if not host or not port_str: raise ValueError("Direct connection requires service_ip and service_port") try: port = int(port_str) except (ValueError, TypeError): raise ValueError(f"Invalid service_port value: {port_str}") try: timeout = int(timeout_rendered) if timeout <= 0: raise ValueError("Timeout must be positive") except (ValueError, TypeError): timeout = DEFAULT_TIMEOUT socket_conn = TSocket.TSocket(host, port, socket_family=socket.AF_INET) socket_conn.setTimeout(timeout * 1000) transport = TTransport.TFramedTransport(socket_conn) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = YTTokenOpService.Client(protocol) transport.open() logger.info("Successfully connected to Thrift server.") client.ping() logger.info("Server ping successful.") token_data = client.getOrRefreshToken( accountId=account_id, updateType=TokenUpdateMode.AUTO, url=url ) logger.info("Successfully retrieved token data from service.") info_json_path = None info_json = self._get_info_json(token_data) if info_json and self._is_valid_json(info_json): info_json_path = self._save_info_json(context, info_json, url, account_id, info_json_dir) if info_json_path: ti.xcom_push(key='info_json_path', value=info_json_path) else: ti.xcom_push(key='info_json_path', value=None) else: ti.xcom_push(key='info_json_path', value=None) socks_proxy = None proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None) if proxy_attr: socks_proxy = getattr(token_data, proxy_attr) ti.xcom_push(key='socks_proxy', value=socks_proxy) ytdlp_cmd = getattr(token_data, 'ytdlpCommand', None) ti.xcom_push(key='ytdlp_command', value=ytdlp_cmd) except Exception as e: logger.error(f"YtdlpOpsOperator (Worker) failed: {e}", exc_info=True) raise AirflowException(f"Task failed: {e}") finally: if transport and transport.isOpen(): transport.close() def _get_info_json(self, token_data): return getattr(token_data, 'infoJson', None) def _is_valid_json(self, json_str): if not json_str or not isinstance(json_str, str): return False try: json.loads(json_str) return True except json.JSONDecodeError: return False def _save_info_json(self, context, info_json, url, account_id, rendered_info_json_dir): try: video_id = _extract_video_id(url) save_dir = rendered_info_json_dir or "." os.makedirs(save_dir, exist_ok=True) timestamp = int(time.time()) base_filename = f"info_{video_id or 'unknown'}_{account_id}_{timestamp}.json" info_json_path = os.path.join(save_dir, base_filename) with open(info_json_path, 'w', encoding='utf-8') as f: f.write(info_json) return info_json_path except Exception as e: logger.error(f"Failed to save info.json: {e}", exc_info=True) return None # ============================================================================= # DAG Definition # ============================================================================= default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=1), 'start_date': days_ago(1), } with DAG( dag_id='ytdlp_worker_per_url', default_args=default_args, schedule_interval=None, catchup=False, description='Processes a single YouTube URL passed via configuration.', tags=['ytdlp', 'thrift', 'client', 'worker'], params={ 'url': Param(None, type=["string", "null"], description="The YouTube URL to process. This is set by the triggering DAG."), # Sensor params (passed through to re-trigger the sensor, with defaults for standalone runs) 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Sensor param: Base name for Redis queues."), 'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="Sensor param: Airflow Redis connection ID."), 'max_urls_per_run': Param(DEFAULT_MAX_URLS, type="integer", description="Sensor param: Maximum number of URLs to process in one batch."), # Worker-specific params 'service_ip': Param('89.253.221.173', type="string", description="Service IP."), 'service_port': Param(9090, type="integer", description="Service port."), 'account_id': Param('default_account', type="string", description="Account ID for the API call."), 'timeout': Param(DEFAULT_TIMEOUT, type="integer", description="Timeout in seconds for the Thrift connection."), 'download_format': Param('ba[ext=m4a]/bestaudio/best', type="string", description="yt-dlp format selection string."), 'output_path_template': Param("%(title)s [%(id)s].%(ext)s", type="string", description="yt-dlp output filename template."), 'info_json_dir': Param("{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}", type="string", description="Directory to save info.json."), 'requeue_on_failure': Param(False, type="boolean", description="If True, re-adds the URL to the inbox on failure instead of moving to the fail hash."), 'stop_on_failure': Param(True, type="boolean", description="If True, a worker failure will stop the entire processing loop."), } ) as dag: get_token = YtdlpOpsOperator( task_id='get_token', service_ip="{{ params.service_ip }}", service_port="{{ params.service_port }}", account_id="{{ params.account_id }}", timeout="{{ params.timeout }}", info_json_dir="{{ params.info_json_dir }}", retries=0, ) download_video = BashOperator( task_id='download_video', bash_command=""" INFO_JSON_PATH="{{ ti.xcom_pull(task_ids='get_token', key='info_json_path') }}" PROXY="{{ ti.xcom_pull(task_ids='get_token', key='socks_proxy') }}" FORMAT="{{ params.download_format }}" DOWNLOAD_DIR="{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles/video') }}" FILENAME_TEMPLATE="{{ params.output_path_template }}" FULL_OUTPUT_PATH="$DOWNLOAD_DIR/$FILENAME_TEMPLATE" echo "Starting download..." echo "Info JSON Path: $INFO_JSON_PATH" echo "Proxy: $PROXY" echo "Format: $FORMAT" echo "Download Directory: $DOWNLOAD_DIR" echo "Full Output Path: $FULL_OUTPUT_PATH" if [ -z "$INFO_JSON_PATH" ] || [ ! -f "$INFO_JSON_PATH" ]; then echo "Error: info.json path is missing or file does not exist ($INFO_JSON_PATH)." exit 1 fi CMD_ARRAY=(yt-dlp --load-info-json "$INFO_JSON_PATH") if [ -n "$PROXY" ]; then CMD_ARRAY+=(--proxy "$PROXY") fi CMD_ARRAY+=(-f "$FORMAT" -o "$FULL_OUTPUT_PATH" --print filename) CMD_ARRAY+=(--no-progress --no-simulate --no-write-info-json --ignore-errors --no-playlist) printf "Executing: %q " "${CMD_ARRAY[@]}" echo "" FINAL_FILENAME=$("${CMD_ARRAY[@]}") EXIT_CODE=$? echo "yt-dlp exited with code: $EXIT_CODE" if [ $EXIT_CODE -ne 0 ]; then echo "Error: yt-dlp command failed." exit $EXIT_CODE fi if [ -z "$FINAL_FILENAME" ] || [ ! -f "$FINAL_FILENAME" ]; then echo "Error: Download failed or did not produce a file." exit 1 fi echo "SUCCESS: Final file confirmed at: $FINAL_FILENAME" echo "$FINAL_FILENAME" """, retries=3, retry_delay=timedelta(minutes=2), ) # This task triggers the sensor DAG to check for more work as soon as this worker is done. trigger_sensor_for_next_batch = TriggerDagRunOperator( task_id='trigger_sensor_for_next_batch', trigger_dag_id='ytdlp_sensor_redis_queue', # Pass only the sensor's needed parameters back to it. # These values were originally passed from the sensor to this worker. # The values are templated and will be passed as strings to the triggered DAG. conf={ "queue_name": "{{ params.queue_name }}", "redis_conn_id": "{{ params.redis_conn_id }}", "max_urls_per_run": "{{ params.max_urls_per_run }}", }, # This task will only run on the success path, so it inherits the default # trigger_rule='all_success'. wait_for_completion=False, ) trigger_sensor_for_next_batch.doc_md = """ ### Trigger Sensor for Next Batch Triggers a new run of the `ytdlp_sensor_redis_queue` DAG to create a continuous processing loop. This task **only runs on the success path** after a URL has been fully processed. This ensures that the system immediately checks for more URLs to process, but stops the loop on failure. """ # Define success and failure handling tasks success_task = PythonOperator( task_id='handle_success', python_callable=handle_success, trigger_rule='all_success', # Run only if upstream tasks succeeded ) failure_task = PythonOperator( task_id='handle_failure', python_callable=handle_failure, trigger_rule='one_failed', # Run if any upstream task failed ) # --- Define Task Dependencies --- # The main processing flow get_token >> download_video # The success path: if download_video succeeds, run success_task, then trigger the next sensor run. download_video >> success_task >> trigger_sensor_for_next_batch # The failure path: if get_token OR download_video fails, run the failure_task. # This is a "fan-in" dependency. [get_token, download_video] >> failure_task