Compare commits
2 Commits
affc59ee57
...
fc2d740b65
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fc2d740b65 | ||
|
|
1f186fd217 |
100
README.en.old.md
Normal file
100
README.en.old.md
Normal file
@ -0,0 +1,100 @@
|
||||
# YTDLP Airflow DAGs
|
||||
|
||||
This document describes the Airflow DAGs used for interacting with the YTDLP Ops service and managing processing queues.
|
||||
|
||||
## DAG Descriptions
|
||||
|
||||
### `ytdlp_client_dag_v2.1`
|
||||
|
||||
* **File:** `airflow/dags/ytdlp_client_dag_v2.1.py`
|
||||
* **Purpose:** Provides a way to test the YTDLP Ops Thrift service interaction for a *single* video URL. Useful for debugging connection issues, testing specific account IDs, or verifying the service response for a particular URL independently of the queueing system.
|
||||
* **Parameters (Defaults):**
|
||||
* `url` (`'https://www.youtube.com/watch?v=sOlTX9uxUtM'`): The video URL to process.
|
||||
* `redis_enabled` (`False`): Use Redis for service discovery?
|
||||
* `service_ip` (`'85.192.30.55'`): Service IP if `redis_enabled=False`.
|
||||
* `service_port` (`9090`): Service port if `redis_enabled=False`.
|
||||
* `account_id` (`'account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Account ID for lookup or call.
|
||||
* `timeout` (`30`): Timeout in seconds for Thrift connection.
|
||||
* `info_json_dir` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}"`): Directory to save `info.json`.
|
||||
* **Results:**
|
||||
* Connects to the YTDLP Ops service using the specified method (Redis or direct IP).
|
||||
* Retrieves token data for the given URL and account ID.
|
||||
* Saves the video's `info.json` metadata to the specified directory.
|
||||
* Extracts the SOCKS proxy (if available).
|
||||
* Pushes `info_json_path`, `socks_proxy`, and the original `ytdlp_command` (with tokens) to XCom.
|
||||
* Optionally stores detailed results in a Redis hash (`token_info:<video_id>`).
|
||||
|
||||
### `ytdlp_mgmt_queue_add_urls`
|
||||
|
||||
* **File:** `airflow/dags/ytdlp_mgmt_queue_add_urls.py`
|
||||
* **Purpose:** Manually add video URLs to a specific YTDLP inbox queue (Redis List).
|
||||
* **Parameters (Defaults):**
|
||||
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
||||
* `queue_name` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Target Redis list (inbox queue).
|
||||
* `urls` (`""`): Multiline string of video URLs to add.
|
||||
* **Results:**
|
||||
* Parses the multiline `urls` parameter.
|
||||
* Adds each valid URL to the end of the Redis list specified by `queue_name`.
|
||||
* Logs the number of URLs added.
|
||||
|
||||
### `ytdlp_mgmt_queue_clear`
|
||||
|
||||
* **File:** `airflow/dags/ytdlp_mgmt_queue_clear.py`
|
||||
* **Purpose:** Manually delete a specific Redis key used by the YTDLP queues.
|
||||
* **Parameters (Defaults):**
|
||||
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
||||
* `queue_to_clear` (`'PLEASE_SPECIFY_QUEUE_TO_CLEAR'`): Exact name of the Redis key to clear. **Must be changed by user.**
|
||||
* **Results:**
|
||||
* Deletes the Redis key specified by the `queue_to_clear` parameter.
|
||||
* **Warning:** This operation is destructive and irreversible. Use with extreme caution. Ensure you specify the correct key name (e.g., `video_queue_inbox_account_xyz`, `video_queue_progress`, `video_queue_result`, `video_queue_fail`).
|
||||
|
||||
### `ytdlp_mgmt_queue_check_status`
|
||||
|
||||
* **File:** `airflow/dags/ytdlp_mgmt_queue_check_status.py`
|
||||
* **Purpose:** Manually check the type and size of a specific YTDLP Redis queue/key.
|
||||
* **Parameters (Defaults):**
|
||||
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
||||
* `queue_to_check` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Exact name of the Redis key to check.
|
||||
* **Results:**
|
||||
* Connects to Redis and determines the type of the key specified by `queue_to_check`.
|
||||
* Determines the size (length for lists, number of fields for hashes).
|
||||
* Logs the key type and size.
|
||||
* Pushes `queue_key_type` and `queue_size` to XCom.
|
||||
|
||||
### `ytdlp_mgmt_queue_list_contents`
|
||||
|
||||
* **File:** `airflow/dags/ytdlp_mgmt_queue_list_contents.py`
|
||||
* **Purpose:** Manually list the contents of a specific YTDLP Redis queue/key (list or hash). Useful for inspecting queue state or results.
|
||||
* **Parameters (Defaults):**
|
||||
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
||||
* `queue_to_list` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Exact name of the Redis key to list.
|
||||
* `max_items` (`100`): Maximum number of items/fields to list.
|
||||
* **Results:**
|
||||
* Connects to Redis and identifies the type of the key specified by `queue_to_list`.
|
||||
* If it's a List, logs the first `max_items` elements.
|
||||
* If it's a Hash, logs up to `max_items` key-value pairs, attempting to pretty-print JSON values.
|
||||
* Logs warnings for very large hashes.
|
||||
|
||||
### `ytdlp_proc_sequential_processor`
|
||||
|
||||
* **File:** `airflow/dags/ytdlp_proc_sequential_processor.py`
|
||||
* **Purpose:** Processes YouTube URLs sequentially from a Redis queue. Designed for batch processing. Pops a URL, gets token/metadata via YTDLP Ops service, downloads the media using `yt-dlp`, and records the result.
|
||||
* **Parameters (Defaults):**
|
||||
* `queue_name` (`'video_queue'`): Base name for Redis queues (e.g., `video_queue_inbox`, `video_queue_progress`).
|
||||
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
||||
* `redis_enabled` (`False`): Use Redis for service discovery? If False, uses `service_ip`/`port`.
|
||||
* `service_ip` (`None`): Required Service IP if `redis_enabled=False`.
|
||||
* `service_port` (`None`): Required Service port if `redis_enabled=False`.
|
||||
* `account_id` (`'default_account'`): Account ID for the API call (used for Redis lookup if `redis_enabled=True`).
|
||||
* `timeout` (`30`): Timeout in seconds for the Thrift connection.
|
||||
* `download_format` (`'ba[ext=m4a]/bestaudio/best'`): yt-dlp format selection string.
|
||||
* `output_path_template` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloads') }}/%(title)s [%(id)s].%(ext)s"`): yt-dlp output template. Uses Airflow Variable `DOWNLOADS_TEMP`.
|
||||
* `info_json_dir` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}"`): Directory to save `info.json`. Uses Airflow Variable `DOWNLOADS_TEMP`.
|
||||
* **Results:**
|
||||
* Pops one URL from the `{{ params.queue_name }}_inbox` Redis list.
|
||||
* If a URL is popped, it's added to the `{{ params.queue_name }}_progress` Redis hash.
|
||||
* The `YtdlpOpsOperator` (`get_token` task) attempts to get token data (including `info.json`, proxy, command) for the URL using the specified connection method and account ID.
|
||||
* If token retrieval succeeds, the `download_video` task executes `yt-dlp` using the retrieved `info.json`, proxy, the `download_format` parameter, and the `output_path_template` parameter to download the actual media.
|
||||
* **On Successful Download:** The URL is removed from the progress hash and added to the `{{ params.queue_name }}_result` hash along with results (`info_json_path`, `socks_proxy`, `ytdlp_command`).
|
||||
* **On Failure (Token Retrieval or Download):** The URL is removed from the progress hash and added to the `{{ params.queue_name }}_fail` hash along with error details (message, traceback).
|
||||
* If the inbox queue is empty, the DAG run skips processing via `AirflowSkipException`.
|
||||
114
README.md
114
README.md
@ -1,100 +1,38 @@
|
||||
# YTDLP Airflow DAGs
|
||||
# Архитектура и описание YTDLP Airflow DAGs
|
||||
|
||||
This document describes the Airflow DAGs used for interacting with the YTDLP Ops service and managing processing queues.
|
||||
Этот документ описывает архитектуру и назначение DAG'ов, используемых для скачивания видео с YouTube. Система построена по паттерну "Сенсор/Воркер" для обеспечения непрерывной и параллельной обработки.
|
||||
|
||||
## DAG Descriptions
|
||||
## Основной цикл обработки
|
||||
|
||||
### `ytdlp_client_dag_v2.1`
|
||||
### `ytdlp_sensor_redis_queue` (Сенсор)
|
||||
|
||||
* **File:** `airflow/dags/ytdlp_client_dag_v2.1.py`
|
||||
* **Purpose:** Provides a way to test the YTDLP Ops Thrift service interaction for a *single* video URL. Useful for debugging connection issues, testing specific account IDs, or verifying the service response for a particular URL independently of the queueing system.
|
||||
* **Parameters (Defaults):**
|
||||
* `url` (`'https://www.youtube.com/watch?v=sOlTX9uxUtM'`): The video URL to process.
|
||||
* `redis_enabled` (`False`): Use Redis for service discovery?
|
||||
* `service_ip` (`'85.192.30.55'`): Service IP if `redis_enabled=False`.
|
||||
* `service_port` (`9090`): Service port if `redis_enabled=False`.
|
||||
* `account_id` (`'account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Account ID for lookup or call.
|
||||
* `timeout` (`30`): Timeout in seconds for Thrift connection.
|
||||
* `info_json_dir` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}"`): Directory to save `info.json`.
|
||||
* **Results:**
|
||||
* Connects to the YTDLP Ops service using the specified method (Redis or direct IP).
|
||||
* Retrieves token data for the given URL and account ID.
|
||||
* Saves the video's `info.json` metadata to the specified directory.
|
||||
* Extracts the SOCKS proxy (if available).
|
||||
* Pushes `info_json_path`, `socks_proxy`, and the original `ytdlp_command` (with tokens) to XCom.
|
||||
* Optionally stores detailed results in a Redis hash (`token_info:<video_id>`).
|
||||
- **Назначение:** Забирает URL на скачивание из очереди Redis и запускает воркеры для их обработки.
|
||||
- **Принцип работы (Гибридный запуск):**
|
||||
- **По расписанию:** Каждую минуту DAG автоматически проверяет очередь Redis. Это гарантирует, что новые задачи будут подхвачены, даже если цикл обработки был временно остановлен (из-за пустой очереди).
|
||||
- **По триггеру:** Когда воркер `ytdlp_worker_per_url` успешно завершает работу, он немедленно запускает сенсор, не дожидаясь следующей минуты. Это обеспечивает непрерывную обработку без задержек.
|
||||
- **Логика:** Извлекает из Redis (`_inbox` лист) пачку URL. Если очередь пуста, DAG успешно завершается до следующего запуска (по триггеру или по расписанию).
|
||||
|
||||
### `ytdlp_mgmt_queue_add_urls`
|
||||
### `ytdlp_worker_per_url` (Воркер)
|
||||
|
||||
* **File:** `airflow/dags/ytdlp_mgmt_queue_add_urls.py`
|
||||
* **Purpose:** Manually add video URLs to a specific YTDLP inbox queue (Redis List).
|
||||
* **Parameters (Defaults):**
|
||||
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
||||
* `queue_name` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Target Redis list (inbox queue).
|
||||
* `urls` (`""`): Multiline string of video URLs to add.
|
||||
* **Results:**
|
||||
* Parses the multiline `urls` parameter.
|
||||
* Adds each valid URL to the end of the Redis list specified by `queue_name`.
|
||||
* Logs the number of URLs added.
|
||||
- **Назначение:** Обрабатывает один URL, скачивает видео и продолжает цикл.
|
||||
- **Принцип работы:**
|
||||
- Получает один URL от сенсора.
|
||||
- Обращается к сервису `ytdlp-ops-auth` для получения `info.json` и `socks5` прокси.
|
||||
- Скачивает видео, используя полученные данные. (TODO: заменить вызов `yt-dlp` как команды на вызов библиотеки).
|
||||
- В зависимости от статуса (успех/неуспех), помещает результат в соответствующий хэш Redis (`_result` или `_fail`).
|
||||
- В случае успеха, повторно запускает сенсор `ytdlp_sensor_redis_queue` для продолжения цикла обработки. В случае ошибки цикл останавливается для ручной диагностики.
|
||||
|
||||
### `ytdlp_mgmt_queue_clear`
|
||||
## Управляющие DAG'и
|
||||
|
||||
* **File:** `airflow/dags/ytdlp_mgmt_queue_clear.py`
|
||||
* **Purpose:** Manually delete a specific Redis key used by the YTDLP queues.
|
||||
* **Parameters (Defaults):**
|
||||
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
||||
* `queue_to_clear` (`'PLEASE_SPECIFY_QUEUE_TO_CLEAR'`): Exact name of the Redis key to clear. **Must be changed by user.**
|
||||
* **Results:**
|
||||
* Deletes the Redis key specified by the `queue_to_clear` parameter.
|
||||
* **Warning:** This operation is destructive and irreversible. Use with extreme caution. Ensure you specify the correct key name (e.g., `video_queue_inbox_account_xyz`, `video_queue_progress`, `video_queue_result`, `video_queue_fail`).
|
||||
Эти DAG'и предназначены для ручного управления очередями и не участвуют в автоматическом цикле.
|
||||
|
||||
### `ytdlp_mgmt_queue_check_status`
|
||||
- **`ytdlp_mgmt_queue_add_and_verify`**: Добавление URL в очередь задач (`_inbox`) и последующая проверка статуса этой очереди.
|
||||
- **`ytdlp_mgmt_queues_check_status`**: Просмотр состояния и содержимого всех ключевых очередей (`_inbox`, `_progress`, `_result`, `_fail`). Помогает отслеживать процесс обработки.
|
||||
- **`ytdlp_mgmt_queue_clear`**: Очистка (полное удаление) указанной очереди Redis. **Использовать с осторожностью**, так как операция необратима.
|
||||
|
||||
* **File:** `airflow/dags/ytdlp_mgmt_queue_check_status.py`
|
||||
* **Purpose:** Manually check the type and size of a specific YTDLP Redis queue/key.
|
||||
* **Parameters (Defaults):**
|
||||
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
||||
* `queue_to_check` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Exact name of the Redis key to check.
|
||||
* **Results:**
|
||||
* Connects to Redis and determines the type of the key specified by `queue_to_check`.
|
||||
* Determines the size (length for lists, number of fields for hashes).
|
||||
* Logs the key type and size.
|
||||
* Pushes `queue_key_type` and `queue_size` to XCom.
|
||||
## Внешние сервисы
|
||||
|
||||
### `ytdlp_mgmt_queue_list_contents`
|
||||
### `ytdlp-ops-auth` (Thrift Service)
|
||||
|
||||
* **File:** `airflow/dags/ytdlp_mgmt_queue_list_contents.py`
|
||||
* **Purpose:** Manually list the contents of a specific YTDLP Redis queue/key (list or hash). Useful for inspecting queue state or results.
|
||||
* **Parameters (Defaults):**
|
||||
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
||||
* `queue_to_list` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Exact name of the Redis key to list.
|
||||
* `max_items` (`100`): Maximum number of items/fields to list.
|
||||
* **Results:**
|
||||
* Connects to Redis and identifies the type of the key specified by `queue_to_list`.
|
||||
* If it's a List, logs the first `max_items` elements.
|
||||
* If it's a Hash, logs up to `max_items` key-value pairs, attempting to pretty-print JSON values.
|
||||
* Logs warnings for very large hashes.
|
||||
|
||||
### `ytdlp_proc_sequential_processor`
|
||||
|
||||
* **File:** `airflow/dags/ytdlp_proc_sequential_processor.py`
|
||||
* **Purpose:** Processes YouTube URLs sequentially from a Redis queue. Designed for batch processing. Pops a URL, gets token/metadata via YTDLP Ops service, downloads the media using `yt-dlp`, and records the result.
|
||||
* **Parameters (Defaults):**
|
||||
* `queue_name` (`'video_queue'`): Base name for Redis queues (e.g., `video_queue_inbox`, `video_queue_progress`).
|
||||
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
|
||||
* `redis_enabled` (`False`): Use Redis for service discovery? If False, uses `service_ip`/`port`.
|
||||
* `service_ip` (`None`): Required Service IP if `redis_enabled=False`.
|
||||
* `service_port` (`None`): Required Service port if `redis_enabled=False`.
|
||||
* `account_id` (`'default_account'`): Account ID for the API call (used for Redis lookup if `redis_enabled=True`).
|
||||
* `timeout` (`30`): Timeout in seconds for the Thrift connection.
|
||||
* `download_format` (`'ba[ext=m4a]/bestaudio/best'`): yt-dlp format selection string.
|
||||
* `output_path_template` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloads') }}/%(title)s [%(id)s].%(ext)s"`): yt-dlp output template. Uses Airflow Variable `DOWNLOADS_TEMP`.
|
||||
* `info_json_dir` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}"`): Directory to save `info.json`. Uses Airflow Variable `DOWNLOADS_TEMP`.
|
||||
* **Results:**
|
||||
* Pops one URL from the `{{ params.queue_name }}_inbox` Redis list.
|
||||
* If a URL is popped, it's added to the `{{ params.queue_name }}_progress` Redis hash.
|
||||
* The `YtdlpOpsOperator` (`get_token` task) attempts to get token data (including `info.json`, proxy, command) for the URL using the specified connection method and account ID.
|
||||
* If token retrieval succeeds, the `download_video` task executes `yt-dlp` using the retrieved `info.json`, proxy, the `download_format` parameter, and the `output_path_template` parameter to download the actual media.
|
||||
* **On Successful Download:** The URL is removed from the progress hash and added to the `{{ params.queue_name }}_result` hash along with results (`info_json_path`, `socks_proxy`, `ytdlp_command`).
|
||||
* **On Failure (Token Retrieval or Download):** The URL is removed from the progress hash and added to the `{{ params.queue_name }}_fail` hash along with error details (message, traceback).
|
||||
* If the inbox queue is empty, the DAG run skips processing via `AirflowSkipException`.
|
||||
- **Назначение:** Внешний сервис, который предоставляет аутентификационные данные (токены, cookies, proxy) для скачивания видео.
|
||||
- **Взаимодействие:** Worker DAG (`ytdlp_worker_per_url`) обращается к этому сервису перед началом загрузки для получения необходимых данных для `yt-dlp`.
|
||||
|
||||
10
dags/utils/__init__.py
Normal file
10
dags/utils/__init__.py
Normal file
@ -0,0 +1,10 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# vim:fenc=utf-8
|
||||
#
|
||||
# Copyright © 2024 rl <rl@rlmbp>
|
||||
#
|
||||
# Distributed under terms of the MIT license.
|
||||
|
||||
"""
|
||||
Airflow DAG Utilities
|
||||
"""
|
||||
32
dags/utils/redis_utils.py
Normal file
32
dags/utils/redis_utils.py
Normal file
@ -0,0 +1,32 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# vim:fenc=utf-8
|
||||
#
|
||||
# Copyright © 2024 rl <rl@rlmbp>
|
||||
#
|
||||
# Distributed under terms of the MIT license.
|
||||
|
||||
"""
|
||||
Redis utility functions for Airflow DAGs.
|
||||
"""
|
||||
|
||||
from airflow.exceptions import AirflowException
|
||||
from airflow.providers.redis.hooks.redis import RedisHook
|
||||
import logging
|
||||
import redis
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def _get_redis_client(redis_conn_id):
|
||||
"""Gets a Redis client connection using RedisHook."""
|
||||
try:
|
||||
hook = RedisHook(redis_conn_id=redis_conn_id)
|
||||
client = hook.get_conn()
|
||||
client.ping()
|
||||
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
|
||||
return client
|
||||
except redis.exceptions.AuthenticationError:
|
||||
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
|
||||
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
|
||||
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
|
||||
@ -8,6 +8,9 @@ from datetime import timedelta
|
||||
import logging
|
||||
import redis # Import redis exceptions if needed
|
||||
|
||||
# Import utility functions
|
||||
from utils.redis_utils import _get_redis_client
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -15,23 +18,6 @@ logger = logging.getLogger(__name__)
|
||||
DEFAULT_QUEUE_NAME = 'video_queue' # Default base name for the queue
|
||||
DEFAULT_REDIS_CONN_ID = 'redis_default'
|
||||
|
||||
# --- Helper Functions ---
|
||||
|
||||
def _get_redis_client(redis_conn_id):
|
||||
"""Gets a Redis client connection using RedisHook."""
|
||||
try:
|
||||
hook = RedisHook(redis_conn_id=redis_conn_id)
|
||||
client = hook.get_conn()
|
||||
client.ping()
|
||||
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
|
||||
return client
|
||||
except redis.exceptions.AuthenticationError:
|
||||
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
|
||||
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
|
||||
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
|
||||
|
||||
# --- Python Callables for Tasks ---
|
||||
|
||||
def add_urls_callable(**context):
|
||||
|
||||
@ -28,22 +28,8 @@ DEFAULT_REDIS_CONN_ID = 'redis_default'
|
||||
DEFAULT_QUEUE_BASE_NAME = 'video_queue'
|
||||
DEFAULT_MAX_ITEMS_TO_LIST = 25
|
||||
|
||||
# --- Helper Function ---
|
||||
|
||||
def _get_redis_client(redis_conn_id):
|
||||
"""Gets a Redis client connection using RedisHook."""
|
||||
try:
|
||||
hook = RedisHook(redis_conn_id=redis_conn_id)
|
||||
client = hook.get_conn()
|
||||
client.ping()
|
||||
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
|
||||
return client
|
||||
except redis.exceptions.AuthenticationError:
|
||||
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
|
||||
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
|
||||
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
|
||||
# Import utility functions
|
||||
from utils.redis_utils import _get_redis_client
|
||||
|
||||
# --- Python Callable for Check and List Task ---
|
||||
|
||||
|
||||
@ -1,5 +1,10 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# vim:fenc=utf-8
|
||||
#
|
||||
# Copyright © 2024 rl <rl@rlmbp>
|
||||
#
|
||||
# Distributed under terms of the MIT license.
|
||||
|
||||
"""
|
||||
Airflow DAG for manually clearing (deleting) a specific Redis key used by YTDLP queues.
|
||||
"""
|
||||
@ -22,22 +27,8 @@ DEFAULT_REDIS_CONN_ID = 'redis_default'
|
||||
# Provide a placeholder default, user MUST specify the queue to clear
|
||||
DEFAULT_QUEUE_TO_CLEAR = 'PLEASE_SPECIFY_QUEUE_TO_CLEAR'
|
||||
|
||||
# --- Helper Function ---
|
||||
|
||||
def _get_redis_client(redis_conn_id):
|
||||
"""Gets a Redis client connection using RedisHook."""
|
||||
try:
|
||||
hook = RedisHook(redis_conn_id=redis_conn_id)
|
||||
client = hook.get_conn()
|
||||
client.ping()
|
||||
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
|
||||
return client
|
||||
except redis.exceptions.AuthenticationError:
|
||||
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
|
||||
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
|
||||
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
|
||||
# Import utility functions
|
||||
from utils.redis_utils import _get_redis_client
|
||||
|
||||
# --- Python Callable for Clear Task ---
|
||||
|
||||
|
||||
@ -29,24 +29,8 @@ DEFAULT_REDIS_CONN_ID = 'redis_default'
|
||||
DEFAULT_QUEUE_TO_LIST = 'video_queue_inbox'
|
||||
DEFAULT_MAX_ITEMS = 10 # Limit number of items listed by default
|
||||
|
||||
# --- Helper Function ---
|
||||
|
||||
def _get_redis_client(redis_conn_id):
|
||||
"""Gets a Redis client connection using RedisHook."""
|
||||
try:
|
||||
hook = RedisHook(redis_conn_id=redis_conn_id)
|
||||
# decode_responses=True removed as it's not supported by get_conn in some environments
|
||||
# We will decode manually where needed.
|
||||
client = hook.get_conn()
|
||||
client.ping()
|
||||
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
|
||||
return client
|
||||
except redis.exceptions.AuthenticationError:
|
||||
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
|
||||
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
|
||||
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
|
||||
# Import utility functions
|
||||
from utils.redis_utils import _get_redis_client
|
||||
|
||||
# --- Python Callable for List Contents Task ---
|
||||
|
||||
|
||||
@ -1,720 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# vim:fenc=utf-8
|
||||
#
|
||||
# Copyright © 2024 rl <rl@rlmbp>
|
||||
#
|
||||
# Distributed under terms of the MIT license.
|
||||
|
||||
"""
|
||||
DAG for processing YouTube URLs sequentially from a Redis queue using YTDLP Ops Thrift service.
|
||||
"""
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.exceptions import AirflowException, AirflowSkipException, AirflowFailException
|
||||
from airflow.hooks.base import BaseHook
|
||||
from airflow.models import BaseOperator, Variable
|
||||
from airflow.models.param import Param
|
||||
from airflow.operators.bash import BashOperator # Import BashOperator
|
||||
from airflow.operators.python import PythonOperator
|
||||
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
|
||||
from airflow.providers.redis.hooks.redis import RedisHook
|
||||
from airflow.utils.dates import days_ago
|
||||
from airflow.utils.decorators import apply_defaults
|
||||
from datetime import datetime, timedelta
|
||||
from pangramia.yt.common.ttypes import TokenUpdateMode
|
||||
from pangramia.yt.exceptions.ttypes import PBServiceException
|
||||
from pangramia.yt.tokens_ops import YTTokenOpService
|
||||
from thrift.protocol import TBinaryProtocol
|
||||
from thrift.transport import TSocket, TTransport
|
||||
from thrift.transport.TTransport import TTransportException
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import redis # Import redis exceptions if needed
|
||||
import socket
|
||||
import time
|
||||
import traceback # For logging stack traces in failure handler
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Default settings
|
||||
DEFAULT_QUEUE_NAME = 'video_queue' # Base name for queues
|
||||
DEFAULT_REDIS_CONN_ID = 'redis_default'
|
||||
DEFAULT_TIMEOUT = 30 # Default Thrift timeout in seconds
|
||||
MAX_RETRIES_REDIS_LOOKUP = 3 # Retries for fetching service details from Redis
|
||||
RETRY_DELAY_REDIS_LOOKUP = 10 # Delay (seconds) for Redis lookup retries
|
||||
|
||||
# --- Helper Functions ---
|
||||
|
||||
def _get_redis_client(redis_conn_id):
|
||||
"""Gets a Redis client connection using RedisHook."""
|
||||
try:
|
||||
hook = RedisHook(redis_conn_id=redis_conn_id)
|
||||
client = hook.get_conn()
|
||||
client.ping()
|
||||
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
|
||||
return client
|
||||
except redis.exceptions.AuthenticationError:
|
||||
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
|
||||
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
|
||||
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
|
||||
|
||||
def _extract_video_id(url):
|
||||
"""Extracts YouTube video ID from URL."""
|
||||
if not url or not isinstance(url, str):
|
||||
logger.debug("URL is empty or not a string, cannot extract video ID.")
|
||||
return None
|
||||
try:
|
||||
video_id = None
|
||||
if 'youtube.com/watch?v=' in url:
|
||||
video_id = url.split('v=')[1].split('&')[0]
|
||||
elif 'youtu.be/' in url:
|
||||
video_id = url.split('youtu.be/')[1].split('?')[0]
|
||||
|
||||
if video_id and len(video_id) >= 11:
|
||||
video_id = video_id[:11] # Standard ID length
|
||||
logger.debug(f"Extracted video ID '{video_id}' from URL: {url}")
|
||||
return video_id
|
||||
else:
|
||||
logger.debug(f"Could not extract a standard video ID pattern from URL: {url}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to extract video ID from URL '{url}'. Error: {e}")
|
||||
return None
|
||||
|
||||
# --- Queue Management Callables ---
|
||||
|
||||
def pop_url_from_queue(**context):
|
||||
"""Pops a URL from the inbox queue and pushes to XCom."""
|
||||
params = context['params']
|
||||
queue_name = params['queue_name']
|
||||
inbox_queue = f"{queue_name}_inbox"
|
||||
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
|
||||
logger.info(f"Attempting to pop URL from inbox queue: {inbox_queue}")
|
||||
|
||||
try:
|
||||
client = _get_redis_client(redis_conn_id)
|
||||
# LPOP is non-blocking, returns None if empty
|
||||
url_bytes = client.lpop(inbox_queue) # Returns bytes if decode_responses=False on hook/client
|
||||
|
||||
if url_bytes:
|
||||
url = url_bytes.decode('utf-8') if isinstance(url_bytes, bytes) else url_bytes
|
||||
logger.info(f"Popped URL: {url}")
|
||||
context['task_instance'].xcom_push(key='current_url', value=url)
|
||||
return url # Return URL for logging/potential use
|
||||
else:
|
||||
logger.info(f"Inbox queue '{inbox_queue}' is empty. Skipping downstream tasks.")
|
||||
context['task_instance'].xcom_push(key='current_url', value=None)
|
||||
# Raise AirflowSkipException to signal downstream tasks to skip
|
||||
raise AirflowSkipException(f"Inbox queue '{inbox_queue}' is empty.")
|
||||
except AirflowSkipException:
|
||||
raise # Re-raise skip exception
|
||||
except Exception as e:
|
||||
logger.error(f"Error popping URL from Redis queue '{inbox_queue}': {e}", exc_info=True)
|
||||
raise AirflowException(f"Failed to pop URL from Redis: {e}")
|
||||
|
||||
|
||||
def move_url_to_progress(**context):
|
||||
"""Moves the current URL from XCom to the progress hash."""
|
||||
ti = context['task_instance']
|
||||
url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url')
|
||||
|
||||
# This task should be skipped if pop_url_from_queue raised AirflowSkipException
|
||||
# Adding check for robustness
|
||||
if not url:
|
||||
logger.info("No URL found in XCom (or upstream skipped). Skipping move to progress.")
|
||||
raise AirflowSkipException("No URL to process.")
|
||||
|
||||
params = context['params']
|
||||
queue_name = params['queue_name']
|
||||
progress_queue = f"{queue_name}_progress"
|
||||
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
|
||||
logger.info(f"Moving URL '{url}' to progress hash: {progress_queue}")
|
||||
|
||||
progress_data = {
|
||||
'status': 'processing',
|
||||
'start_time': time.time(),
|
||||
'dag_run_id': context['dag_run'].run_id,
|
||||
'task_instance_key_str': context['task_instance_key_str']
|
||||
}
|
||||
|
||||
try:
|
||||
client = _get_redis_client(redis_conn_id)
|
||||
client.hset(progress_queue, url, json.dumps(progress_data))
|
||||
logger.info(f"Moved URL '{url}' to progress hash '{progress_queue}'.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error moving URL to Redis progress hash '{progress_queue}': {e}", exc_info=True)
|
||||
# If this fails, the URL is popped but not tracked as processing. Fail the task.
|
||||
raise AirflowException(f"Failed to move URL to progress hash: {e}")
|
||||
|
||||
|
||||
def handle_success(**context):
|
||||
"""Moves URL from progress to result hash on success."""
|
||||
ti = context['task_instance']
|
||||
url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url')
|
||||
if not url:
|
||||
logger.warning("handle_success called but no URL found from pop_url_from_queue XCom. This shouldn't happen on success path.")
|
||||
return # Or raise error
|
||||
|
||||
params = context['params']
|
||||
queue_name = params['queue_name']
|
||||
progress_queue = f"{queue_name}_progress"
|
||||
result_queue = f"{queue_name}_result"
|
||||
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
|
||||
|
||||
# Pull results from get_token task
|
||||
info_json_path = ti.xcom_pull(task_ids='get_token', key='info_json_path')
|
||||
socks_proxy = ti.xcom_pull(task_ids='get_token', key='socks_proxy')
|
||||
ytdlp_command = ti.xcom_pull(task_ids='get_token', key='ytdlp_command') # Original command
|
||||
downloaded_file_path = ti.xcom_pull(task_ids='download_video') # Pull from download_video task
|
||||
|
||||
logger.info(f"Handling success for URL: {url}")
|
||||
logger.info(f" Info JSON Path: {info_json_path}")
|
||||
logger.info(f" SOCKS Proxy: {socks_proxy}")
|
||||
logger.info(f" YTDLP Command: {ytdlp_command[:100] if ytdlp_command else 'None'}...") # Log truncated command
|
||||
logger.info(f" Downloaded File Path: {downloaded_file_path}")
|
||||
|
||||
result_data = {
|
||||
'status': 'success',
|
||||
'end_time': time.time(),
|
||||
'info_json_path': info_json_path,
|
||||
'socks_proxy': socks_proxy,
|
||||
'ytdlp_command': ytdlp_command,
|
||||
'downloaded_file_path': downloaded_file_path,
|
||||
'url': url,
|
||||
'dag_run_id': context['dag_run'].run_id,
|
||||
'task_instance_key_str': context['task_instance_key_str'] # Record which task instance succeeded
|
||||
}
|
||||
|
||||
try:
|
||||
client = _get_redis_client(redis_conn_id)
|
||||
# Remove from progress hash
|
||||
removed_count = client.hdel(progress_queue, url)
|
||||
if removed_count > 0:
|
||||
logger.info(f"Removed URL '{url}' from progress hash '{progress_queue}'.")
|
||||
else:
|
||||
logger.warning(f"URL '{url}' not found in progress hash '{progress_queue}' during success handling.")
|
||||
|
||||
# Add to result hash
|
||||
client.hset(result_queue, url, json.dumps(result_data))
|
||||
logger.info(f"Stored success result for URL '{url}' in result hash '{result_queue}'.")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling success in Redis for URL '{url}': {e}", exc_info=True)
|
||||
# Even if Redis fails, the task succeeded. Log error but don't fail the task.
|
||||
# Consider adding retry logic for Redis operations here or marking state differently.
|
||||
|
||||
|
||||
def handle_failure(**context):
|
||||
"""
|
||||
Handles failed processing. Depending on the `requeue_on_failure` parameter,
|
||||
it either moves the URL to the fail hash or re-queues it in the inbox.
|
||||
If `stop_on_failure` is True, this task will fail, stopping the DAG loop.
|
||||
"""
|
||||
ti = context['task_instance']
|
||||
url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url')
|
||||
if not url:
|
||||
logger.error("handle_failure called but no URL found from pop_url_from_queue XCom.")
|
||||
return
|
||||
|
||||
params = context['params']
|
||||
queue_name = params['queue_name']
|
||||
progress_queue = f"{queue_name}_progress"
|
||||
fail_queue = f"{queue_name}_fail"
|
||||
inbox_queue = f"{queue_name}_inbox"
|
||||
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
|
||||
requeue_on_failure = params.get('requeue_on_failure', False)
|
||||
stop_on_failure = params.get('stop_on_failure', True) # Default to True
|
||||
|
||||
exception = context.get('exception')
|
||||
error_message = str(exception) if exception else "Unknown error"
|
||||
tb_str = traceback.format_exc() if exception else "No traceback available."
|
||||
|
||||
logger.info(f"Handling failure for URL: {url}")
|
||||
logger.error(f" Failure Reason: {error_message}")
|
||||
logger.debug(f" Traceback:\n{tb_str}")
|
||||
|
||||
try:
|
||||
client = _get_redis_client(redis_conn_id)
|
||||
# Always remove from progress hash first
|
||||
removed_count = client.hdel(progress_queue, url)
|
||||
if removed_count > 0:
|
||||
logger.info(f"Removed URL '{url}' from progress hash '{progress_queue}'.")
|
||||
else:
|
||||
logger.warning(f"URL '{url}' not found in progress hash '{progress_queue}' during failure handling.")
|
||||
|
||||
if requeue_on_failure:
|
||||
# Re-queue the URL for another attempt
|
||||
client.rpush(inbox_queue, url)
|
||||
logger.info(f"Re-queued failed URL '{url}' to inbox '{inbox_queue}' for retry.")
|
||||
else:
|
||||
# Move to the permanent fail hash
|
||||
fail_data = {
|
||||
'status': 'failed',
|
||||
'end_time': time.time(),
|
||||
'error': error_message,
|
||||
'traceback': tb_str,
|
||||
'url': url,
|
||||
'dag_run_id': context['dag_run'].run_id,
|
||||
'task_instance_key_str': context['task_instance_key_str']
|
||||
}
|
||||
client.hset(fail_queue, url, json.dumps(fail_data))
|
||||
logger.info(f"Stored failure details for URL '{url}' in fail hash '{fail_queue}'.")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during failure handling in Redis for URL '{url}': {e}", exc_info=True)
|
||||
# This is a critical error in the failure handling logic itself.
|
||||
raise AirflowException(f"Could not handle failure in Redis: {e}")
|
||||
|
||||
# After handling Redis, decide whether to fail the task to stop the loop
|
||||
if stop_on_failure:
|
||||
logger.error("stop_on_failure is True. Failing this task to stop the DAG loop.")
|
||||
# Re-raise the original exception to fail the task instance.
|
||||
# This is better than AirflowFailException because it preserves the original error.
|
||||
if exception:
|
||||
raise exception
|
||||
else:
|
||||
# If for some reason there's no exception, fail explicitly.
|
||||
raise AirflowFailException("Failing task as per stop_on_failure=True, but original exception was not found.")
|
||||
|
||||
|
||||
# --- YtdlpOpsOperator ---
|
||||
|
||||
class YtdlpOpsOperator(BaseOperator):
|
||||
"""
|
||||
Custom Airflow operator to interact with YTDLP Thrift service. Handles direct connections
|
||||
and Redis-based discovery, retrieves tokens, saves info.json, and manages errors.
|
||||
Modified to pull URL from XCom for sequential processing.
|
||||
"""
|
||||
# Removed 'url' from template_fields as it's pulled from XCom
|
||||
template_fields = ('service_ip', 'service_port', 'account_id', 'timeout', 'info_json_dir', 'redis_conn_id')
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self,
|
||||
# url parameter removed - will be pulled from XCom
|
||||
redis_conn_id=DEFAULT_REDIS_CONN_ID,
|
||||
max_retries_lookup=MAX_RETRIES_REDIS_LOOKUP,
|
||||
retry_delay_lookup=RETRY_DELAY_REDIS_LOOKUP,
|
||||
service_ip=None,
|
||||
service_port=None,
|
||||
redis_enabled=False, # Default to direct connection now
|
||||
account_id=None,
|
||||
# save_info_json removed, always True
|
||||
info_json_dir=None,
|
||||
# get_socks_proxy removed, always True
|
||||
# store_socks_proxy removed, always True
|
||||
# get_socks_proxy=True, # Removed
|
||||
# store_socks_proxy=True, # Store proxy in XCom by default # Removed
|
||||
timeout=DEFAULT_TIMEOUT,
|
||||
*args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
logger.info(f"Initializing YtdlpOpsOperator (Processor Version) with parameters: "
|
||||
f"redis_conn_id={redis_conn_id}, max_retries_lookup={max_retries_lookup}, retry_delay_lookup={retry_delay_lookup}, "
|
||||
f"service_ip={service_ip}, service_port={service_port}, redis_enabled={redis_enabled}, "
|
||||
f"account_id={account_id}, info_json_dir={info_json_dir}, timeout={timeout}")
|
||||
# save_info_json, get_socks_proxy, store_socks_proxy removed from log
|
||||
|
||||
# Validate parameters based on connection mode
|
||||
if redis_enabled:
|
||||
# If using Redis, account_id is essential for lookup
|
||||
if not account_id:
|
||||
raise ValueError("account_id is required when redis_enabled=True for service lookup.")
|
||||
else:
|
||||
# If direct connection, IP and Port are essential
|
||||
if not service_ip or not service_port:
|
||||
raise ValueError("Both service_ip and service_port must be specified when redis_enabled=False.")
|
||||
# Account ID is still needed for the API call itself, but rely on DAG param or operator config
|
||||
if not account_id:
|
||||
logger.warning("No account_id provided for direct connection mode. Ensure it's set in DAG params or operator config.")
|
||||
# We won't assign 'default' here, let the value passed during instantiation be used.
|
||||
|
||||
# self.url is no longer needed here
|
||||
self.redis_conn_id = redis_conn_id
|
||||
self.max_retries_lookup = max_retries_lookup
|
||||
self.retry_delay_lookup = int(retry_delay_lookup.total_seconds() if isinstance(retry_delay_lookup, timedelta) else retry_delay_lookup)
|
||||
self.service_ip = service_ip
|
||||
self.service_port = service_port
|
||||
self.redis_enabled = redis_enabled
|
||||
self.account_id = account_id
|
||||
# self.save_info_json removed
|
||||
self.info_json_dir = info_json_dir # Still needed
|
||||
# self.get_socks_proxy removed
|
||||
# self.store_socks_proxy removed
|
||||
self.timeout = timeout
|
||||
|
||||
def execute(self, context):
|
||||
logger.info("Executing YtdlpOpsOperator (Processor Version)")
|
||||
transport = None
|
||||
ti = context['task_instance'] # Get task instance for XCom access
|
||||
|
||||
try:
|
||||
# --- Get URL from XCom ---
|
||||
url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url')
|
||||
if not url:
|
||||
# This should ideally be caught by upstream skip, but handle defensively
|
||||
logger.info("No URL found in XCom from pop_url_from_queue. Skipping execution.")
|
||||
raise AirflowSkipException("Upstream task did not provide a URL.")
|
||||
logger.info(f"Processing URL from XCom: {url}")
|
||||
# --- End Get URL ---
|
||||
|
||||
logger.info("Getting task parameters and rendering templates")
|
||||
params = context['params'] # DAG run params
|
||||
|
||||
# Render template fields using context
|
||||
# Use render_template_as_native for better type handling if needed, else render_template
|
||||
redis_conn_id = self.render_template(self.redis_conn_id, context)
|
||||
service_ip = self.render_template(self.service_ip, context)
|
||||
service_port_rendered = self.render_template(self.service_port, context)
|
||||
account_id = self.render_template(self.account_id, context)
|
||||
timeout_rendered = self.render_template(self.timeout, context)
|
||||
info_json_dir = self.render_template(self.info_json_dir, context) # Rendered here for _save_info_json
|
||||
|
||||
# Determine effective settings (DAG params override operator defaults)
|
||||
redis_enabled = params.get('redis_enabled', self.redis_enabled)
|
||||
account_id = params.get('account_id', account_id) # Use DAG param if provided
|
||||
redis_conn_id = params.get('redis_conn_id', redis_conn_id) # Use DAG param if provided
|
||||
|
||||
logger.info(f"Effective settings: redis_enabled={redis_enabled}, account_id='{account_id}', redis_conn_id='{redis_conn_id}'")
|
||||
|
||||
host = None
|
||||
port = None
|
||||
|
||||
if redis_enabled:
|
||||
# Get Redis connection using the helper for consistency
|
||||
redis_client = _get_redis_client(redis_conn_id)
|
||||
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}' for service discovery.")
|
||||
|
||||
# Get service details from Redis with retries
|
||||
service_key = f"ytdlp:{account_id}"
|
||||
legacy_key = account_id # For backward compatibility
|
||||
|
||||
for attempt in range(self.max_retries_lookup):
|
||||
try:
|
||||
logger.info(f"Attempt {attempt + 1}/{self.max_retries_lookup}: Fetching service details from Redis for keys: '{service_key}', '{legacy_key}'")
|
||||
service_details = redis_client.hgetall(service_key)
|
||||
if not service_details:
|
||||
logger.warning(f"Key '{service_key}' not found, trying legacy key '{legacy_key}'")
|
||||
service_details = redis_client.hgetall(legacy_key)
|
||||
|
||||
if not service_details:
|
||||
raise ValueError(f"No service details found in Redis for keys: {service_key} or {legacy_key}")
|
||||
|
||||
# Find IP and port (case-insensitive keys)
|
||||
ip_key = next((k for k in service_details if k.lower() == 'ip'), None)
|
||||
port_key = next((k for k in service_details if k.lower() == 'port'), None)
|
||||
|
||||
if not ip_key: raise ValueError(f"'ip' key not found in Redis hash for {service_key}/{legacy_key}")
|
||||
if not port_key: raise ValueError(f"'port' key not found in Redis hash for {service_key}/{legacy_key}")
|
||||
|
||||
host = service_details[ip_key] # Assumes decode_responses=True in hook
|
||||
port_str = service_details[port_key]
|
||||
|
||||
try:
|
||||
port = int(port_str)
|
||||
except (ValueError, TypeError):
|
||||
raise ValueError(f"Invalid port value '{port_str}' found in Redis for {service_key}/{legacy_key}")
|
||||
|
||||
logger.info(f"Extracted from Redis - Service IP: {host}, Service Port: {port}")
|
||||
break # Success
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Attempt {attempt + 1} failed to get Redis details: {str(e)}")
|
||||
if attempt == self.max_retries_lookup - 1:
|
||||
logger.error("Max retries reached for fetching Redis details.")
|
||||
raise AirflowException(f"Failed to get service details from Redis after {self.max_retries_lookup} attempts: {e}")
|
||||
logger.info(f"Retrying in {self.retry_delay_lookup} seconds...")
|
||||
time.sleep(self.retry_delay_lookup)
|
||||
else:
|
||||
# Direct connection: Use rendered/param values
|
||||
host = params.get('service_ip', service_ip) # Use DAG param if provided
|
||||
port_str = params.get('service_port', service_port_rendered) # Use DAG param if provided
|
||||
|
||||
logger.info(f"Using direct connection settings: service_ip={host}, service_port={port_str}")
|
||||
|
||||
if not host or not port_str:
|
||||
raise ValueError("Direct connection requires service_ip and service_port (check Operator config and DAG params)")
|
||||
try:
|
||||
port = int(port_str)
|
||||
except (ValueError, TypeError):
|
||||
raise ValueError(f"Invalid service_port value: {port_str}")
|
||||
|
||||
logger.info(f"Connecting directly to Thrift service at {host}:{port} (Redis bypassed)")
|
||||
|
||||
# Validate and use timeout
|
||||
try:
|
||||
timeout = int(timeout_rendered)
|
||||
if timeout <= 0: raise ValueError("Timeout must be positive")
|
||||
logger.info(f"Using timeout: {timeout} seconds")
|
||||
except (ValueError, TypeError):
|
||||
logger.warning(f"Invalid timeout value: '{timeout_rendered}'. Using default: {DEFAULT_TIMEOUT}")
|
||||
timeout = DEFAULT_TIMEOUT
|
||||
|
||||
# Create Thrift connection objects
|
||||
# socket_conn = TSocket.TSocket(host, port) # Original
|
||||
socket_conn = TSocket.TSocket(host, port, socket_family=socket.AF_INET) # Explicitly use AF_INET (IPv4)
|
||||
socket_conn.setTimeout(timeout * 1000) # Thrift timeout is in milliseconds
|
||||
transport = TTransport.TFramedTransport(socket_conn) # Use TFramedTransport if server expects it
|
||||
# transport = TTransport.TBufferedTransport(socket_conn) # Use TBufferedTransport if server expects it
|
||||
protocol = TBinaryProtocol.TBinaryProtocol(transport)
|
||||
client = YTTokenOpService.Client(protocol)
|
||||
|
||||
logger.info(f"Attempting to connect to Thrift server at {host}:{port}...")
|
||||
try:
|
||||
transport.open()
|
||||
logger.info("Successfully connected to Thrift server.")
|
||||
|
||||
# Test connection with ping
|
||||
try:
|
||||
client.ping()
|
||||
logger.info("Server ping successful.")
|
||||
except Exception as e:
|
||||
logger.error(f"Server ping failed: {e}")
|
||||
raise AirflowException(f"Server connection test (ping) failed: {e}")
|
||||
|
||||
# Get token from service using the URL from XCom
|
||||
try:
|
||||
logger.info(f"Requesting token for accountId='{account_id}', url='{url}'")
|
||||
token_data = client.getOrRefreshToken(
|
||||
accountId=account_id,
|
||||
updateType=TokenUpdateMode.AUTO,
|
||||
url=url # Use the url variable from XCom
|
||||
)
|
||||
logger.info("Successfully retrieved token data from service.")
|
||||
except PBServiceException as e:
|
||||
# Handle specific service exceptions
|
||||
error_code = getattr(e, 'errorCode', 'N/A')
|
||||
error_message = getattr(e, 'message', 'N/A')
|
||||
error_context = getattr(e, 'context', {})
|
||||
logger.error(f"PBServiceException occurred: Code={error_code}, Message={error_message}")
|
||||
if error_context:
|
||||
logger.error(f" Context: {error_context}") # Log context separately
|
||||
# Construct a concise error message for AirflowException
|
||||
error_msg = f"YTDLP service error (Code: {error_code}): {error_message}"
|
||||
# Add specific error code handling if needed...
|
||||
logger.error(f"Failing task instance due to PBServiceException: {error_msg}") # Add explicit log before raising
|
||||
raise AirflowException(error_msg) # Fail task on service error
|
||||
except TTransportException as e:
|
||||
logger.error(f"Thrift transport error during getOrRefreshToken: {e}")
|
||||
logger.error(f"Failing task instance due to TTransportException: {e}") # Add explicit log before raising
|
||||
raise AirflowException(f"Transport error during API call: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error during getOrRefreshToken: {e}")
|
||||
logger.error(f"Failing task instance due to unexpected error during API call: {e}") # Add explicit log before raising
|
||||
raise AirflowException(f"Unexpected error during API call: {e}")
|
||||
|
||||
except TTransportException as e:
|
||||
# Handle connection errors
|
||||
logger.error(f"Thrift transport error during connection: {str(e)}")
|
||||
logger.error(f"Failing task instance due to TTransportException during connection: {e}") # Add explicit log before raising
|
||||
raise AirflowException(f"Transport error connecting to YTDLP service: {str(e)}")
|
||||
# Removed the overly broad except Exception block here, as inner blocks raise AirflowException
|
||||
|
||||
# --- Process Token Data ---
|
||||
logger.debug(f"Token data received. Attributes: {dir(token_data)}")
|
||||
|
||||
info_json_path = None # Initialize
|
||||
|
||||
# save_info_json is now always True
|
||||
logger.info("Proceeding to save info.json (save_info_json=True).")
|
||||
info_json = self._get_info_json(token_data)
|
||||
if info_json and self._is_valid_json(info_json):
|
||||
try:
|
||||
# Pass rendered info_json_dir to helper
|
||||
info_json_path = self._save_info_json(context, info_json, url, account_id, info_json_dir)
|
||||
if info_json_path:
|
||||
ti.xcom_push(key='info_json_path', value=info_json_path)
|
||||
logger.info(f"Successfully saved info.json and pushed path to XCom: {info_json_path}")
|
||||
else:
|
||||
ti.xcom_push(key='info_json_path', value=None)
|
||||
logger.warning("info.json saving failed (check logs from _save_info_json).")
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error during info.json saving process: {e}", exc_info=True)
|
||||
ti.xcom_push(key='info_json_path', value=None)
|
||||
elif info_json:
|
||||
logger.warning("Retrieved infoJson is not valid JSON. Skipping save.")
|
||||
ti.xcom_push(key='info_json_path', value=None)
|
||||
else:
|
||||
logger.info("No infoJson found in token data. Skipping save.")
|
||||
ti.xcom_push(key='info_json_path', value=None)
|
||||
|
||||
|
||||
# Extract and potentially store SOCKS proxy
|
||||
# get_socks_proxy and store_socks_proxy are now always True
|
||||
socks_proxy = None
|
||||
logger.info("Attempting to extract SOCKS proxy (get_socks_proxy=True).")
|
||||
proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None)
|
||||
if proxy_attr:
|
||||
socks_proxy = getattr(token_data, proxy_attr)
|
||||
if socks_proxy:
|
||||
logger.info(f"Extracted SOCKS proxy ({proxy_attr}): {socks_proxy}")
|
||||
# Always store if found (store_socks_proxy=True)
|
||||
ti.xcom_push(key='socks_proxy', value=socks_proxy)
|
||||
logger.info("Pushed 'socks_proxy' to XCom.")
|
||||
else:
|
||||
logger.info(f"Found proxy attribute '{proxy_attr}' but value is empty.")
|
||||
# Store None if attribute found but empty
|
||||
ti.xcom_push(key='socks_proxy', value=None)
|
||||
logger.info("Pushed None to XCom for 'socks_proxy' as extracted value was empty.")
|
||||
else:
|
||||
logger.info("No SOCKS proxy attribute found in token data.")
|
||||
# Store None if attribute not found
|
||||
ti.xcom_push(key='socks_proxy', value=None)
|
||||
logger.info("Pushed None to XCom for 'socks_proxy' as attribute was not found.")
|
||||
|
||||
|
||||
# --- Removed old logic block ---
|
||||
# # Extract and potentially store SOCKS proxy
|
||||
# socks_proxy = None
|
||||
# get_socks_proxy = params.get('get_socks_proxy', self.get_socks_proxy)
|
||||
# store_socks_proxy = params.get('store_socks_proxy', self.store_socks_proxy)
|
||||
#
|
||||
# if get_socks_proxy:
|
||||
# proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None)
|
||||
# if proxy_attr:
|
||||
# socks_proxy = getattr(token_data, proxy_attr)
|
||||
# if socks_proxy:
|
||||
# logger.info(f"Extracted SOCKS proxy ({proxy_attr}): {socks_proxy}")
|
||||
# if store_socks_proxy:
|
||||
# ti.xcom_push(key='socks_proxy', value=socks_proxy)
|
||||
# logger.info("Pushed 'socks_proxy' to XCom.")
|
||||
# else:
|
||||
# logger.info(f"Found proxy attribute '{proxy_attr}' but value is empty.")
|
||||
# if store_socks_proxy: ti.xcom_push(key='socks_proxy', value=None)
|
||||
# else:
|
||||
# logger.info("get_socks_proxy is True, but no SOCKS proxy attribute found.")
|
||||
# if store_socks_proxy: ti.xcom_push(key='socks_proxy', value=None)
|
||||
# else:
|
||||
# logger.info("get_socks_proxy is False. Skipping proxy extraction.")
|
||||
# if store_socks_proxy: ti.xcom_push(key='socks_proxy', value=None)
|
||||
# --- End Removed old logic block ---
|
||||
|
||||
|
||||
# Get the original command from the server, or construct a fallback
|
||||
ytdlp_cmd = getattr(token_data, 'ytdlpCommand', None)
|
||||
if ytdlp_cmd:
|
||||
logger.info(f"Original command received from server: {ytdlp_cmd[:100]}...") # Log truncated
|
||||
else:
|
||||
logger.warning("No 'ytdlpCommand' attribute found in token data. Constructing a fallback for logging.")
|
||||
# Construct a representative command for logging purposes
|
||||
if socks_proxy:
|
||||
ytdlp_cmd = f"yt-dlp --dump-json --proxy \"{socks_proxy}\" \"{url}\""
|
||||
else:
|
||||
ytdlp_cmd = f"yt-dlp --dump-json \"{url}\""
|
||||
logger.info(f"Constructed fallback command: {ytdlp_cmd}")
|
||||
|
||||
# Push the command to XCom
|
||||
ti.xcom_push(key='ytdlp_command', value=ytdlp_cmd)
|
||||
logger.info("Pushed command to XCom key 'ytdlp_command'.")
|
||||
|
||||
# No explicit return needed, success is implicit if no exception raised
|
||||
|
||||
except (AirflowSkipException, AirflowFailException) as e:
|
||||
logger.info(f"Task skipped or failed explicitly: {e}")
|
||||
raise # Re-raise to let Airflow handle state
|
||||
except AirflowException as e: # Catch AirflowExceptions raised explicitly
|
||||
logger.error(f"Operation failed due to AirflowException: {e}", exc_info=True)
|
||||
raise # Re-raise AirflowExceptions to ensure task failure
|
||||
except (TTransportException, PBServiceException) as e: # Catch specific Thrift/Service errors not already handled inside inner try
|
||||
logger.error(f"Unhandled YTDLP Service/Transport error in outer block: {e}", exc_info=True)
|
||||
logger.error(f"Failing task instance due to unhandled outer Service/Transport error: {e}") # Add explicit log before raising
|
||||
raise AirflowException(f"Unhandled YTDLP service error: {e}") # Wrap in AirflowException to fail task
|
||||
except Exception as e: # General catch-all for truly unexpected errors
|
||||
logger.error(f"Caught unexpected error in YtdlpOpsOperator outer block: {e}", exc_info=True)
|
||||
logger.error(f"Failing task instance due to unexpected outer error: {e}") # Add explicit log before raising
|
||||
raise AirflowException(f"Unexpected error caused task failure: {e}") # Wrap to fail task
|
||||
finally:
|
||||
if transport and transport.isOpen():
|
||||
logger.info("Closing Thrift transport.")
|
||||
transport.close()
|
||||
|
||||
# --- Helper Methods ---
|
||||
|
||||
def _get_info_json(self, token_data):
|
||||
"""Safely extracts infoJson from token data."""
|
||||
return getattr(token_data, 'infoJson', None)
|
||||
|
||||
def _is_valid_json(self, json_str):
|
||||
"""Checks if a string is valid JSON."""
|
||||
if not json_str or not isinstance(json_str, str): return False
|
||||
try:
|
||||
json.loads(json_str)
|
||||
return True
|
||||
except json.JSONDecodeError:
|
||||
return False
|
||||
|
||||
def _save_info_json(self, context, info_json, url, account_id, rendered_info_json_dir):
|
||||
"""Saves info_json to a file. Uses pre-rendered directory path."""
|
||||
try:
|
||||
video_id = _extract_video_id(url) # Use standalone helper
|
||||
|
||||
save_dir = rendered_info_json_dir or "." # Use rendered path
|
||||
logger.info(f"Target directory for info.json: {save_dir}")
|
||||
|
||||
# Ensure directory exists
|
||||
try:
|
||||
os.makedirs(save_dir, exist_ok=True)
|
||||
logger.info(f"Ensured directory exists: {save_dir}")
|
||||
except OSError as e:
|
||||
logger.error(f"Could not create directory {save_dir}: {e}. Cannot save info.json.")
|
||||
return None
|
||||
|
||||
# Construct filename
|
||||
timestamp = int(time.time())
|
||||
base_filename = f"info_{video_id or 'unknown'}_{account_id}_{timestamp}.json"
|
||||
info_json_path = os.path.join(save_dir, base_filename)
|
||||
latest_json_path = os.path.join(save_dir, "latest.json") # Path for the latest symlink/copy
|
||||
|
||||
# Write to timestamped file
|
||||
try:
|
||||
logger.info(f"Writing info.json content (received from service) to {info_json_path}...")
|
||||
with open(info_json_path, 'w', encoding='utf-8') as f:
|
||||
f.write(info_json)
|
||||
logger.info(f"Successfully saved info.json to timestamped file: {info_json_path}")
|
||||
except IOError as e:
|
||||
logger.error(f"Failed to write info.json to {info_json_path}: {e}")
|
||||
return None
|
||||
|
||||
# Write to latest.json (overwrite) - best effort
|
||||
try:
|
||||
with open(latest_json_path, 'w', encoding='utf-8') as f:
|
||||
f.write(info_json)
|
||||
logger.info(f"Updated latest.json file: {latest_json_path}")
|
||||
except IOError as e:
|
||||
logger.warning(f"Failed to update latest.json at {latest_json_path}: {e}")
|
||||
|
||||
return info_json_path
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error in _save_info_json: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# DAG Definition
|
||||
# =============================================================================
|
||||
|
||||
default_args = {
|
||||
'owner': 'airflow',
|
||||
'depends_on_past': False,
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'retries': 1, # Default retries for tasks like queue management
|
||||
'retry_delay': timedelta(minutes=1),
|
||||
'start_date': days_ago(1),
|
||||
# Add concurrency control if needed for sequential processing
|
||||
# 'concurrency': 1, # Ensure only one task instance runs at a time per DAG run
|
||||
# 'max_active_runs': 1, # Ensure only one DAG run is active
|
||||
}
|
||||
|
||||
# Define DAG
|
||||
#
|
||||
# --- DAG Block Deactivated on 2025-07-16 ---
|
||||
# This DAG has been replaced by the Sensor/Worker pattern implemented in:
|
||||
# - ytdlp_sensor_redis_queue.py (polls the queue)
|
||||
# - ytdlp_worker_per_url.py (processes a single URL)
|
||||
# This code is kept for reference but is not active.
|
||||
#
|
||||
@ -21,6 +21,9 @@ from datetime import timedelta
|
||||
import logging
|
||||
import redis
|
||||
|
||||
# Import utility functions
|
||||
from utils.redis_utils import _get_redis_client
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -30,23 +33,6 @@ DEFAULT_REDIS_CONN_ID = 'redis_default'
|
||||
DEFAULT_TIMEOUT = 30
|
||||
DEFAULT_MAX_URLS = '1' # Default number of URLs to process per run
|
||||
|
||||
# --- Helper Functions ---
|
||||
|
||||
def _get_redis_client(redis_conn_id):
|
||||
"""Gets a Redis client connection using RedisHook."""
|
||||
try:
|
||||
hook = RedisHook(redis_conn_id=redis_conn_id)
|
||||
client = hook.get_conn()
|
||||
client.ping()
|
||||
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
|
||||
return client
|
||||
except redis.exceptions.AuthenticationError:
|
||||
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
|
||||
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
|
||||
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
|
||||
|
||||
# --- Task Callables ---
|
||||
|
||||
def log_trigger_info_callable(**context):
|
||||
@ -57,6 +43,8 @@ def log_trigger_info_callable(**context):
|
||||
|
||||
if trigger_type == 'manual':
|
||||
logger.info("Trigger source: Manual execution from Airflow UI or CLI.")
|
||||
elif trigger_type == 'scheduled':
|
||||
logger.info("Trigger source: Scheduled run (periodic check).")
|
||||
elif trigger_type == 'dag_run':
|
||||
# In Airflow 2.2+ we can get the triggering run object
|
||||
try:
|
||||
@ -154,10 +142,10 @@ default_args = {
|
||||
with DAG(
|
||||
dag_id='ytdlp_sensor_redis_queue',
|
||||
default_args=default_args,
|
||||
schedule_interval=None, # This DAG is now only triggered (manually or by a worker)
|
||||
schedule_interval='*/1 * * * *', # Runs every minute and can also be triggered.
|
||||
max_active_runs=1, # Prevent multiple sensors from running at once
|
||||
catchup=False,
|
||||
description='Polls Redis queue for a batch of URLs and triggers parallel worker DAGs.',
|
||||
description='Polls Redis queue every minute (and on trigger) for URLs and starts worker DAGs.',
|
||||
tags=['ytdlp', 'sensor', 'queue', 'redis', 'batch'],
|
||||
params={
|
||||
'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Base name for Redis queues."),
|
||||
|
||||
@ -33,6 +33,10 @@ import os
|
||||
import redis
|
||||
import socket
|
||||
import time
|
||||
import traceback
|
||||
|
||||
# Import utility functions
|
||||
from utils.redis_utils import _get_redis_client
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -106,7 +110,7 @@ def handle_success(**context):
|
||||
try:
|
||||
# In the worker pattern, there's no "progress" hash to remove from.
|
||||
# We just add the result to the success hash.
|
||||
client = YtdlpOpsOperator._get_redis_client(redis_conn_id)
|
||||
client = _get_redis_client(redis_conn_id)
|
||||
client.hset(result_queue, url, json.dumps(result_data))
|
||||
logger.info(f"Stored success result for URL '{url}' in result hash '{result_queue}'.")
|
||||
except Exception as e:
|
||||
@ -115,8 +119,8 @@ def handle_success(**context):
|
||||
|
||||
def handle_failure(**context):
|
||||
"""
|
||||
Handles failed processing. Moves the URL to the fail hash and, if stop_on_failure
|
||||
is True, fails the task to make the DAG run failure visible.
|
||||
Handles failed processing. Records detailed error information to the fail hash
|
||||
and, if stop_on_failure is True, fails the task to make the DAG run failure visible.
|
||||
"""
|
||||
ti = context['task_instance']
|
||||
params = context['params']
|
||||
@ -132,14 +136,31 @@ def handle_failure(**context):
|
||||
requeue_on_failure = params.get('requeue_on_failure', False)
|
||||
stop_on_failure = params.get('stop_on_failure', True)
|
||||
|
||||
# --- Extract Detailed Error Information ---
|
||||
exception = context.get('exception')
|
||||
error_message = str(exception) if exception else "Unknown error"
|
||||
error_type = type(exception).__name__ if exception else "Unknown"
|
||||
tb_str = "".join(traceback.format_exception(etype=type(exception), value=exception, tb=exception.__traceback__)) if exception else "No traceback available."
|
||||
|
||||
# Find the specific task that failed
|
||||
dag_run = context['dag_run']
|
||||
failed_task_id = "unknown"
|
||||
# Look at direct upstream tasks of the current task ('handle_failure')
|
||||
upstream_tasks = ti.get_direct_relatives(upstream=True)
|
||||
for task in upstream_tasks:
|
||||
upstream_ti = dag_run.get_task_instance(task_id=task.task_id)
|
||||
if upstream_ti and upstream_ti.state == 'failed':
|
||||
failed_task_id = task.task_id
|
||||
break
|
||||
|
||||
logger.info(f"Handling failure for URL: {url}")
|
||||
logger.error(f" Failed Task: {failed_task_id}")
|
||||
logger.error(f" Failure Type: {error_type}")
|
||||
logger.error(f" Failure Reason: {error_message}")
|
||||
logger.debug(f" Traceback:\n{tb_str}")
|
||||
|
||||
try:
|
||||
client = YtdlpOpsOperator._get_redis_client(redis_conn_id)
|
||||
client = _get_redis_client(redis_conn_id)
|
||||
if requeue_on_failure:
|
||||
client.rpush(inbox_queue, url)
|
||||
logger.info(f"Re-queued failed URL '{url}' to inbox '{inbox_queue}' for retry.")
|
||||
@ -147,12 +168,15 @@ def handle_failure(**context):
|
||||
fail_data = {
|
||||
'status': 'failed',
|
||||
'end_time': time.time(),
|
||||
'error': error_message,
|
||||
'failed_task': failed_task_id,
|
||||
'error_type': error_type,
|
||||
'error_message': error_message,
|
||||
'traceback': tb_str,
|
||||
'url': url,
|
||||
'dag_run_id': context['dag_run'].run_id,
|
||||
}
|
||||
client.hset(fail_queue, url, json.dumps(fail_data))
|
||||
logger.info(f"Stored failure details for URL '{url}' in fail hash '{fail_queue}'.")
|
||||
client.hset(fail_queue, url, json.dumps(fail_data, indent=2))
|
||||
logger.info(f"Stored detailed failure info for URL '{url}' in fail hash '{fail_queue}'.")
|
||||
except Exception as e:
|
||||
logger.error(f"Critical error during failure handling in Redis for URL '{url}': {e}", exc_info=True)
|
||||
# This is a critical error in the failure handling logic itself.
|
||||
@ -178,22 +202,6 @@ class YtdlpOpsOperator(BaseOperator):
|
||||
"""
|
||||
template_fields = ('service_ip', 'service_port', 'account_id', 'timeout', 'info_json_dir')
|
||||
|
||||
@staticmethod
|
||||
def _get_redis_client(redis_conn_id):
|
||||
"""Gets a Redis client connection using RedisHook."""
|
||||
try:
|
||||
hook = RedisHook(redis_conn_id=redis_conn_id)
|
||||
client = hook.get_conn()
|
||||
client.ping()
|
||||
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
|
||||
return client
|
||||
except redis.exceptions.AuthenticationError:
|
||||
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
|
||||
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
|
||||
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self,
|
||||
service_ip=None,
|
||||
@ -448,8 +456,8 @@ with DAG(
|
||||
trigger_sensor_for_next_batch.doc_md = """
|
||||
### Trigger Sensor for Next Batch
|
||||
Triggers a new run of the `ytdlp_sensor_redis_queue` DAG to create a continuous processing loop.
|
||||
This task runs after the main processing tasks are complete (either success or failure),
|
||||
ensuring that the system immediately checks for more URLs to process.
|
||||
This task **only runs on the success path** after a URL has been fully processed.
|
||||
This ensures that the system immediately checks for more URLs to process, but stops the loop on failure.
|
||||
"""
|
||||
|
||||
# Define success and failure handling tasks
|
||||
@ -470,10 +478,9 @@ with DAG(
|
||||
# The main processing flow
|
||||
get_token >> download_video
|
||||
|
||||
# Branch after download: one path for success, one for failure
|
||||
download_video >> success_task
|
||||
download_video >> failure_task
|
||||
# The success path: if download_video succeeds, run success_task, then trigger the next sensor run.
|
||||
download_video >> success_task >> trigger_sensor_for_next_batch
|
||||
|
||||
# The trigger to continue the loop ONLY runs on the success path.
|
||||
# A failure will be recorded in Redis by `handle_failure` and then the loop will stop.
|
||||
success_task >> trigger_sensor_for_next_batch
|
||||
# The failure path: if get_token OR download_video fails, run the failure_task.
|
||||
# This is a "fan-in" dependency.
|
||||
[get_token, download_video] >> failure_task
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user