diff --git a/dags/ytdlp_mgmt_queue_add_urls.py b/dags/ytdlp_mgmt_queue_add_urls.py index 71d135f..3721010 100644 --- a/dags/ytdlp_mgmt_queue_add_urls.py +++ b/dags/ytdlp_mgmt_queue_add_urls.py @@ -12,7 +12,7 @@ import redis # Import redis exceptions if needed logger = logging.getLogger(__name__) # Default settings -DEFAULT_QUEUE_NAME = 'video_queue_inbox' # Default to the inbox queue +DEFAULT_QUEUE_NAME = 'video_queue' # Default base name for the queue DEFAULT_REDIS_CONN_ID = 'redis_default' # --- Helper Functions --- @@ -35,15 +35,13 @@ def _get_redis_client(redis_conn_id): # --- Python Callables for Tasks --- def add_urls_callable(**context): - """Adds URLs from comma/newline separated input to the specified Redis list.""" + """Adds URLs from comma/newline separated input to the specified Redis inbox list.""" params = context['params'] redis_conn_id = params['redis_conn_id'] - queue_name = params['queue_name'] # Should be the inbox queue, e.g., video_queue_inbox + queue_name = params['queue_name'] + inbox_queue = f"{queue_name}_inbox" urls_input = params['urls'] - if not queue_name.endswith('_inbox'): - logger.warning(f"Target queue name '{queue_name}' does not end with '_inbox'. Ensure this is the intended inbox queue.") - if not urls_input or not isinstance(urls_input, str): logger.warning("No URLs provided or 'urls' parameter is not a string. Nothing to add.") return @@ -61,14 +59,14 @@ def add_urls_callable(**context): logger.info("No valid URLs found after processing input. Nothing added.") return - logger.info(f"Attempting to add {len(urls_to_add)} unique URLs to Redis list '{queue_name}' using connection '{redis_conn_id}'.") + logger.info(f"Attempting to add {len(urls_to_add)} unique URLs to Redis list '{inbox_queue}' using connection '{redis_conn_id}'.") try: redis_client = _get_redis_client(redis_conn_id) # Use rpush to add to the end of the list (FIFO behavior with lpop) - added_count = redis_client.rpush(queue_name, *urls_to_add) - logger.info(f"Successfully added {len(urls_to_add)} URLs to list '{queue_name}'. New list length: {added_count}.") + added_count = redis_client.rpush(inbox_queue, *urls_to_add) + logger.info(f"Successfully added {len(urls_to_add)} URLs to list '{inbox_queue}'. New list length: {added_count}.") except Exception as e: - logger.error(f"Failed to add URLs to Redis list '{queue_name}': {e}", exc_info=True) + logger.error(f"Failed to add URLs to Redis list '{inbox_queue}': {e}", exc_info=True) raise AirflowException(f"Failed to add URLs to Redis: {e}") @@ -76,17 +74,18 @@ def add_urls_callable(**context): def check_status_callable(**context): - """Checks the type and length/size of the specified Redis key.""" + """Checks the type and length/size of the specified Redis inbox key.""" # Access DAG run parameters directly from context['params'] dag_params = context['params'] redis_conn_id = dag_params['redis_conn_id'] - # Check the status of the queue specified in the main DAG parameters - queue_to_check = dag_params['queue_name'] + # This DAG verifies the inbox queue, so we construct the name from the base name + queue_name = dag_params['queue_name'] + queue_to_check = f"{queue_name}_inbox" - if not queue_to_check: - raise ValueError("DAG parameter 'queue_name' cannot be empty.") + if not queue_name: + raise ValueError("DAG parameter 'queue_name' (base name) cannot be empty.") - logger.info(f"Attempting to check status of Redis key '{queue_to_check}' using connection '{redis_conn_id}'.") # Uses DAG param value + logger.info(f"Attempting to check status of Redis key '{queue_to_check}' using connection '{redis_conn_id}'.") try: # Use the resolved redis_conn_id to get the client redis_client = _get_redis_client(redis_conn_id) @@ -152,7 +151,7 @@ with DAG( # Common params 'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="Airflow Redis connection ID."), # Params for adding URLs (and checking the same queue) - 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", title="Target Queue Name", description="Redis list (inbox queue) to add URLs to and check status of."), + 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", title="Base Queue Name", description="Base name for the Redis queues (e.g., 'video_queue'). The DAG will add URLs to '{base_name}_inbox'."), 'urls': Param("", type="string", title="URLs to Add", description="Comma and/or newline separated list of video URLs.", multiline=True), # Updated description, keep multiline for UI # Removed clear_queue_name param # Removed check_queue_name param (will use queue_name) diff --git a/dags/ytdlp_mgmt_queue_check_status.py b/dags/ytdlp_mgmt_queue_check_status.py index 7033f70..1b27ea6 100644 --- a/dags/ytdlp_mgmt_queue_check_status.py +++ b/dags/ytdlp_mgmt_queue_check_status.py @@ -15,17 +15,18 @@ from airflow.models.param import Param from airflow.operators.python import PythonOperator from airflow.providers.redis.hooks.redis import RedisHook from airflow.utils.dates import days_ago -from datetime import timedelta +from datetime import datetime, timedelta, timezone import logging -import redis # Import redis exceptions if needed +import json +import redis # Import redis exceptions if needed # Configure logging logger = logging.getLogger(__name__) # Default settings DEFAULT_REDIS_CONN_ID = 'redis_default' -# Default to a common inbox pattern, user should override with the specific key -DEFAULT_QUEUE_TO_CHECK = 'video_queue_inbox' +DEFAULT_QUEUE_BASE_NAME = 'video_queue' +DEFAULT_MAX_ITEMS_TO_LIST = 25 # --- Helper Function --- @@ -44,47 +45,80 @@ def _get_redis_client(redis_conn_id): logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}") raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}") -# --- Python Callable for Check Status Task --- +# --- Python Callable for Check and List Task --- -def check_status_callable(**context): - """Checks the length/size of the specified Redis key (queue/hash).""" +def check_and_list_queue_callable(**context): + """Checks the type and size of a Redis key and lists its recent contents.""" params = context['params'] redis_conn_id = params['redis_conn_id'] - queue_to_check = params['queue_to_check'] # Specific queue/hash name + # queue_suffix is passed from the PythonOperator's op_kwargs, which are available in the context + queue_suffix = context['queue_suffix'] + queue_name = params.get('queue_name', DEFAULT_QUEUE_BASE_NAME) + queue_to_check = f"{queue_name}{queue_suffix}" + max_items = int(params.get('max_items_to_list', DEFAULT_MAX_ITEMS_TO_LIST)) - if not queue_to_check: - raise ValueError("Parameter 'queue_to_check' cannot be empty.") + logger.info(f"--- Checking Status and Contents of Redis Key: '{queue_to_check}' ---") + logger.info(f"Using connection '{redis_conn_id}', listing up to {max_items} items.") - logger.info(f"Attempting to check status of Redis key '{queue_to_check}' using connection '{redis_conn_id}'.") try: redis_client = _get_redis_client(redis_conn_id) - key_type = redis_client.type(queue_to_check) - key_type_str = key_type.decode('utf-8') if isinstance(key_type, bytes) else key_type # Decode if needed + key_type_bytes = redis_client.type(queue_to_check) + key_type = key_type_bytes.decode('utf-8') - length = 0 - if key_type_str == 'list': - length = redis_client.llen(queue_to_check) - logger.info(f"Redis list '{queue_to_check}' has {length} items.") - elif key_type_str == 'hash': - length = redis_client.hlen(queue_to_check) - logger.info(f"Redis hash '{queue_to_check}' has {length} fields.") - elif key_type_str == 'none': + if key_type == 'list': + list_length = redis_client.llen(queue_to_check) + logger.info(f"Redis key '{queue_to_check}' is a LIST with {list_length} items.") + if list_length > 0: + items_to_fetch = min(max_items, list_length) + # lrange with negative indices gets items from the end (most recent for rpush) + contents_bytes = redis_client.lrange(queue_to_check, -items_to_fetch, -1) + contents = [item.decode('utf-8') for item in contents_bytes] + contents.reverse() # Show most recent first + logger.info(f"--- Showing most recent {len(contents)} of {list_length} items ---") + for i, item in enumerate(contents): + logger.info(f" [recent_{i}]: {item}") + if list_length > len(contents): + logger.info(f" ... ({list_length - len(contents)} older items not shown)") + logger.info(f"--- End of List Contents ---") + + elif key_type == 'hash': + hash_size = redis_client.hlen(queue_to_check) + logger.info(f"Redis key '{queue_to_check}' is a HASH with {hash_size} fields.") + if hash_size > 0: + logger.info(f"--- Showing a sample of up to {max_items} fields ---") + item_count = 0 + # Using hscan_iter to safely iterate over hash fields, count is a hint + for field_bytes, value_bytes in redis_client.hscan_iter(queue_to_check, count=max_items): + if item_count >= max_items: + logger.info(f" ... (stopped listing after {max_items} items of {hash_size})") + break + field = field_bytes.decode('utf-8') + value = value_bytes.decode('utf-8') + # Try to pretty-print if value is JSON + try: + parsed_value = json.loads(value) + # Check for timestamp to show age + timestamp = parsed_value.get('end_time') or parsed_value.get('start_time') + age_str = "" + if timestamp: + age_seconds = (datetime.now(timezone.utc) - datetime.fromtimestamp(timestamp, timezone.utc)).total_seconds() + age_str = f" (age: {timedelta(seconds=age_seconds)})" + + pretty_value = json.dumps(parsed_value, indent=2) + logger.info(f" Field '{field}'{age_str}:\n{pretty_value}") + except (json.JSONDecodeError, TypeError): + logger.info(f" Field '{field}': {value}") + item_count += 1 + logger.info(f"--- End of Hash Contents ---") + + elif key_type == 'none': logger.info(f"Redis key '{queue_to_check}' does not exist.") else: - # Attempt to get size for other types if possible, e.g., set size - try: - length = redis_client.scard(queue_to_check) # Example for set - logger.info(f"Redis key '{queue_to_check}' (type: {key_type_str}) has size {length}.") - except: - logger.info(f"Redis key '{queue_to_check}' exists but is of unhandled/unsizeable type '{key_type_str}'.") + logger.info(f"Redis key '{queue_to_check}' is of type '{key_type}'. Listing contents for this type is not implemented.") - # Optionally push length to XCom if needed downstream - context['task_instance'].xcom_push(key='queue_key_type', value=key_type_str) - context['task_instance'].xcom_push(key='queue_size', value=length) - return {'key': queue_to_check, 'type': key_type_str, 'size': length} # Return status info except Exception as e: - logger.error(f"Failed to check status of Redis key '{queue_to_check}': {e}", exc_info=True) - raise AirflowException(f"Failed to check Redis key status: {e}") + logger.error(f"Failed to check/list contents of Redis key '{queue_to_check}': {e}", exc_info=True) + raise AirflowException(f"Failed to process Redis key: {e}") # --- DAG Definition --- default_args = { @@ -92,42 +126,68 @@ default_args = { 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(seconds=30), + 'retries': 0, # No retries for a manual check/list operation 'start_date': days_ago(1) } with DAG( - dag_id='ytdlp_mgmt_queue_check_status', + dag_id='ytdlp_mgmt_queues_check_status', default_args=default_args, - schedule_interval=None, # Manually triggered + schedule_interval=None, # Manually triggered catchup=False, - description='Manually check the type and size of a specific YTDLP Redis queue/key.', - tags=['ytdlp', 'queue', 'management', 'redis', 'manual', 'status'], + description='Manually check the status and recent items of all YTDLP Redis queues for a given base name.', + tags=['ytdlp', 'queue', 'management', 'redis', 'manual', 'status', 'list'], params={ 'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="Airflow Redis connection ID."), - 'queue_to_check': Param( - DEFAULT_QUEUE_TO_CHECK, + 'queue_name': Param( + DEFAULT_QUEUE_BASE_NAME, type="string", - description="Exact name of the Redis key to check (e.g., 'video_queue_inbox_account_xyz', 'video_queue_progress', 'video_queue_result', 'video_queue_fail')." + description="Base name for the Redis queues (e.g., 'video_queue')." ), + 'max_items_to_list': Param(DEFAULT_MAX_ITEMS_TO_LIST, type="integer", description="Maximum number of recent items/fields to list from each queue."), } ) as dag: - check_status_task = PythonOperator( - task_id='check_specified_queue_status', - python_callable=check_status_callable, - # Params are implicitly passed via context['params'] + check_inbox_queue = PythonOperator( + task_id='check_inbox_queue', + python_callable=check_and_list_queue_callable, + op_kwargs={'queue_suffix': '_inbox'}, ) - check_status_task.doc_md = """ - ### Check Specified Queue/Key Status Task - Checks the type and size (length for lists, number of fields for hashes) of the Redis key specified by `queue_to_check`. - Logs the result and pushes `queue_key_type` and `queue_size` to XCom. - Can check keys like: - - `_inbox` (Redis List) - - `_progress` (Redis Hash) - - `_result` (Redis Hash) - - `_fail` (Redis Hash) - - *Trigger this task manually via the UI.* + check_inbox_queue.doc_md = """ + ### Check Inbox Queue (`_inbox`) + Checks the status and lists the most recent URLs waiting to be processed. + The full queue name is `{{ params.queue_name }}_inbox`. + """ + + check_progress_queue = PythonOperator( + task_id='check_progress_queue', + python_callable=check_and_list_queue_callable, + op_kwargs={'queue_suffix': '_progress'}, + ) + check_progress_queue.doc_md = """ + ### Check Progress Queue (`_progress`) + Checks the status and lists a sample of URLs currently being processed. + The full queue name is `{{ params.queue_name }}_progress`. + """ + + check_result_queue = PythonOperator( + task_id='check_result_queue', + python_callable=check_and_list_queue_callable, + op_kwargs={'queue_suffix': '_result'}, + ) + check_result_queue.doc_md = """ + ### Check Result Queue (`_result`) + Checks the status and lists a sample of successfully processed URLs. + The full queue name is `{{ params.queue_name }}_result`. + """ + + check_fail_queue = PythonOperator( + task_id='check_fail_queue', + python_callable=check_and_list_queue_callable, + op_kwargs={'queue_suffix': '_fail'}, + ) + check_fail_queue.doc_md = """ + ### Check Fail Queue (`_fail`) + Checks the status and lists a sample of failed URLs. + The full queue name is `{{ params.queue_name }}_fail`. """ diff --git a/dags/ytdlp_mgmt_queue_clear.py b/dags/ytdlp_mgmt_queue_clear.py index 50300c6..0ecc562 100644 --- a/dags/ytdlp_mgmt_queue_clear.py +++ b/dags/ytdlp_mgmt_queue_clear.py @@ -1,10 +1,5 @@ # -*- coding: utf-8 -*- # vim:fenc=utf-8 -# -# Copyright © 2024 rl -# -# Distributed under terms of the MIT license. - """ Airflow DAG for manually clearing (deleting) a specific Redis key used by YTDLP queues. """ diff --git a/dags/ytdlp_mgmt_queue_list_contents.py b/dags/ytdlp_mgmt_queue_list_contents.py index 878580d..b903308 100644 --- a/dags/ytdlp_mgmt_queue_list_contents.py +++ b/dags/ytdlp_mgmt_queue_list_contents.py @@ -27,7 +27,7 @@ logger = logging.getLogger(__name__) DEFAULT_REDIS_CONN_ID = 'redis_default' # Default to a common inbox pattern, user should override with the specific key DEFAULT_QUEUE_TO_LIST = 'video_queue_inbox' -DEFAULT_MAX_ITEMS = 100 # Limit number of items listed by default +DEFAULT_MAX_ITEMS = 10 # Limit number of items listed by default # --- Helper Function --- @@ -68,16 +68,20 @@ def list_contents_callable(**context): if key_type == 'list': list_length = redis_client.llen(queue_to_list) - # Get range, respecting max_items (0 to max_items-1) + # Get the last N items, which are the most recently added with rpush items_to_fetch = min(max_items, list_length) - # lrange returns list of bytes, decode each item - contents_bytes = redis_client.lrange(queue_to_list, 0, items_to_fetch - 1) + # lrange with negative indices gets items from the end of the list. + # -N to -1 gets the last N items. + contents_bytes = redis_client.lrange(queue_to_list, -items_to_fetch, -1) contents = [item.decode('utf-8') for item in contents_bytes] - logger.info(f"--- Contents of Redis List '{queue_to_list}' (showing first {len(contents)} of {list_length}) ---") + # Reverse the list so the absolute most recent item is printed first + contents.reverse() + logger.info(f"--- Contents of Redis List '{queue_to_list}' (showing most recent {len(contents)} of {list_length}) ---") for i, item in enumerate(contents): - logger.info(f" [{i}]: {item}") # item is now a string + # The index here is just for display, 0 is the most recent + logger.info(f" [recent_{i}]: {item}") if list_length > len(contents): - logger.info(f" ... ({list_length - len(contents)} more items not shown)") + logger.info(f" ... ({list_length - len(contents)} older items not shown)") logger.info(f"--- End of List Contents ---") # Optionally push contents to XCom if small enough # context['task_instance'].xcom_push(key='list_contents', value=contents) @@ -143,7 +147,7 @@ with DAG( type="string", description="Exact name of the Redis key (list/hash) to list contents for (e.g., 'video_queue_inbox_account_xyz', 'video_queue_progress', etc.)." ), - 'max_items': Param(DEFAULT_MAX_ITEMS, type="integer", description="Maximum number of items/fields to list from the key."), + 'max_items': Param(DEFAULT_MAX_ITEMS, type="integer", description="Maximum number of items/fields to list. For lists, shows the most recent items."), } ) as dag: diff --git a/dags/ytdlp_proc_sequential_processor.py b/dags/ytdlp_proc_sequential_processor.py index 617c810..68d5aa8 100644 --- a/dags/ytdlp_proc_sequential_processor.py +++ b/dags/ytdlp_proc_sequential_processor.py @@ -16,6 +16,7 @@ from airflow.models import BaseOperator, Variable from airflow.models.param import Param from airflow.operators.bash import BashOperator # Import BashOperator from airflow.operators.python import PythonOperator +from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.redis.hooks.redis import RedisHook from airflow.utils.dates import days_ago from airflow.utils.decorators import apply_defaults @@ -168,11 +169,13 @@ def handle_success(**context): info_json_path = ti.xcom_pull(task_ids='get_token', key='info_json_path') socks_proxy = ti.xcom_pull(task_ids='get_token', key='socks_proxy') ytdlp_command = ti.xcom_pull(task_ids='get_token', key='ytdlp_command') # Original command + downloaded_file_path = ti.xcom_pull(task_ids='download_video') # Pull from download_video task logger.info(f"Handling success for URL: {url}") logger.info(f" Info JSON Path: {info_json_path}") logger.info(f" SOCKS Proxy: {socks_proxy}") logger.info(f" YTDLP Command: {ytdlp_command[:100] if ytdlp_command else 'None'}...") # Log truncated command + logger.info(f" Downloaded File Path: {downloaded_file_path}") result_data = { 'status': 'success', @@ -180,6 +183,7 @@ def handle_success(**context): 'info_json_path': info_json_path, 'socks_proxy': socks_proxy, 'ytdlp_command': ytdlp_command, + 'downloaded_file_path': downloaded_file_path, 'url': url, 'dag_run_id': context['dag_run'].run_id, 'task_instance_key_str': context['task_instance_key_str'] # Record which task instance succeeded @@ -205,56 +209,76 @@ def handle_success(**context): def handle_failure(**context): - """Moves URL from progress to fail hash on failure.""" + """ + Handles failed processing. Depending on the `requeue_on_failure` parameter, + it either moves the URL to the fail hash or re-queues it in the inbox. + If `stop_on_failure` is True, this task will fail, stopping the DAG loop. + """ ti = context['task_instance'] url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url') if not url: logger.error("handle_failure called but no URL found from pop_url_from_queue XCom.") - # Cannot move to fail queue if URL is unknown return params = context['params'] queue_name = params['queue_name'] progress_queue = f"{queue_name}_progress" fail_queue = f"{queue_name}_fail" + inbox_queue = f"{queue_name}_inbox" redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) + requeue_on_failure = params.get('requeue_on_failure', False) + stop_on_failure = params.get('stop_on_failure', True) # Default to True - # Get failure reason from the exception context exception = context.get('exception') error_message = str(exception) if exception else "Unknown error" - # Get traceback if available tb_str = traceback.format_exc() if exception else "No traceback available." logger.info(f"Handling failure for URL: {url}") - logger.error(f" Failure Reason: {error_message}") # Log the error that triggered failure - logger.debug(f" Traceback:\n{tb_str}") # Log traceback at debug level - - fail_data = { - 'status': 'failed', - 'end_time': time.time(), - 'error': error_message, - 'traceback': tb_str, # Store traceback - 'url': url, - 'dag_run_id': context['dag_run'].run_id, - 'task_instance_key_str': context['task_instance_key_str'] # Record which task instance failed - } + logger.error(f" Failure Reason: {error_message}") + logger.debug(f" Traceback:\n{tb_str}") try: client = _get_redis_client(redis_conn_id) - # Remove from progress hash + # Always remove from progress hash first removed_count = client.hdel(progress_queue, url) if removed_count > 0: - logger.info(f"Removed URL '{url}' from progress hash '{progress_queue}'.") + logger.info(f"Removed URL '{url}' from progress hash '{progress_queue}'.") else: - logger.warning(f"URL '{url}' not found in progress hash '{progress_queue}' during failure handling.") + logger.warning(f"URL '{url}' not found in progress hash '{progress_queue}' during failure handling.") - # Add to fail hash - client.hset(fail_queue, url, json.dumps(fail_data)) - logger.info(f"Stored failure details for URL '{url}' in fail hash '{fail_queue}'.") + if requeue_on_failure: + # Re-queue the URL for another attempt + client.rpush(inbox_queue, url) + logger.info(f"Re-queued failed URL '{url}' to inbox '{inbox_queue}' for retry.") + else: + # Move to the permanent fail hash + fail_data = { + 'status': 'failed', + 'end_time': time.time(), + 'error': error_message, + 'traceback': tb_str, + 'url': url, + 'dag_run_id': context['dag_run'].run_id, + 'task_instance_key_str': context['task_instance_key_str'] + } + client.hset(fail_queue, url, json.dumps(fail_data)) + logger.info(f"Stored failure details for URL '{url}' in fail hash '{fail_queue}'.") except Exception as e: - logger.error(f"Error handling failure in Redis for URL '{url}': {e}", exc_info=True) - # Log error, but the task already failed. + logger.error(f"Error during failure handling in Redis for URL '{url}': {e}", exc_info=True) + # This is a critical error in the failure handling logic itself. + raise AirflowException(f"Could not handle failure in Redis: {e}") + + # After handling Redis, decide whether to fail the task to stop the loop + if stop_on_failure: + logger.error("stop_on_failure is True. Failing this task to stop the DAG loop.") + # Re-raise the original exception to fail the task instance. + # This is better than AirflowFailException because it preserves the original error. + if exception: + raise exception + else: + # If for some reason there's no exception, fail explicitly. + raise AirflowFailException("Failing task as per stop_on_failure=True, but original exception was not found.") # --- YtdlpOpsOperator --- @@ -569,17 +593,22 @@ class YtdlpOpsOperator(BaseOperator): # --- End Removed old logic block --- - # Get the original command from the server + # Get the original command from the server, or construct a fallback ytdlp_cmd = getattr(token_data, 'ytdlpCommand', None) - if not ytdlp_cmd: - logger.error("No 'ytdlpCommand' attribute found in token data.") - raise AirflowException("Required 'ytdlpCommand' not received from service.") + if ytdlp_cmd: + logger.info(f"Original command received from server: {ytdlp_cmd[:100]}...") # Log truncated + else: + logger.warning("No 'ytdlpCommand' attribute found in token data. Constructing a fallback for logging.") + # Construct a representative command for logging purposes + if socks_proxy: + ytdlp_cmd = f"yt-dlp --dump-json --proxy \"{socks_proxy}\" \"{url}\"" + else: + ytdlp_cmd = f"yt-dlp --dump-json \"{url}\"" + logger.info(f"Constructed fallback command: {ytdlp_cmd}") - logger.info(f"Original command received from server: {ytdlp_cmd[:100]}...") # Log truncated - - # Push the *original* command to XCom + # Push the command to XCom ti.xcom_push(key='ytdlp_command', value=ytdlp_cmd) - logger.info("Pushed original command to XCom key 'ytdlp_command'.") + logger.info("Pushed command to XCom key 'ytdlp_command'.") # No explicit return needed, success is implicit if no exception raised @@ -682,229 +711,10 @@ default_args = { } # Define DAG -with DAG( - dag_id='ytdlp_proc_sequential_processor', # New DAG ID - default_args=default_args, - schedule_interval=None, # Manually triggered or triggered by external sensor/event - catchup=False, - description='Processes YouTube URLs sequentially from a Redis queue using YTDLP Ops.', - tags=['ytdlp', 'thrift', 'client', 'sequential', 'queue', 'processor'], # Updated tags - params={ - # Define DAG parameters - 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Base name for Redis queues (e.g., 'video_queue' -> video_queue_inbox, video_queue_progress, etc.)."), - 'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="Airflow Redis connection ID."), - # YtdlpOpsOperator specific params (can be overridden at task level if needed) - 'redis_enabled': Param(False, type="boolean", description="Use Redis for service discovery? If False, uses service_ip/port."), # Default changed to False - 'service_ip': Param(None, type=["null", "string"], description="Required Service IP if redis_enabled=False."), # Clarified requirement - 'service_port': Param(None, type=["null", "integer"], description="Required Service port if redis_enabled=False."), # Clarified requirement - 'account_id': Param('default_account', type="string", description="Account ID for the API call (used for Redis lookup if redis_enabled=True)."), # Clarified usage - 'timeout': Param(DEFAULT_TIMEOUT, type="integer", description="Timeout in seconds for the Thrift connection."), - # save_info_json removed, always True - # get_socks_proxy removed, always True - # store_socks_proxy removed, always True - # Download specific parameters - 'download_format': Param( - # Default to best audio-only format (e.g., m4a) - 'ba[ext=m4a]/bestaudio/best', - type="string", - description="yt-dlp format selection string (e.g., 'ba' for best audio, 'wv*+wa/w' for worst video+audio)." - ), - 'output_path_template': Param( - # Simplified template, removed queue_name subdir - "{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloads') }}/%(title)s [%(id)s].%(ext)s", - type="string", - description="yt-dlp output template (e.g., '/path/to/downloads/%(title)s.%(ext)s'). Uses Airflow Variable 'DOWNLOADS_TEMP'." - ), - # Simplified info_json_dir, just uses DOWNLOADS_TEMP variable - 'info_json_dir': Param( - "{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}", - type="string", - description="Directory to save info.json. Uses Airflow Variable 'DOWNLOADS_TEMP'." - ) - } -) as dag: - - # --- Task Definitions --- - - pop_url = PythonOperator( - task_id='pop_url_from_queue', - python_callable=pop_url_from_queue, - # Params are implicitly passed via context - ) - pop_url.doc_md = """ - ### Pop URL from Inbox Queue - Pops the next available URL from the `{{ params.queue_name }}_inbox` Redis list. - Pushes the URL to XCom key `current_url`. - If the queue is empty, raises `AirflowSkipException` to skip downstream tasks. - """ - - move_to_progress = PythonOperator( - task_id='move_url_to_progress', - python_callable=move_url_to_progress, - trigger_rule='all_success', # Only run if pop_url succeeded (didn't skip) - ) - move_to_progress.doc_md = """ - ### Move URL to Progress Hash - Retrieves the `current_url` from XCom (pushed by `pop_url_from_queue`). - Adds the URL as a key to the `{{ params.queue_name }}_progress` Redis hash with status 'processing'. - This task is skipped if `pop_url_from_queue` was skipped. - """ - - # YtdlpOpsOperator task to get the token - get_token = YtdlpOpsOperator( - task_id='get_token', - # Operator params are inherited from DAG params by default, - # but can be overridden here if needed. - # We rely on the operator pulling the URL from XCom internally. - # Pass DAG params explicitly to ensure they are used if overridden - redis_conn_id="{{ params.redis_conn_id }}", - redis_enabled="{{ params.redis_enabled }}", - service_ip="{{ params.service_ip }}", - service_port="{{ params.service_port }}", - account_id="{{ params.account_id }}", - timeout="{{ params.timeout }}", - # save_info_json removed - info_json_dir="{{ params.info_json_dir }}", # Pass the simplified path template - # get_socks_proxy removed - # store_socks_proxy removed - retries=0, # Set operator retries to 0; failure handled by branching/failure handler - trigger_rule='all_success', # Only run if move_to_progress succeeded - ) - get_token.doc_md = """ - ### Get Token and Info Task - Connects to the YTDLP Thrift service for the URL pulled from XCom (`current_url`). - Retrieves token, metadata, command, and potentially proxy. Saves `info.json`. - Failure of this task triggers the `handle_failure` path. - Success triggers the `handle_success` path. - - **Pulls from XCom:** - - `current_url` (from `pop_url_from_queue`) - *Used internally* - - **Pushes to XCom:** - - `info_json_path` - - `socks_proxy` - - `ytdlp_command` - """ - - # Task to perform the actual download using yt-dlp - # Ensure info_json_path and socks_proxy are correctly quoted within the bash command - # Use {% raw %} {% endraw %} around Jinja if needed, but direct templating should work here. - # Added --no-simulate, --no-write-info-json, --ignore-errors, --no-progress - download_video = BashOperator( - task_id='download_video', - bash_command=""" - INFO_JSON_PATH="{{ ti.xcom_pull(task_ids='get_token', key='info_json_path') }}" - PROXY="{{ ti.xcom_pull(task_ids='get_token', key='socks_proxy') }}" - FORMAT="{{ params.download_format }}" - OUTPUT_TEMPLATE="{{ params.output_path_template }}" - - echo "Starting download..." - echo "Info JSON Path: $INFO_JSON_PATH" - echo "Proxy: $PROXY" - echo "Format: $FORMAT" - echo "Output Template: $OUTPUT_TEMPLATE" - - # Check if info.json path exists - if [ -z "$INFO_JSON_PATH" ] || [ ! -f "$INFO_JSON_PATH" ]; then - echo "Error: info.json path is missing or file does not exist ($INFO_JSON_PATH)." - exit 1 - fi - - # Construct command - CMD="yt-dlp --load-info-json \"$INFO_JSON_PATH\"" - - # Add proxy if it exists - if [ -n "$PROXY" ]; then - CMD="$CMD --proxy \"$PROXY\"" - fi - - # Add format and output template - CMD="$CMD -f \"$FORMAT\" -o \"$OUTPUT_TEMPLATE\"" - - # Add other useful flags - CMD_ARRAY=(yt-dlp --load-info-json "$INFO_JSON_PATH") - - # Add proxy if it exists - if [ -n "$PROXY" ]; then - CMD_ARRAY+=(--proxy "$PROXY") - fi - - # Add format and output template - CMD_ARRAY+=(-f "$FORMAT" -o "$OUTPUT_TEMPLATE") - - # Add other useful flags - CMD_ARRAY+=(--no-progress --no-simulate --no-write-info-json --ignore-errors --verbose) - - echo "Executing command array:" - # Use printf to safely quote and display the command array - printf "%q " "${CMD_ARRAY[@]}" - echo "" # Newline after command - - # Execute the command directly using the array - "${CMD_ARRAY[@]}" - - # Check exit code - EXIT_CODE=$? - if [ $EXIT_CODE -ne 0 ]; then - echo "Error: yt-dlp command failed with exit code $EXIT_CODE" - exit $EXIT_CODE - fi - echo "Download command completed successfully." - """, - trigger_rule='all_success', # Run only if get_token succeeded - ) - download_video.doc_md = """ - ### Download Video/Audio Task - Executes `yt-dlp` using the `info.json` and proxy obtained from the `get_token` task. - Uses the `download_format` and `output_path_template` parameters from the DAG run configuration. - Failure of this task triggers the `handle_failure` path. - - **Pulls from XCom (task_id='get_token'):** - - `info_json_path` - - `socks_proxy` - """ - - - # Task to handle successful token retrieval AND download - success_handler = PythonOperator( - task_id='handle_success', - python_callable=handle_success, - trigger_rule='all_success', # Run only if get_token succeeds - ) - success_handler.doc_md = """ - ### Handle Success Task - Runs after `get_token` succeeds. - Retrieves `current_url` and results from `get_token` via XCom. - Removes the URL from the `{{ params.queue_name }}_progress` hash. - Adds the URL and results to the `{{ params.queue_name }}_result` hash. - """ - - # Task to handle failed token retrieval or download - failure_handler = PythonOperator( - task_id='handle_failure', - python_callable=handle_failure, - trigger_rule='one_failed', # Run only if get_token or download_video fails - ) - failure_handler.doc_md = """ - ### Handle Failure Task - # Runs after `get_token` (or potentially `move_url_to_progress`) fails. - # Retrieves `current_url` from XCom. - # Retrieves the error message and traceback from the context. - # Removes the URL from the `{{ params.queue_name }}_progress` hash. - # Adds the URL and error details to the `{{ params.queue_name }}_fail` hash. - # **Important:** This task succeeding means the failure was *handled*, the DAG run itself might still be marked as failed if `get_token` failed. - # """ - - - # --- Task Dependencies --- - # Core processing flow - pop_url >> move_to_progress >> get_token >> download_video - - # Handlers depend on the outcome of both token retrieval and download - # Success handler runs only if download_video succeeds - download_video >> success_handler # Default trigger_rule='all_success' is suitable - - # Failure handler runs if either get_token or download_video fails - [get_token, download_video] >> failure_handler # Uses trigger_rule='one_failed' defined in the task - -# Removed Jinja filters as they are no longer needed for the simplified info_json_dir +# +# --- DAG Block Deactivated on 2025-07-16 --- +# This DAG has been replaced by the Sensor/Worker pattern implemented in: +# - ytdlp_sensor_redis_queue.py (polls the queue) +# - ytdlp_worker_per_url.py (processes a single URL) +# This code is kept for reference but is not active. +# diff --git a/dags/ytdlp_sensor_redis_queue.py b/dags/ytdlp_sensor_redis_queue.py new file mode 100644 index 0000000..b592b12 --- /dev/null +++ b/dags/ytdlp_sensor_redis_queue.py @@ -0,0 +1,206 @@ +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# +# Copyright © 2024 rl +# +# Distributed under terms of the MIT license. + +""" +DAG to sense a Redis queue for new URLs and trigger the ytdlp_worker_per_url DAG. +This is the "Sensor" part of a Sensor/Worker pattern. +""" + +from airflow import DAG +from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.operators.python import PythonOperator +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.providers.redis.hooks.redis import RedisHook +from airflow.models.param import Param +from airflow.utils.dates import days_ago +from datetime import timedelta +import logging +import redis + +# Configure logging +logger = logging.getLogger(__name__) + +# Default settings +DEFAULT_QUEUE_NAME = 'video_queue' +DEFAULT_REDIS_CONN_ID = 'redis_default' +DEFAULT_TIMEOUT = 30 +DEFAULT_MAX_URLS = '1' # Default number of URLs to process per run + +# --- Helper Functions --- + +def _get_redis_client(redis_conn_id): + """Gets a Redis client connection using RedisHook.""" + try: + hook = RedisHook(redis_conn_id=redis_conn_id) + client = hook.get_conn() + client.ping() + logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.") + return client + except redis.exceptions.AuthenticationError: + logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.") + raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.") + except Exception as e: + logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}") + raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}") + +# --- Task Callables --- + +def log_trigger_info_callable(**context): + """Logs information about how the DAG run was triggered.""" + dag_run = context['dag_run'] + trigger_type = dag_run.run_type + logger.info(f"Sensor DAG triggered. Run ID: {dag_run.run_id}, Type: {trigger_type}") + + if trigger_type == 'manual': + logger.info("Trigger source: Manual execution from Airflow UI or CLI.") + elif trigger_type == 'dag_run': + # In Airflow 2.2+ we can get the triggering run object + try: + triggering_dag_run = dag_run.get_triggering_dagrun() + if triggering_dag_run: + triggering_dag_id = triggering_dag_run.dag_id + triggering_run_id = triggering_dag_run.run_id + logger.info(f"Trigger source: DAG Run from '{triggering_dag_id}' (Run ID: {triggering_run_id}).") + # Check if it's a worker by looking at the conf keys + conf = dag_run.conf or {} + if all(k in conf for k in ['queue_name', 'redis_conn_id', 'max_urls_per_run']): + logger.info("This appears to be a standard trigger from a worker DAG continuing the loop.") + else: + logger.warning(f"Triggered by another DAG but conf does not match worker pattern. Conf: {conf}") + else: + logger.warning("Trigger type is 'dag_run' but could not retrieve triggering DAG run details.") + except Exception as e: + logger.error(f"Could not get triggering DAG run details: {e}") + else: + logger.info(f"Trigger source: {trigger_type}") + + +def check_queue_for_urls_batch(**context): + """ + Pops a batch of URLs from the inbox queue. + Returns a list of configuration dictionaries for the TriggerDagRunOperator. + If the queue is empty, it raises AirflowSkipException. + """ + params = context['params'] + queue_name = params['queue_name'] + inbox_queue = f"{queue_name}_inbox" + redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) + max_urls_raw = params.get('max_urls_per_run', DEFAULT_MAX_URLS) + try: + max_urls = int(max_urls_raw) + except (ValueError, TypeError): + logger.warning(f"Invalid value for max_urls_per_run: '{max_urls_raw}'. Using default: {DEFAULT_MAX_URLS}") + max_urls = DEFAULT_MAX_URLS + + urls_to_process = [] + try: + client = _get_redis_client(redis_conn_id) + current_queue_size = client.llen(inbox_queue) + logger.info(f"Queue '{inbox_queue}' has {current_queue_size} URLs. Attempting to pop up to {max_urls}.") + + for _ in range(max_urls): + url_bytes = client.lpop(inbox_queue) + if url_bytes: + url = url_bytes.decode('utf-8') if isinstance(url_bytes, bytes) else url_bytes + logger.info(f" - Popped URL: {url}") + urls_to_process.append(url) + else: + # Queue is empty, stop trying to pop + break + + if urls_to_process: + logger.info(f"Found {len(urls_to_process)} URLs in queue. Generating trigger configurations.") + # Create a list of 'conf' objects for the trigger operator to expand + trigger_configs = [] + for url in urls_to_process: + # The worker DAG will use its own default params for its operations. + # We only need to provide the URL for processing, and the sensor's own + # params so the worker can trigger the sensor again to continue the loop. + worker_conf = { + 'url': url, + 'queue_name': queue_name, + 'redis_conn_id': redis_conn_id, + 'max_urls_per_run': int(max_urls), + 'stop_on_failure': params.get('stop_on_failure', True) + } + trigger_configs.append(worker_conf) + return trigger_configs + else: + logger.info(f"Queue '{inbox_queue}' is empty. Skipping trigger.") + raise AirflowSkipException(f"Redis queue '{inbox_queue}' is empty.") + except AirflowSkipException: + raise + except Exception as e: + logger.error(f"Error popping URLs from Redis queue '{inbox_queue}': {e}", exc_info=True) + raise AirflowException(f"Failed to pop URLs from Redis: {e}") + +# ============================================================================= +# DAG Definition +# ============================================================================= + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 0, # The sensor itself should not retry on failure, it will run again on schedule + 'start_date': days_ago(1), +} + +with DAG( + dag_id='ytdlp_sensor_redis_queue', + default_args=default_args, + schedule_interval=None, # This DAG is now only triggered (manually or by a worker) + max_active_runs=1, # Prevent multiple sensors from running at once + catchup=False, + description='Polls Redis queue for a batch of URLs and triggers parallel worker DAGs.', + tags=['ytdlp', 'sensor', 'queue', 'redis', 'batch'], + params={ + 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Base name for Redis queues."), + 'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="Airflow Redis connection ID."), + 'max_urls_per_run': Param(DEFAULT_MAX_URLS, type="string", description="Maximum number of URLs to process in one batch."), + 'stop_on_failure': Param(True, type="boolean", description="If True, a worker failure will stop the entire processing loop."), + } +) as dag: + + log_trigger_info_task = PythonOperator( + task_id='log_trigger_info', + python_callable=log_trigger_info_callable, + ) + log_trigger_info_task.doc_md = """ + ### Log Trigger Information + Logs details about how this DAG run was initiated (e.g., manually or by a worker DAG). + This provides visibility into the processing loop. + """ + + poll_redis_task = PythonOperator( + task_id='check_queue_for_urls_batch', + python_callable=check_queue_for_urls_batch, + ) + poll_redis_task.doc_md = """ + ### Poll Redis Queue for Batch + Checks the Redis inbox queue for a batch of new URLs (up to `max_urls_per_run`). + - **On Success (URLs found):** Returns a list of configuration objects for the trigger task. + - **On Skip (Queue empty):** Skips this task and the trigger task. The DAG run succeeds. + """ + + # This operator will be dynamically expanded based on the output of poll_redis_task + trigger_worker_dags = TriggerDagRunOperator.partial( + task_id='trigger_worker_dags', + trigger_dag_id='ytdlp_worker_per_url', + wait_for_completion=False, # Fire and forget + doc_md=""" +### Trigger Worker DAGs (Dynamically Mapped) +Triggers one `ytdlp_worker_per_url` DAG run for each URL found by the polling task. +Each triggered DAG receives its own specific configuration (including the URL). +This task is skipped if the polling task finds no URLs. +""" + ).expand( + conf=poll_redis_task.output + ) + + log_trigger_info_task >> poll_redis_task >> trigger_worker_dags diff --git a/dags/ytdlp_worker_per_url.py b/dags/ytdlp_worker_per_url.py new file mode 100644 index 0000000..1cbdffc --- /dev/null +++ b/dags/ytdlp_worker_per_url.py @@ -0,0 +1,479 @@ +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# +# Copyright © 2024 rl +# +# Distributed under terms of the MIT license. + +""" +DAG for processing a single YouTube URL passed via DAG run configuration. +This is the "Worker" part of a Sensor/Worker pattern. +""" + +from airflow import DAG +from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.models import BaseOperator, Variable +from airflow.models.param import Param +from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.providers.redis.hooks.redis import RedisHook +from airflow.utils.dates import days_ago +from airflow.utils.decorators import apply_defaults +from datetime import datetime, timedelta +from pangramia.yt.common.ttypes import TokenUpdateMode +from pangramia.yt.exceptions.ttypes import PBServiceException +from pangramia.yt.tokens_ops import YTTokenOpService +from thrift.protocol import TBinaryProtocol +from thrift.transport import TSocket, TTransport +from thrift.transport.TTransport import TTransportException +import json +import logging +import os +import redis +import socket +import time + +# Configure logging +logger = logging.getLogger(__name__) + +# Default settings +DEFAULT_QUEUE_NAME = 'video_queue' +DEFAULT_REDIS_CONN_ID = 'redis_default' +DEFAULT_MAX_URLS = 1 +DEFAULT_TIMEOUT = 30 # Default Thrift timeout in seconds + +# --- Helper Functions --- + +def _extract_video_id(url): + """Extracts YouTube video ID from URL.""" + if not url or not isinstance(url, str): + logger.debug("URL is empty or not a string, cannot extract video ID.") + return None + try: + video_id = None + if 'youtube.com/watch?v=' in url: + video_id = url.split('v=')[1].split('&')[0] + elif 'youtu.be/' in url: + video_id = url.split('youtu.be/')[1].split('?')[0] + + if video_id and len(video_id) >= 11: + video_id = video_id[:11] # Standard ID length + logger.debug(f"Extracted video ID '{video_id}' from URL: {url}") + return video_id + else: + logger.debug(f"Could not extract a standard video ID pattern from URL: {url}") + return None + except Exception as e: + logger.error(f"Failed to extract video ID from URL '{url}'. Error: {e}") + return None + +# --- Queue Management Callables (for success/failure reporting) --- + +def handle_success(**context): + """Moves URL from progress to result hash on success.""" + ti = context['task_instance'] + params = context['params'] + url = params.get('url') # Get URL from params, not XCom + if not url: + logger.warning("handle_success called but no URL found in DAG run parameters.") + return + + queue_name = params['queue_name'] + result_queue = f"{queue_name}_result" + redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) + + # Pull results from previous tasks + info_json_path = ti.xcom_pull(task_ids='get_token', key='info_json_path') + socks_proxy = ti.xcom_pull(task_ids='get_token', key='socks_proxy') + ytdlp_command = ti.xcom_pull(task_ids='get_token', key='ytdlp_command') + downloaded_file_path = ti.xcom_pull(task_ids='download_video') + + logger.info(f"Handling success for URL: {url}") + logger.info(f" Downloaded File Path: {downloaded_file_path}") + + result_data = { + 'status': 'success', + 'end_time': time.time(), + 'info_json_path': info_json_path, + 'socks_proxy': socks_proxy, + 'ytdlp_command': ytdlp_command, + 'downloaded_file_path': downloaded_file_path, + 'url': url, + 'dag_run_id': context['dag_run'].run_id, + } + + try: + # In the worker pattern, there's no "progress" hash to remove from. + # We just add the result to the success hash. + client = YtdlpOpsOperator._get_redis_client(redis_conn_id) + client.hset(result_queue, url, json.dumps(result_data)) + logger.info(f"Stored success result for URL '{url}' in result hash '{result_queue}'.") + except Exception as e: + logger.error(f"Error handling success in Redis for URL '{url}': {e}", exc_info=True) + # Log error but don't fail the task, as the main work succeeded. + +def handle_failure(**context): + """ + Handles failed processing. Moves the URL to the fail hash and, if stop_on_failure + is True, fails the task to make the DAG run failure visible. + """ + ti = context['task_instance'] + params = context['params'] + url = params.get('url') # Get URL from params + if not url: + logger.error("handle_failure called but no URL found in DAG run parameters.") + return + + queue_name = params['queue_name'] + fail_queue = f"{queue_name}_fail" + inbox_queue = f"{queue_name}_inbox" + redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) + requeue_on_failure = params.get('requeue_on_failure', False) + stop_on_failure = params.get('stop_on_failure', True) + + exception = context.get('exception') + error_message = str(exception) if exception else "Unknown error" + + logger.info(f"Handling failure for URL: {url}") + logger.error(f" Failure Reason: {error_message}") + + try: + client = YtdlpOpsOperator._get_redis_client(redis_conn_id) + if requeue_on_failure: + client.rpush(inbox_queue, url) + logger.info(f"Re-queued failed URL '{url}' to inbox '{inbox_queue}' for retry.") + else: + fail_data = { + 'status': 'failed', + 'end_time': time.time(), + 'error': error_message, + 'url': url, + 'dag_run_id': context['dag_run'].run_id, + } + client.hset(fail_queue, url, json.dumps(fail_data)) + logger.info(f"Stored failure details for URL '{url}' in fail hash '{fail_queue}'.") + except Exception as e: + logger.error(f"Critical error during failure handling in Redis for URL '{url}': {e}", exc_info=True) + # This is a critical error in the failure handling logic itself. + raise AirflowException(f"Could not handle failure in Redis: {e}") + + # If stop_on_failure is True, we should fail this task to make the DAG run fail. + # The loop is already stopped by the DAG structure, but this makes the failure visible. + if stop_on_failure: + logger.error("stop_on_failure is True. Failing this task to mark the DAG run as failed.") + # Re-raise the original exception to fail the task instance. + if exception: + raise exception + else: + # If for some reason there's no exception, fail explicitly. + raise AirflowException("Failing task as per stop_on_failure=True, but original exception was not found.") + +# --- YtdlpOpsOperator --- + +class YtdlpOpsOperator(BaseOperator): + """ + Custom Airflow operator to interact with YTDLP Thrift service. + Processes a single URL passed via DAG run configuration. + """ + template_fields = ('service_ip', 'service_port', 'account_id', 'timeout', 'info_json_dir') + + @staticmethod + def _get_redis_client(redis_conn_id): + """Gets a Redis client connection using RedisHook.""" + try: + hook = RedisHook(redis_conn_id=redis_conn_id) + client = hook.get_conn() + client.ping() + logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.") + return client + except redis.exceptions.AuthenticationError: + logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.") + raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.") + except Exception as e: + logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}") + raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}") + + @apply_defaults + def __init__(self, + service_ip=None, + service_port=None, + account_id=None, + info_json_dir=None, + timeout=DEFAULT_TIMEOUT, + *args, **kwargs): + super().__init__(*args, **kwargs) + + logger.info(f"Initializing YtdlpOpsOperator (Worker Version) with parameters: " + f"service_ip={service_ip}, service_port={service_port}, " + f"account_id={account_id}, info_json_dir={info_json_dir}, timeout={timeout}") + + if not service_ip or not service_port: + raise ValueError("Both service_ip and service_port must be specified.") + if not account_id: + logger.warning("No account_id provided. Ensure it's set in DAG params or operator config.") + + self.service_ip = service_ip + self.service_port = service_port + self.account_id = account_id + self.info_json_dir = info_json_dir + self.timeout = timeout + + def execute(self, context): + logger.info("Executing YtdlpOpsOperator (Worker Version)") + transport = None + ti = context['task_instance'] + + try: + params = context['params'] + url = params.get('url') + if not url: + raise AirflowException("DAG was triggered without a 'url' in its configuration.") + logger.info(f"Processing URL from DAG run config: {url}") + + service_ip = self.render_template(self.service_ip, context) + service_port_rendered = self.render_template(self.service_port, context) + account_id = self.render_template(self.account_id, context) + timeout_rendered = self.render_template(self.timeout, context) + info_json_dir = self.render_template(self.info_json_dir, context) + + host = params.get('service_ip', service_ip) + port_str = params.get('service_port', service_port_rendered) + account_id = params.get('account_id', account_id) + + logger.info(f"Using direct connection settings: service_ip={host}, service_port={port_str}") + + if not host or not port_str: + raise ValueError("Direct connection requires service_ip and service_port") + try: + port = int(port_str) + except (ValueError, TypeError): + raise ValueError(f"Invalid service_port value: {port_str}") + + try: + timeout = int(timeout_rendered) + if timeout <= 0: raise ValueError("Timeout must be positive") + except (ValueError, TypeError): + timeout = DEFAULT_TIMEOUT + + socket_conn = TSocket.TSocket(host, port, socket_family=socket.AF_INET) + socket_conn.setTimeout(timeout * 1000) + transport = TTransport.TFramedTransport(socket_conn) + protocol = TBinaryProtocol.TBinaryProtocol(transport) + client = YTTokenOpService.Client(protocol) + + transport.open() + logger.info("Successfully connected to Thrift server.") + client.ping() + logger.info("Server ping successful.") + + token_data = client.getOrRefreshToken( + accountId=account_id, + updateType=TokenUpdateMode.AUTO, + url=url + ) + logger.info("Successfully retrieved token data from service.") + + info_json_path = None + info_json = self._get_info_json(token_data) + if info_json and self._is_valid_json(info_json): + info_json_path = self._save_info_json(context, info_json, url, account_id, info_json_dir) + if info_json_path: + ti.xcom_push(key='info_json_path', value=info_json_path) + else: + ti.xcom_push(key='info_json_path', value=None) + else: + ti.xcom_push(key='info_json_path', value=None) + + socks_proxy = None + proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None) + if proxy_attr: + socks_proxy = getattr(token_data, proxy_attr) + ti.xcom_push(key='socks_proxy', value=socks_proxy) + + ytdlp_cmd = getattr(token_data, 'ytdlpCommand', None) + ti.xcom_push(key='ytdlp_command', value=ytdlp_cmd) + + except Exception as e: + logger.error(f"YtdlpOpsOperator (Worker) failed: {e}", exc_info=True) + raise AirflowException(f"Task failed: {e}") + finally: + if transport and transport.isOpen(): + transport.close() + + def _get_info_json(self, token_data): + return getattr(token_data, 'infoJson', None) + + def _is_valid_json(self, json_str): + if not json_str or not isinstance(json_str, str): return False + try: + json.loads(json_str) + return True + except json.JSONDecodeError: + return False + + def _save_info_json(self, context, info_json, url, account_id, rendered_info_json_dir): + try: + video_id = _extract_video_id(url) + save_dir = rendered_info_json_dir or "." + os.makedirs(save_dir, exist_ok=True) + timestamp = int(time.time()) + base_filename = f"info_{video_id or 'unknown'}_{account_id}_{timestamp}.json" + info_json_path = os.path.join(save_dir, base_filename) + with open(info_json_path, 'w', encoding='utf-8') as f: + f.write(info_json) + return info_json_path + except Exception as e: + logger.error(f"Failed to save info.json: {e}", exc_info=True) + return None + +# ============================================================================= +# DAG Definition +# ============================================================================= + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=1), + 'start_date': days_ago(1), +} + +with DAG( + dag_id='ytdlp_worker_per_url', + default_args=default_args, + schedule_interval=None, + catchup=False, + description='Processes a single YouTube URL passed via configuration.', + tags=['ytdlp', 'thrift', 'client', 'worker'], + params={ + 'url': Param(None, type=["string", "null"], description="The YouTube URL to process. This is set by the triggering DAG."), + # Sensor params (passed through to re-trigger the sensor, with defaults for standalone runs) + 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Sensor param: Base name for Redis queues."), + 'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="Sensor param: Airflow Redis connection ID."), + 'max_urls_per_run': Param(DEFAULT_MAX_URLS, type="integer", description="Sensor param: Maximum number of URLs to process in one batch."), + # Worker-specific params + 'service_ip': Param('89.253.221.173', type="string", description="Service IP."), + 'service_port': Param(9090, type="integer", description="Service port."), + 'account_id': Param('default_account', type="string", description="Account ID for the API call."), + 'timeout': Param(DEFAULT_TIMEOUT, type="integer", description="Timeout in seconds for the Thrift connection."), + 'download_format': Param('ba[ext=m4a]/bestaudio/best', type="string", description="yt-dlp format selection string."), + 'output_path_template': Param("%(title)s [%(id)s].%(ext)s", type="string", description="yt-dlp output filename template."), + 'info_json_dir': Param("{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}", type="string", description="Directory to save info.json."), + 'requeue_on_failure': Param(False, type="boolean", description="If True, re-adds the URL to the inbox on failure instead of moving to the fail hash."), + 'stop_on_failure': Param(True, type="boolean", description="If True, a worker failure will stop the entire processing loop."), + } +) as dag: + + get_token = YtdlpOpsOperator( + task_id='get_token', + service_ip="{{ params.service_ip }}", + service_port="{{ params.service_port }}", + account_id="{{ params.account_id }}", + timeout="{{ params.timeout }}", + info_json_dir="{{ params.info_json_dir }}", + retries=0, + ) + + download_video = BashOperator( + task_id='download_video', + bash_command=""" + INFO_JSON_PATH="{{ ti.xcom_pull(task_ids='get_token', key='info_json_path') }}" + PROXY="{{ ti.xcom_pull(task_ids='get_token', key='socks_proxy') }}" + FORMAT="{{ params.download_format }}" + DOWNLOAD_DIR="{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles/video') }}" + FILENAME_TEMPLATE="{{ params.output_path_template }}" + FULL_OUTPUT_PATH="$DOWNLOAD_DIR/$FILENAME_TEMPLATE" + + echo "Starting download..." + echo "Info JSON Path: $INFO_JSON_PATH" + echo "Proxy: $PROXY" + echo "Format: $FORMAT" + echo "Download Directory: $DOWNLOAD_DIR" + echo "Full Output Path: $FULL_OUTPUT_PATH" + + if [ -z "$INFO_JSON_PATH" ] || [ ! -f "$INFO_JSON_PATH" ]; then + echo "Error: info.json path is missing or file does not exist ($INFO_JSON_PATH)." + exit 1 + fi + + CMD_ARRAY=(yt-dlp --load-info-json "$INFO_JSON_PATH") + if [ -n "$PROXY" ]; then + CMD_ARRAY+=(--proxy "$PROXY") + fi + CMD_ARRAY+=(-f "$FORMAT" -o "$FULL_OUTPUT_PATH" --print filename) + CMD_ARRAY+=(--no-progress --no-simulate --no-write-info-json --ignore-errors --no-playlist) + + printf "Executing: %q " "${CMD_ARRAY[@]}" + echo "" + + FINAL_FILENAME=$("${CMD_ARRAY[@]}") + EXIT_CODE=$? + + echo "yt-dlp exited with code: $EXIT_CODE" + + if [ $EXIT_CODE -ne 0 ]; then + echo "Error: yt-dlp command failed." + exit $EXIT_CODE + fi + if [ -z "$FINAL_FILENAME" ] || [ ! -f "$FINAL_FILENAME" ]; then + echo "Error: Download failed or did not produce a file." + exit 1 + fi + echo "SUCCESS: Final file confirmed at: $FINAL_FILENAME" + echo "$FINAL_FILENAME" + """, + retries=3, + retry_delay=timedelta(minutes=2), + ) + + # This task triggers the sensor DAG to check for more work as soon as this worker is done. + trigger_sensor_for_next_batch = TriggerDagRunOperator( + task_id='trigger_sensor_for_next_batch', + trigger_dag_id='ytdlp_sensor_redis_queue', + # Pass only the sensor's needed parameters back to it. + # These values were originally passed from the sensor to this worker. + # The values are templated and will be passed as strings to the triggered DAG. + conf={ + "queue_name": "{{ params.queue_name }}", + "redis_conn_id": "{{ params.redis_conn_id }}", + "max_urls_per_run": "{{ params.max_urls_per_run }}", + }, + # This task will only run on the success path, so it inherits the default + # trigger_rule='all_success'. + wait_for_completion=False, + ) + trigger_sensor_for_next_batch.doc_md = """ + ### Trigger Sensor for Next Batch + Triggers a new run of the `ytdlp_sensor_redis_queue` DAG to create a continuous processing loop. + This task runs after the main processing tasks are complete (either success or failure), + ensuring that the system immediately checks for more URLs to process. + """ + + # Define success and failure handling tasks + success_task = PythonOperator( + task_id='handle_success', + python_callable=handle_success, + trigger_rule='all_success', # Run only if upstream tasks succeeded + ) + + failure_task = PythonOperator( + task_id='handle_failure', + python_callable=handle_failure, + trigger_rule='one_failed', # Run if any upstream task failed + ) + + # --- Define Task Dependencies --- + + # The main processing flow + get_token >> download_video + + # Branch after download: one path for success, one for failure + download_video >> success_task + download_video >> failure_task + + # The trigger to continue the loop ONLY runs on the success path. + # A failure will be recorded in Redis by `handle_failure` and then the loop will stop. + success_task >> trigger_sensor_for_next_batch diff --git a/docker-compose-ytdlp-ops.yaml b/docker-compose-ytdlp-ops.yaml index ff7a153..8ef3e54 100644 --- a/docker-compose-ytdlp-ops.yaml +++ b/docker-compose-ytdlp-ops.yaml @@ -7,9 +7,8 @@ services: dockerfile: Dockerfile ports: # Optionally expose the camoufox port to the host for debugging - # - "12345:12345" - - "12345" # Expose port within the docker network, pass in Dockerfile - - "5900:5900" # Expose VNC port to the host + - "12345:12345" + - "5900:5900" # Expose VNC port to the host, still not working networks: - airflow_prod_proxynet command: [ @@ -25,7 +24,7 @@ services: # Add healthcheck if desired ytdlp-ops: - image: pangramia/ytdlp-ops-server:latest # Don't comment + image: pangramia/ytdlp-ops-server:latest # Don't comment out or remove, build is performed externally depends_on: - camoufox # Ensure camoufox starts first ports: @@ -43,7 +42,7 @@ services: - "--port" - "9090" - "--clients" - # Add 'web' client since we now have camoufox + # Add 'web' client since we now have camoufox, test firstly - "web,ios,android,mweb" - "--proxy" - "socks5://sslocal-rust-1082:1082"