# -*- coding: utf-8 -*- # vim:fenc=utf-8 # # Copyright © 2024 rl # # Distributed under terms of the MIT license. """ DAG to sense a Redis queue for new URLs and trigger the ytdlp_worker_per_url DAG. This is the "Sensor" part of a Sensor/Worker pattern. """ from airflow import DAG from airflow.exceptions import AirflowException, AirflowSkipException from airflow.operators.python import PythonOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.redis.hooks.redis import RedisHook from airflow.models.param import Param from airflow.utils.dates import days_ago from datetime import timedelta import logging import redis # Configure logging logger = logging.getLogger(__name__) # Default settings DEFAULT_QUEUE_NAME = 'video_queue' DEFAULT_REDIS_CONN_ID = 'redis_default' DEFAULT_TIMEOUT = 30 DEFAULT_MAX_URLS = '1' # Default number of URLs to process per run # --- Helper Functions --- def _get_redis_client(redis_conn_id): """Gets a Redis client connection using RedisHook.""" try: hook = RedisHook(redis_conn_id=redis_conn_id) client = hook.get_conn() client.ping() logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.") return client except redis.exceptions.AuthenticationError: logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.") raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.") except Exception as e: logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}") raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}") # --- Task Callables --- def log_trigger_info_callable(**context): """Logs information about how the DAG run was triggered.""" dag_run = context['dag_run'] trigger_type = dag_run.run_type logger.info(f"Sensor DAG triggered. Run ID: {dag_run.run_id}, Type: {trigger_type}") if trigger_type == 'manual': logger.info("Trigger source: Manual execution from Airflow UI or CLI.") elif trigger_type == 'dag_run': # In Airflow 2.2+ we can get the triggering run object try: triggering_dag_run = dag_run.get_triggering_dagrun() if triggering_dag_run: triggering_dag_id = triggering_dag_run.dag_id triggering_run_id = triggering_dag_run.run_id logger.info(f"Trigger source: DAG Run from '{triggering_dag_id}' (Run ID: {triggering_run_id}).") # Check if it's a worker by looking at the conf keys conf = dag_run.conf or {} if all(k in conf for k in ['queue_name', 'redis_conn_id', 'max_urls_per_run']): logger.info("This appears to be a standard trigger from a worker DAG continuing the loop.") else: logger.warning(f"Triggered by another DAG but conf does not match worker pattern. Conf: {conf}") else: logger.warning("Trigger type is 'dag_run' but could not retrieve triggering DAG run details.") except Exception as e: logger.error(f"Could not get triggering DAG run details: {e}") else: logger.info(f"Trigger source: {trigger_type}") def check_queue_for_urls_batch(**context): """ Pops a batch of URLs from the inbox queue. Returns a list of configuration dictionaries for the TriggerDagRunOperator. If the queue is empty, it raises AirflowSkipException. """ params = context['params'] queue_name = params['queue_name'] inbox_queue = f"{queue_name}_inbox" redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) max_urls_raw = params.get('max_urls_per_run', DEFAULT_MAX_URLS) try: max_urls = int(max_urls_raw) except (ValueError, TypeError): logger.warning(f"Invalid value for max_urls_per_run: '{max_urls_raw}'. Using default: {DEFAULT_MAX_URLS}") max_urls = DEFAULT_MAX_URLS urls_to_process = [] try: client = _get_redis_client(redis_conn_id) current_queue_size = client.llen(inbox_queue) logger.info(f"Queue '{inbox_queue}' has {current_queue_size} URLs. Attempting to pop up to {max_urls}.") for _ in range(max_urls): url_bytes = client.lpop(inbox_queue) if url_bytes: url = url_bytes.decode('utf-8') if isinstance(url_bytes, bytes) else url_bytes logger.info(f" - Popped URL: {url}") urls_to_process.append(url) else: # Queue is empty, stop trying to pop break if urls_to_process: logger.info(f"Found {len(urls_to_process)} URLs in queue. Generating trigger configurations.") # Create a list of 'conf' objects for the trigger operator to expand trigger_configs = [] for url in urls_to_process: # The worker DAG will use its own default params for its operations. # We only need to provide the URL for processing, and the sensor's own # params so the worker can trigger the sensor again to continue the loop. worker_conf = { 'url': url, 'queue_name': queue_name, 'redis_conn_id': redis_conn_id, 'max_urls_per_run': int(max_urls), 'stop_on_failure': params.get('stop_on_failure', True) } trigger_configs.append(worker_conf) return trigger_configs else: logger.info(f"Queue '{inbox_queue}' is empty. Skipping trigger.") raise AirflowSkipException(f"Redis queue '{inbox_queue}' is empty.") except AirflowSkipException: raise except Exception as e: logger.error(f"Error popping URLs from Redis queue '{inbox_queue}': {e}", exc_info=True) raise AirflowException(f"Failed to pop URLs from Redis: {e}") # ============================================================================= # DAG Definition # ============================================================================= default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, # The sensor itself should not retry on failure, it will run again on schedule 'start_date': days_ago(1), } with DAG( dag_id='ytdlp_sensor_redis_queue', default_args=default_args, schedule_interval=None, # This DAG is now only triggered (manually or by a worker) max_active_runs=1, # Prevent multiple sensors from running at once catchup=False, description='Polls Redis queue for a batch of URLs and triggers parallel worker DAGs.', tags=['ytdlp', 'sensor', 'queue', 'redis', 'batch'], params={ 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Base name for Redis queues."), 'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="Airflow Redis connection ID."), 'max_urls_per_run': Param(DEFAULT_MAX_URLS, type="string", description="Maximum number of URLs to process in one batch."), 'stop_on_failure': Param(True, type="boolean", description="If True, a worker failure will stop the entire processing loop."), } ) as dag: log_trigger_info_task = PythonOperator( task_id='log_trigger_info', python_callable=log_trigger_info_callable, ) log_trigger_info_task.doc_md = """ ### Log Trigger Information Logs details about how this DAG run was initiated (e.g., manually or by a worker DAG). This provides visibility into the processing loop. """ poll_redis_task = PythonOperator( task_id='check_queue_for_urls_batch', python_callable=check_queue_for_urls_batch, ) poll_redis_task.doc_md = """ ### Poll Redis Queue for Batch Checks the Redis inbox queue for a batch of new URLs (up to `max_urls_per_run`). - **On Success (URLs found):** Returns a list of configuration objects for the trigger task. - **On Skip (Queue empty):** Skips this task and the trigger task. The DAG run succeeds. """ # This operator will be dynamically expanded based on the output of poll_redis_task trigger_worker_dags = TriggerDagRunOperator.partial( task_id='trigger_worker_dags', trigger_dag_id='ytdlp_worker_per_url', wait_for_completion=False, # Fire and forget doc_md=""" ### Trigger Worker DAGs (Dynamically Mapped) Triggers one `ytdlp_worker_per_url` DAG run for each URL found by the polling task. Each triggered DAG receives its own specific configuration (including the URL). This task is skipped if the polling task finds no URLs. """ ).expand( conf=poll_redis_task.output ) log_trigger_info_task >> poll_redis_task >> trigger_worker_dags