yt-dlp-dags/airflow/dags/ytdlp_ops_dispatcher.py
2025-08-26 18:00:55 +03:00

90 lines
3.5 KiB
Python

# -*- coding: utf-8 -*-
"""
DAG to dispatch work to ytdlp_ops_worker_per_url DAGs.
It pulls a URL from Redis and triggers a worker with a pinned queue.
"""
from __future__ import annotations
import logging
import socket
from datetime import timedelta
from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from airflow.models.dag import DAG
from airflow.models.param import Param
from airflow.api.common.trigger_dag import trigger_dag
from airflow.utils.dates import days_ago
from utils.redis_utils import _get_redis_client
logger = logging.getLogger(__name__)
DEFAULT_QUEUE_NAME = 'video_queue'
DEFAULT_REDIS_CONN_ID = 'redis_default'
@task(queue='queue-dl')
def dispatch_url_to_worker(**context):
"""
Pulls one URL from Redis, determines the current worker's dedicated queue,
and triggers the main worker DAG to process the URL on that specific queue.
"""
params = context['params']
redis_conn_id = params['redis_conn_id']
queue_name = params['queue_name']
inbox_queue = f"{queue_name}_inbox"
logger.info(f"Attempting to pull one URL from Redis queue '{inbox_queue}'...")
client = _get_redis_client(redis_conn_id)
url_bytes = client.lpop(inbox_queue)
if not url_bytes:
logger.info("Redis queue is empty. No work to dispatch. Skipping task.")
raise AirflowSkipException("Redis queue is empty. No work to dispatch.")
url_to_process = url_bytes.decode('utf-8')
logger.info(f"Pulled URL '{url_to_process}' from the queue.")
# Determine the worker-specific queue for affinity
hostname = socket.gethostname()
worker_queue = f"queue-dl-{hostname}"
logger.info(f"Running on worker '{hostname}'. Dispatching job to its dedicated queue '{worker_queue}'.")
# The orchestrator passes all its params, which we will pass through to the worker.
# We add the specific URL and the determined worker queue to the configuration.
conf_to_pass = {**params, 'url_to_process': url_to_process, 'worker_queue': worker_queue}
run_id = f"worker_run_{context['dag_run'].run_id}_{context['ts_nodash']}"
logger.info(f"Triggering 'ytdlp_ops_worker_per_url' with run_id '{run_id}'")
trigger_dag(
dag_id='ytdlp_ops_worker_per_url',
run_id=run_id,
conf=conf_to_pass,
replace_microseconds=False
)
with DAG(
dag_id='ytdlp_ops_dispatcher',
default_args={'owner': 'airflow', 'retries': 0},
schedule=None, # This DAG is only triggered by the orchestrator.
start_date=days_ago(1),
catchup=False,
tags=['ytdlp', 'worker', 'dispatcher'],
doc_md="""
### YT-DLP URL Dispatcher
This DAG is responsible for dispatching a single URL to a worker with a pinned queue.
1. It pulls a single URL from the Redis `_inbox` queue.
2. It runs on the generic `queue-dl` to find any available worker.
3. It determines the worker's hostname and constructs a dedicated queue name (e.g., `queue-dl-dl-worker-1`).
4. It triggers the `ytdlp_ops_worker_per_url` DAG, passing the URL and the dedicated queue name in the configuration.
This dispatcher-led affinity, combined with the `task_instance_mutation_hook` cluster policy, ensures that all subsequent processing for that URL happens on the same machine.
The `ytdlp_ops_orchestrator` is used to trigger a batch of these dispatcher runs.
""",
# All params are passed through from the orchestrator
render_template_as_native_obj=True,
) as dag:
dispatch_url_to_worker()