yt-dlp-dags/airflow/dags/ytdlp_mgmt_queues.py

960 lines
41 KiB
Python

# -*- coding: utf-8 -*-
"""
Airflow DAG for manually adding YouTube URLs or Video IDs to a Redis queue.
"""
from __future__ import annotations
import json
import logging
import re
from typing import List, Optional
import csv
import os
from datetime import datetime
from airflow.exceptions import AirflowException
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
from airflow.models.param import Param
from airflow.models.taskinstance import TaskInstance
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.providers.celery.executors.celery_executor import app as celery_app
from airflow.providers.redis.hooks.redis import RedisHook
from airflow.utils.dates import days_ago
from airflow.models.variable import Variable
from airflow.utils.session import create_session
import requests
# Configure logging
logger = logging.getLogger(__name__)
# Default settings
DEFAULT_REDIS_CONN_ID = "redis_default"
DEFAULT_QUEUE_NAME = "video_queue"
DEFAULT_QUEUE_TO_CLEAR = 'PLEASE_SPECIFY_QUEUE_TO_CLEAR'
DEFAULT_URL_LISTS_DIR = '/opt/airflow/inputfiles'
# --- Helper Functions ---
def _get_redis_client(redis_conn_id: str):
"""Gets a Redis client from an Airflow connection."""
try:
redis_hook = RedisHook(redis_conn_id=redis_conn_id)
return redis_hook.get_conn()
except Exception as e:
logger.error(f"Failed to connect to Redis using connection '{redis_conn_id}': {e}")
raise AirflowException(f"Redis connection failed: {e}")
def _get_predefined_url_lists():
"""Returns a static list of predefined URL list files."""
# This is a static list to ensure options are always visible in the UI,
# even if the files don't exist on the filesystem at parse time.
# The DAG will check for the file's existence at runtime.
predefined_files = [
'urls.dh128.json',
'urls.ixbt2045.json',
'urls.news1000.json',
'urls.rt100.json',
'urls.rt250_01.txt',
'urls.rt250_02.txt',
'urls.rt250_03.txt',
'urls.rt250_04.txt',
'urls.rt250_05.txt',
'urls.rt250_06.txt',
'urls.rt250_07.txt',
'urls.rt250_08.txt',
'urls.rt250_11.txt',
'urls.rt250_12.txt',
'urls.rt250_13.txt',
'urls.rt250_14.txt',
'urls.rt250_15.txt',
'urls.rt250_16.txt',
'urls.rt250_17.txt',
'urls.rt250_18.txt',
'urls.rt3700.txt',
'urls.sky28.json',
'urls.sky3.json',
'urls.tq46.json',
]
return ['None'] + sorted(predefined_files)
def _get_urls_from_source(**params) -> List[str]:
"""
Determines the source of video inputs based on the 'input_source' param and returns a list of raw items.
"""
input_source = params.get("input_source", "manual")
predefined_list = params.get("predefined_url_list")
file_path_or_url = params.get("url_list_file_path")
manual_inputs = params.get("video_inputs")
# Source 1: Predefined file
if input_source == 'predefined_file':
if not predefined_list or predefined_list == 'None':
raise AirflowException("Input source is 'predefined_file', but no file was selected from the list.")
default_path = DEFAULT_URL_LISTS_DIR
url_lists_dir = Variable.get('YTDLP_URL_LISTS_DIR', default_var=default_path)
file_path = os.path.join(url_lists_dir, predefined_list)
logger.info(f"Loading URLs from predefined file: {file_path}")
if not os.path.exists(file_path):
raise AirflowException(f"Selected predefined file does not exist: {file_path}")
with open(file_path, 'r', encoding='utf-8') as f:
if predefined_list.lower().endswith('.json'):
logger.info(f"Parsing '{predefined_list}' as a JSON file.")
try:
data = json.load(f)
if not isinstance(data, list):
raise AirflowException(f"JSON file '{predefined_list}' must contain a list of strings.")
return [str(item) for item in data]
except json.JSONDecodeError:
raise AirflowException(f"Failed to parse JSON from file: {predefined_list}")
elif predefined_list.lower().endswith('.txt'):
logger.info(f"Parsing '{predefined_list}' as a text file (one URL per line).")
return [line.strip() for line in f if line.strip()]
else:
raise AirflowException(f"Unsupported file type for predefined file: '{predefined_list}'. Must be .json or .txt.")
# Source 2: File path or URL
elif input_source == 'file_path_or_url':
if not file_path_or_url:
raise AirflowException("Input source is 'file_path_or_url', but no path/URL was provided.")
logger.info(f"Loading URLs from provided path/URL: {file_path_or_url}")
content = ""
if file_path_or_url.startswith(('http://', 'https://')):
try:
response = requests.get(file_path_or_url, timeout=30)
response.raise_for_status()
content = response.text
except requests.RequestException as e:
raise AirflowException(f"Failed to fetch URL list from '{file_path_or_url}': {e}")
else: # Assume local file path
if not os.path.exists(file_path_or_url):
raise AirflowException(f"Provided file path does not exist: {file_path_or_url}")
with open(file_path_or_url, 'r', encoding='utf-8') as f:
content = f.read()
try:
data = json.loads(content)
if not isinstance(data, list):
raise AirflowException("JSON content from path/URL must contain a list of strings.")
return [str(item) for item in data]
except json.JSONDecodeError:
raise AirflowException(f"Failed to parse JSON from path/URL: {file_path_or_url}")
# Source 3: Manual input
elif input_source == 'manual':
if not manual_inputs:
logger.info("Input source is 'manual', but no inputs were provided. Nothing to do.")
return []
logger.info("Loading URLs from manual input.")
return parse_video_inputs(manual_inputs)
else:
logger.warning(f"No valid input source selected or no data provided for the selected source. Nothing to do.")
return []
def parse_video_inputs(input_str: str) -> List[str]:
"""Parses a flexible string of video inputs into a list of individual items."""
if not input_str or not isinstance(input_str, str):
return []
input_str = input_str.strip()
# 1. Try to parse as a JSON array
if input_str.startswith("[") and input_str.endswith("]"):
try:
items = json.loads(input_str)
if isinstance(items, list):
logger.info("Successfully parsed input as a JSON array.")
return [str(item).strip() for item in items]
except json.JSONDecodeError:
logger.warning("Input looked like a JSON array but failed to parse. Treating as a comma-separated string.")
# 2. Treat as a comma-separated string
items = [item.strip() for item in input_str.split(",")]
# 3. Clean up quotes and extra whitespace from each item
cleaned_items = []
for item in items:
if item.startswith(('"', "'")) and item.endswith(('"', "'")):
item = item[1:-1]
if item: # Only add non-empty items
cleaned_items.append(item.strip())
return cleaned_items
def normalize_to_url(item: str) -> Optional[str]:
"""
Validates if an item is a recognizable YouTube URL or video ID,
and normalizes it to a standard watch URL format.
"""
if not item:
return None
# Regex for a standard 11-character YouTube video ID
video_id_pattern = r"^[a-zA-Z0-9_-]{11}$"
# Check if the item itself is a video ID
if re.match(video_id_pattern, item):
video_id = item
return f"https://www.youtube.com/watch?v={video_id}"
# Comprehensive regex to extract video ID from various URL formats
# Covers: watch, youtu.be, shorts, embed, /v/
url_patterns = [
r"(?:v=|\/v\/|youtu\.be\/|embed\/|shorts\/)([a-zA-Z0-9_-]{11})"
]
for pattern in url_patterns:
match = re.search(pattern, item)
if match:
video_id = match.group(1)
return f"https://www.youtube.com/watch?v={video_id}"
logger.warning(f"Could not recognize '{item}' as a valid YouTube URL or video ID.")
return None
def dump_redis_data_to_csv(redis_client, dump_dir, patterns):
"""Dumps data from Redis keys matching patterns to separate CSV files in a timestamped directory."""
timestamp_dir = datetime.now().strftime('%Y%m%d_%H%M%S')
full_dump_path = os.path.join(dump_dir, timestamp_dir)
os.makedirs(full_dump_path, exist_ok=True)
logger.info(f"Created dump directory: {full_dump_path}")
for pattern in patterns:
if not pattern: continue
# Sanitize pattern for filename
sanitized_pattern = re.sub(r'[^a-zA-Z0-9_-]', '_', pattern)
timestamp_file = datetime.now().strftime('%Y%m%d')
dump_file_name = f'redis_dump_{sanitized_pattern}_{timestamp_file}.csv'
dump_file_path = os.path.join(full_dump_path, dump_file_name)
logger.info(f"Dumping keys matching '{pattern}' to {dump_file_path}")
try:
with open(dump_file_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['key', 'type', 'field_or_index', 'value'])
keys_found = 0
for key_bytes in redis_client.scan_iter(pattern):
key = key_bytes.decode('utf-8')
keys_found += 1
key_type = redis_client.type(key).decode('utf-8')
if key_type == 'hash':
for field, value in redis_client.hgetall(key).items():
writer.writerow([key, key_type, field.decode('utf-8'), value.decode('utf-8')])
elif key_type == 'list':
for index, value in enumerate(redis_client.lrange(key, 0, -1)):
writer.writerow([key, key_type, index, value.decode('utf-8')])
elif key_type == 'set':
for member in redis_client.smembers(key):
writer.writerow([key, key_type, None, member.decode('utf-8')])
elif key_type == 'string':
value = redis_client.get(key)
if value:
writer.writerow([key, key_type, None, value.decode('utf-8')])
if keys_found > 0:
logger.info(f"Successfully dumped {keys_found} keys for pattern '{pattern}' to {dump_file_path}")
else:
logger.info(f"No keys found for pattern '{pattern}'. Empty CSV file created at {dump_file_path}")
except Exception as e:
logger.error(f"Failed to dump Redis data for pattern '{pattern}': {e}", exc_info=True)
raise AirflowException(f"Failed to dump Redis data for pattern '{pattern}': {e}")
def clear_queue_callable(**context):
"""
Dumps Redis data to CSV and/or clears specified Redis keys based on selection.
The `_skipped` queue is for videos that are unavailable due to external reasons (e.g., private, removed).
"""
params = context['params']
ti = context['task_instance']
logger.info(f"Task '{ti.task_id}' running on queue '{ti.queue}'.")
redis_conn_id = params['redis_conn_id']
queue_system = params.get('queue_system', 'v1_monolithic')
queue_base_names_to_clear = []
if queue_system == 'v1_monolithic':
queue_base_names_to_clear.append(params['queue_base_name'])
elif queue_system.startswith('v2_'):
# For v2, clear both auth and dl queues for a complete clear.
queue_base_names_to_clear.extend(['queue2_auth', 'queue2_dl'])
else:
raise ValueError(f"Invalid queue_system: {queue_system}")
logger.info(f"Operating on queue system '{queue_system}' with base names: {queue_base_names_to_clear}.")
queues_to_clear_options = params.get('queues_to_clear_options', [])
confirm_clear = params.get('confirm_clear', False)
dump_queues = params['dump_queues']
dump_dir = context['templates_dict']['dump_dir']
dump_patterns = params['dump_patterns'].split(',') if params.get('dump_patterns') else []
if not confirm_clear:
message = "Action is 'clear_queue', but 'Confirm Deletion' was not checked. Aborting to prevent accidental data loss."
logger.error(message)
raise AirflowException(message)
# If no queues are selected, default to clearing all of them.
if not queues_to_clear_options:
logger.warning("No specific queues selected to clear. Defaulting to '_all'.")
queues_to_clear_options = ['_all']
redis_client = _get_redis_client(redis_conn_id)
if dump_queues and dump_patterns:
logger.info("Dumping is enabled. Performing dump before clearing.")
dump_redis_data_to_csv(redis_client, dump_dir, dump_patterns)
all_suffixes = ['_inbox', '_fail', '_result', '_progress', '_skipped']
keys_to_delete = set()
for queue_base_name in queue_base_names_to_clear:
if '_all' in queues_to_clear_options:
logger.info(f"'_all' option selected. Clearing all standard queues for base '{queue_base_name}'.")
for suffix in all_suffixes:
keys_to_delete.add(f"{queue_base_name}{suffix}")
else:
for suffix in queues_to_clear_options:
if suffix in all_suffixes:
keys_to_delete.add(f"{queue_base_name}{suffix}")
if not keys_to_delete:
logger.warning("No valid queue suffixes were selected. Nothing to delete.")
return
logger.info(f"Attempting to clear {len(keys_to_delete)} Redis key(s): {sorted(list(keys_to_delete))}")
try:
deleted_count = redis_client.delete(*keys_to_delete)
logger.info(f"Successfully sent delete command for {len(keys_to_delete)} key(s). Redis reported {deleted_count} deleted.")
except Exception as e:
logger.error(f"Failed to clear Redis keys: {e}", exc_info=True)
raise AirflowException(f"Failed to clear Redis keys: {e}")
def list_contents_callable(**context):
"""Lists the contents of the specified Redis key(s) (list or hash)."""
params = context['params']
ti = context['task_instance']
logger.info(f"Task '{ti.task_id}' running on queue '{ti.queue}'.")
redis_conn_id = params['redis_conn_id']
queues_to_list_str = params.get('queue_to_list')
max_items = params.get('max_items', 10)
if not queues_to_list_str:
raise ValueError("Parameter 'queue_to_list' cannot be empty.")
queues_to_list = [q.strip() for q in queues_to_list_str.split(',') if q.strip()]
if not queues_to_list:
logger.info("No valid queue names provided in 'queue_to_list'. Nothing to do.")
return
logger.info(f"Attempting to list contents for {len(queues_to_list)} Redis key(s): {queues_to_list}")
redis_client = _get_redis_client(redis_conn_id)
for queue_to_list in queues_to_list:
# Add a newline for better separation in logs
logger.info(f"\n--- Listing contents of Redis key '{queue_to_list}' (max: {max_items}) ---")
try:
key_type_bytes = redis_client.type(queue_to_list)
key_type = key_type_bytes.decode('utf-8') # Decode type
if key_type == 'list':
list_length = redis_client.llen(queue_to_list)
items_to_fetch = min(max_items, list_length)
contents_bytes = redis_client.lrange(queue_to_list, -items_to_fetch, -1)
contents = [item.decode('utf-8') for item in contents_bytes]
contents.reverse()
logger.info(f"--- Contents of Redis List '{queue_to_list}' ---")
logger.info(f"Total items in list: {list_length}")
if contents:
logger.info(f"Showing most recent {len(contents)} item(s):")
for i, item in enumerate(contents):
logger.info(f" [recent_{i}]: {item}")
if list_length > len(contents):
logger.info(f" ... ({list_length - len(contents)} older items not shown)")
logger.info(f"--- End of List Contents ---")
elif key_type == 'hash':
hash_size = redis_client.hlen(queue_to_list)
if hash_size > max_items * 2:
logger.warning(f"Hash '{queue_to_list}' has {hash_size} fields, which is large. Listing might be slow or incomplete. Consider using redis-cli HSCAN.")
contents_bytes = redis_client.hgetall(queue_to_list)
contents = {k.decode('utf-8'): v.decode('utf-8') for k, v in contents_bytes.items()}
logger.info(f"--- Contents of Redis Hash '{queue_to_list}' ---")
logger.info(f"Total fields in hash: {hash_size}")
if contents:
logger.info(f"Showing up to {max_items} item(s):")
item_count = 0
for key, value in contents.items():
if item_count >= max_items:
logger.info(f" ... (stopped listing after {max_items} items of {hash_size})")
break
try:
parsed_value = json.loads(value)
pretty_value = json.dumps(parsed_value, indent=2)
logger.info(f" '{key}':\n{pretty_value}")
except json.JSONDecodeError:
logger.info(f" '{key}': {value}")
item_count += 1
logger.info(f"--- End of Hash Contents ---")
elif key_type == 'none':
logger.info(f"Redis key '{queue_to_list}' does not exist.")
else:
logger.info(f"Redis key '{queue_to_list}' is of type '{key_type}'. Listing contents for this type is not implemented.")
except Exception as e:
logger.error(f"Failed to list contents of Redis key '{queue_to_list}': {e}", exc_info=True)
# Continue to the next key in the list instead of failing the whole task
def check_status_callable(**context):
"""
Checks the status (type and size) of all standard Redis queues for a given base name.
The `_skipped` queue is for videos that are unavailable due to external reasons (e.g., private, removed).
"""
params = context['params']
ti = context['task_instance']
logger.info(f"Task '{ti.task_id}' running on queue '{ti.queue}'.")
redis_conn_id = params['redis_conn_id']
queue_system = params.get('queue_system', 'v1_monolithic')
queue_base_names_to_check = []
if queue_system == 'v1_monolithic':
queue_base_names_to_check.append(params.get('queue_base_name', DEFAULT_QUEUE_NAME))
elif queue_system.startswith('v2_'):
# For v2, always check both auth and dl queues for a complete picture.
queue_base_names_to_check.extend(['queue2_auth', 'queue2_dl'])
else:
raise ValueError(f"Invalid queue_system: {queue_system}")
queue_suffixes = ['_inbox', '_progress', '_result', '_fail', '_skipped']
logger.info(f"--- Checking Status for Queue System: '{queue_system}' ---")
try:
redis_client = _get_redis_client(redis_conn_id)
for queue_name in queue_base_names_to_check:
logger.info(f"--- Base Name: '{queue_name}' ---")
for suffix in queue_suffixes:
queue_to_check = f"{queue_name}{suffix}"
key_type = redis_client.type(queue_to_check).decode('utf-8')
size = 0
if key_type == 'list':
size = redis_client.llen(queue_to_check)
elif key_type == 'hash':
size = redis_client.hlen(queue_to_check)
if key_type != 'none':
logger.info(f" - Queue '{queue_to_check}': Type='{key_type.upper()}', Size={size}")
else:
logger.info(f" - Queue '{queue_to_check}': Does not exist.")
logger.info(f"--- End of Status Check ---")
except Exception as e:
logger.error(f"Failed to check queue status for system '{queue_system}': {e}", exc_info=True)
raise AirflowException(f"Failed to check queue status: {e}")
def requeue_failed_callable(**context):
"""
Copies all URLs from the fail hash to the inbox list and optionally clears the fail hash.
Adapts behavior for v1 and v2 queue systems.
"""
params = context['params']
ti = context['task_instance']
logger.info(f"Task '{ti.task_id}' running on queue '{ti.queue}'.")
redis_conn_id = params['redis_conn_id']
clear_fail_queue = params['clear_fail_queue_after_requeue']
queue_system = params.get('queue_system', 'v1_monolithic')
fail_queue_name = ""
inbox_queue_name = ""
if queue_system == 'v1_monolithic':
queue_name = params['queue_base_name']
fail_queue_name = f"{queue_name}_fail"
inbox_queue_name = f"{queue_name}_inbox"
elif queue_system == 'v2_separated_auth':
fail_queue_name = "queue2_auth_fail"
inbox_queue_name = "queue2_auth_inbox"
elif queue_system == 'v2_separated_dl':
fail_queue_name = "queue2_dl_fail"
# DL failures must be re-authenticated, so they go back to the auth inbox.
inbox_queue_name = "queue2_auth_inbox"
else:
raise ValueError(f"Invalid queue_system: {queue_system}")
logger.info(f"Requeuing failed URLs from '{fail_queue_name}' to '{inbox_queue_name}' (system: {queue_system}).")
redis_client = _get_redis_client(redis_conn_id)
try:
# The fail queue is a hash. The keys are the URLs.
failed_urls_bytes = redis_client.hkeys(fail_queue_name)
if not failed_urls_bytes:
logger.info(f"Fail queue '{fail_queue_name}' is empty. Nothing to requeue.")
return
failed_urls = [url.decode('utf-8') for url in failed_urls_bytes]
logger.info(f"Found {len(failed_urls)} URLs to requeue:")
for url in failed_urls:
logger.info(f" - {url}")
# Add URLs to the inbox list
if failed_urls:
with redis_client.pipeline() as pipe:
pipe.rpush(inbox_queue_name, *failed_urls)
if clear_fail_queue:
pipe.delete(fail_queue_name)
pipe.execute()
final_list_length = redis_client.llen(inbox_queue_name)
success_message = (
f"Successfully requeued {len(failed_urls)} URLs to '{inbox_queue_name}'. "
f"The list now contains {final_list_length} items."
)
logger.info(success_message)
if clear_fail_queue:
logger.info(f"Successfully cleared fail queue '{fail_queue_name}'.")
else:
logger.info(f"Fail queue '{fail_queue_name}' was not cleared as per configuration.")
except Exception as e:
logger.error(f"Failed to requeue failed URLs: {e}", exc_info=True)
raise AirflowException(f"Failed to requeue failed URLs: {e}")
def purge_celery_queue_callable(**context):
"""
Purges messages from the specified Celery queues using the Airflow Celery app.
This is more reliable than shelling out to `celery purge` as it uses the same
app context and broker connection as the workers.
"""
params = context['params']
if not params.get('confirm_purge'):
raise AirflowException("'Confirm Purge' is not checked. Aborting to prevent accidental data loss.")
queues_to_purge_str = params.get('celery_queue_to_purge')
if not queues_to_purge_str:
raise AirflowException("No Celery queues specified to purge.")
queues = [q.strip() for q in queues_to_purge_str.split(',') if q.strip()]
logger.info(f"Attempting to purge {len(queues)} Celery queue(s): {queues}")
logger.info(f"Using broker: {celery_app.conf.broker_url}")
purged_counts = {}
with celery_app.connection_for_read() as conn:
with conn.channel() as channel:
for queue in queues:
try:
message_count = channel.queue_purge(queue)
purged_counts[queue] = message_count
logger.info(f"Purged {message_count} messages from queue '{queue}'.")
except Exception as e:
# This can happen if the queue doesn't exist on the broker.
# kombu might raise an operational error.
logger.error(f"Failed to purge queue '{queue}': {e}", exc_info=True)
purged_counts[queue] = f"ERROR: {e}"
logger.info("--- Celery Purge Summary ---")
for queue, result in purged_counts.items():
logger.info(f" - {queue}: {result}")
logger.info("--- Purge complete. ---")
def clear_dag_runs_callable(**context):
"""
Deletes DAG run history and associated task instances from the database.
"""
params = context['params']
dag_id = params.get("dag_id_to_manage")
clear_scope = params.get("clear_scope")
log_target = f"DAG '{dag_id}'" if dag_id != "ALL_DAGS" else "ALL DAGS (except ytdlp_mgmt_queues)"
logger.info(f"Attempting to delete DagRuns for {log_target} with scope '{clear_scope}'.")
with create_session() as session:
dag_run_query = session.query(DagRun)
if dag_id == "ALL_DAGS":
dag_run_query = dag_run_query.filter(DagRun.dag_id != 'ytdlp_mgmt_queues')
else:
dag_run_query = dag_run_query.filter(DagRun.dag_id == dag_id)
if clear_scope == "last_run":
if dag_id == "ALL_DAGS":
raise AirflowException("Cannot clear 'last_run' for ALL_DAGS. Please select a specific DAG.")
last_run = dag_run_query.order_by(DagRun.execution_date.desc()).first()
if not last_run:
logger.info(f"No runs found for DAG '{dag_id}'. Nothing to delete.")
print(f"\nNo runs found for DAG '{dag_id}'.\n")
return
logger.warning(f"Deleting last DagRun for DAG '{dag_id}' (run_id: {last_run.run_id}, execution_date: {last_run.execution_date}). This will also delete its task instances.")
session.delete(last_run)
deleted_count = 1
else: # all_runs
logger.warning(f"Deleting ALL DagRuns and associated TaskInstances for {log_target}. This will remove all history from the UI.")
ti_query = session.query(TaskInstance)
if dag_id == "ALL_DAGS":
ti_query = ti_query.filter(TaskInstance.dag_id != 'ytdlp_mgmt_queues')
else:
ti_query = ti_query.filter(TaskInstance.dag_id == dag_id)
ti_deleted_count = ti_query.delete(synchronize_session=False)
logger.info(f"Deleted {ti_deleted_count} TaskInstance records for {log_target}.")
deleted_count = dag_run_query.delete(synchronize_session=False)
# The session is committed automatically by the `with create_session()` context manager.
logger.info(f"Successfully deleted {deleted_count} DagRun(s) for {log_target}.")
print(f"\nSuccessfully deleted {deleted_count} DagRun(s) for {log_target}.\n")
def add_videos_to_queue_callable(**context):
"""
Parses video inputs from manual text, a predefined file, or a file path/URL,
normalizes them to URLs, and adds them to a Redis queue.
"""
params = context["params"]
ti = context['task_instance']
logger.info(f"Task '{ti.task_id}' running on queue '{ti.queue}'.")
queue_system = params.get('queue_system', 'v1_monolithic')
if queue_system.startswith('v2_'):
# For v2 systems, raw URLs are always added to the auth queue.
queue_name = 'queue2_auth'
logger.info(f"Queue system is '{queue_system}'. Adding URLs to '{queue_name}_inbox'.")
else:
queue_name = params["queue_base_name"]
redis_conn_id = params["redis_conn_id"]
dry_run = params["dry_run"]
# This function will get the list of strings from the correct source based on precedence
raw_items = _get_urls_from_source(**params)
if not raw_items:
logger.info("No video inputs found from any source. Nothing to do.")
return
valid_urls = []
for item in raw_items:
url = normalize_to_url(item)
if url and url not in valid_urls:
valid_urls.append(url)
elif not url:
logger.warning(f"Skipping invalid input item: '{item}'")
if not valid_urls:
raise AirflowException("No valid YouTube URLs or IDs were found in the provided input.")
logger.info(f"Found {len(valid_urls)} valid and unique URLs to add to the queue:")
for url in valid_urls:
logger.info(f" - {url}")
if dry_run:
logger.info("Dry run is enabled. Skipping Redis operation.")
print(f"\n[DRY RUN] Would have added {len(valid_urls)} URLs to the Redis list '{queue_name}_inbox'.")
return
# --- Add to Redis ---
try:
redis_client = _get_redis_client(redis_conn_id)
inbox_queue = f"{queue_name}_inbox"
# Use a pipeline for atomic and efficient addition
with redis_client.pipeline() as pipe:
for url in valid_urls:
pipe.rpush(inbox_queue, url)
pipe.execute()
final_list_length = redis_client.llen(inbox_queue)
success_message = (
f"Successfully added {len(valid_urls)} URLs to Redis list '{inbox_queue}'. "
f"The list now contains {final_list_length} items."
)
logger.info(success_message)
except Exception as e:
logger.error(f"Failed to add URLs to Redis queue '{inbox_queue}': {e}", exc_info=True)
raise AirflowException(f"Failed to add URLs to Redis: {e}")
# --- DAG Definition ---
with DAG(
dag_id="ytdlp_mgmt_queues",
default_args={
"owner": "airflow",
"start_date": days_ago(1),
"retries": 0,
"queue": "queue-mgmt",
},
schedule=None,
catchup=False,
tags=["ytdlp", "mgmt", "master"],
doc_md="""
### YT-DLP Queue Management
This DAG provides a set of tools to manage Redis queues used by the YTDLP processing pipeline.
Select an `action` to perform when triggering the DAG.
**Actions:**
- `add_videos`: Add one or more YouTube videos to a queue. You can provide input manually, select a predefined file from the server, or provide a path/URL to a file.
- `clear_queue`: Dump and/or delete a specific Redis key.
- `list_contents`: View the contents of a Redis key (list or hash).
- `check_status`: Check the overall status of the queues.
- `requeue_failed`: Copy all URLs from the `_fail` hash to the `_inbox` list and clear the `_fail` hash.
- `purge_celery_queue`: **(Destructive)** Removes all tasks from a specified Celery worker queue (e.g., `queue-dl`). This is useful for clearing out a backlog of tasks that were queued before a dispatcher was paused.
- `clear_dag_runs`: **(Destructive)** Deletes DAG run history and associated task instances from the database, removing them from the UI.
""",
params={
"action": Param(
"list_contents",
type="string",
enum=["add_videos", "clear_queue", "list_contents", "check_status", "requeue_failed", "inspect_celery_cluster", "purge_celery_queue", "clear_dag_runs"],
title="Action",
description="The management action to perform.",
),
"queue_system": Param(
"v1_monolithic",
type="string",
enum=["v1_monolithic", "v2_separated_auth", "v2_separated_dl"],
title="Queue System",
description="Select the target queue system to manage. This choice affects which queues are targeted by actions.",
),
"queue_base_name": Param(
DEFAULT_QUEUE_NAME,
type="string",
title="Queue Base Name (v1 only)",
description="Base name for queues. Only used when 'Queue System' is 'v1_monolithic'.",
),
# --- Params for 'add_videos' ---
"input_source": Param(
"predefined_file",
type="string",
enum=["manual", "predefined_file", "file_path_or_url"],
title="[add_videos] Video Input Source",
description="Choose how to provide the video URLs. This choice determines which of the following parameters is used.",
),
"video_inputs": Param(
None,
type=["null", "string"],
title="[add_videos] 1. Manual Input",
description="Used if 'Input Source' is 'manual'. Paste a single item, a comma-separated list, or a JSON array of YouTube URLs or Video IDs.",
),
"predefined_url_list": Param(
"None",
type="string",
enum=_get_predefined_url_lists(),
title="[add_videos] 2. Predefined File",
description=(
"Used if 'Input Source' is 'predefined_file'. Select a JSON file from the server's URL list directory "
f"(defined by Airflow Variable 'YTDLP_URL_LISTS_DIR', defaults to '{DEFAULT_URL_LISTS_DIR}')."
),
),
"url_list_file_path": Param(
None,
type=["null", "string"],
title="[add_videos] 3. File Path or URL",
description="Used if 'Input Source' is 'file_path_or_url'. Enter a local file path (on the Airflow worker) or a remote URL to a JSON file containing a list of URLs/IDs.",
),
"dry_run": Param(
False,
type="boolean",
title="[add_videos] Dry Run",
description="If True, validate inputs without adding them to the queue.",
),
# --- Params for 'clear_queue' ---
"queues_to_clear_options": Param(
None,
type=["null", "array"],
title="[clear_queue] Queues to Clear",
description="Select which standard queues to clear. '_all' clears all four. If left empty, it defaults to '_all'.",
items={
"type": "string",
"enum": ["_inbox", "_fail", "_result", "_progress", "_skipped", "_all"],
}
),
"confirm_clear": Param(
False,
type="boolean",
title="[clear_queue] Confirm Deletion",
description="Must be set to True to execute the 'clear_queue' action. This is a destructive operation.",
),
"dump_queues": Param(
True,
type="boolean",
title="[clear_queue] Dump Data",
description="If True, dump data before clearing.",
),
"dump_dir": Param(
None,
type=["null", "string"],
title="[clear_queue] Dump Directory",
description="Base directory to save CSV dump files. Supports Jinja. If empty, defaults to Airflow variable 'YTDLP_REDIS_DUMP_DIR' or '/opt/airflow/dumps'.",
),
"dump_patterns": Param(
'ytdlp:*,video_queue_*',
type="string",
title="[clear_queue] Dump Patterns",
description="Comma-separated list of key patterns to dump.",
),
# --- Params for 'list_contents' ---
"queue_to_list": Param(
'video_queue_inbox,queue2_auth_inbox,queue2_dl_inbox,queue2_dl_result',
type="string",
title="[list_contents] Queues to List",
description="Comma-separated list of exact Redis key names to list.",
),
"max_items": Param(
10,
type="integer",
title="[list_contents] Max Items to List",
description="Maximum number of items to show.",
),
# --- Params for 'requeue_failed' ---
"clear_fail_queue_after_requeue": Param(
True,
type="boolean",
title="[requeue_failed] Clear Fail Queue",
description="If True, deletes the `_fail` hash after requeueing items.",
),
# --- Params for 'purge_celery_queue' ---
"celery_queue_to_purge": Param(
"queue-dl,queue-auth",
type="string",
title="[purge_celery_queue] Celery Queues to Purge",
description="Comma-separated list of Celery queue names to purge from the broker. This is a destructive action.",
),
"confirm_purge": Param(
False,
type="boolean",
title="[purge_celery_queue] Confirm Purge",
description="Must be set to True to execute the 'purge_celery_queue' action. This is a destructive operation that removes all tasks from the specified Celery queue(s).",
),
# --- Params for 'clear_dag_runs' ---
"dag_id_to_manage": Param(
"ALL_DAGS",
type="string",
enum=["ALL_DAGS", "ytdlp_ops_v01_orchestrator", "ytdlp_ops_v01_dispatcher", "ytdlp_ops_v01_worker_per_url", "ytdlp_ops_v02_orchestrator_auth", "ytdlp_ops_v02_dispatcher_auth", "ytdlp_ops_v02_worker_per_url_auth", "ytdlp_ops_v02_orchestrator_dl", "ytdlp_ops_v02_dispatcher_dl", "ytdlp_ops_v02_worker_per_url_dl"],
title="[clear_dag_runs] DAG ID",
description="The DAG ID to perform the action on. Select 'ALL_DAGS' to clear history for all DAGs.",
),
"clear_scope": Param(
"all_runs",
type="string",
enum=["last_run", "all_runs"],
title="[clear_dag_runs] Clear Scope",
description="For 'clear_dag_runs' action, specifies the scope of runs to clear.",
),
# --- Common Params ---
"redis_conn_id": Param(
DEFAULT_REDIS_CONN_ID,
type="string",
title="Redis Connection ID",
),
},
) as dag:
branch_on_action = BranchPythonOperator(
task_id="branch_on_action",
python_callable=lambda **context: f"action_{context['params']['action']}",
)
action_add_videos = PythonOperator(
task_id="action_add_videos",
python_callable=add_videos_to_queue_callable,
)
action_clear_queue = PythonOperator(
task_id="action_clear_queue",
python_callable=clear_queue_callable,
templates_dict={'dump_dir': "{{ params.dump_dir or var.value.get('YTDLP_REDIS_DUMP_DIR', '/opt/airflow/dumps') }}"},
)
action_list_contents = PythonOperator(
task_id="action_list_contents",
python_callable=list_contents_callable,
)
action_check_status = PythonOperator(
task_id="action_check_status",
python_callable=check_status_callable,
)
action_requeue_failed = PythonOperator(
task_id="action_requeue_failed",
python_callable=requeue_failed_callable,
)
action_inspect_celery_cluster = BashOperator(
task_id="action_inspect_celery_cluster",
bash_command="""
# Get the broker URL from Airflow config
BROKER_URL=$(airflow config get-value celery broker_url)
echo "--- Inspecting Celery Cluster (Broker: $BROKER_URL) ---"
echo ""
echo "--- Active Queues (shows queues with consumers) ---"
celery -A airflow.providers.celery.executors.celery_executor.app -b "$BROKER_URL" inspect active_queues
echo ""
echo "--- Worker Stats (shows connected workers) ---"
celery -A airflow.providers.celery.executors.celery_executor.app -b "$BROKER_URL" inspect stats
echo ""
echo "--- Active Tasks (tasks currently running) ---"
celery -A airflow.providers.celery.executors.celery_executor.app -b "$BROKER_URL" inspect active
echo ""
echo "--- Reserved Tasks (tasks prefetched by workers) ---"
celery -A airflow.providers.celery.executors.celery_executor.app -b "$BROKER_URL" inspect reserved
""",
)
action_purge_celery_queue = PythonOperator(
task_id="action_purge_celery_queue",
python_callable=purge_celery_queue_callable,
)
action_clear_dag_runs = PythonOperator(
task_id="action_clear_dag_runs",
python_callable=clear_dag_runs_callable,
)
# --- Wire up tasks ---
branch_on_action >> [
action_add_videos,
action_clear_queue,
action_list_contents,
action_check_status,
action_requeue_failed,
action_inspect_celery_cluster,
action_purge_celery_queue,
action_clear_dag_runs,
]