Compare commits
2 Commits
affc59ee57
...
fc2d740b65
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fc2d740b65 | ||
|
|
1f186fd217 |
100
README.en.old.md
Normal file
100
README.en.old.md
Normal file
@ -0,0 +1,100 @@
|
|||||||
|
# YTDLP Airflow DAGs
|
||||||
|
|
||||||
|
This document describes the Airflow DAGs used for interacting with the YTDLP Ops service and managing processing queues.
|
||||||
|
|
||||||
|
## DAG Descriptions
|
||||||
|
|
||||||
|
### `ytdlp_client_dag_v2.1`
|
||||||
|
|
||||||
|
* **File:** `airflow/dags/ytdlp_client_dag_v2.1.py`
|
||||||
|
* **Purpose:** Provides a way to test the YTDLP Ops Thrift service interaction for a *single* video URL. Useful for debugging connection issues, testing specific account IDs, or verifying the service response for a particular URL independently of the queueing system.
|
||||||
|
* **Parameters (Defaults):**
|
||||||
|
* `url` (`'https://www.youtube.com/watch?v=sOlTX9uxUtM'`): The video URL to process.
|
||||||
|
* `redis_enabled` (`False`): Use Redis for service discovery?
|
||||||
|
* `service_ip` (`'85.192.30.55'`): Service IP if `redis_enabled=False`.
|
||||||
|
* `service_port` (`9090`): Service port if `redis_enabled=False`.
|
||||||
|
* `account_id` (`'account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Account ID for lookup or call.
|
||||||
|
* `timeout` (`30`): Timeout in seconds for Thrift connection.
|
||||||
|
* `info_json_dir` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}"`): Directory to save `info.json`.
|
||||||
|
* **Results:**
|
||||||
|
* Connects to the YTDLP Ops service using the specified method (Redis or direct IP).
|
||||||
|
* Retrieves token data for the given URL and account ID.
|
||||||
|
* Saves the video's `info.json` metadata to the specified directory.
|
||||||
|
* Extracts the SOCKS proxy (if available).
|
||||||
|
* Pushes `info_json_path`, `socks_proxy`, and the original `ytdlp_command` (with tokens) to XCom.
|
||||||
|
* Optionally stores detailed results in a Redis hash (`token_info:<video_id>`).
|
||||||
|
|
||||||
|
### `ytdlp_mgmt_queue_add_urls`
|
||||||
|
|
||||||
|
* **File:** `airflow/dags/ytdlp_mgmt_queue_add_urls.py`
|
||||||
|
* **Purpose:** Manually add video URLs to a specific YTDLP inbox queue (Redis List).
|
||||||
|
* **Parameters (Defaults):**
|
||||||
|
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
||||||
|
* `queue_name` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Target Redis list (inbox queue).
|
||||||
|
* `urls` (`""`): Multiline string of video URLs to add.
|
||||||
|
* **Results:**
|
||||||
|
* Parses the multiline `urls` parameter.
|
||||||
|
* Adds each valid URL to the end of the Redis list specified by `queue_name`.
|
||||||
|
* Logs the number of URLs added.
|
||||||
|
|
||||||
|
### `ytdlp_mgmt_queue_clear`
|
||||||
|
|
||||||
|
* **File:** `airflow/dags/ytdlp_mgmt_queue_clear.py`
|
||||||
|
* **Purpose:** Manually delete a specific Redis key used by the YTDLP queues.
|
||||||
|
* **Parameters (Defaults):**
|
||||||
|
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
||||||
|
* `queue_to_clear` (`'PLEASE_SPECIFY_QUEUE_TO_CLEAR'`): Exact name of the Redis key to clear. **Must be changed by user.**
|
||||||
|
* **Results:**
|
||||||
|
* Deletes the Redis key specified by the `queue_to_clear` parameter.
|
||||||
|
* **Warning:** This operation is destructive and irreversible. Use with extreme caution. Ensure you specify the correct key name (e.g., `video_queue_inbox_account_xyz`, `video_queue_progress`, `video_queue_result`, `video_queue_fail`).
|
||||||
|
|
||||||
|
### `ytdlp_mgmt_queue_check_status`
|
||||||
|
|
||||||
|
* **File:** `airflow/dags/ytdlp_mgmt_queue_check_status.py`
|
||||||
|
* **Purpose:** Manually check the type and size of a specific YTDLP Redis queue/key.
|
||||||
|
* **Parameters (Defaults):**
|
||||||
|
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
||||||
|
* `queue_to_check` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Exact name of the Redis key to check.
|
||||||
|
* **Results:**
|
||||||
|
* Connects to Redis and determines the type of the key specified by `queue_to_check`.
|
||||||
|
* Determines the size (length for lists, number of fields for hashes).
|
||||||
|
* Logs the key type and size.
|
||||||
|
* Pushes `queue_key_type` and `queue_size` to XCom.
|
||||||
|
|
||||||
|
### `ytdlp_mgmt_queue_list_contents`
|
||||||
|
|
||||||
|
* **File:** `airflow/dags/ytdlp_mgmt_queue_list_contents.py`
|
||||||
|
* **Purpose:** Manually list the contents of a specific YTDLP Redis queue/key (list or hash). Useful for inspecting queue state or results.
|
||||||
|
* **Parameters (Defaults):**
|
||||||
|
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
||||||
|
* `queue_to_list` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Exact name of the Redis key to list.
|
||||||
|
* `max_items` (`100`): Maximum number of items/fields to list.
|
||||||
|
* **Results:**
|
||||||
|
* Connects to Redis and identifies the type of the key specified by `queue_to_list`.
|
||||||
|
* If it's a List, logs the first `max_items` elements.
|
||||||
|
* If it's a Hash, logs up to `max_items` key-value pairs, attempting to pretty-print JSON values.
|
||||||
|
* Logs warnings for very large hashes.
|
||||||
|
|
||||||
|
### `ytdlp_proc_sequential_processor`
|
||||||
|
|
||||||
|
* **File:** `airflow/dags/ytdlp_proc_sequential_processor.py`
|
||||||
|
* **Purpose:** Processes YouTube URLs sequentially from a Redis queue. Designed for batch processing. Pops a URL, gets token/metadata via YTDLP Ops service, downloads the media using `yt-dlp`, and records the result.
|
||||||
|
* **Parameters (Defaults):**
|
||||||
|
* `queue_name` (`'video_queue'`): Base name for Redis queues (e.g., `video_queue_inbox`, `video_queue_progress`).
|
||||||
|
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
||||||
|
* `redis_enabled` (`False`): Use Redis for service discovery? If False, uses `service_ip`/`port`.
|
||||||
|
* `service_ip` (`None`): Required Service IP if `redis_enabled=False`.
|
||||||
|
* `service_port` (`None`): Required Service port if `redis_enabled=False`.
|
||||||
|
* `account_id` (`'default_account'`): Account ID for the API call (used for Redis lookup if `redis_enabled=True`).
|
||||||
|
* `timeout` (`30`): Timeout in seconds for the Thrift connection.
|
||||||
|
* `download_format` (`'ba[ext=m4a]/bestaudio/best'`): yt-dlp format selection string.
|
||||||
|
* `output_path_template` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloads') }}/%(title)s [%(id)s].%(ext)s"`): yt-dlp output template. Uses Airflow Variable `DOWNLOADS_TEMP`.
|
||||||
|
* `info_json_dir` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}"`): Directory to save `info.json`. Uses Airflow Variable `DOWNLOADS_TEMP`.
|
||||||
|
* **Results:**
|
||||||
|
* Pops one URL from the `{{ params.queue_name }}_inbox` Redis list.
|
||||||
|
* If a URL is popped, it's added to the `{{ params.queue_name }}_progress` Redis hash.
|
||||||
|
* The `YtdlpOpsOperator` (`get_token` task) attempts to get token data (including `info.json`, proxy, command) for the URL using the specified connection method and account ID.
|
||||||
|
* If token retrieval succeeds, the `download_video` task executes `yt-dlp` using the retrieved `info.json`, proxy, the `download_format` parameter, and the `output_path_template` parameter to download the actual media.
|
||||||
|
* **On Successful Download:** The URL is removed from the progress hash and added to the `{{ params.queue_name }}_result` hash along with results (`info_json_path`, `socks_proxy`, `ytdlp_command`).
|
||||||
|
* **On Failure (Token Retrieval or Download):** The URL is removed from the progress hash and added to the `{{ params.queue_name }}_fail` hash along with error details (message, traceback).
|
||||||
|
* If the inbox queue is empty, the DAG run skips processing via `AirflowSkipException`.
|
||||||
114
README.md
114
README.md
@ -1,100 +1,38 @@
|
|||||||
# YTDLP Airflow DAGs
|
# Архитектура и описание YTDLP Airflow DAGs
|
||||||
|
|
||||||
This document describes the Airflow DAGs used for interacting with the YTDLP Ops service and managing processing queues.
|
Этот документ описывает архитектуру и назначение DAG'ов, используемых для скачивания видео с YouTube. Система построена по паттерну "Сенсор/Воркер" для обеспечения непрерывной и параллельной обработки.
|
||||||
|
|
||||||
## DAG Descriptions
|
## Основной цикл обработки
|
||||||
|
|
||||||
### `ytdlp_client_dag_v2.1`
|
### `ytdlp_sensor_redis_queue` (Сенсор)
|
||||||
|
|
||||||
* **File:** `airflow/dags/ytdlp_client_dag_v2.1.py`
|
- **Назначение:** Забирает URL на скачивание из очереди Redis и запускает воркеры для их обработки.
|
||||||
* **Purpose:** Provides a way to test the YTDLP Ops Thrift service interaction for a *single* video URL. Useful for debugging connection issues, testing specific account IDs, or verifying the service response for a particular URL independently of the queueing system.
|
- **Принцип работы (Гибридный запуск):**
|
||||||
* **Parameters (Defaults):**
|
- **По расписанию:** Каждую минуту DAG автоматически проверяет очередь Redis. Это гарантирует, что новые задачи будут подхвачены, даже если цикл обработки был временно остановлен (из-за пустой очереди).
|
||||||
* `url` (`'https://www.youtube.com/watch?v=sOlTX9uxUtM'`): The video URL to process.
|
- **По триггеру:** Когда воркер `ytdlp_worker_per_url` успешно завершает работу, он немедленно запускает сенсор, не дожидаясь следующей минуты. Это обеспечивает непрерывную обработку без задержек.
|
||||||
* `redis_enabled` (`False`): Use Redis for service discovery?
|
- **Логика:** Извлекает из Redis (`_inbox` лист) пачку URL. Если очередь пуста, DAG успешно завершается до следующего запуска (по триггеру или по расписанию).
|
||||||
* `service_ip` (`'85.192.30.55'`): Service IP if `redis_enabled=False`.
|
|
||||||
* `service_port` (`9090`): Service port if `redis_enabled=False`.
|
|
||||||
* `account_id` (`'account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Account ID for lookup or call.
|
|
||||||
* `timeout` (`30`): Timeout in seconds for Thrift connection.
|
|
||||||
* `info_json_dir` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}"`): Directory to save `info.json`.
|
|
||||||
* **Results:**
|
|
||||||
* Connects to the YTDLP Ops service using the specified method (Redis or direct IP).
|
|
||||||
* Retrieves token data for the given URL and account ID.
|
|
||||||
* Saves the video's `info.json` metadata to the specified directory.
|
|
||||||
* Extracts the SOCKS proxy (if available).
|
|
||||||
* Pushes `info_json_path`, `socks_proxy`, and the original `ytdlp_command` (with tokens) to XCom.
|
|
||||||
* Optionally stores detailed results in a Redis hash (`token_info:<video_id>`).
|
|
||||||
|
|
||||||
### `ytdlp_mgmt_queue_add_urls`
|
### `ytdlp_worker_per_url` (Воркер)
|
||||||
|
|
||||||
* **File:** `airflow/dags/ytdlp_mgmt_queue_add_urls.py`
|
- **Назначение:** Обрабатывает один URL, скачивает видео и продолжает цикл.
|
||||||
* **Purpose:** Manually add video URLs to a specific YTDLP inbox queue (Redis List).
|
- **Принцип работы:**
|
||||||
* **Parameters (Defaults):**
|
- Получает один URL от сенсора.
|
||||||
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
- Обращается к сервису `ytdlp-ops-auth` для получения `info.json` и `socks5` прокси.
|
||||||
* `queue_name` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Target Redis list (inbox queue).
|
- Скачивает видео, используя полученные данные. (TODO: заменить вызов `yt-dlp` как команды на вызов библиотеки).
|
||||||
* `urls` (`""`): Multiline string of video URLs to add.
|
- В зависимости от статуса (успех/неуспех), помещает результат в соответствующий хэш Redis (`_result` или `_fail`).
|
||||||
* **Results:**
|
- В случае успеха, повторно запускает сенсор `ytdlp_sensor_redis_queue` для продолжения цикла обработки. В случае ошибки цикл останавливается для ручной диагностики.
|
||||||
* Parses the multiline `urls` parameter.
|
|
||||||
* Adds each valid URL to the end of the Redis list specified by `queue_name`.
|
|
||||||
* Logs the number of URLs added.
|
|
||||||
|
|
||||||
### `ytdlp_mgmt_queue_clear`
|
## Управляющие DAG'и
|
||||||
|
|
||||||
* **File:** `airflow/dags/ytdlp_mgmt_queue_clear.py`
|
Эти DAG'и предназначены для ручного управления очередями и не участвуют в автоматическом цикле.
|
||||||
* **Purpose:** Manually delete a specific Redis key used by the YTDLP queues.
|
|
||||||
* **Parameters (Defaults):**
|
|
||||||
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
|
||||||
* `queue_to_clear` (`'PLEASE_SPECIFY_QUEUE_TO_CLEAR'`): Exact name of the Redis key to clear. **Must be changed by user.**
|
|
||||||
* **Results:**
|
|
||||||
* Deletes the Redis key specified by the `queue_to_clear` parameter.
|
|
||||||
* **Warning:** This operation is destructive and irreversible. Use with extreme caution. Ensure you specify the correct key name (e.g., `video_queue_inbox_account_xyz`, `video_queue_progress`, `video_queue_result`, `video_queue_fail`).
|
|
||||||
|
|
||||||
### `ytdlp_mgmt_queue_check_status`
|
- **`ytdlp_mgmt_queue_add_and_verify`**: Добавление URL в очередь задач (`_inbox`) и последующая проверка статуса этой очереди.
|
||||||
|
- **`ytdlp_mgmt_queues_check_status`**: Просмотр состояния и содержимого всех ключевых очередей (`_inbox`, `_progress`, `_result`, `_fail`). Помогает отслеживать процесс обработки.
|
||||||
|
- **`ytdlp_mgmt_queue_clear`**: Очистка (полное удаление) указанной очереди Redis. **Использовать с осторожностью**, так как операция необратима.
|
||||||
|
|
||||||
* **File:** `airflow/dags/ytdlp_mgmt_queue_check_status.py`
|
## Внешние сервисы
|
||||||
* **Purpose:** Manually check the type and size of a specific YTDLP Redis queue/key.
|
|
||||||
* **Parameters (Defaults):**
|
|
||||||
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
|
||||||
* `queue_to_check` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Exact name of the Redis key to check.
|
|
||||||
* **Results:**
|
|
||||||
* Connects to Redis and determines the type of the key specified by `queue_to_check`.
|
|
||||||
* Determines the size (length for lists, number of fields for hashes).
|
|
||||||
* Logs the key type and size.
|
|
||||||
* Pushes `queue_key_type` and `queue_size` to XCom.
|
|
||||||
|
|
||||||
### `ytdlp_mgmt_queue_list_contents`
|
### `ytdlp-ops-auth` (Thrift Service)
|
||||||
|
|
||||||
* **File:** `airflow/dags/ytdlp_mgmt_queue_list_contents.py`
|
- **Назначение:** Внешний сервис, который предоставляет аутентификационные данные (токены, cookies, proxy) для скачивания видео.
|
||||||
* **Purpose:** Manually list the contents of a specific YTDLP Redis queue/key (list or hash). Useful for inspecting queue state or results.
|
- **Взаимодействие:** Worker DAG (`ytdlp_worker_per_url`) обращается к этому сервису перед началом загрузки для получения необходимых данных для `yt-dlp`.
|
||||||
* **Parameters (Defaults):**
|
|
||||||
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
|
||||||
* `queue_to_list` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Exact name of the Redis key to list.
|
|
||||||
* `max_items` (`100`): Maximum number of items/fields to list.
|
|
||||||
* **Results:**
|
|
||||||
* Connects to Redis and identifies the type of the key specified by `queue_to_list`.
|
|
||||||
* If it's a List, logs the first `max_items` elements.
|
|
||||||
* If it's a Hash, logs up to `max_items` key-value pairs, attempting to pretty-print JSON values.
|
|
||||||
* Logs warnings for very large hashes.
|
|
||||||
|
|
||||||
### `ytdlp_proc_sequential_processor`
|
|
||||||
|
|
||||||
* **File:** `airflow/dags/ytdlp_proc_sequential_processor.py`
|
|
||||||
* **Purpose:** Processes YouTube URLs sequentially from a Redis queue. Designed for batch processing. Pops a URL, gets token/metadata via YTDLP Ops service, downloads the media using `yt-dlp`, and records the result.
|
|
||||||
* **Parameters (Defaults):**
|
|
||||||
* `queue_name` (`'video_queue'`): Base name for Redis queues (e.g., `video_queue_inbox`, `video_queue_progress`).
|
|
||||||
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
|
||||||
* `redis_enabled` (`False`): Use Redis for service discovery? If False, uses `service_ip`/`port`.
|
|
||||||
* `service_ip` (`None`): Required Service IP if `redis_enabled=False`.
|
|
||||||
* `service_port` (`None`): Required Service port if `redis_enabled=False`.
|
|
||||||
* `account_id` (`'default_account'`): Account ID for the API call (used for Redis lookup if `redis_enabled=True`).
|
|
||||||
* `timeout` (`30`): Timeout in seconds for the Thrift connection.
|
|
||||||
* `download_format` (`'ba[ext=m4a]/bestaudio/best'`): yt-dlp format selection string.
|
|
||||||
* `output_path_template` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloads') }}/%(title)s [%(id)s].%(ext)s"`): yt-dlp output template. Uses Airflow Variable `DOWNLOADS_TEMP`.
|
|
||||||
* `info_json_dir` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}"`): Directory to save `info.json`. Uses Airflow Variable `DOWNLOADS_TEMP`.
|
|
||||||
* **Results:**
|
|
||||||
* Pops one URL from the `{{ params.queue_name }}_inbox` Redis list.
|
|
||||||
* If a URL is popped, it's added to the `{{ params.queue_name }}_progress` Redis hash.
|
|
||||||
* The `YtdlpOpsOperator` (`get_token` task) attempts to get token data (including `info.json`, proxy, command) for the URL using the specified connection method and account ID.
|
|
||||||
* If token retrieval succeeds, the `download_video` task executes `yt-dlp` using the retrieved `info.json`, proxy, the `download_format` parameter, and the `output_path_template` parameter to download the actual media.
|
|
||||||
* **On Successful Download:** The URL is removed from the progress hash and added to the `{{ params.queue_name }}_result` hash along with results (`info_json_path`, `socks_proxy`, `ytdlp_command`).
|
|
||||||
* **On Failure (Token Retrieval or Download):** The URL is removed from the progress hash and added to the `{{ params.queue_name }}_fail` hash along with error details (message, traceback).
|
|
||||||
* If the inbox queue is empty, the DAG run skips processing via `AirflowSkipException`.
|
|
||||||
|
|||||||
10
dags/utils/__init__.py
Normal file
10
dags/utils/__init__.py
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# vim:fenc=utf-8
|
||||||
|
#
|
||||||
|
# Copyright © 2024 rl <rl@rlmbp>
|
||||||
|
#
|
||||||
|
# Distributed under terms of the MIT license.
|
||||||
|
|
||||||
|
"""
|
||||||
|
Airflow DAG Utilities
|
||||||
|
"""
|
||||||
32
dags/utils/redis_utils.py
Normal file
32
dags/utils/redis_utils.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# vim:fenc=utf-8
|
||||||
|
#
|
||||||
|
# Copyright © 2024 rl <rl@rlmbp>
|
||||||
|
#
|
||||||
|
# Distributed under terms of the MIT license.
|
||||||
|
|
||||||
|
"""
|
||||||
|
Redis utility functions for Airflow DAGs.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from airflow.exceptions import AirflowException
|
||||||
|
from airflow.providers.redis.hooks.redis import RedisHook
|
||||||
|
import logging
|
||||||
|
import redis
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def _get_redis_client(redis_conn_id):
|
||||||
|
"""Gets a Redis client connection using RedisHook."""
|
||||||
|
try:
|
||||||
|
hook = RedisHook(redis_conn_id=redis_conn_id)
|
||||||
|
client = hook.get_conn()
|
||||||
|
client.ping()
|
||||||
|
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
|
||||||
|
return client
|
||||||
|
except redis.exceptions.AuthenticationError:
|
||||||
|
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
|
||||||
|
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
|
||||||
|
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
|
||||||
@ -8,6 +8,9 @@ from datetime import timedelta
|
|||||||
import logging
|
import logging
|
||||||
import redis # Import redis exceptions if needed
|
import redis # Import redis exceptions if needed
|
||||||
|
|
||||||
|
# Import utility functions
|
||||||
|
from utils.redis_utils import _get_redis_client
|
||||||
|
|
||||||
# Configure logging
|
# Configure logging
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -15,23 +18,6 @@ logger = logging.getLogger(__name__)
|
|||||||
DEFAULT_QUEUE_NAME = 'video_queue' # Default base name for the queue
|
DEFAULT_QUEUE_NAME = 'video_queue' # Default base name for the queue
|
||||||
DEFAULT_REDIS_CONN_ID = 'redis_default'
|
DEFAULT_REDIS_CONN_ID = 'redis_default'
|
||||||
|
|
||||||
# --- Helper Functions ---
|
|
||||||
|
|
||||||
def _get_redis_client(redis_conn_id):
|
|
||||||
"""Gets a Redis client connection using RedisHook."""
|
|
||||||
try:
|
|
||||||
hook = RedisHook(redis_conn_id=redis_conn_id)
|
|
||||||
client = hook.get_conn()
|
|
||||||
client.ping()
|
|
||||||
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
|
|
||||||
return client
|
|
||||||
except redis.exceptions.AuthenticationError:
|
|
||||||
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
|
|
||||||
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
|
|
||||||
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
|
|
||||||
|
|
||||||
# --- Python Callables for Tasks ---
|
# --- Python Callables for Tasks ---
|
||||||
|
|
||||||
def add_urls_callable(**context):
|
def add_urls_callable(**context):
|
||||||
|
|||||||
@ -28,22 +28,8 @@ DEFAULT_REDIS_CONN_ID = 'redis_default'
|
|||||||
DEFAULT_QUEUE_BASE_NAME = 'video_queue'
|
DEFAULT_QUEUE_BASE_NAME = 'video_queue'
|
||||||
DEFAULT_MAX_ITEMS_TO_LIST = 25
|
DEFAULT_MAX_ITEMS_TO_LIST = 25
|
||||||
|
|
||||||
# --- Helper Function ---
|
# Import utility functions
|
||||||
|
from utils.redis_utils import _get_redis_client
|
||||||
def _get_redis_client(redis_conn_id):
|
|
||||||
"""Gets a Redis client connection using RedisHook."""
|
|
||||||
try:
|
|
||||||
hook = RedisHook(redis_conn_id=redis_conn_id)
|
|
||||||
client = hook.get_conn()
|
|
||||||
client.ping()
|
|
||||||
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
|
|
||||||
return client
|
|
||||||
except redis.exceptions.AuthenticationError:
|
|
||||||
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
|
|
||||||
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
|
|
||||||
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
|
|
||||||
|
|
||||||
# --- Python Callable for Check and List Task ---
|
# --- Python Callable for Check and List Task ---
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,10 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
# vim:fenc=utf-8
|
# vim:fenc=utf-8
|
||||||
|
#
|
||||||
|
# Copyright © 2024 rl <rl@rlmbp>
|
||||||
|
#
|
||||||
|
# Distributed under terms of the MIT license.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Airflow DAG for manually clearing (deleting) a specific Redis key used by YTDLP queues.
|
Airflow DAG for manually clearing (deleting) a specific Redis key used by YTDLP queues.
|
||||||
"""
|
"""
|
||||||
@ -22,22 +27,8 @@ DEFAULT_REDIS_CONN_ID = 'redis_default'
|
|||||||
# Provide a placeholder default, user MUST specify the queue to clear
|
# Provide a placeholder default, user MUST specify the queue to clear
|
||||||
DEFAULT_QUEUE_TO_CLEAR = 'PLEASE_SPECIFY_QUEUE_TO_CLEAR'
|
DEFAULT_QUEUE_TO_CLEAR = 'PLEASE_SPECIFY_QUEUE_TO_CLEAR'
|
||||||
|
|
||||||
# --- Helper Function ---
|
# Import utility functions
|
||||||
|
from utils.redis_utils import _get_redis_client
|
||||||
def _get_redis_client(redis_conn_id):
|
|
||||||
"""Gets a Redis client connection using RedisHook."""
|
|
||||||
try:
|
|
||||||
hook = RedisHook(redis_conn_id=redis_conn_id)
|
|
||||||
client = hook.get_conn()
|
|
||||||
client.ping()
|
|
||||||
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
|
|
||||||
return client
|
|
||||||
except redis.exceptions.AuthenticationError:
|
|
||||||
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
|
|
||||||
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
|
|
||||||
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
|
|
||||||
|
|
||||||
# --- Python Callable for Clear Task ---
|
# --- Python Callable for Clear Task ---
|
||||||
|
|
||||||
|
|||||||
@ -29,24 +29,8 @@ DEFAULT_REDIS_CONN_ID = 'redis_default'
|
|||||||
DEFAULT_QUEUE_TO_LIST = 'video_queue_inbox'
|
DEFAULT_QUEUE_TO_LIST = 'video_queue_inbox'
|
||||||
DEFAULT_MAX_ITEMS = 10 # Limit number of items listed by default
|
DEFAULT_MAX_ITEMS = 10 # Limit number of items listed by default
|
||||||
|
|
||||||
# --- Helper Function ---
|
# Import utility functions
|
||||||
|
from utils.redis_utils import _get_redis_client
|
||||||
def _get_redis_client(redis_conn_id):
|
|
||||||
"""Gets a Redis client connection using RedisHook."""
|
|
||||||
try:
|
|
||||||
hook = RedisHook(redis_conn_id=redis_conn_id)
|
|
||||||
# decode_responses=True removed as it's not supported by get_conn in some environments
|
|
||||||
# We will decode manually where needed.
|
|
||||||
client = hook.get_conn()
|
|
||||||
client.ping()
|
|
||||||
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
|
|
||||||
return client
|
|
||||||
except redis.exceptions.AuthenticationError:
|
|
||||||
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
|
|
||||||
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
|
|
||||||
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
|
|
||||||
|
|
||||||
# --- Python Callable for List Contents Task ---
|
# --- Python Callable for List Contents Task ---
|
||||||
|
|
||||||
|
|||||||
@ -1,720 +0,0 @@
|
|||||||
# -*- coding: utf-8 -*-
|
|
||||||
# vim:fenc=utf-8
|
|
||||||
#
|
|
||||||
# Copyright © 2024 rl <rl@rlmbp>
|
|
||||||
#
|
|
||||||
# Distributed under terms of the MIT license.
|
|
||||||
|
|
||||||
"""
|
|
||||||
DAG for processing YouTube URLs sequentially from a Redis queue using YTDLP Ops Thrift service.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from airflow import DAG
|
|
||||||
from airflow.exceptions import AirflowException, AirflowSkipException, AirflowFailException
|
|
||||||
from airflow.hooks.base import BaseHook
|
|
||||||
from airflow.models import BaseOperator, Variable
|
|
||||||
from airflow.models.param import Param
|
|
||||||
from airflow.operators.bash import BashOperator # Import BashOperator
|
|
||||||
from airflow.operators.python import PythonOperator
|
|
||||||
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
|
|
||||||
from airflow.providers.redis.hooks.redis import RedisHook
|
|
||||||
from airflow.utils.dates import days_ago
|
|
||||||
from airflow.utils.decorators import apply_defaults
|
|
||||||
from datetime import datetime, timedelta
|
|
||||||
from pangramia.yt.common.ttypes import TokenUpdateMode
|
|
||||||
from pangramia.yt.exceptions.ttypes import PBServiceException
|
|
||||||
from pangramia.yt.tokens_ops import YTTokenOpService
|
|
||||||
from thrift.protocol import TBinaryProtocol
|
|
||||||
from thrift.transport import TSocket, TTransport
|
|
||||||
from thrift.transport.TTransport import TTransportException
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import redis # Import redis exceptions if needed
|
|
||||||
import socket
|
|
||||||
import time
|
|
||||||
import traceback # For logging stack traces in failure handler
|
|
||||||
|
|
||||||
# Configure logging
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# Default settings
|
|
||||||
DEFAULT_QUEUE_NAME = 'video_queue' # Base name for queues
|
|
||||||
DEFAULT_REDIS_CONN_ID = 'redis_default'
|
|
||||||
DEFAULT_TIMEOUT = 30 # Default Thrift timeout in seconds
|
|
||||||
MAX_RETRIES_REDIS_LOOKUP = 3 # Retries for fetching service details from Redis
|
|
||||||
RETRY_DELAY_REDIS_LOOKUP = 10 # Delay (seconds) for Redis lookup retries
|
|
||||||
|
|
||||||
# --- Helper Functions ---
|
|
||||||
|
|
||||||
def _get_redis_client(redis_conn_id):
|
|
||||||
"""Gets a Redis client connection using RedisHook."""
|
|
||||||
try:
|
|
||||||
hook = RedisHook(redis_conn_id=redis_conn_id)
|
|
||||||
client = hook.get_conn()
|
|
||||||
client.ping()
|
|
||||||
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
|
|
||||||
return client
|
|
||||||
except redis.exceptions.AuthenticationError:
|
|
||||||
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
|
|
||||||
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
|
|
||||||
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
|
|
||||||
|
|
||||||
def _extract_video_id(url):
|
|
||||||
"""Extracts YouTube video ID from URL."""
|
|
||||||
if not url or not isinstance(url, str):
|
|
||||||
logger.debug("URL is empty or not a string, cannot extract video ID.")
|
|
||||||
return None
|
|
||||||
try:
|
|
||||||
video_id = None
|
|
||||||
if 'youtube.com/watch?v=' in url:
|
|
||||||
video_id = url.split('v=')[1].split('&')[0]
|
|
||||||
elif 'youtu.be/' in url:
|
|
||||||
video_id = url.split('youtu.be/')[1].split('?')[0]
|
|
||||||
|
|
||||||
if video_id and len(video_id) >= 11:
|
|
||||||
video_id = video_id[:11] # Standard ID length
|
|
||||||
logger.debug(f"Extracted video ID '{video_id}' from URL: {url}")
|
|
||||||
return video_id
|
|
||||||
else:
|
|
||||||
logger.debug(f"Could not extract a standard video ID pattern from URL: {url}")
|
|
||||||
return None
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to extract video ID from URL '{url}'. Error: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# --- Queue Management Callables ---
|
|
||||||
|
|
||||||
def pop_url_from_queue(**context):
|
|
||||||
"""Pops a URL from the inbox queue and pushes to XCom."""
|
|
||||||
params = context['params']
|
|
||||||
queue_name = params['queue_name']
|
|
||||||
inbox_queue = f"{queue_name}_inbox"
|
|
||||||
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
|
|
||||||
logger.info(f"Attempting to pop URL from inbox queue: {inbox_queue}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
client = _get_redis_client(redis_conn_id)
|
|
||||||
# LPOP is non-blocking, returns None if empty
|
|
||||||
url_bytes = client.lpop(inbox_queue) # Returns bytes if decode_responses=False on hook/client
|
|
||||||
|
|
||||||
if url_bytes:
|
|
||||||
url = url_bytes.decode('utf-8') if isinstance(url_bytes, bytes) else url_bytes
|
|
||||||
logger.info(f"Popped URL: {url}")
|
|
||||||
context['task_instance'].xcom_push(key='current_url', value=url)
|
|
||||||
return url # Return URL for logging/potential use
|
|
||||||
else:
|
|
||||||
logger.info(f"Inbox queue '{inbox_queue}' is empty. Skipping downstream tasks.")
|
|
||||||
context['task_instance'].xcom_push(key='current_url', value=None)
|
|
||||||
# Raise AirflowSkipException to signal downstream tasks to skip
|
|
||||||
raise AirflowSkipException(f"Inbox queue '{inbox_queue}' is empty.")
|
|
||||||
except AirflowSkipException:
|
|
||||||
raise # Re-raise skip exception
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error popping URL from Redis queue '{inbox_queue}': {e}", exc_info=True)
|
|
||||||
raise AirflowException(f"Failed to pop URL from Redis: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
def move_url_to_progress(**context):
|
|
||||||
"""Moves the current URL from XCom to the progress hash."""
|
|
||||||
ti = context['task_instance']
|
|
||||||
url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url')
|
|
||||||
|
|
||||||
# This task should be skipped if pop_url_from_queue raised AirflowSkipException
|
|
||||||
# Adding check for robustness
|
|
||||||
if not url:
|
|
||||||
logger.info("No URL found in XCom (or upstream skipped). Skipping move to progress.")
|
|
||||||
raise AirflowSkipException("No URL to process.")
|
|
||||||
|
|
||||||
params = context['params']
|
|
||||||
queue_name = params['queue_name']
|
|
||||||
progress_queue = f"{queue_name}_progress"
|
|
||||||
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
|
|
||||||
logger.info(f"Moving URL '{url}' to progress hash: {progress_queue}")
|
|
||||||
|
|
||||||
progress_data = {
|
|
||||||
'status': 'processing',
|
|
||||||
'start_time': time.time(),
|
|
||||||
'dag_run_id': context['dag_run'].run_id,
|
|
||||||
'task_instance_key_str': context['task_instance_key_str']
|
|
||||||
}
|
|
||||||
|
|
||||||
try:
|
|
||||||
client = _get_redis_client(redis_conn_id)
|
|
||||||
client.hset(progress_queue, url, json.dumps(progress_data))
|
|
||||||
logger.info(f"Moved URL '{url}' to progress hash '{progress_queue}'.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error moving URL to Redis progress hash '{progress_queue}': {e}", exc_info=True)
|
|
||||||
# If this fails, the URL is popped but not tracked as processing. Fail the task.
|
|
||||||
raise AirflowException(f"Failed to move URL to progress hash: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
def handle_success(**context):
|
|
||||||
"""Moves URL from progress to result hash on success."""
|
|
||||||
ti = context['task_instance']
|
|
||||||
url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url')
|
|
||||||
if not url:
|
|
||||||
logger.warning("handle_success called but no URL found from pop_url_from_queue XCom. This shouldn't happen on success path.")
|
|
||||||
return # Or raise error
|
|
||||||
|
|
||||||
params = context['params']
|
|
||||||
queue_name = params['queue_name']
|
|
||||||
progress_queue = f"{queue_name}_progress"
|
|
||||||
result_queue = f"{queue_name}_result"
|
|
||||||
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
|
|
||||||
|
|
||||||
# Pull results from get_token task
|
|
||||||
info_json_path = ti.xcom_pull(task_ids='get_token', key='info_json_path')
|
|
||||||
socks_proxy = ti.xcom_pull(task_ids='get_token', key='socks_proxy')
|
|
||||||
ytdlp_command = ti.xcom_pull(task_ids='get_token', key='ytdlp_command') # Original command
|
|
||||||
downloaded_file_path = ti.xcom_pull(task_ids='download_video') # Pull from download_video task
|
|
||||||
|
|
||||||
logger.info(f"Handling success for URL: {url}")
|
|
||||||
logger.info(f" Info JSON Path: {info_json_path}")
|
|
||||||
logger.info(f" SOCKS Proxy: {socks_proxy}")
|
|
||||||
logger.info(f" YTDLP Command: {ytdlp_command[:100] if ytdlp_command else 'None'}...") # Log truncated command
|
|
||||||
logger.info(f" Downloaded File Path: {downloaded_file_path}")
|
|
||||||
|
|
||||||
result_data = {
|
|
||||||
'status': 'success',
|
|
||||||
'end_time': time.time(),
|
|
||||||
'info_json_path': info_json_path,
|
|
||||||
'socks_proxy': socks_proxy,
|
|
||||||
'ytdlp_command': ytdlp_command,
|
|
||||||
'downloaded_file_path': downloaded_file_path,
|
|
||||||
'url': url,
|
|
||||||
'dag_run_id': context['dag_run'].run_id,
|
|
||||||
'task_instance_key_str': context['task_instance_key_str'] # Record which task instance succeeded
|
|
||||||
}
|
|
||||||
|
|
||||||
try:
|
|
||||||
client = _get_redis_client(redis_conn_id)
|
|
||||||
# Remove from progress hash
|
|
||||||
removed_count = client.hdel(progress_queue, url)
|
|
||||||
if removed_count > 0:
|
|
||||||
logger.info(f"Removed URL '{url}' from progress hash '{progress_queue}'.")
|
|
||||||
else:
|
|
||||||
logger.warning(f"URL '{url}' not found in progress hash '{progress_queue}' during success handling.")
|
|
||||||
|
|
||||||
# Add to result hash
|
|
||||||
client.hset(result_queue, url, json.dumps(result_data))
|
|
||||||
logger.info(f"Stored success result for URL '{url}' in result hash '{result_queue}'.")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error handling success in Redis for URL '{url}': {e}", exc_info=True)
|
|
||||||
# Even if Redis fails, the task succeeded. Log error but don't fail the task.
|
|
||||||
# Consider adding retry logic for Redis operations here or marking state differently.
|
|
||||||
|
|
||||||
|
|
||||||
def handle_failure(**context):
|
|
||||||
"""
|
|
||||||
Handles failed processing. Depending on the `requeue_on_failure` parameter,
|
|
||||||
it either moves the URL to the fail hash or re-queues it in the inbox.
|
|
||||||
If `stop_on_failure` is True, this task will fail, stopping the DAG loop.
|
|
||||||
"""
|
|
||||||
ti = context['task_instance']
|
|
||||||
url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url')
|
|
||||||
if not url:
|
|
||||||
logger.error("handle_failure called but no URL found from pop_url_from_queue XCom.")
|
|
||||||
return
|
|
||||||
|
|
||||||
params = context['params']
|
|
||||||
queue_name = params['queue_name']
|
|
||||||
progress_queue = f"{queue_name}_progress"
|
|
||||||
fail_queue = f"{queue_name}_fail"
|
|
||||||
inbox_queue = f"{queue_name}_inbox"
|
|
||||||
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
|
|
||||||
requeue_on_failure = params.get('requeue_on_failure', False)
|
|
||||||
stop_on_failure = params.get('stop_on_failure', True) # Default to True
|
|
||||||
|
|
||||||
exception = context.get('exception')
|
|
||||||
error_message = str(exception) if exception else "Unknown error"
|
|
||||||
tb_str = traceback.format_exc() if exception else "No traceback available."
|
|
||||||
|
|
||||||
logger.info(f"Handling failure for URL: {url}")
|
|
||||||
logger.error(f" Failure Reason: {error_message}")
|
|
||||||
logger.debug(f" Traceback:\n{tb_str}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
client = _get_redis_client(redis_conn_id)
|
|
||||||
# Always remove from progress hash first
|
|
||||||
removed_count = client.hdel(progress_queue, url)
|
|
||||||
if removed_count > 0:
|
|
||||||
logger.info(f"Removed URL '{url}' from progress hash '{progress_queue}'.")
|
|
||||||
else:
|
|
||||||
logger.warning(f"URL '{url}' not found in progress hash '{progress_queue}' during failure handling.")
|
|
||||||
|
|
||||||
if requeue_on_failure:
|
|
||||||
# Re-queue the URL for another attempt
|
|
||||||
client.rpush(inbox_queue, url)
|
|
||||||
logger.info(f"Re-queued failed URL '{url}' to inbox '{inbox_queue}' for retry.")
|
|
||||||
else:
|
|
||||||
# Move to the permanent fail hash
|
|
||||||
fail_data = {
|
|
||||||
'status': 'failed',
|
|
||||||
'end_time': time.time(),
|
|
||||||
'error': error_message,
|
|
||||||
'traceback': tb_str,
|
|
||||||
'url': url,
|
|
||||||
'dag_run_id': context['dag_run'].run_id,
|
|
||||||
'task_instance_key_str': context['task_instance_key_str']
|
|
||||||
}
|
|
||||||
client.hset(fail_queue, url, json.dumps(fail_data))
|
|
||||||
logger.info(f"Stored failure details for URL '{url}' in fail hash '{fail_queue}'.")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error during failure handling in Redis for URL '{url}': {e}", exc_info=True)
|
|
||||||
# This is a critical error in the failure handling logic itself.
|
|
||||||
raise AirflowException(f"Could not handle failure in Redis: {e}")
|
|
||||||
|
|
||||||
# After handling Redis, decide whether to fail the task to stop the loop
|
|
||||||
if stop_on_failure:
|
|
||||||
logger.error("stop_on_failure is True. Failing this task to stop the DAG loop.")
|
|
||||||
# Re-raise the original exception to fail the task instance.
|
|
||||||
# This is better than AirflowFailException because it preserves the original error.
|
|
||||||
if exception:
|
|
||||||
raise exception
|
|
||||||
else:
|
|
||||||
# If for some reason there's no exception, fail explicitly.
|
|
||||||
raise AirflowFailException("Failing task as per stop_on_failure=True, but original exception was not found.")
|
|
||||||
|
|
||||||
|
|
||||||
# --- YtdlpOpsOperator ---
|
|
||||||
|
|
||||||
class YtdlpOpsOperator(BaseOperator):
|
|
||||||
"""
|
|
||||||
Custom Airflow operator to interact with YTDLP Thrift service. Handles direct connections
|
|
||||||
and Redis-based discovery, retrieves tokens, saves info.json, and manages errors.
|
|
||||||
Modified to pull URL from XCom for sequential processing.
|
|
||||||
"""
|
|
||||||
# Removed 'url' from template_fields as it's pulled from XCom
|
|
||||||
template_fields = ('service_ip', 'service_port', 'account_id', 'timeout', 'info_json_dir', 'redis_conn_id')
|
|
||||||
|
|
||||||
@apply_defaults
|
|
||||||
def __init__(self,
|
|
||||||
# url parameter removed - will be pulled from XCom
|
|
||||||
redis_conn_id=DEFAULT_REDIS_CONN_ID,
|
|
||||||
max_retries_lookup=MAX_RETRIES_REDIS_LOOKUP,
|
|
||||||
retry_delay_lookup=RETRY_DELAY_REDIS_LOOKUP,
|
|
||||||
service_ip=None,
|
|
||||||
service_port=None,
|
|
||||||
redis_enabled=False, # Default to direct connection now
|
|
||||||
account_id=None,
|
|
||||||
# save_info_json removed, always True
|
|
||||||
info_json_dir=None,
|
|
||||||
# get_socks_proxy removed, always True
|
|
||||||
# store_socks_proxy removed, always True
|
|
||||||
# get_socks_proxy=True, # Removed
|
|
||||||
# store_socks_proxy=True, # Store proxy in XCom by default # Removed
|
|
||||||
timeout=DEFAULT_TIMEOUT,
|
|
||||||
*args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
|
|
||||||
logger.info(f"Initializing YtdlpOpsOperator (Processor Version) with parameters: "
|
|
||||||
f"redis_conn_id={redis_conn_id}, max_retries_lookup={max_retries_lookup}, retry_delay_lookup={retry_delay_lookup}, "
|
|
||||||
f"service_ip={service_ip}, service_port={service_port}, redis_enabled={redis_enabled}, "
|
|
||||||
f"account_id={account_id}, info_json_dir={info_json_dir}, timeout={timeout}")
|
|
||||||
# save_info_json, get_socks_proxy, store_socks_proxy removed from log
|
|
||||||
|
|
||||||
# Validate parameters based on connection mode
|
|
||||||
if redis_enabled:
|
|
||||||
# If using Redis, account_id is essential for lookup
|
|
||||||
if not account_id:
|
|
||||||
raise ValueError("account_id is required when redis_enabled=True for service lookup.")
|
|
||||||
else:
|
|
||||||
# If direct connection, IP and Port are essential
|
|
||||||
if not service_ip or not service_port:
|
|
||||||
raise ValueError("Both service_ip and service_port must be specified when redis_enabled=False.")
|
|
||||||
# Account ID is still needed for the API call itself, but rely on DAG param or operator config
|
|
||||||
if not account_id:
|
|
||||||
logger.warning("No account_id provided for direct connection mode. Ensure it's set in DAG params or operator config.")
|
|
||||||
# We won't assign 'default' here, let the value passed during instantiation be used.
|
|
||||||
|
|
||||||
# self.url is no longer needed here
|
|
||||||
self.redis_conn_id = redis_conn_id
|
|
||||||
self.max_retries_lookup = max_retries_lookup
|
|
||||||
self.retry_delay_lookup = int(retry_delay_lookup.total_seconds() if isinstance(retry_delay_lookup, timedelta) else retry_delay_lookup)
|
|
||||||
self.service_ip = service_ip
|
|
||||||
self.service_port = service_port
|
|
||||||
self.redis_enabled = redis_enabled
|
|
||||||
self.account_id = account_id
|
|
||||||
# self.save_info_json removed
|
|
||||||
self.info_json_dir = info_json_dir # Still needed
|
|
||||||
# self.get_socks_proxy removed
|
|
||||||
# self.store_socks_proxy removed
|
|
||||||
self.timeout = timeout
|
|
||||||
|
|
||||||
def execute(self, context):
|
|
||||||
logger.info("Executing YtdlpOpsOperator (Processor Version)")
|
|
||||||
transport = None
|
|
||||||
ti = context['task_instance'] # Get task instance for XCom access
|
|
||||||
|
|
||||||
try:
|
|
||||||
# --- Get URL from XCom ---
|
|
||||||
url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url')
|
|
||||||
if not url:
|
|
||||||
# This should ideally be caught by upstream skip, but handle defensively
|
|
||||||
logger.info("No URL found in XCom from pop_url_from_queue. Skipping execution.")
|
|
||||||
raise AirflowSkipException("Upstream task did not provide a URL.")
|
|
||||||
logger.info(f"Processing URL from XCom: {url}")
|
|
||||||
# --- End Get URL ---
|
|
||||||
|
|
||||||
logger.info("Getting task parameters and rendering templates")
|
|
||||||
params = context['params'] # DAG run params
|
|
||||||
|
|
||||||
# Render template fields using context
|
|
||||||
# Use render_template_as_native for better type handling if needed, else render_template
|
|
||||||
redis_conn_id = self.render_template(self.redis_conn_id, context)
|
|
||||||
service_ip = self.render_template(self.service_ip, context)
|
|
||||||
service_port_rendered = self.render_template(self.service_port, context)
|
|
||||||
account_id = self.render_template(self.account_id, context)
|
|
||||||
timeout_rendered = self.render_template(self.timeout, context)
|
|
||||||
info_json_dir = self.render_template(self.info_json_dir, context) # Rendered here for _save_info_json
|
|
||||||
|
|
||||||
# Determine effective settings (DAG params override operator defaults)
|
|
||||||
redis_enabled = params.get('redis_enabled', self.redis_enabled)
|
|
||||||
account_id = params.get('account_id', account_id) # Use DAG param if provided
|
|
||||||
redis_conn_id = params.get('redis_conn_id', redis_conn_id) # Use DAG param if provided
|
|
||||||
|
|
||||||
logger.info(f"Effective settings: redis_enabled={redis_enabled}, account_id='{account_id}', redis_conn_id='{redis_conn_id}'")
|
|
||||||
|
|
||||||
host = None
|
|
||||||
port = None
|
|
||||||
|
|
||||||
if redis_enabled:
|
|
||||||
# Get Redis connection using the helper for consistency
|
|
||||||
redis_client = _get_redis_client(redis_conn_id)
|
|
||||||
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}' for service discovery.")
|
|
||||||
|
|
||||||
# Get service details from Redis with retries
|
|
||||||
service_key = f"ytdlp:{account_id}"
|
|
||||||
legacy_key = account_id # For backward compatibility
|
|
||||||
|
|
||||||
for attempt in range(self.max_retries_lookup):
|
|
||||||
try:
|
|
||||||
logger.info(f"Attempt {attempt + 1}/{self.max_retries_lookup}: Fetching service details from Redis for keys: '{service_key}', '{legacy_key}'")
|
|
||||||
service_details = redis_client.hgetall(service_key)
|
|
||||||
if not service_details:
|
|
||||||
logger.warning(f"Key '{service_key}' not found, trying legacy key '{legacy_key}'")
|
|
||||||
service_details = redis_client.hgetall(legacy_key)
|
|
||||||
|
|
||||||
if not service_details:
|
|
||||||
raise ValueError(f"No service details found in Redis for keys: {service_key} or {legacy_key}")
|
|
||||||
|
|
||||||
# Find IP and port (case-insensitive keys)
|
|
||||||
ip_key = next((k for k in service_details if k.lower() == 'ip'), None)
|
|
||||||
port_key = next((k for k in service_details if k.lower() == 'port'), None)
|
|
||||||
|
|
||||||
if not ip_key: raise ValueError(f"'ip' key not found in Redis hash for {service_key}/{legacy_key}")
|
|
||||||
if not port_key: raise ValueError(f"'port' key not found in Redis hash for {service_key}/{legacy_key}")
|
|
||||||
|
|
||||||
host = service_details[ip_key] # Assumes decode_responses=True in hook
|
|
||||||
port_str = service_details[port_key]
|
|
||||||
|
|
||||||
try:
|
|
||||||
port = int(port_str)
|
|
||||||
except (ValueError, TypeError):
|
|
||||||
raise ValueError(f"Invalid port value '{port_str}' found in Redis for {service_key}/{legacy_key}")
|
|
||||||
|
|
||||||
logger.info(f"Extracted from Redis - Service IP: {host}, Service Port: {port}")
|
|
||||||
break # Success
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Attempt {attempt + 1} failed to get Redis details: {str(e)}")
|
|
||||||
if attempt == self.max_retries_lookup - 1:
|
|
||||||
logger.error("Max retries reached for fetching Redis details.")
|
|
||||||
raise AirflowException(f"Failed to get service details from Redis after {self.max_retries_lookup} attempts: {e}")
|
|
||||||
logger.info(f"Retrying in {self.retry_delay_lookup} seconds...")
|
|
||||||
time.sleep(self.retry_delay_lookup)
|
|
||||||
else:
|
|
||||||
# Direct connection: Use rendered/param values
|
|
||||||
host = params.get('service_ip', service_ip) # Use DAG param if provided
|
|
||||||
port_str = params.get('service_port', service_port_rendered) # Use DAG param if provided
|
|
||||||
|
|
||||||
logger.info(f"Using direct connection settings: service_ip={host}, service_port={port_str}")
|
|
||||||
|
|
||||||
if not host or not port_str:
|
|
||||||
raise ValueError("Direct connection requires service_ip and service_port (check Operator config and DAG params)")
|
|
||||||
try:
|
|
||||||
port = int(port_str)
|
|
||||||
except (ValueError, TypeError):
|
|
||||||
raise ValueError(f"Invalid service_port value: {port_str}")
|
|
||||||
|
|
||||||
logger.info(f"Connecting directly to Thrift service at {host}:{port} (Redis bypassed)")
|
|
||||||
|
|
||||||
# Validate and use timeout
|
|
||||||
try:
|
|
||||||
timeout = int(timeout_rendered)
|
|
||||||
if timeout <= 0: raise ValueError("Timeout must be positive")
|
|
||||||
logger.info(f"Using timeout: {timeout} seconds")
|
|
||||||
except (ValueError, TypeError):
|
|
||||||
logger.warning(f"Invalid timeout value: '{timeout_rendered}'. Using default: {DEFAULT_TIMEOUT}")
|
|
||||||
timeout = DEFAULT_TIMEOUT
|
|
||||||
|
|
||||||
# Create Thrift connection objects
|
|
||||||
# socket_conn = TSocket.TSocket(host, port) # Original
|
|
||||||
socket_conn = TSocket.TSocket(host, port, socket_family=socket.AF_INET) # Explicitly use AF_INET (IPv4)
|
|
||||||
socket_conn.setTimeout(timeout * 1000) # Thrift timeout is in milliseconds
|
|
||||||
transport = TTransport.TFramedTransport(socket_conn) # Use TFramedTransport if server expects it
|
|
||||||
# transport = TTransport.TBufferedTransport(socket_conn) # Use TBufferedTransport if server expects it
|
|
||||||
protocol = TBinaryProtocol.TBinaryProtocol(transport)
|
|
||||||
client = YTTokenOpService.Client(protocol)
|
|
||||||
|
|
||||||
logger.info(f"Attempting to connect to Thrift server at {host}:{port}...")
|
|
||||||
try:
|
|
||||||
transport.open()
|
|
||||||
logger.info("Successfully connected to Thrift server.")
|
|
||||||
|
|
||||||
# Test connection with ping
|
|
||||||
try:
|
|
||||||
client.ping()
|
|
||||||
logger.info("Server ping successful.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Server ping failed: {e}")
|
|
||||||
raise AirflowException(f"Server connection test (ping) failed: {e}")
|
|
||||||
|
|
||||||
# Get token from service using the URL from XCom
|
|
||||||
try:
|
|
||||||
logger.info(f"Requesting token for accountId='{account_id}', url='{url}'")
|
|
||||||
token_data = client.getOrRefreshToken(
|
|
||||||
accountId=account_id,
|
|
||||||
updateType=TokenUpdateMode.AUTO,
|
|
||||||
url=url # Use the url variable from XCom
|
|
||||||
)
|
|
||||||
logger.info("Successfully retrieved token data from service.")
|
|
||||||
except PBServiceException as e:
|
|
||||||
# Handle specific service exceptions
|
|
||||||
error_code = getattr(e, 'errorCode', 'N/A')
|
|
||||||
error_message = getattr(e, 'message', 'N/A')
|
|
||||||
error_context = getattr(e, 'context', {})
|
|
||||||
logger.error(f"PBServiceException occurred: Code={error_code}, Message={error_message}")
|
|
||||||
if error_context:
|
|
||||||
logger.error(f" Context: {error_context}") # Log context separately
|
|
||||||
# Construct a concise error message for AirflowException
|
|
||||||
error_msg = f"YTDLP service error (Code: {error_code}): {error_message}"
|
|
||||||
# Add specific error code handling if needed...
|
|
||||||
logger.error(f"Failing task instance due to PBServiceException: {error_msg}") # Add explicit log before raising
|
|
||||||
raise AirflowException(error_msg) # Fail task on service error
|
|
||||||
except TTransportException as e:
|
|
||||||
logger.error(f"Thrift transport error during getOrRefreshToken: {e}")
|
|
||||||
logger.error(f"Failing task instance due to TTransportException: {e}") # Add explicit log before raising
|
|
||||||
raise AirflowException(f"Transport error during API call: {e}")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Unexpected error during getOrRefreshToken: {e}")
|
|
||||||
logger.error(f"Failing task instance due to unexpected error during API call: {e}") # Add explicit log before raising
|
|
||||||
raise AirflowException(f"Unexpected error during API call: {e}")
|
|
||||||
|
|
||||||
except TTransportException as e:
|
|
||||||
# Handle connection errors
|
|
||||||
logger.error(f"Thrift transport error during connection: {str(e)}")
|
|
||||||
logger.error(f"Failing task instance due to TTransportException during connection: {e}") # Add explicit log before raising
|
|
||||||
raise AirflowException(f"Transport error connecting to YTDLP service: {str(e)}")
|
|
||||||
# Removed the overly broad except Exception block here, as inner blocks raise AirflowException
|
|
||||||
|
|
||||||
# --- Process Token Data ---
|
|
||||||
logger.debug(f"Token data received. Attributes: {dir(token_data)}")
|
|
||||||
|
|
||||||
info_json_path = None # Initialize
|
|
||||||
|
|
||||||
# save_info_json is now always True
|
|
||||||
logger.info("Proceeding to save info.json (save_info_json=True).")
|
|
||||||
info_json = self._get_info_json(token_data)
|
|
||||||
if info_json and self._is_valid_json(info_json):
|
|
||||||
try:
|
|
||||||
# Pass rendered info_json_dir to helper
|
|
||||||
info_json_path = self._save_info_json(context, info_json, url, account_id, info_json_dir)
|
|
||||||
if info_json_path:
|
|
||||||
ti.xcom_push(key='info_json_path', value=info_json_path)
|
|
||||||
logger.info(f"Successfully saved info.json and pushed path to XCom: {info_json_path}")
|
|
||||||
else:
|
|
||||||
ti.xcom_push(key='info_json_path', value=None)
|
|
||||||
logger.warning("info.json saving failed (check logs from _save_info_json).")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Unexpected error during info.json saving process: {e}", exc_info=True)
|
|
||||||
ti.xcom_push(key='info_json_path', value=None)
|
|
||||||
elif info_json:
|
|
||||||
logger.warning("Retrieved infoJson is not valid JSON. Skipping save.")
|
|
||||||
ti.xcom_push(key='info_json_path', value=None)
|
|
||||||
else:
|
|
||||||
logger.info("No infoJson found in token data. Skipping save.")
|
|
||||||
ti.xcom_push(key='info_json_path', value=None)
|
|
||||||
|
|
||||||
|
|
||||||
# Extract and potentially store SOCKS proxy
|
|
||||||
# get_socks_proxy and store_socks_proxy are now always True
|
|
||||||
socks_proxy = None
|
|
||||||
logger.info("Attempting to extract SOCKS proxy (get_socks_proxy=True).")
|
|
||||||
proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None)
|
|
||||||
if proxy_attr:
|
|
||||||
socks_proxy = getattr(token_data, proxy_attr)
|
|
||||||
if socks_proxy:
|
|
||||||
logger.info(f"Extracted SOCKS proxy ({proxy_attr}): {socks_proxy}")
|
|
||||||
# Always store if found (store_socks_proxy=True)
|
|
||||||
ti.xcom_push(key='socks_proxy', value=socks_proxy)
|
|
||||||
logger.info("Pushed 'socks_proxy' to XCom.")
|
|
||||||
else:
|
|
||||||
logger.info(f"Found proxy attribute '{proxy_attr}' but value is empty.")
|
|
||||||
# Store None if attribute found but empty
|
|
||||||
ti.xcom_push(key='socks_proxy', value=None)
|
|
||||||
logger.info("Pushed None to XCom for 'socks_proxy' as extracted value was empty.")
|
|
||||||
else:
|
|
||||||
logger.info("No SOCKS proxy attribute found in token data.")
|
|
||||||
# Store None if attribute not found
|
|
||||||
ti.xcom_push(key='socks_proxy', value=None)
|
|
||||||
logger.info("Pushed None to XCom for 'socks_proxy' as attribute was not found.")
|
|
||||||
|
|
||||||
|
|
||||||
# --- Removed old logic block ---
|
|
||||||
# # Extract and potentially store SOCKS proxy
|
|
||||||
# socks_proxy = None
|
|
||||||
# get_socks_proxy = params.get('get_socks_proxy', self.get_socks_proxy)
|
|
||||||
# store_socks_proxy = params.get('store_socks_proxy', self.store_socks_proxy)
|
|
||||||
#
|
|
||||||
# if get_socks_proxy:
|
|
||||||
# proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None)
|
|
||||||
# if proxy_attr:
|
|
||||||
# socks_proxy = getattr(token_data, proxy_attr)
|
|
||||||
# if socks_proxy:
|
|
||||||
# logger.info(f"Extracted SOCKS proxy ({proxy_attr}): {socks_proxy}")
|
|
||||||
# if store_socks_proxy:
|
|
||||||
# ti.xcom_push(key='socks_proxy', value=socks_proxy)
|
|
||||||
# logger.info("Pushed 'socks_proxy' to XCom.")
|
|
||||||
# else:
|
|
||||||
# logger.info(f"Found proxy attribute '{proxy_attr}' but value is empty.")
|
|
||||||
# if store_socks_proxy: ti.xcom_push(key='socks_proxy', value=None)
|
|
||||||
# else:
|
|
||||||
# logger.info("get_socks_proxy is True, but no SOCKS proxy attribute found.")
|
|
||||||
# if store_socks_proxy: ti.xcom_push(key='socks_proxy', value=None)
|
|
||||||
# else:
|
|
||||||
# logger.info("get_socks_proxy is False. Skipping proxy extraction.")
|
|
||||||
# if store_socks_proxy: ti.xcom_push(key='socks_proxy', value=None)
|
|
||||||
# --- End Removed old logic block ---
|
|
||||||
|
|
||||||
|
|
||||||
# Get the original command from the server, or construct a fallback
|
|
||||||
ytdlp_cmd = getattr(token_data, 'ytdlpCommand', None)
|
|
||||||
if ytdlp_cmd:
|
|
||||||
logger.info(f"Original command received from server: {ytdlp_cmd[:100]}...") # Log truncated
|
|
||||||
else:
|
|
||||||
logger.warning("No 'ytdlpCommand' attribute found in token data. Constructing a fallback for logging.")
|
|
||||||
# Construct a representative command for logging purposes
|
|
||||||
if socks_proxy:
|
|
||||||
ytdlp_cmd = f"yt-dlp --dump-json --proxy \"{socks_proxy}\" \"{url}\""
|
|
||||||
else:
|
|
||||||
ytdlp_cmd = f"yt-dlp --dump-json \"{url}\""
|
|
||||||
logger.info(f"Constructed fallback command: {ytdlp_cmd}")
|
|
||||||
|
|
||||||
# Push the command to XCom
|
|
||||||
ti.xcom_push(key='ytdlp_command', value=ytdlp_cmd)
|
|
||||||
logger.info("Pushed command to XCom key 'ytdlp_command'.")
|
|
||||||
|
|
||||||
# No explicit return needed, success is implicit if no exception raised
|
|
||||||
|
|
||||||
except (AirflowSkipException, AirflowFailException) as e:
|
|
||||||
logger.info(f"Task skipped or failed explicitly: {e}")
|
|
||||||
raise # Re-raise to let Airflow handle state
|
|
||||||
except AirflowException as e: # Catch AirflowExceptions raised explicitly
|
|
||||||
logger.error(f"Operation failed due to AirflowException: {e}", exc_info=True)
|
|
||||||
raise # Re-raise AirflowExceptions to ensure task failure
|
|
||||||
except (TTransportException, PBServiceException) as e: # Catch specific Thrift/Service errors not already handled inside inner try
|
|
||||||
logger.error(f"Unhandled YTDLP Service/Transport error in outer block: {e}", exc_info=True)
|
|
||||||
logger.error(f"Failing task instance due to unhandled outer Service/Transport error: {e}") # Add explicit log before raising
|
|
||||||
raise AirflowException(f"Unhandled YTDLP service error: {e}") # Wrap in AirflowException to fail task
|
|
||||||
except Exception as e: # General catch-all for truly unexpected errors
|
|
||||||
logger.error(f"Caught unexpected error in YtdlpOpsOperator outer block: {e}", exc_info=True)
|
|
||||||
logger.error(f"Failing task instance due to unexpected outer error: {e}") # Add explicit log before raising
|
|
||||||
raise AirflowException(f"Unexpected error caused task failure: {e}") # Wrap to fail task
|
|
||||||
finally:
|
|
||||||
if transport and transport.isOpen():
|
|
||||||
logger.info("Closing Thrift transport.")
|
|
||||||
transport.close()
|
|
||||||
|
|
||||||
# --- Helper Methods ---
|
|
||||||
|
|
||||||
def _get_info_json(self, token_data):
|
|
||||||
"""Safely extracts infoJson from token data."""
|
|
||||||
return getattr(token_data, 'infoJson', None)
|
|
||||||
|
|
||||||
def _is_valid_json(self, json_str):
|
|
||||||
"""Checks if a string is valid JSON."""
|
|
||||||
if not json_str or not isinstance(json_str, str): return False
|
|
||||||
try:
|
|
||||||
json.loads(json_str)
|
|
||||||
return True
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _save_info_json(self, context, info_json, url, account_id, rendered_info_json_dir):
|
|
||||||
"""Saves info_json to a file. Uses pre-rendered directory path."""
|
|
||||||
try:
|
|
||||||
video_id = _extract_video_id(url) # Use standalone helper
|
|
||||||
|
|
||||||
save_dir = rendered_info_json_dir or "." # Use rendered path
|
|
||||||
logger.info(f"Target directory for info.json: {save_dir}")
|
|
||||||
|
|
||||||
# Ensure directory exists
|
|
||||||
try:
|
|
||||||
os.makedirs(save_dir, exist_ok=True)
|
|
||||||
logger.info(f"Ensured directory exists: {save_dir}")
|
|
||||||
except OSError as e:
|
|
||||||
logger.error(f"Could not create directory {save_dir}: {e}. Cannot save info.json.")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Construct filename
|
|
||||||
timestamp = int(time.time())
|
|
||||||
base_filename = f"info_{video_id or 'unknown'}_{account_id}_{timestamp}.json"
|
|
||||||
info_json_path = os.path.join(save_dir, base_filename)
|
|
||||||
latest_json_path = os.path.join(save_dir, "latest.json") # Path for the latest symlink/copy
|
|
||||||
|
|
||||||
# Write to timestamped file
|
|
||||||
try:
|
|
||||||
logger.info(f"Writing info.json content (received from service) to {info_json_path}...")
|
|
||||||
with open(info_json_path, 'w', encoding='utf-8') as f:
|
|
||||||
f.write(info_json)
|
|
||||||
logger.info(f"Successfully saved info.json to timestamped file: {info_json_path}")
|
|
||||||
except IOError as e:
|
|
||||||
logger.error(f"Failed to write info.json to {info_json_path}: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Write to latest.json (overwrite) - best effort
|
|
||||||
try:
|
|
||||||
with open(latest_json_path, 'w', encoding='utf-8') as f:
|
|
||||||
f.write(info_json)
|
|
||||||
logger.info(f"Updated latest.json file: {latest_json_path}")
|
|
||||||
except IOError as e:
|
|
||||||
logger.warning(f"Failed to update latest.json at {latest_json_path}: {e}")
|
|
||||||
|
|
||||||
return info_json_path
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Unexpected error in _save_info_json: {e}", exc_info=True)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# DAG Definition
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
default_args = {
|
|
||||||
'owner': 'airflow',
|
|
||||||
'depends_on_past': False,
|
|
||||||
'email_on_failure': False,
|
|
||||||
'email_on_retry': False,
|
|
||||||
'retries': 1, # Default retries for tasks like queue management
|
|
||||||
'retry_delay': timedelta(minutes=1),
|
|
||||||
'start_date': days_ago(1),
|
|
||||||
# Add concurrency control if needed for sequential processing
|
|
||||||
# 'concurrency': 1, # Ensure only one task instance runs at a time per DAG run
|
|
||||||
# 'max_active_runs': 1, # Ensure only one DAG run is active
|
|
||||||
}
|
|
||||||
|
|
||||||
# Define DAG
|
|
||||||
#
|
|
||||||
# --- DAG Block Deactivated on 2025-07-16 ---
|
|
||||||
# This DAG has been replaced by the Sensor/Worker pattern implemented in:
|
|
||||||
# - ytdlp_sensor_redis_queue.py (polls the queue)
|
|
||||||
# - ytdlp_worker_per_url.py (processes a single URL)
|
|
||||||
# This code is kept for reference but is not active.
|
|
||||||
#
|
|
||||||
@ -21,6 +21,9 @@ from datetime import timedelta
|
|||||||
import logging
|
import logging
|
||||||
import redis
|
import redis
|
||||||
|
|
||||||
|
# Import utility functions
|
||||||
|
from utils.redis_utils import _get_redis_client
|
||||||
|
|
||||||
# Configure logging
|
# Configure logging
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -30,23 +33,6 @@ DEFAULT_REDIS_CONN_ID = 'redis_default'
|
|||||||
DEFAULT_TIMEOUT = 30
|
DEFAULT_TIMEOUT = 30
|
||||||
DEFAULT_MAX_URLS = '1' # Default number of URLs to process per run
|
DEFAULT_MAX_URLS = '1' # Default number of URLs to process per run
|
||||||
|
|
||||||
# --- Helper Functions ---
|
|
||||||
|
|
||||||
def _get_redis_client(redis_conn_id):
|
|
||||||
"""Gets a Redis client connection using RedisHook."""
|
|
||||||
try:
|
|
||||||
hook = RedisHook(redis_conn_id=redis_conn_id)
|
|
||||||
client = hook.get_conn()
|
|
||||||
client.ping()
|
|
||||||
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
|
|
||||||
return client
|
|
||||||
except redis.exceptions.AuthenticationError:
|
|
||||||
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
|
|
||||||
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
|
|
||||||
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
|
|
||||||
|
|
||||||
# --- Task Callables ---
|
# --- Task Callables ---
|
||||||
|
|
||||||
def log_trigger_info_callable(**context):
|
def log_trigger_info_callable(**context):
|
||||||
@ -57,6 +43,8 @@ def log_trigger_info_callable(**context):
|
|||||||
|
|
||||||
if trigger_type == 'manual':
|
if trigger_type == 'manual':
|
||||||
logger.info("Trigger source: Manual execution from Airflow UI or CLI.")
|
logger.info("Trigger source: Manual execution from Airflow UI or CLI.")
|
||||||
|
elif trigger_type == 'scheduled':
|
||||||
|
logger.info("Trigger source: Scheduled run (periodic check).")
|
||||||
elif trigger_type == 'dag_run':
|
elif trigger_type == 'dag_run':
|
||||||
# In Airflow 2.2+ we can get the triggering run object
|
# In Airflow 2.2+ we can get the triggering run object
|
||||||
try:
|
try:
|
||||||
@ -154,10 +142,10 @@ default_args = {
|
|||||||
with DAG(
|
with DAG(
|
||||||
dag_id='ytdlp_sensor_redis_queue',
|
dag_id='ytdlp_sensor_redis_queue',
|
||||||
default_args=default_args,
|
default_args=default_args,
|
||||||
schedule_interval=None, # This DAG is now only triggered (manually or by a worker)
|
schedule_interval='*/1 * * * *', # Runs every minute and can also be triggered.
|
||||||
max_active_runs=1, # Prevent multiple sensors from running at once
|
max_active_runs=1, # Prevent multiple sensors from running at once
|
||||||
catchup=False,
|
catchup=False,
|
||||||
description='Polls Redis queue for a batch of URLs and triggers parallel worker DAGs.',
|
description='Polls Redis queue every minute (and on trigger) for URLs and starts worker DAGs.',
|
||||||
tags=['ytdlp', 'sensor', 'queue', 'redis', 'batch'],
|
tags=['ytdlp', 'sensor', 'queue', 'redis', 'batch'],
|
||||||
params={
|
params={
|
||||||
'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Base name for Redis queues."),
|
'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Base name for Redis queues."),
|
||||||
|
|||||||
@ -33,6 +33,10 @@ import os
|
|||||||
import redis
|
import redis
|
||||||
import socket
|
import socket
|
||||||
import time
|
import time
|
||||||
|
import traceback
|
||||||
|
|
||||||
|
# Import utility functions
|
||||||
|
from utils.redis_utils import _get_redis_client
|
||||||
|
|
||||||
# Configure logging
|
# Configure logging
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -106,7 +110,7 @@ def handle_success(**context):
|
|||||||
try:
|
try:
|
||||||
# In the worker pattern, there's no "progress" hash to remove from.
|
# In the worker pattern, there's no "progress" hash to remove from.
|
||||||
# We just add the result to the success hash.
|
# We just add the result to the success hash.
|
||||||
client = YtdlpOpsOperator._get_redis_client(redis_conn_id)
|
client = _get_redis_client(redis_conn_id)
|
||||||
client.hset(result_queue, url, json.dumps(result_data))
|
client.hset(result_queue, url, json.dumps(result_data))
|
||||||
logger.info(f"Stored success result for URL '{url}' in result hash '{result_queue}'.")
|
logger.info(f"Stored success result for URL '{url}' in result hash '{result_queue}'.")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -115,8 +119,8 @@ def handle_success(**context):
|
|||||||
|
|
||||||
def handle_failure(**context):
|
def handle_failure(**context):
|
||||||
"""
|
"""
|
||||||
Handles failed processing. Moves the URL to the fail hash and, if stop_on_failure
|
Handles failed processing. Records detailed error information to the fail hash
|
||||||
is True, fails the task to make the DAG run failure visible.
|
and, if stop_on_failure is True, fails the task to make the DAG run failure visible.
|
||||||
"""
|
"""
|
||||||
ti = context['task_instance']
|
ti = context['task_instance']
|
||||||
params = context['params']
|
params = context['params']
|
||||||
@ -132,14 +136,31 @@ def handle_failure(**context):
|
|||||||
requeue_on_failure = params.get('requeue_on_failure', False)
|
requeue_on_failure = params.get('requeue_on_failure', False)
|
||||||
stop_on_failure = params.get('stop_on_failure', True)
|
stop_on_failure = params.get('stop_on_failure', True)
|
||||||
|
|
||||||
|
# --- Extract Detailed Error Information ---
|
||||||
exception = context.get('exception')
|
exception = context.get('exception')
|
||||||
error_message = str(exception) if exception else "Unknown error"
|
error_message = str(exception) if exception else "Unknown error"
|
||||||
|
error_type = type(exception).__name__ if exception else "Unknown"
|
||||||
|
tb_str = "".join(traceback.format_exception(etype=type(exception), value=exception, tb=exception.__traceback__)) if exception else "No traceback available."
|
||||||
|
|
||||||
|
# Find the specific task that failed
|
||||||
|
dag_run = context['dag_run']
|
||||||
|
failed_task_id = "unknown"
|
||||||
|
# Look at direct upstream tasks of the current task ('handle_failure')
|
||||||
|
upstream_tasks = ti.get_direct_relatives(upstream=True)
|
||||||
|
for task in upstream_tasks:
|
||||||
|
upstream_ti = dag_run.get_task_instance(task_id=task.task_id)
|
||||||
|
if upstream_ti and upstream_ti.state == 'failed':
|
||||||
|
failed_task_id = task.task_id
|
||||||
|
break
|
||||||
|
|
||||||
logger.info(f"Handling failure for URL: {url}")
|
logger.info(f"Handling failure for URL: {url}")
|
||||||
|
logger.error(f" Failed Task: {failed_task_id}")
|
||||||
|
logger.error(f" Failure Type: {error_type}")
|
||||||
logger.error(f" Failure Reason: {error_message}")
|
logger.error(f" Failure Reason: {error_message}")
|
||||||
|
logger.debug(f" Traceback:\n{tb_str}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
client = YtdlpOpsOperator._get_redis_client(redis_conn_id)
|
client = _get_redis_client(redis_conn_id)
|
||||||
if requeue_on_failure:
|
if requeue_on_failure:
|
||||||
client.rpush(inbox_queue, url)
|
client.rpush(inbox_queue, url)
|
||||||
logger.info(f"Re-queued failed URL '{url}' to inbox '{inbox_queue}' for retry.")
|
logger.info(f"Re-queued failed URL '{url}' to inbox '{inbox_queue}' for retry.")
|
||||||
@ -147,12 +168,15 @@ def handle_failure(**context):
|
|||||||
fail_data = {
|
fail_data = {
|
||||||
'status': 'failed',
|
'status': 'failed',
|
||||||
'end_time': time.time(),
|
'end_time': time.time(),
|
||||||
'error': error_message,
|
'failed_task': failed_task_id,
|
||||||
|
'error_type': error_type,
|
||||||
|
'error_message': error_message,
|
||||||
|
'traceback': tb_str,
|
||||||
'url': url,
|
'url': url,
|
||||||
'dag_run_id': context['dag_run'].run_id,
|
'dag_run_id': context['dag_run'].run_id,
|
||||||
}
|
}
|
||||||
client.hset(fail_queue, url, json.dumps(fail_data))
|
client.hset(fail_queue, url, json.dumps(fail_data, indent=2))
|
||||||
logger.info(f"Stored failure details for URL '{url}' in fail hash '{fail_queue}'.")
|
logger.info(f"Stored detailed failure info for URL '{url}' in fail hash '{fail_queue}'.")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Critical error during failure handling in Redis for URL '{url}': {e}", exc_info=True)
|
logger.error(f"Critical error during failure handling in Redis for URL '{url}': {e}", exc_info=True)
|
||||||
# This is a critical error in the failure handling logic itself.
|
# This is a critical error in the failure handling logic itself.
|
||||||
@ -178,22 +202,6 @@ class YtdlpOpsOperator(BaseOperator):
|
|||||||
"""
|
"""
|
||||||
template_fields = ('service_ip', 'service_port', 'account_id', 'timeout', 'info_json_dir')
|
template_fields = ('service_ip', 'service_port', 'account_id', 'timeout', 'info_json_dir')
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _get_redis_client(redis_conn_id):
|
|
||||||
"""Gets a Redis client connection using RedisHook."""
|
|
||||||
try:
|
|
||||||
hook = RedisHook(redis_conn_id=redis_conn_id)
|
|
||||||
client = hook.get_conn()
|
|
||||||
client.ping()
|
|
||||||
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
|
|
||||||
return client
|
|
||||||
except redis.exceptions.AuthenticationError:
|
|
||||||
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
|
|
||||||
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
|
|
||||||
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
|
|
||||||
|
|
||||||
@apply_defaults
|
@apply_defaults
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
service_ip=None,
|
service_ip=None,
|
||||||
@ -448,8 +456,8 @@ with DAG(
|
|||||||
trigger_sensor_for_next_batch.doc_md = """
|
trigger_sensor_for_next_batch.doc_md = """
|
||||||
### Trigger Sensor for Next Batch
|
### Trigger Sensor for Next Batch
|
||||||
Triggers a new run of the `ytdlp_sensor_redis_queue` DAG to create a continuous processing loop.
|
Triggers a new run of the `ytdlp_sensor_redis_queue` DAG to create a continuous processing loop.
|
||||||
This task runs after the main processing tasks are complete (either success or failure),
|
This task **only runs on the success path** after a URL has been fully processed.
|
||||||
ensuring that the system immediately checks for more URLs to process.
|
This ensures that the system immediately checks for more URLs to process, but stops the loop on failure.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Define success and failure handling tasks
|
# Define success and failure handling tasks
|
||||||
@ -470,10 +478,9 @@ with DAG(
|
|||||||
# The main processing flow
|
# The main processing flow
|
||||||
get_token >> download_video
|
get_token >> download_video
|
||||||
|
|
||||||
# Branch after download: one path for success, one for failure
|
# The success path: if download_video succeeds, run success_task, then trigger the next sensor run.
|
||||||
download_video >> success_task
|
download_video >> success_task >> trigger_sensor_for_next_batch
|
||||||
download_video >> failure_task
|
|
||||||
|
|
||||||
# The trigger to continue the loop ONLY runs on the success path.
|
# The failure path: if get_token OR download_video fails, run the failure_task.
|
||||||
# A failure will be recorded in Redis by `handle_failure` and then the loop will stop.
|
# This is a "fan-in" dependency.
|
||||||
success_task >> trigger_sensor_for_next_batch
|
[get_token, download_video] >> failure_task
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user