# -*- coding: utf-8 -*- """ Airflow DAG for manually adding YouTube URLs or Video IDs to a Redis queue. """ from __future__ import annotations import json import logging import re from typing import List, Optional import csv import os from datetime import datetime from airflow.exceptions import AirflowException from airflow.models.dag import DAG from airflow.models.param import Param from airflow.operators.python import PythonOperator, BranchPythonOperator from airflow.operators.empty import EmptyOperator from airflow.operators.bash import BashOperator from airflow.providers.celery.executors.celery_executor import app as celery_app from airflow.providers.redis.hooks.redis import RedisHook from airflow.utils.dates import days_ago from airflow.models.variable import Variable import requests # Configure logging logger = logging.getLogger(__name__) # Default settings DEFAULT_REDIS_CONN_ID = "redis_default" DEFAULT_QUEUE_NAME = "video_queue" DEFAULT_QUEUE_TO_CLEAR = 'PLEASE_SPECIFY_QUEUE_TO_CLEAR' DEFAULT_URL_LISTS_DIR = '/opt/airflow/inputfiles' # --- Helper Functions --- def _get_redis_client(redis_conn_id: str): """Gets a Redis client from an Airflow connection.""" try: redis_hook = RedisHook(redis_conn_id=redis_conn_id) return redis_hook.get_conn() except Exception as e: logger.error(f"Failed to connect to Redis using connection '{redis_conn_id}': {e}") raise AirflowException(f"Redis connection failed: {e}") def _get_predefined_url_lists(): """Returns a static list of predefined URL list files.""" # This is a static list to ensure options are always visible in the UI, # even if the files don't exist on the filesystem at parse time. # The DAG will check for the file's existence at runtime. predefined_files = [ 'urls.dh128.json', 'urls.ixbt2045.json', 'urls.news1000.json', 'urls.rt100.json', 'urls.rt250_01.txt', 'urls.rt250_02.txt', 'urls.rt250_03.txt', 'urls.rt250_04.txt', 'urls.rt250_05.txt', 'urls.rt250_06.txt', 'urls.rt250_07.txt', 'urls.rt250_08.txt', 'urls.rt250_11.txt', 'urls.rt250_12.txt', 'urls.rt250_13.txt', 'urls.rt250_14.txt', 'urls.rt250_15.txt', 'urls.rt250_16.txt', 'urls.rt250_17.txt', 'urls.rt250_18.txt', 'urls.rt3700.txt', 'urls.sky28.json', 'urls.sky3.json', 'urls.tq46.json', ] return ['None'] + sorted(predefined_files) def _get_urls_from_source(**params) -> List[str]: """ Determines the source of video inputs based on the 'input_source' param and returns a list of raw items. """ input_source = params.get("input_source", "manual") predefined_list = params.get("predefined_url_list") file_path_or_url = params.get("url_list_file_path") manual_inputs = params.get("video_inputs") # Source 1: Predefined file if input_source == 'predefined_file': if not predefined_list or predefined_list == 'None': raise AirflowException("Input source is 'predefined_file', but no file was selected from the list.") default_path = DEFAULT_URL_LISTS_DIR url_lists_dir = Variable.get('YTDLP_URL_LISTS_DIR', default_var=default_path) file_path = os.path.join(url_lists_dir, predefined_list) logger.info(f"Loading URLs from predefined file: {file_path}") if not os.path.exists(file_path): raise AirflowException(f"Selected predefined file does not exist: {file_path}") with open(file_path, 'r', encoding='utf-8') as f: if predefined_list.lower().endswith('.json'): logger.info(f"Parsing '{predefined_list}' as a JSON file.") try: data = json.load(f) if not isinstance(data, list): raise AirflowException(f"JSON file '{predefined_list}' must contain a list of strings.") return [str(item) for item in data] except json.JSONDecodeError: raise AirflowException(f"Failed to parse JSON from file: {predefined_list}") elif predefined_list.lower().endswith('.txt'): logger.info(f"Parsing '{predefined_list}' as a text file (one URL per line).") return [line.strip() for line in f if line.strip()] else: raise AirflowException(f"Unsupported file type for predefined file: '{predefined_list}'. Must be .json or .txt.") # Source 2: File path or URL elif input_source == 'file_path_or_url': if not file_path_or_url: raise AirflowException("Input source is 'file_path_or_url', but no path/URL was provided.") logger.info(f"Loading URLs from provided path/URL: {file_path_or_url}") content = "" if file_path_or_url.startswith(('http://', 'https://')): try: response = requests.get(file_path_or_url, timeout=30) response.raise_for_status() content = response.text except requests.RequestException as e: raise AirflowException(f"Failed to fetch URL list from '{file_path_or_url}': {e}") else: # Assume local file path if not os.path.exists(file_path_or_url): raise AirflowException(f"Provided file path does not exist: {file_path_or_url}") with open(file_path_or_url, 'r', encoding='utf-8') as f: content = f.read() try: data = json.loads(content) if not isinstance(data, list): raise AirflowException("JSON content from path/URL must contain a list of strings.") return [str(item) for item in data] except json.JSONDecodeError: raise AirflowException(f"Failed to parse JSON from path/URL: {file_path_or_url}") # Source 3: Manual input elif input_source == 'manual': if not manual_inputs: logger.info("Input source is 'manual', but no inputs were provided. Nothing to do.") return [] logger.info("Loading URLs from manual input.") return parse_video_inputs(manual_inputs) else: logger.warning(f"No valid input source selected or no data provided for the selected source. Nothing to do.") return [] def parse_video_inputs(input_str: str) -> List[str]: """Parses a flexible string of video inputs into a list of individual items.""" if not input_str or not isinstance(input_str, str): return [] input_str = input_str.strip() # 1. Try to parse as a JSON array if input_str.startswith("[") and input_str.endswith("]"): try: items = json.loads(input_str) if isinstance(items, list): logger.info("Successfully parsed input as a JSON array.") return [str(item).strip() for item in items] except json.JSONDecodeError: logger.warning("Input looked like a JSON array but failed to parse. Treating as a comma-separated string.") # 2. Treat as a comma-separated string items = [item.strip() for item in input_str.split(",")] # 3. Clean up quotes and extra whitespace from each item cleaned_items = [] for item in items: if item.startswith(('"', "'")) and item.endswith(('"', "'")): item = item[1:-1] if item: # Only add non-empty items cleaned_items.append(item.strip()) return cleaned_items def normalize_to_url(item: str) -> Optional[str]: """ Validates if an item is a recognizable YouTube URL or video ID, and normalizes it to a standard watch URL format. """ if not item: return None # Regex for a standard 11-character YouTube video ID video_id_pattern = r"^[a-zA-Z0-9_-]{11}$" # Check if the item itself is a video ID if re.match(video_id_pattern, item): video_id = item return f"https://www.youtube.com/watch?v={video_id}" # Comprehensive regex to extract video ID from various URL formats # Covers: watch, youtu.be, shorts, embed, /v/ url_patterns = [ r"(?:v=|\/v\/|youtu\.be\/|embed\/|shorts\/)([a-zA-Z0-9_-]{11})" ] for pattern in url_patterns: match = re.search(pattern, item) if match: video_id = match.group(1) return f"https://www.youtube.com/watch?v={video_id}" logger.warning(f"Could not recognize '{item}' as a valid YouTube URL or video ID.") return None def dump_redis_data_to_csv(redis_client, dump_dir, patterns): """Dumps data from Redis keys matching patterns to separate CSV files in a timestamped directory.""" timestamp_dir = datetime.now().strftime('%Y%m%d_%H%M%S') full_dump_path = os.path.join(dump_dir, timestamp_dir) os.makedirs(full_dump_path, exist_ok=True) logger.info(f"Created dump directory: {full_dump_path}") for pattern in patterns: if not pattern: continue # Sanitize pattern for filename sanitized_pattern = re.sub(r'[^a-zA-Z0-9_-]', '_', pattern) timestamp_file = datetime.now().strftime('%Y%m%d') dump_file_name = f'redis_dump_{sanitized_pattern}_{timestamp_file}.csv' dump_file_path = os.path.join(full_dump_path, dump_file_name) logger.info(f"Dumping keys matching '{pattern}' to {dump_file_path}") try: with open(dump_file_path, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.writer(csvfile) writer.writerow(['key', 'type', 'field_or_index', 'value']) keys_found = 0 for key_bytes in redis_client.scan_iter(pattern): key = key_bytes.decode('utf-8') keys_found += 1 key_type = redis_client.type(key).decode('utf-8') if key_type == 'hash': for field, value in redis_client.hgetall(key).items(): writer.writerow([key, key_type, field.decode('utf-8'), value.decode('utf-8')]) elif key_type == 'list': for index, value in enumerate(redis_client.lrange(key, 0, -1)): writer.writerow([key, key_type, index, value.decode('utf-8')]) elif key_type == 'set': for member in redis_client.smembers(key): writer.writerow([key, key_type, None, member.decode('utf-8')]) elif key_type == 'string': value = redis_client.get(key) if value: writer.writerow([key, key_type, None, value.decode('utf-8')]) if keys_found > 0: logger.info(f"Successfully dumped {keys_found} keys for pattern '{pattern}' to {dump_file_path}") else: logger.info(f"No keys found for pattern '{pattern}'. Empty CSV file created at {dump_file_path}") except Exception as e: logger.error(f"Failed to dump Redis data for pattern '{pattern}': {e}", exc_info=True) raise AirflowException(f"Failed to dump Redis data for pattern '{pattern}': {e}") def clear_queue_callable(**context): """Dumps Redis data to CSV and/or clears specified Redis keys based on selection.""" params = context['params'] ti = context['task_instance'] logger.info(f"Task '{ti.task_id}' running on queue '{ti.queue}'.") redis_conn_id = params['redis_conn_id'] queue_system = params.get('queue_system', 'v1_monolithic') queue_base_names_to_clear = [] if queue_system == 'v1_monolithic': queue_base_names_to_clear.append(params['queue_base_name']) elif queue_system.startswith('v2_'): # For v2, clear both auth and dl queues for a complete clear. queue_base_names_to_clear.extend(['queue2_auth', 'queue2_dl']) else: raise ValueError(f"Invalid queue_system: {queue_system}") logger.info(f"Operating on queue system '{queue_system}' with base names: {queue_base_names_to_clear}.") queues_to_clear_options = params.get('queues_to_clear_options', []) confirm_clear = params.get('confirm_clear', False) dump_queues = params['dump_queues'] dump_dir = context['templates_dict']['dump_dir'] dump_patterns = params['dump_patterns'].split(',') if params.get('dump_patterns') else [] if not confirm_clear: message = "Action is 'clear_queue', but 'Confirm Deletion' was not checked. Aborting to prevent accidental data loss." logger.error(message) raise AirflowException(message) # If no queues are selected, default to clearing all of them. if not queues_to_clear_options: logger.warning("No specific queues selected to clear. Defaulting to '_all'.") queues_to_clear_options = ['_all'] redis_client = _get_redis_client(redis_conn_id) if dump_queues and dump_patterns: logger.info("Dumping is enabled. Performing dump before clearing.") dump_redis_data_to_csv(redis_client, dump_dir, dump_patterns) all_suffixes = ['_inbox', '_fail', '_result', '_progress'] keys_to_delete = set() for queue_base_name in queue_base_names_to_clear: if '_all' in queues_to_clear_options: logger.info(f"'_all' option selected. Clearing all standard queues for base '{queue_base_name}'.") for suffix in all_suffixes: keys_to_delete.add(f"{queue_base_name}{suffix}") else: for suffix in queues_to_clear_options: if suffix in all_suffixes: keys_to_delete.add(f"{queue_base_name}{suffix}") if not keys_to_delete: logger.warning("No valid queue suffixes were selected. Nothing to delete.") return logger.info(f"Attempting to clear {len(keys_to_delete)} Redis key(s): {sorted(list(keys_to_delete))}") try: deleted_count = redis_client.delete(*keys_to_delete) logger.info(f"Successfully sent delete command for {len(keys_to_delete)} key(s). Redis reported {deleted_count} deleted.") except Exception as e: logger.error(f"Failed to clear Redis keys: {e}", exc_info=True) raise AirflowException(f"Failed to clear Redis keys: {e}") def list_contents_callable(**context): """Lists the contents of the specified Redis key(s) (list or hash).""" params = context['params'] ti = context['task_instance'] logger.info(f"Task '{ti.task_id}' running on queue '{ti.queue}'.") redis_conn_id = params['redis_conn_id'] queues_to_list_str = params.get('queue_to_list') max_items = params.get('max_items', 10) if not queues_to_list_str: raise ValueError("Parameter 'queue_to_list' cannot be empty.") queues_to_list = [q.strip() for q in queues_to_list_str.split(',') if q.strip()] if not queues_to_list: logger.info("No valid queue names provided in 'queue_to_list'. Nothing to do.") return logger.info(f"Attempting to list contents for {len(queues_to_list)} Redis key(s): {queues_to_list}") redis_client = _get_redis_client(redis_conn_id) for queue_to_list in queues_to_list: # Add a newline for better separation in logs logger.info(f"\n--- Listing contents of Redis key '{queue_to_list}' (max: {max_items}) ---") try: key_type_bytes = redis_client.type(queue_to_list) key_type = key_type_bytes.decode('utf-8') # Decode type if key_type == 'list': list_length = redis_client.llen(queue_to_list) items_to_fetch = min(max_items, list_length) contents_bytes = redis_client.lrange(queue_to_list, -items_to_fetch, -1) contents = [item.decode('utf-8') for item in contents_bytes] contents.reverse() logger.info(f"--- Contents of Redis List '{queue_to_list}' ---") logger.info(f"Total items in list: {list_length}") if contents: logger.info(f"Showing most recent {len(contents)} item(s):") for i, item in enumerate(contents): logger.info(f" [recent_{i}]: {item}") if list_length > len(contents): logger.info(f" ... ({list_length - len(contents)} older items not shown)") logger.info(f"--- End of List Contents ---") elif key_type == 'hash': hash_size = redis_client.hlen(queue_to_list) if hash_size > max_items * 2: logger.warning(f"Hash '{queue_to_list}' has {hash_size} fields, which is large. Listing might be slow or incomplete. Consider using redis-cli HSCAN.") contents_bytes = redis_client.hgetall(queue_to_list) contents = {k.decode('utf-8'): v.decode('utf-8') for k, v in contents_bytes.items()} logger.info(f"--- Contents of Redis Hash '{queue_to_list}' ---") logger.info(f"Total fields in hash: {hash_size}") if contents: logger.info(f"Showing up to {max_items} item(s):") item_count = 0 for key, value in contents.items(): if item_count >= max_items: logger.info(f" ... (stopped listing after {max_items} items of {hash_size})") break try: parsed_value = json.loads(value) pretty_value = json.dumps(parsed_value, indent=2) logger.info(f" '{key}':\n{pretty_value}") except json.JSONDecodeError: logger.info(f" '{key}': {value}") item_count += 1 logger.info(f"--- End of Hash Contents ---") elif key_type == 'none': logger.info(f"Redis key '{queue_to_list}' does not exist.") else: logger.info(f"Redis key '{queue_to_list}' is of type '{key_type}'. Listing contents for this type is not implemented.") except Exception as e: logger.error(f"Failed to list contents of Redis key '{queue_to_list}': {e}", exc_info=True) # Continue to the next key in the list instead of failing the whole task def check_status_callable(**context): """Checks the status (type and size) of all standard Redis queues for a given base name.""" params = context['params'] ti = context['task_instance'] logger.info(f"Task '{ti.task_id}' running on queue '{ti.queue}'.") redis_conn_id = params['redis_conn_id'] queue_system = params.get('queue_system', 'v1_monolithic') queue_base_names_to_check = [] if queue_system == 'v1_monolithic': queue_base_names_to_check.append(params.get('queue_base_name', DEFAULT_QUEUE_NAME)) elif queue_system.startswith('v2_'): # For v2, always check both auth and dl queues for a complete picture. queue_base_names_to_check.extend(['queue2_auth', 'queue2_dl']) else: raise ValueError(f"Invalid queue_system: {queue_system}") queue_suffixes = ['_inbox', '_progress', '_result', '_fail'] logger.info(f"--- Checking Status for Queue System: '{queue_system}' ---") try: redis_client = _get_redis_client(redis_conn_id) for queue_name in queue_base_names_to_check: logger.info(f"--- Base Name: '{queue_name}' ---") for suffix in queue_suffixes: queue_to_check = f"{queue_name}{suffix}" key_type = redis_client.type(queue_to_check).decode('utf-8') size = 0 if key_type == 'list': size = redis_client.llen(queue_to_check) elif key_type == 'hash': size = redis_client.hlen(queue_to_check) if key_type != 'none': logger.info(f" - Queue '{queue_to_check}': Type='{key_type.upper()}', Size={size}") else: logger.info(f" - Queue '{queue_to_check}': Does not exist.") logger.info(f"--- End of Status Check ---") except Exception as e: logger.error(f"Failed to check queue status for system '{queue_system}': {e}", exc_info=True) raise AirflowException(f"Failed to check queue status: {e}") def requeue_failed_callable(**context): """ Copies all URLs from the fail hash to the inbox list and optionally clears the fail hash. Adapts behavior for v1 and v2 queue systems. """ params = context['params'] ti = context['task_instance'] logger.info(f"Task '{ti.task_id}' running on queue '{ti.queue}'.") redis_conn_id = params['redis_conn_id'] clear_fail_queue = params['clear_fail_queue_after_requeue'] queue_system = params.get('queue_system', 'v1_monolithic') fail_queue_name = "" inbox_queue_name = "" if queue_system == 'v1_monolithic': queue_name = params['queue_base_name'] fail_queue_name = f"{queue_name}_fail" inbox_queue_name = f"{queue_name}_inbox" elif queue_system == 'v2_separated_auth': fail_queue_name = "queue2_auth_fail" inbox_queue_name = "queue2_auth_inbox" elif queue_system == 'v2_separated_dl': fail_queue_name = "queue2_dl_fail" # DL failures must be re-authenticated, so they go back to the auth inbox. inbox_queue_name = "queue2_auth_inbox" else: raise ValueError(f"Invalid queue_system: {queue_system}") logger.info(f"Requeuing failed URLs from '{fail_queue_name}' to '{inbox_queue_name}' (system: {queue_system}).") redis_client = _get_redis_client(redis_conn_id) try: # The fail queue is a hash. The keys are the URLs. failed_urls_bytes = redis_client.hkeys(fail_queue_name) if not failed_urls_bytes: logger.info(f"Fail queue '{fail_queue_name}' is empty. Nothing to requeue.") return failed_urls = [url.decode('utf-8') for url in failed_urls_bytes] logger.info(f"Found {len(failed_urls)} URLs to requeue:") for url in failed_urls: logger.info(f" - {url}") # Add URLs to the inbox list if failed_urls: with redis_client.pipeline() as pipe: pipe.rpush(inbox_queue_name, *failed_urls) if clear_fail_queue: pipe.delete(fail_queue_name) pipe.execute() final_list_length = redis_client.llen(inbox_queue_name) success_message = ( f"Successfully requeued {len(failed_urls)} URLs to '{inbox_queue_name}'. " f"The list now contains {final_list_length} items." ) logger.info(success_message) if clear_fail_queue: logger.info(f"Successfully cleared fail queue '{fail_queue_name}'.") else: logger.info(f"Fail queue '{fail_queue_name}' was not cleared as per configuration.") except Exception as e: logger.error(f"Failed to requeue failed URLs: {e}", exc_info=True) raise AirflowException(f"Failed to requeue failed URLs: {e}") def purge_celery_queue_callable(**context): """ Purges messages from the specified Celery queues using the Airflow Celery app. This is more reliable than shelling out to `celery purge` as it uses the same app context and broker connection as the workers. """ params = context['params'] if not params.get('confirm_purge'): raise AirflowException("'Confirm Purge' is not checked. Aborting to prevent accidental data loss.") queues_to_purge_str = params.get('celery_queue_to_purge') if not queues_to_purge_str: raise AirflowException("No Celery queues specified to purge.") queues = [q.strip() for q in queues_to_purge_str.split(',') if q.strip()] logger.info(f"Attempting to purge {len(queues)} Celery queue(s): {queues}") logger.info(f"Using broker: {celery_app.conf.broker_url}") purged_counts = {} with celery_app.connection_for_read() as conn: with conn.channel() as channel: for queue in queues: try: message_count = channel.queue_purge(queue) purged_counts[queue] = message_count logger.info(f"Purged {message_count} messages from queue '{queue}'.") except Exception as e: # This can happen if the queue doesn't exist on the broker. # kombu might raise an operational error. logger.error(f"Failed to purge queue '{queue}': {e}", exc_info=True) purged_counts[queue] = f"ERROR: {e}" logger.info("--- Celery Purge Summary ---") for queue, result in purged_counts.items(): logger.info(f" - {queue}: {result}") logger.info("--- Purge complete. ---") def add_videos_to_queue_callable(**context): """ Parses video inputs from manual text, a predefined file, or a file path/URL, normalizes them to URLs, and adds them to a Redis queue. """ params = context["params"] ti = context['task_instance'] logger.info(f"Task '{ti.task_id}' running on queue '{ti.queue}'.") queue_system = params.get('queue_system', 'v1_monolithic') if queue_system.startswith('v2_'): # For v2 systems, raw URLs are always added to the auth queue. queue_name = 'queue2_auth' logger.info(f"Queue system is '{queue_system}'. Adding URLs to '{queue_name}_inbox'.") else: queue_name = params["queue_base_name"] redis_conn_id = params["redis_conn_id"] dry_run = params["dry_run"] # This function will get the list of strings from the correct source based on precedence raw_items = _get_urls_from_source(**params) if not raw_items: logger.info("No video inputs found from any source. Nothing to do.") return valid_urls = [] for item in raw_items: url = normalize_to_url(item) if url and url not in valid_urls: valid_urls.append(url) elif not url: logger.warning(f"Skipping invalid input item: '{item}'") if not valid_urls: raise AirflowException("No valid YouTube URLs or IDs were found in the provided input.") logger.info(f"Found {len(valid_urls)} valid and unique URLs to add to the queue:") for url in valid_urls: logger.info(f" - {url}") if dry_run: logger.info("Dry run is enabled. Skipping Redis operation.") print(f"\n[DRY RUN] Would have added {len(valid_urls)} URLs to the Redis list '{queue_name}_inbox'.") return # --- Add to Redis --- try: redis_client = _get_redis_client(redis_conn_id) inbox_queue = f"{queue_name}_inbox" # Use a pipeline for atomic and efficient addition with redis_client.pipeline() as pipe: for url in valid_urls: pipe.rpush(inbox_queue, url) pipe.execute() final_list_length = redis_client.llen(inbox_queue) success_message = ( f"Successfully added {len(valid_urls)} URLs to Redis list '{inbox_queue}'. " f"The list now contains {final_list_length} items." ) logger.info(success_message) except Exception as e: logger.error(f"Failed to add URLs to Redis queue '{inbox_queue}': {e}", exc_info=True) raise AirflowException(f"Failed to add URLs to Redis: {e}") # --- DAG Definition --- with DAG( dag_id="ytdlp_mgmt_queues", default_args={ "owner": "airflow", "start_date": days_ago(1), "retries": 0, "queue": "queue-mgmt", }, schedule=None, catchup=False, tags=["ytdlp", "mgmt", "master"], doc_md=""" ### YT-DLP Queue Management This DAG provides a set of tools to manage Redis queues used by the YTDLP processing pipeline. Select an `action` to perform when triggering the DAG. **Actions:** - `add_videos`: Add one or more YouTube videos to a queue. You can provide input manually, select a predefined file from the server, or provide a path/URL to a file. - `clear_queue`: Dump and/or delete a specific Redis key. - `list_contents`: View the contents of a Redis key (list or hash). - `check_status`: Check the overall status of the queues. - `requeue_failed`: Copy all URLs from the `_fail` hash to the `_inbox` list and clear the `_fail` hash. - `purge_celery_queue`: **(Destructive)** Removes all tasks from a specified Celery worker queue (e.g., `queue-dl`). This is useful for clearing out a backlog of tasks that were queued before a dispatcher was paused. """, params={ "action": Param( "list_contents", type="string", enum=["add_videos", "clear_queue", "list_contents", "check_status", "requeue_failed", "inspect_celery_cluster", "purge_celery_queue"], title="Action", description="The management action to perform.", ), "queue_system": Param( "v1_monolithic", type="string", enum=["v1_monolithic", "v2_separated_auth", "v2_separated_dl"], title="Queue System", description="Select the target queue system to manage. This choice affects which queues are targeted by actions.", ), "queue_base_name": Param( DEFAULT_QUEUE_NAME, type="string", title="Queue Base Name (v1 only)", description="Base name for queues. Only used when 'Queue System' is 'v1_monolithic'.", ), # --- Params for 'add_videos' --- "input_source": Param( "predefined_file", type="string", enum=["manual", "predefined_file", "file_path_or_url"], title="[add_videos] Video Input Source", description="Choose how to provide the video URLs. This choice determines which of the following parameters is used.", ), "video_inputs": Param( None, type=["null", "string"], title="[add_videos] 1. Manual Input", description="Used if 'Input Source' is 'manual'. Paste a single item, a comma-separated list, or a JSON array of YouTube URLs or Video IDs.", ), "predefined_url_list": Param( "None", type="string", enum=_get_predefined_url_lists(), title="[add_videos] 2. Predefined File", description=( "Used if 'Input Source' is 'predefined_file'. Select a JSON file from the server's URL list directory " f"(defined by Airflow Variable 'YTDLP_URL_LISTS_DIR', defaults to '{DEFAULT_URL_LISTS_DIR}')." ), ), "url_list_file_path": Param( None, type=["null", "string"], title="[add_videos] 3. File Path or URL", description="Used if 'Input Source' is 'file_path_or_url'. Enter a local file path (on the Airflow worker) or a remote URL to a JSON file containing a list of URLs/IDs.", ), "dry_run": Param( False, type="boolean", title="[add_videos] Dry Run", description="If True, validate inputs without adding them to the queue.", ), # --- Params for 'clear_queue' --- "queues_to_clear_options": Param( None, type=["null", "array"], title="[clear_queue] Queues to Clear", description="Select which standard queues to clear. '_all' clears all four. If left empty, it defaults to '_all'.", items={ "type": "string", "enum": ["_inbox", "_fail", "_result", "_progress", "_all"], } ), "confirm_clear": Param( False, type="boolean", title="[clear_queue] Confirm Deletion", description="Must be set to True to execute the 'clear_queue' action. This is a destructive operation.", ), "dump_queues": Param( True, type="boolean", title="[clear_queue] Dump Data", description="If True, dump data before clearing.", ), "dump_dir": Param( None, type=["null", "string"], title="[clear_queue] Dump Directory", description="Base directory to save CSV dump files. Supports Jinja. If empty, defaults to Airflow variable 'YTDLP_REDIS_DUMP_DIR' or '/opt/airflow/dumps'.", ), "dump_patterns": Param( 'ytdlp:*,video_queue_*', type="string", title="[clear_queue] Dump Patterns", description="Comma-separated list of key patterns to dump.", ), # --- Params for 'list_contents' --- "queue_to_list": Param( 'video_queue_inbox,queue2_auth_inbox,queue2_dl_result', type="string", title="[list_contents] Queues to List", description="Comma-separated list of exact Redis key names to list.", ), "max_items": Param( 10, type="integer", title="[list_contents] Max Items to List", description="Maximum number of items to show.", ), # --- Params for 'requeue_failed' --- "clear_fail_queue_after_requeue": Param( True, type="boolean", title="[requeue_failed] Clear Fail Queue", description="If True, deletes the `_fail` hash after requeueing items.", ), # --- Params for 'purge_celery_queue' --- "celery_queue_to_purge": Param( "queue-dl,queue-auth", type="string", title="[purge_celery_queue] Celery Queues to Purge", description="Comma-separated list of Celery queue names to purge from the broker. This is a destructive action.", ), "confirm_purge": Param( False, type="boolean", title="[purge_celery_queue] Confirm Purge", description="Must be set to True to execute the 'purge_celery_queue' action. This is a destructive operation that removes all tasks from the specified Celery queue(s).", ), # --- Common Params --- "redis_conn_id": Param( DEFAULT_REDIS_CONN_ID, type="string", title="Redis Connection ID", ), }, ) as dag: branch_on_action = BranchPythonOperator( task_id="branch_on_action", python_callable=lambda **context: f"action_{context['params']['action']}", ) action_add_videos = PythonOperator( task_id="action_add_videos", python_callable=add_videos_to_queue_callable, ) action_clear_queue = PythonOperator( task_id="action_clear_queue", python_callable=clear_queue_callable, templates_dict={'dump_dir': "{{ params.dump_dir or var.value.get('YTDLP_REDIS_DUMP_DIR', '/opt/airflow/dumps') }}"}, ) action_list_contents = PythonOperator( task_id="action_list_contents", python_callable=list_contents_callable, ) action_check_status = PythonOperator( task_id="action_check_status", python_callable=check_status_callable, ) action_requeue_failed = PythonOperator( task_id="action_requeue_failed", python_callable=requeue_failed_callable, ) action_inspect_celery_cluster = BashOperator( task_id="action_inspect_celery_cluster", bash_command=""" # Get the broker URL from Airflow config BROKER_URL=$(airflow config get-value celery broker_url) echo "--- Inspecting Celery Cluster (Broker: $BROKER_URL) ---" echo "" echo "--- Active Queues (shows queues with consumers) ---" celery -A airflow.providers.celery.executors.celery_executor.app -b "$BROKER_URL" inspect active_queues echo "" echo "--- Worker Stats (shows connected workers) ---" celery -A airflow.providers.celery.executors.celery_executor.app -b "$BROKER_URL" inspect stats echo "" echo "--- Active Tasks (tasks currently running) ---" celery -A airflow.providers.celery.executors.celery_executor.app -b "$BROKER_URL" inspect active echo "" echo "--- Reserved Tasks (tasks prefetched by workers) ---" celery -A airflow.providers.celery.executors.celery_executor.app -b "$BROKER_URL" inspect reserved """, ) action_purge_celery_queue = PythonOperator( task_id="action_purge_celery_queue", python_callable=purge_celery_queue_callable, ) # --- Wire up tasks --- branch_on_action >> [ action_add_videos, action_clear_queue, action_list_contents, action_check_status, action_requeue_failed, action_inspect_celery_cluster, action_purge_celery_queue, ]