# -*- coding: utf-8 -*- """ DAG to dispatch work to ytdlp_ops_worker_per_url DAGs. It pulls a URL from Redis and triggers a worker with a pinned queue. """ from __future__ import annotations import logging import socket from datetime import timedelta from airflow.decorators import task from airflow.exceptions import AirflowSkipException from airflow.models.dag import DAG from airflow.models.param import Param from airflow.api.common.trigger_dag import trigger_dag from airflow.utils.dates import days_ago from utils.redis_utils import _get_redis_client logger = logging.getLogger(__name__) DEFAULT_QUEUE_NAME = 'video_queue' DEFAULT_REDIS_CONN_ID = 'redis_default' @task(queue='queue-dl') def dispatch_url_to_worker(**context): """ Pulls one URL from Redis, determines the current worker's dedicated queue, and triggers the main worker DAG to process the URL on that specific queue. """ params = context['params'] redis_conn_id = params['redis_conn_id'] queue_name = params['queue_name'] inbox_queue = f"{queue_name}_inbox" logger.info(f"Attempting to pull one URL from Redis queue '{inbox_queue}'...") client = _get_redis_client(redis_conn_id) url_bytes = client.lpop(inbox_queue) if not url_bytes: logger.info("Redis queue is empty. No work to dispatch. Skipping task.") raise AirflowSkipException("Redis queue is empty. No work to dispatch.") url_to_process = url_bytes.decode('utf-8') logger.info(f"Pulled URL '{url_to_process}' from the queue.") # Determine the worker-specific queue for affinity hostname = socket.gethostname() worker_queue = f"queue-dl-{hostname}" logger.info(f"Running on worker '{hostname}'. Dispatching job to its dedicated queue '{worker_queue}'.") # The orchestrator passes all its params, which we will pass through to the worker. # We add the specific URL and the determined worker queue to the configuration. conf_to_pass = {**params, 'url_to_process': url_to_process, 'worker_queue': worker_queue} run_id = f"worker_run_{context['dag_run'].run_id}_{context['ts_nodash']}" logger.info(f"Triggering 'ytdlp_ops_worker_per_url' with run_id '{run_id}'") trigger_dag( dag_id='ytdlp_ops_worker_per_url', run_id=run_id, conf=conf_to_pass, replace_microseconds=False ) with DAG( dag_id='ytdlp_ops_dispatcher', default_args={'owner': 'airflow', 'retries': 0}, schedule=None, # This DAG is only triggered by the orchestrator. start_date=days_ago(1), catchup=False, tags=['ytdlp', 'worker', 'dispatcher'], doc_md=""" ### YT-DLP URL Dispatcher This DAG is responsible for dispatching a single URL to a worker with a pinned queue. 1. It pulls a single URL from the Redis `_inbox` queue. 2. It runs on the generic `queue-dl` to find any available worker. 3. It determines the worker's hostname and constructs a dedicated queue name (e.g., `queue-dl-dl-worker-1`). 4. It triggers the `ytdlp_ops_worker_per_url` DAG, passing the URL and the dedicated queue name in the configuration. This dispatcher-led affinity, combined with the `task_instance_mutation_hook` cluster policy, ensures that all subsequent processing for that URL happens on the same machine. The `ytdlp_ops_orchestrator` is used to trigger a batch of these dispatcher runs. """, # All params are passed through from the orchestrator render_template_as_native_obj=True, ) as dag: dispatch_url_to_worker()