diff --git a/README.en.old.md b/README.en.old.md new file mode 100644 index 0000000..8afe76f --- /dev/null +++ b/README.en.old.md @@ -0,0 +1,100 @@ +# YTDLP Airflow DAGs + +This document describes the Airflow DAGs used for interacting with the YTDLP Ops service and managing processing queues. + +## DAG Descriptions + +### `ytdlp_client_dag_v2.1` + +* **File:** `airflow/dags/ytdlp_client_dag_v2.1.py` +* **Purpose:** Provides a way to test the YTDLP Ops Thrift service interaction for a *single* video URL. Useful for debugging connection issues, testing specific account IDs, or verifying the service response for a particular URL independently of the queueing system. +* **Parameters (Defaults):** + * `url` (`'https://www.youtube.com/watch?v=sOlTX9uxUtM'`): The video URL to process. + * `redis_enabled` (`False`): Use Redis for service discovery? + * `service_ip` (`'85.192.30.55'`): Service IP if `redis_enabled=False`. + * `service_port` (`9090`): Service port if `redis_enabled=False`. + * `account_id` (`'account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Account ID for lookup or call. + * `timeout` (`30`): Timeout in seconds for Thrift connection. + * `info_json_dir` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}"`): Directory to save `info.json`. +* **Results:** + * Connects to the YTDLP Ops service using the specified method (Redis or direct IP). + * Retrieves token data for the given URL and account ID. + * Saves the video's `info.json` metadata to the specified directory. + * Extracts the SOCKS proxy (if available). + * Pushes `info_json_path`, `socks_proxy`, and the original `ytdlp_command` (with tokens) to XCom. + * Optionally stores detailed results in a Redis hash (`token_info:`). + +### `ytdlp_mgmt_queue_add_urls` + +* **File:** `airflow/dags/ytdlp_mgmt_queue_add_urls.py` +* **Purpose:** Manually add video URLs to a specific YTDLP inbox queue (Redis List). +* **Parameters (Defaults):** + * `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID. + * `queue_name` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Target Redis list (inbox queue). + * `urls` (`""`): Multiline string of video URLs to add. +* **Results:** + * Parses the multiline `urls` parameter. + * Adds each valid URL to the end of the Redis list specified by `queue_name`. + * Logs the number of URLs added. + +### `ytdlp_mgmt_queue_clear` + +* **File:** `airflow/dags/ytdlp_mgmt_queue_clear.py` +* **Purpose:** Manually delete a specific Redis key used by the YTDLP queues. +* **Parameters (Defaults):** + * `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID. + * `queue_to_clear` (`'PLEASE_SPECIFY_QUEUE_TO_CLEAR'`): Exact name of the Redis key to clear. **Must be changed by user.** +* **Results:** + * Deletes the Redis key specified by the `queue_to_clear` parameter. + * **Warning:** This operation is destructive and irreversible. Use with extreme caution. Ensure you specify the correct key name (e.g., `video_queue_inbox_account_xyz`, `video_queue_progress`, `video_queue_result`, `video_queue_fail`). + +### `ytdlp_mgmt_queue_check_status` + +* **File:** `airflow/dags/ytdlp_mgmt_queue_check_status.py` +* **Purpose:** Manually check the type and size of a specific YTDLP Redis queue/key. +* **Parameters (Defaults):** + * `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID. + * `queue_to_check` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Exact name of the Redis key to check. +* **Results:** + * Connects to Redis and determines the type of the key specified by `queue_to_check`. + * Determines the size (length for lists, number of fields for hashes). + * Logs the key type and size. + * Pushes `queue_key_type` and `queue_size` to XCom. + +### `ytdlp_mgmt_queue_list_contents` + +* **File:** `airflow/dags/ytdlp_mgmt_queue_list_contents.py` +* **Purpose:** Manually list the contents of a specific YTDLP Redis queue/key (list or hash). Useful for inspecting queue state or results. +* **Parameters (Defaults):** + * `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID. + * `queue_to_list` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Exact name of the Redis key to list. + * `max_items` (`100`): Maximum number of items/fields to list. +* **Results:** + * Connects to Redis and identifies the type of the key specified by `queue_to_list`. + * If it's a List, logs the first `max_items` elements. + * If it's a Hash, logs up to `max_items` key-value pairs, attempting to pretty-print JSON values. + * Logs warnings for very large hashes. + +### `ytdlp_proc_sequential_processor` + +* **File:** `airflow/dags/ytdlp_proc_sequential_processor.py` +* **Purpose:** Processes YouTube URLs sequentially from a Redis queue. Designed for batch processing. Pops a URL, gets token/metadata via YTDLP Ops service, downloads the media using `yt-dlp`, and records the result. +* **Parameters (Defaults):** + * `queue_name` (`'video_queue'`): Base name for Redis queues (e.g., `video_queue_inbox`, `video_queue_progress`). + * `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID. + * `redis_enabled` (`False`): Use Redis for service discovery? If False, uses `service_ip`/`port`. + * `service_ip` (`None`): Required Service IP if `redis_enabled=False`. + * `service_port` (`None`): Required Service port if `redis_enabled=False`. + * `account_id` (`'default_account'`): Account ID for the API call (used for Redis lookup if `redis_enabled=True`). + * `timeout` (`30`): Timeout in seconds for the Thrift connection. + * `download_format` (`'ba[ext=m4a]/bestaudio/best'`): yt-dlp format selection string. + * `output_path_template` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloads') }}/%(title)s [%(id)s].%(ext)s"`): yt-dlp output template. Uses Airflow Variable `DOWNLOADS_TEMP`. + * `info_json_dir` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}"`): Directory to save `info.json`. Uses Airflow Variable `DOWNLOADS_TEMP`. +* **Results:** + * Pops one URL from the `{{ params.queue_name }}_inbox` Redis list. + * If a URL is popped, it's added to the `{{ params.queue_name }}_progress` Redis hash. + * The `YtdlpOpsOperator` (`get_token` task) attempts to get token data (including `info.json`, proxy, command) for the URL using the specified connection method and account ID. + * If token retrieval succeeds, the `download_video` task executes `yt-dlp` using the retrieved `info.json`, proxy, the `download_format` parameter, and the `output_path_template` parameter to download the actual media. + * **On Successful Download:** The URL is removed from the progress hash and added to the `{{ params.queue_name }}_result` hash along with results (`info_json_path`, `socks_proxy`, `ytdlp_command`). + * **On Failure (Token Retrieval or Download):** The URL is removed from the progress hash and added to the `{{ params.queue_name }}_fail` hash along with error details (message, traceback). + * If the inbox queue is empty, the DAG run skips processing via `AirflowSkipException`. diff --git a/README.md b/README.md index 8afe76f..67f1b61 100644 --- a/README.md +++ b/README.md @@ -1,100 +1,38 @@ -# YTDLP Airflow DAGs +# Архитектура и описание YTDLP Airflow DAGs -This document describes the Airflow DAGs used for interacting with the YTDLP Ops service and managing processing queues. +Этот документ описывает архитектуру и назначение DAG'ов, используемых для скачивания видео с YouTube. Система построена по паттерну "Сенсор/Воркер" для обеспечения непрерывной и параллельной обработки. -## DAG Descriptions +## Основной цикл обработки -### `ytdlp_client_dag_v2.1` +### `ytdlp_sensor_redis_queue` (Сенсор) -* **File:** `airflow/dags/ytdlp_client_dag_v2.1.py` -* **Purpose:** Provides a way to test the YTDLP Ops Thrift service interaction for a *single* video URL. Useful for debugging connection issues, testing specific account IDs, or verifying the service response for a particular URL independently of the queueing system. -* **Parameters (Defaults):** - * `url` (`'https://www.youtube.com/watch?v=sOlTX9uxUtM'`): The video URL to process. - * `redis_enabled` (`False`): Use Redis for service discovery? - * `service_ip` (`'85.192.30.55'`): Service IP if `redis_enabled=False`. - * `service_port` (`9090`): Service port if `redis_enabled=False`. - * `account_id` (`'account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Account ID for lookup or call. - * `timeout` (`30`): Timeout in seconds for Thrift connection. - * `info_json_dir` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}"`): Directory to save `info.json`. -* **Results:** - * Connects to the YTDLP Ops service using the specified method (Redis or direct IP). - * Retrieves token data for the given URL and account ID. - * Saves the video's `info.json` metadata to the specified directory. - * Extracts the SOCKS proxy (if available). - * Pushes `info_json_path`, `socks_proxy`, and the original `ytdlp_command` (with tokens) to XCom. - * Optionally stores detailed results in a Redis hash (`token_info:`). +- **Назначение:** Забирает URL на скачивание из очереди Redis и запускает воркеры для их обработки. +- **Принцип работы (Гибридный запуск):** + - **По расписанию:** Каждую минуту DAG автоматически проверяет очередь Redis. Это гарантирует, что новые задачи будут подхвачены, даже если цикл обработки был временно остановлен (из-за пустой очереди). + - **По триггеру:** Когда воркер `ytdlp_worker_per_url` успешно завершает работу, он немедленно запускает сенсор, не дожидаясь следующей минуты. Это обеспечивает непрерывную обработку без задержек. + - **Логика:** Извлекает из Redis (`_inbox` лист) пачку URL. Если очередь пуста, DAG успешно завершается до следующего запуска (по триггеру или по расписанию). -### `ytdlp_mgmt_queue_add_urls` +### `ytdlp_worker_per_url` (Воркер) -* **File:** `airflow/dags/ytdlp_mgmt_queue_add_urls.py` -* **Purpose:** Manually add video URLs to a specific YTDLP inbox queue (Redis List). -* **Parameters (Defaults):** - * `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID. - * `queue_name` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Target Redis list (inbox queue). - * `urls` (`""`): Multiline string of video URLs to add. -* **Results:** - * Parses the multiline `urls` parameter. - * Adds each valid URL to the end of the Redis list specified by `queue_name`. - * Logs the number of URLs added. +- **Назначение:** Обрабатывает один URL, скачивает видео и продолжает цикл. +- **Принцип работы:** + - Получает один URL от сенсора. + - Обращается к сервису `ytdlp-ops-auth` для получения `info.json` и `socks5` прокси. + - Скачивает видео, используя полученные данные. (TODO: заменить вызов `yt-dlp` как команды на вызов библиотеки). + - В зависимости от статуса (успех/неуспех), помещает результат в соответствующий хэш Redis (`_result` или `_fail`). + - В случае успеха, повторно запускает сенсор `ytdlp_sensor_redis_queue` для продолжения цикла обработки. В случае ошибки цикл останавливается для ручной диагностики. -### `ytdlp_mgmt_queue_clear` +## Управляющие DAG'и -* **File:** `airflow/dags/ytdlp_mgmt_queue_clear.py` -* **Purpose:** Manually delete a specific Redis key used by the YTDLP queues. -* **Parameters (Defaults):** - * `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID. - * `queue_to_clear` (`'PLEASE_SPECIFY_QUEUE_TO_CLEAR'`): Exact name of the Redis key to clear. **Must be changed by user.** -* **Results:** - * Deletes the Redis key specified by the `queue_to_clear` parameter. - * **Warning:** This operation is destructive and irreversible. Use with extreme caution. Ensure you specify the correct key name (e.g., `video_queue_inbox_account_xyz`, `video_queue_progress`, `video_queue_result`, `video_queue_fail`). +Эти DAG'и предназначены для ручного управления очередями и не участвуют в автоматическом цикле. -### `ytdlp_mgmt_queue_check_status` +- **`ytdlp_mgmt_queue_add_and_verify`**: Добавление URL в очередь задач (`_inbox`) и последующая проверка статуса этой очереди. +- **`ytdlp_mgmt_queues_check_status`**: Просмотр состояния и содержимого всех ключевых очередей (`_inbox`, `_progress`, `_result`, `_fail`). Помогает отслеживать процесс обработки. +- **`ytdlp_mgmt_queue_clear`**: Очистка (полное удаление) указанной очереди Redis. **Использовать с осторожностью**, так как операция необратима. -* **File:** `airflow/dags/ytdlp_mgmt_queue_check_status.py` -* **Purpose:** Manually check the type and size of a specific YTDLP Redis queue/key. -* **Parameters (Defaults):** - * `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID. - * `queue_to_check` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Exact name of the Redis key to check. -* **Results:** - * Connects to Redis and determines the type of the key specified by `queue_to_check`. - * Determines the size (length for lists, number of fields for hashes). - * Logs the key type and size. - * Pushes `queue_key_type` and `queue_size` to XCom. +## Внешние сервисы -### `ytdlp_mgmt_queue_list_contents` +### `ytdlp-ops-auth` (Thrift Service) -* **File:** `airflow/dags/ytdlp_mgmt_queue_list_contents.py` -* **Purpose:** Manually list the contents of a specific YTDLP Redis queue/key (list or hash). Useful for inspecting queue state or results. -* **Parameters (Defaults):** - * `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID. - * `queue_to_list` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Exact name of the Redis key to list. - * `max_items` (`100`): Maximum number of items/fields to list. -* **Results:** - * Connects to Redis and identifies the type of the key specified by `queue_to_list`. - * If it's a List, logs the first `max_items` elements. - * If it's a Hash, logs up to `max_items` key-value pairs, attempting to pretty-print JSON values. - * Logs warnings for very large hashes. - -### `ytdlp_proc_sequential_processor` - -* **File:** `airflow/dags/ytdlp_proc_sequential_processor.py` -* **Purpose:** Processes YouTube URLs sequentially from a Redis queue. Designed for batch processing. Pops a URL, gets token/metadata via YTDLP Ops service, downloads the media using `yt-dlp`, and records the result. -* **Parameters (Defaults):** - * `queue_name` (`'video_queue'`): Base name for Redis queues (e.g., `video_queue_inbox`, `video_queue_progress`). - * `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID. - * `redis_enabled` (`False`): Use Redis for service discovery? If False, uses `service_ip`/`port`. - * `service_ip` (`None`): Required Service IP if `redis_enabled=False`. - * `service_port` (`None`): Required Service port if `redis_enabled=False`. - * `account_id` (`'default_account'`): Account ID for the API call (used for Redis lookup if `redis_enabled=True`). - * `timeout` (`30`): Timeout in seconds for the Thrift connection. - * `download_format` (`'ba[ext=m4a]/bestaudio/best'`): yt-dlp format selection string. - * `output_path_template` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloads') }}/%(title)s [%(id)s].%(ext)s"`): yt-dlp output template. Uses Airflow Variable `DOWNLOADS_TEMP`. - * `info_json_dir` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}"`): Directory to save `info.json`. Uses Airflow Variable `DOWNLOADS_TEMP`. -* **Results:** - * Pops one URL from the `{{ params.queue_name }}_inbox` Redis list. - * If a URL is popped, it's added to the `{{ params.queue_name }}_progress` Redis hash. - * The `YtdlpOpsOperator` (`get_token` task) attempts to get token data (including `info.json`, proxy, command) for the URL using the specified connection method and account ID. - * If token retrieval succeeds, the `download_video` task executes `yt-dlp` using the retrieved `info.json`, proxy, the `download_format` parameter, and the `output_path_template` parameter to download the actual media. - * **On Successful Download:** The URL is removed from the progress hash and added to the `{{ params.queue_name }}_result` hash along with results (`info_json_path`, `socks_proxy`, `ytdlp_command`). - * **On Failure (Token Retrieval or Download):** The URL is removed from the progress hash and added to the `{{ params.queue_name }}_fail` hash along with error details (message, traceback). - * If the inbox queue is empty, the DAG run skips processing via `AirflowSkipException`. +- **Назначение:** Внешний сервис, который предоставляет аутентификационные данные (токены, cookies, proxy) для скачивания видео. +- **Взаимодействие:** Worker DAG (`ytdlp_worker_per_url`) обращается к этому сервису перед началом загрузки для получения необходимых данных для `yt-dlp`. diff --git a/dags/ytdlp_mgmt_queue_add_urls.py b/dags/ytdlp_mgmt_queue_add_urls.py index 3721010..4fbeec6 100644 --- a/dags/ytdlp_mgmt_queue_add_urls.py +++ b/dags/ytdlp_mgmt_queue_add_urls.py @@ -8,6 +8,9 @@ from datetime import timedelta import logging import redis # Import redis exceptions if needed +# Import utility functions +from utils.redis_utils import _get_redis_client + # Configure logging logger = logging.getLogger(__name__) @@ -15,23 +18,6 @@ logger = logging.getLogger(__name__) DEFAULT_QUEUE_NAME = 'video_queue' # Default base name for the queue DEFAULT_REDIS_CONN_ID = 'redis_default' -# --- Helper Functions --- - -def _get_redis_client(redis_conn_id): - """Gets a Redis client connection using RedisHook.""" - try: - hook = RedisHook(redis_conn_id=redis_conn_id) - client = hook.get_conn() - client.ping() - logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.") - return client - except redis.exceptions.AuthenticationError: - logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.") - raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.") - except Exception as e: - logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}") - raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}") - # --- Python Callables for Tasks --- def add_urls_callable(**context): diff --git a/dags/ytdlp_mgmt_queue_check_status.py b/dags/ytdlp_mgmt_queue_check_status.py index 1b27ea6..17c75b0 100644 --- a/dags/ytdlp_mgmt_queue_check_status.py +++ b/dags/ytdlp_mgmt_queue_check_status.py @@ -28,22 +28,8 @@ DEFAULT_REDIS_CONN_ID = 'redis_default' DEFAULT_QUEUE_BASE_NAME = 'video_queue' DEFAULT_MAX_ITEMS_TO_LIST = 25 -# --- Helper Function --- - -def _get_redis_client(redis_conn_id): - """Gets a Redis client connection using RedisHook.""" - try: - hook = RedisHook(redis_conn_id=redis_conn_id) - client = hook.get_conn() - client.ping() - logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.") - return client - except redis.exceptions.AuthenticationError: - logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.") - raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.") - except Exception as e: - logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}") - raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}") +# Import utility functions +from utils.redis_utils import _get_redis_client # --- Python Callable for Check and List Task --- diff --git a/dags/ytdlp_mgmt_queue_clear.py b/dags/ytdlp_mgmt_queue_clear.py index 0ecc562..2167921 100644 --- a/dags/ytdlp_mgmt_queue_clear.py +++ b/dags/ytdlp_mgmt_queue_clear.py @@ -1,5 +1,10 @@ # -*- coding: utf-8 -*- # vim:fenc=utf-8 +# +# Copyright © 2024 rl +# +# Distributed under terms of the MIT license. + """ Airflow DAG for manually clearing (deleting) a specific Redis key used by YTDLP queues. """ @@ -22,22 +27,8 @@ DEFAULT_REDIS_CONN_ID = 'redis_default' # Provide a placeholder default, user MUST specify the queue to clear DEFAULT_QUEUE_TO_CLEAR = 'PLEASE_SPECIFY_QUEUE_TO_CLEAR' -# --- Helper Function --- - -def _get_redis_client(redis_conn_id): - """Gets a Redis client connection using RedisHook.""" - try: - hook = RedisHook(redis_conn_id=redis_conn_id) - client = hook.get_conn() - client.ping() - logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.") - return client - except redis.exceptions.AuthenticationError: - logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.") - raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.") - except Exception as e: - logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}") - raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}") +# Import utility functions +from utils.redis_utils import _get_redis_client # --- Python Callable for Clear Task --- diff --git a/dags/ytdlp_mgmt_queue_list_contents.py b/dags/ytdlp_mgmt_queue_list_contents.py index b903308..071da44 100644 --- a/dags/ytdlp_mgmt_queue_list_contents.py +++ b/dags/ytdlp_mgmt_queue_list_contents.py @@ -29,24 +29,8 @@ DEFAULT_REDIS_CONN_ID = 'redis_default' DEFAULT_QUEUE_TO_LIST = 'video_queue_inbox' DEFAULT_MAX_ITEMS = 10 # Limit number of items listed by default -# --- Helper Function --- - -def _get_redis_client(redis_conn_id): - """Gets a Redis client connection using RedisHook.""" - try: - hook = RedisHook(redis_conn_id=redis_conn_id) - # decode_responses=True removed as it's not supported by get_conn in some environments - # We will decode manually where needed. - client = hook.get_conn() - client.ping() - logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.") - return client - except redis.exceptions.AuthenticationError: - logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.") - raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.") - except Exception as e: - logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}") - raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}") +# Import utility functions +from utils.redis_utils import _get_redis_client # --- Python Callable for List Contents Task --- diff --git a/dags/ytdlp_proc_sequential_processor.py b/dags/ytdlp_proc_sequential_processor.py index 68d5aa8..08c7c6d 100644 --- a/dags/ytdlp_proc_sequential_processor.py +++ b/dags/ytdlp_proc_sequential_processor.py @@ -47,20 +47,7 @@ RETRY_DELAY_REDIS_LOOKUP = 10 # Delay (seconds) for Redis lookup retries # --- Helper Functions --- -def _get_redis_client(redis_conn_id): - """Gets a Redis client connection using RedisHook.""" - try: - hook = RedisHook(redis_conn_id=redis_conn_id) - client = hook.get_conn() - client.ping() - logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.") - return client - except redis.exceptions.AuthenticationError: - logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.") - raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.") - except Exception as e: - logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}") - raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}") +from utils.redis_utils import _get_redis_client def _extract_video_id(url): """Extracts YouTube video ID from URL.""" diff --git a/dags/ytdlp_sensor_redis_queue.py b/dags/ytdlp_sensor_redis_queue.py index b592b12..e5d9147 100644 --- a/dags/ytdlp_sensor_redis_queue.py +++ b/dags/ytdlp_sensor_redis_queue.py @@ -21,6 +21,9 @@ from datetime import timedelta import logging import redis +# Import utility functions +from utils.redis_utils import _get_redis_client + # Configure logging logger = logging.getLogger(__name__) @@ -30,23 +33,6 @@ DEFAULT_REDIS_CONN_ID = 'redis_default' DEFAULT_TIMEOUT = 30 DEFAULT_MAX_URLS = '1' # Default number of URLs to process per run -# --- Helper Functions --- - -def _get_redis_client(redis_conn_id): - """Gets a Redis client connection using RedisHook.""" - try: - hook = RedisHook(redis_conn_id=redis_conn_id) - client = hook.get_conn() - client.ping() - logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.") - return client - except redis.exceptions.AuthenticationError: - logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.") - raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.") - except Exception as e: - logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}") - raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}") - # --- Task Callables --- def log_trigger_info_callable(**context): @@ -57,6 +43,8 @@ def log_trigger_info_callable(**context): if trigger_type == 'manual': logger.info("Trigger source: Manual execution from Airflow UI or CLI.") + elif trigger_type == 'scheduled': + logger.info("Trigger source: Scheduled run (periodic check).") elif trigger_type == 'dag_run': # In Airflow 2.2+ we can get the triggering run object try: @@ -154,10 +142,10 @@ default_args = { with DAG( dag_id='ytdlp_sensor_redis_queue', default_args=default_args, - schedule_interval=None, # This DAG is now only triggered (manually or by a worker) + schedule_interval='*/1 * * * *', # Runs every minute and can also be triggered. max_active_runs=1, # Prevent multiple sensors from running at once catchup=False, - description='Polls Redis queue for a batch of URLs and triggers parallel worker DAGs.', + description='Polls Redis queue every minute (and on trigger) for URLs and starts worker DAGs.', tags=['ytdlp', 'sensor', 'queue', 'redis', 'batch'], params={ 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Base name for Redis queues."), diff --git a/dags/ytdlp_worker_per_url.py b/dags/ytdlp_worker_per_url.py index 1cbdffc..e540c46 100644 --- a/dags/ytdlp_worker_per_url.py +++ b/dags/ytdlp_worker_per_url.py @@ -33,6 +33,10 @@ import os import redis import socket import time +import traceback + +# Import utility functions +from utils.redis_utils import _get_redis_client # Configure logging logger = logging.getLogger(__name__) @@ -106,7 +110,7 @@ def handle_success(**context): try: # In the worker pattern, there's no "progress" hash to remove from. # We just add the result to the success hash. - client = YtdlpOpsOperator._get_redis_client(redis_conn_id) + client = _get_redis_client(redis_conn_id) client.hset(result_queue, url, json.dumps(result_data)) logger.info(f"Stored success result for URL '{url}' in result hash '{result_queue}'.") except Exception as e: @@ -115,8 +119,8 @@ def handle_success(**context): def handle_failure(**context): """ - Handles failed processing. Moves the URL to the fail hash and, if stop_on_failure - is True, fails the task to make the DAG run failure visible. + Handles failed processing. Records detailed error information to the fail hash + and, if stop_on_failure is True, fails the task to make the DAG run failure visible. """ ti = context['task_instance'] params = context['params'] @@ -132,14 +136,31 @@ def handle_failure(**context): requeue_on_failure = params.get('requeue_on_failure', False) stop_on_failure = params.get('stop_on_failure', True) + # --- Extract Detailed Error Information --- exception = context.get('exception') error_message = str(exception) if exception else "Unknown error" + error_type = type(exception).__name__ if exception else "Unknown" + tb_str = "".join(traceback.format_exception(etype=type(exception), value=exception, tb=exception.__traceback__)) if exception else "No traceback available." + + # Find the specific task that failed + dag_run = context['dag_run'] + failed_task_id = "unknown" + # Look at direct upstream tasks of the current task ('handle_failure') + upstream_tasks = ti.get_direct_relatives(upstream=True) + for task in upstream_tasks: + upstream_ti = dag_run.get_task_instance(task_id=task.task_id) + if upstream_ti and upstream_ti.state == 'failed': + failed_task_id = task.task_id + break logger.info(f"Handling failure for URL: {url}") + logger.error(f" Failed Task: {failed_task_id}") + logger.error(f" Failure Type: {error_type}") logger.error(f" Failure Reason: {error_message}") + logger.debug(f" Traceback:\n{tb_str}") try: - client = YtdlpOpsOperator._get_redis_client(redis_conn_id) + client = _get_redis_client(redis_conn_id) if requeue_on_failure: client.rpush(inbox_queue, url) logger.info(f"Re-queued failed URL '{url}' to inbox '{inbox_queue}' for retry.") @@ -147,12 +168,15 @@ def handle_failure(**context): fail_data = { 'status': 'failed', 'end_time': time.time(), - 'error': error_message, + 'failed_task': failed_task_id, + 'error_type': error_type, + 'error_message': error_message, + 'traceback': tb_str, 'url': url, 'dag_run_id': context['dag_run'].run_id, } - client.hset(fail_queue, url, json.dumps(fail_data)) - logger.info(f"Stored failure details for URL '{url}' in fail hash '{fail_queue}'.") + client.hset(fail_queue, url, json.dumps(fail_data, indent=2)) + logger.info(f"Stored detailed failure info for URL '{url}' in fail hash '{fail_queue}'.") except Exception as e: logger.error(f"Critical error during failure handling in Redis for URL '{url}': {e}", exc_info=True) # This is a critical error in the failure handling logic itself. @@ -178,22 +202,6 @@ class YtdlpOpsOperator(BaseOperator): """ template_fields = ('service_ip', 'service_port', 'account_id', 'timeout', 'info_json_dir') - @staticmethod - def _get_redis_client(redis_conn_id): - """Gets a Redis client connection using RedisHook.""" - try: - hook = RedisHook(redis_conn_id=redis_conn_id) - client = hook.get_conn() - client.ping() - logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.") - return client - except redis.exceptions.AuthenticationError: - logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.") - raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.") - except Exception as e: - logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}") - raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}") - @apply_defaults def __init__(self, service_ip=None, @@ -448,8 +456,8 @@ with DAG( trigger_sensor_for_next_batch.doc_md = """ ### Trigger Sensor for Next Batch Triggers a new run of the `ytdlp_sensor_redis_queue` DAG to create a continuous processing loop. - This task runs after the main processing tasks are complete (either success or failure), - ensuring that the system immediately checks for more URLs to process. + This task **only runs on the success path** after a URL has been fully processed. + This ensures that the system immediately checks for more URLs to process, but stops the loop on failure. """ # Define success and failure handling tasks @@ -470,10 +478,9 @@ with DAG( # The main processing flow get_token >> download_video - # Branch after download: one path for success, one for failure - download_video >> success_task - download_video >> failure_task + # The success path: if download_video succeeds, run success_task, then trigger the next sensor run. + download_video >> success_task >> trigger_sensor_for_next_batch - # The trigger to continue the loop ONLY runs on the success path. - # A failure will be recorded in Redis by `handle_failure` and then the loop will stop. - success_task >> trigger_sensor_for_next_batch + # The failure path: if get_token OR download_video fails, run the failure_task. + # This is a "fan-in" dependency. + [get_token, download_video] >> failure_task