Compare commits

..

2 Commits

Author SHA1 Message Date
aperez
fc2d740b65 Updated utils for reuse redis connects 2025-07-18 17:19:07 +03:00
aperez
1f186fd217 Update Readme and ytdlp dags 2025-07-18 17:17:19 +03:00
11 changed files with 227 additions and 925 deletions

100
README.en.old.md Normal file
View File

@ -0,0 +1,100 @@
# YTDLP Airflow DAGs
This document describes the Airflow DAGs used for interacting with the YTDLP Ops service and managing processing queues.
## DAG Descriptions
### `ytdlp_client_dag_v2.1`
* **File:** `airflow/dags/ytdlp_client_dag_v2.1.py`
* **Purpose:** Provides a way to test the YTDLP Ops Thrift service interaction for a *single* video URL. Useful for debugging connection issues, testing specific account IDs, or verifying the service response for a particular URL independently of the queueing system.
* **Parameters (Defaults):**
* `url` (`'https://www.youtube.com/watch?v=sOlTX9uxUtM'`): The video URL to process.
* `redis_enabled` (`False`): Use Redis for service discovery?
* `service_ip` (`'85.192.30.55'`): Service IP if `redis_enabled=False`.
* `service_port` (`9090`): Service port if `redis_enabled=False`.
* `account_id` (`'account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Account ID for lookup or call.
* `timeout` (`30`): Timeout in seconds for Thrift connection.
* `info_json_dir` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}"`): Directory to save `info.json`.
* **Results:**
* Connects to the YTDLP Ops service using the specified method (Redis or direct IP).
* Retrieves token data for the given URL and account ID.
* Saves the video's `info.json` metadata to the specified directory.
* Extracts the SOCKS proxy (if available).
* Pushes `info_json_path`, `socks_proxy`, and the original `ytdlp_command` (with tokens) to XCom.
* Optionally stores detailed results in a Redis hash (`token_info:<video_id>`).
### `ytdlp_mgmt_queue_add_urls`
* **File:** `airflow/dags/ytdlp_mgmt_queue_add_urls.py`
* **Purpose:** Manually add video URLs to a specific YTDLP inbox queue (Redis List).
* **Parameters (Defaults):**
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
* `queue_name` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Target Redis list (inbox queue).
* `urls` (`""`): Multiline string of video URLs to add.
* **Results:**
* Parses the multiline `urls` parameter.
* Adds each valid URL to the end of the Redis list specified by `queue_name`.
* Logs the number of URLs added.
### `ytdlp_mgmt_queue_clear`
* **File:** `airflow/dags/ytdlp_mgmt_queue_clear.py`
* **Purpose:** Manually delete a specific Redis key used by the YTDLP queues.
* **Parameters (Defaults):**
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
* `queue_to_clear` (`'PLEASE_SPECIFY_QUEUE_TO_CLEAR'`): Exact name of the Redis key to clear. **Must be changed by user.**
* **Results:**
* Deletes the Redis key specified by the `queue_to_clear` parameter.
* **Warning:** This operation is destructive and irreversible. Use with extreme caution. Ensure you specify the correct key name (e.g., `video_queue_inbox_account_xyz`, `video_queue_progress`, `video_queue_result`, `video_queue_fail`).
### `ytdlp_mgmt_queue_check_status`
* **File:** `airflow/dags/ytdlp_mgmt_queue_check_status.py`
* **Purpose:** Manually check the type and size of a specific YTDLP Redis queue/key.
* **Parameters (Defaults):**
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
* `queue_to_check` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Exact name of the Redis key to check.
* **Results:**
* Connects to Redis and determines the type of the key specified by `queue_to_check`.
* Determines the size (length for lists, number of fields for hashes).
* Logs the key type and size.
* Pushes `queue_key_type` and `queue_size` to XCom.
### `ytdlp_mgmt_queue_list_contents`
* **File:** `airflow/dags/ytdlp_mgmt_queue_list_contents.py`
* **Purpose:** Manually list the contents of a specific YTDLP Redis queue/key (list or hash). Useful for inspecting queue state or results.
* **Parameters (Defaults):**
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
* `queue_to_list` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Exact name of the Redis key to list.
* `max_items` (`100`): Maximum number of items/fields to list.
* **Results:**
* Connects to Redis and identifies the type of the key specified by `queue_to_list`.
* If it's a List, logs the first `max_items` elements.
* If it's a Hash, logs up to `max_items` key-value pairs, attempting to pretty-print JSON values.
* Logs warnings for very large hashes.
### `ytdlp_proc_sequential_processor`
* **File:** `airflow/dags/ytdlp_proc_sequential_processor.py`
* **Purpose:** Processes YouTube URLs sequentially from a Redis queue. Designed for batch processing. Pops a URL, gets token/metadata via YTDLP Ops service, downloads the media using `yt-dlp`, and records the result.
* **Parameters (Defaults):**
* `queue_name` (`'video_queue'`): Base name for Redis queues (e.g., `video_queue_inbox`, `video_queue_progress`).
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
* `redis_enabled` (`False`): Use Redis for service discovery? If False, uses `service_ip`/`port`.
* `service_ip` (`None`): Required Service IP if `redis_enabled=False`.
* `service_port` (`None`): Required Service port if `redis_enabled=False`.
* `account_id` (`'default_account'`): Account ID for the API call (used for Redis lookup if `redis_enabled=True`).
* `timeout` (`30`): Timeout in seconds for the Thrift connection.
* `download_format` (`'ba[ext=m4a]/bestaudio/best'`): yt-dlp format selection string.
* `output_path_template` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloads') }}/%(title)s [%(id)s].%(ext)s"`): yt-dlp output template. Uses Airflow Variable `DOWNLOADS_TEMP`.
* `info_json_dir` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}"`): Directory to save `info.json`. Uses Airflow Variable `DOWNLOADS_TEMP`.
* **Results:**
* Pops one URL from the `{{ params.queue_name }}_inbox` Redis list.
* If a URL is popped, it's added to the `{{ params.queue_name }}_progress` Redis hash.
* The `YtdlpOpsOperator` (`get_token` task) attempts to get token data (including `info.json`, proxy, command) for the URL using the specified connection method and account ID.
* If token retrieval succeeds, the `download_video` task executes `yt-dlp` using the retrieved `info.json`, proxy, the `download_format` parameter, and the `output_path_template` parameter to download the actual media.
* **On Successful Download:** The URL is removed from the progress hash and added to the `{{ params.queue_name }}_result` hash along with results (`info_json_path`, `socks_proxy`, `ytdlp_command`).
* **On Failure (Token Retrieval or Download):** The URL is removed from the progress hash and added to the `{{ params.queue_name }}_fail` hash along with error details (message, traceback).
* If the inbox queue is empty, the DAG run skips processing via `AirflowSkipException`.

114
README.md
View File

@ -1,100 +1,38 @@
# YTDLP Airflow DAGs # Архитектура и описание YTDLP Airflow DAGs
This document describes the Airflow DAGs used for interacting with the YTDLP Ops service and managing processing queues. Этот документ описывает архитектуру и назначение DAG'ов, используемых для скачивания видео с YouTube. Система построена по паттерну "Сенсор/Воркер" для обеспечения непрерывной и параллельной обработки.
## DAG Descriptions ## Основной цикл обработки
### `ytdlp_client_dag_v2.1` ### `ytdlp_sensor_redis_queue` (Сенсор)
* **File:** `airflow/dags/ytdlp_client_dag_v2.1.py` - **Назначение:** Забирает URL на скачивание из очереди Redis и запускает воркеры для их обработки.
* **Purpose:** Provides a way to test the YTDLP Ops Thrift service interaction for a *single* video URL. Useful for debugging connection issues, testing specific account IDs, or verifying the service response for a particular URL independently of the queueing system. - **Принцип работы (Гибридный запуск):**
* **Parameters (Defaults):** - **По расписанию:** Каждую минуту DAG автоматически проверяет очередь Redis. Это гарантирует, что новые задачи будут подхвачены, даже если цикл обработки был временно остановлен (из-за пустой очереди).
* `url` (`'https://www.youtube.com/watch?v=sOlTX9uxUtM'`): The video URL to process. - **По триггеру:** Когда воркер `ytdlp_worker_per_url` успешно завершает работу, он немедленно запускает сенсор, не дожидаясь следующей минуты. Это обеспечивает непрерывную обработку без задержек.
* `redis_enabled` (`False`): Use Redis for service discovery? - **Логика:** Извлекает из Redis (`_inbox` лист) пачку URL. Если очередь пуста, DAG успешно завершается до следующего запуска (по триггеру или по расписанию).
* `service_ip` (`'85.192.30.55'`): Service IP if `redis_enabled=False`.
* `service_port` (`9090`): Service port if `redis_enabled=False`.
* `account_id` (`'account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Account ID for lookup or call.
* `timeout` (`30`): Timeout in seconds for Thrift connection.
* `info_json_dir` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}"`): Directory to save `info.json`.
* **Results:**
* Connects to the YTDLP Ops service using the specified method (Redis or direct IP).
* Retrieves token data for the given URL and account ID.
* Saves the video's `info.json` metadata to the specified directory.
* Extracts the SOCKS proxy (if available).
* Pushes `info_json_path`, `socks_proxy`, and the original `ytdlp_command` (with tokens) to XCom.
* Optionally stores detailed results in a Redis hash (`token_info:<video_id>`).
### `ytdlp_mgmt_queue_add_urls` ### `ytdlp_worker_per_url` (Воркер)
* **File:** `airflow/dags/ytdlp_mgmt_queue_add_urls.py` - **Назначение:** Обрабатывает один URL, скачивает видео и продолжает цикл.
* **Purpose:** Manually add video URLs to a specific YTDLP inbox queue (Redis List). - **Принцип работы:**
* **Parameters (Defaults):** - Получает один URL от сенсора.
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID. - Обращается к сервису `ytdlp-ops-auth` для получения `info.json` и `socks5` прокси.
* `queue_name` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Target Redis list (inbox queue). - Скачивает видео, используя полученные данные. (TODO: заменить вызов `yt-dlp` как команды на вызов библиотеки).
* `urls` (`""`): Multiline string of video URLs to add. - В зависимости от статуса (успех/неуспех), помещает результат в соответствующий хэш Redis (`_result` или `_fail`).
* **Results:** - В случае успеха, повторно запускает сенсор `ytdlp_sensor_redis_queue` для продолжения цикла обработки. В случае ошибки цикл останавливается для ручной диагностики.
* Parses the multiline `urls` parameter.
* Adds each valid URL to the end of the Redis list specified by `queue_name`.
* Logs the number of URLs added.
### `ytdlp_mgmt_queue_clear` ## Управляющие DAG'и
* **File:** `airflow/dags/ytdlp_mgmt_queue_clear.py` Эти DAG'и предназначены для ручного управления очередями и не участвуют в автоматическом цикле.
* **Purpose:** Manually delete a specific Redis key used by the YTDLP queues.
* **Parameters (Defaults):**
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
* `queue_to_clear` (`'PLEASE_SPECIFY_QUEUE_TO_CLEAR'`): Exact name of the Redis key to clear. **Must be changed by user.**
* **Results:**
* Deletes the Redis key specified by the `queue_to_clear` parameter.
* **Warning:** This operation is destructive and irreversible. Use with extreme caution. Ensure you specify the correct key name (e.g., `video_queue_inbox_account_xyz`, `video_queue_progress`, `video_queue_result`, `video_queue_fail`).
### `ytdlp_mgmt_queue_check_status` - **`ytdlp_mgmt_queue_add_and_verify`**: Добавление URL в очередь задач (`_inbox`) и последующая проверка статуса этой очереди.
- **`ytdlp_mgmt_queues_check_status`**: Просмотр состояния и содержимого всех ключевых очередей (`_inbox`, `_progress`, `_result`, `_fail`). Помогает отслеживать процесс обработки.
- **`ytdlp_mgmt_queue_clear`**: Очистка (полное удаление) указанной очереди Redis. **Использовать с осторожностью**, так как операция необратима.
* **File:** `airflow/dags/ytdlp_mgmt_queue_check_status.py` ## Внешние сервисы
* **Purpose:** Manually check the type and size of a specific YTDLP Redis queue/key.
* **Parameters (Defaults):**
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
* `queue_to_check` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Exact name of the Redis key to check.
* **Results:**
* Connects to Redis and determines the type of the key specified by `queue_to_check`.
* Determines the size (length for lists, number of fields for hashes).
* Logs the key type and size.
* Pushes `queue_key_type` and `queue_size` to XCom.
### `ytdlp_mgmt_queue_list_contents` ### `ytdlp-ops-auth` (Thrift Service)
* **File:** `airflow/dags/ytdlp_mgmt_queue_list_contents.py` - **Назначение:** Внешний сервис, который предоставляет аутентификационные данные (токены, cookies, proxy) для скачивания видео.
* **Purpose:** Manually list the contents of a specific YTDLP Redis queue/key (list or hash). Useful for inspecting queue state or results. - **Взаимодействие:** Worker DAG (`ytdlp_worker_per_url`) обращается к этому сервису перед началом загрузки для получения необходимых данных для `yt-dlp`.
* **Parameters (Defaults):**
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
* `queue_to_list` (`'video_queue_inbox_account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09'`): Exact name of the Redis key to list.
* `max_items` (`100`): Maximum number of items/fields to list.
* **Results:**
* Connects to Redis and identifies the type of the key specified by `queue_to_list`.
* If it's a List, logs the first `max_items` elements.
* If it's a Hash, logs up to `max_items` key-value pairs, attempting to pretty-print JSON values.
* Logs warnings for very large hashes.
### `ytdlp_proc_sequential_processor`
* **File:** `airflow/dags/ytdlp_proc_sequential_processor.py`
* **Purpose:** Processes YouTube URLs sequentially from a Redis queue. Designed for batch processing. Pops a URL, gets token/metadata via YTDLP Ops service, downloads the media using `yt-dlp`, and records the result.
* **Parameters (Defaults):**
* `queue_name` (`'video_queue'`): Base name for Redis queues (e.g., `video_queue_inbox`, `video_queue_progress`).
* `redis_conn_id` (`'redis_default'`): Airflow Redis connection ID.
* `redis_enabled` (`False`): Use Redis for service discovery? If False, uses `service_ip`/`port`.
* `service_ip` (`None`): Required Service IP if `redis_enabled=False`.
* `service_port` (`None`): Required Service port if `redis_enabled=False`.
* `account_id` (`'default_account'`): Account ID for the API call (used for Redis lookup if `redis_enabled=True`).
* `timeout` (`30`): Timeout in seconds for the Thrift connection.
* `download_format` (`'ba[ext=m4a]/bestaudio/best'`): yt-dlp format selection string.
* `output_path_template` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloads') }}/%(title)s [%(id)s].%(ext)s"`): yt-dlp output template. Uses Airflow Variable `DOWNLOADS_TEMP`.
* `info_json_dir` (`"{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}"`): Directory to save `info.json`. Uses Airflow Variable `DOWNLOADS_TEMP`.
* **Results:**
* Pops one URL from the `{{ params.queue_name }}_inbox` Redis list.
* If a URL is popped, it's added to the `{{ params.queue_name }}_progress` Redis hash.
* The `YtdlpOpsOperator` (`get_token` task) attempts to get token data (including `info.json`, proxy, command) for the URL using the specified connection method and account ID.
* If token retrieval succeeds, the `download_video` task executes `yt-dlp` using the retrieved `info.json`, proxy, the `download_format` parameter, and the `output_path_template` parameter to download the actual media.
* **On Successful Download:** The URL is removed from the progress hash and added to the `{{ params.queue_name }}_result` hash along with results (`info_json_path`, `socks_proxy`, `ytdlp_command`).
* **On Failure (Token Retrieval or Download):** The URL is removed from the progress hash and added to the `{{ params.queue_name }}_fail` hash along with error details (message, traceback).
* If the inbox queue is empty, the DAG run skips processing via `AirflowSkipException`.

10
dags/utils/__init__.py Normal file
View File

@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2024 rl <rl@rlmbp>
#
# Distributed under terms of the MIT license.
"""
Airflow DAG Utilities
"""

32
dags/utils/redis_utils.py Normal file
View File

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2024 rl <rl@rlmbp>
#
# Distributed under terms of the MIT license.
"""
Redis utility functions for Airflow DAGs.
"""
from airflow.exceptions import AirflowException
from airflow.providers.redis.hooks.redis import RedisHook
import logging
import redis
logger = logging.getLogger(__name__)
def _get_redis_client(redis_conn_id):
"""Gets a Redis client connection using RedisHook."""
try:
hook = RedisHook(redis_conn_id=redis_conn_id)
client = hook.get_conn()
client.ping()
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
return client
except redis.exceptions.AuthenticationError:
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
except Exception as e:
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")

View File

@ -8,6 +8,9 @@ from datetime import timedelta
import logging import logging
import redis # Import redis exceptions if needed import redis # Import redis exceptions if needed
# Import utility functions
from utils.redis_utils import _get_redis_client
# Configure logging # Configure logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -15,23 +18,6 @@ logger = logging.getLogger(__name__)
DEFAULT_QUEUE_NAME = 'video_queue' # Default base name for the queue DEFAULT_QUEUE_NAME = 'video_queue' # Default base name for the queue
DEFAULT_REDIS_CONN_ID = 'redis_default' DEFAULT_REDIS_CONN_ID = 'redis_default'
# --- Helper Functions ---
def _get_redis_client(redis_conn_id):
"""Gets a Redis client connection using RedisHook."""
try:
hook = RedisHook(redis_conn_id=redis_conn_id)
client = hook.get_conn()
client.ping()
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
return client
except redis.exceptions.AuthenticationError:
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
except Exception as e:
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
# --- Python Callables for Tasks --- # --- Python Callables for Tasks ---
def add_urls_callable(**context): def add_urls_callable(**context):

View File

@ -28,22 +28,8 @@ DEFAULT_REDIS_CONN_ID = 'redis_default'
DEFAULT_QUEUE_BASE_NAME = 'video_queue' DEFAULT_QUEUE_BASE_NAME = 'video_queue'
DEFAULT_MAX_ITEMS_TO_LIST = 25 DEFAULT_MAX_ITEMS_TO_LIST = 25
# --- Helper Function --- # Import utility functions
from utils.redis_utils import _get_redis_client
def _get_redis_client(redis_conn_id):
"""Gets a Redis client connection using RedisHook."""
try:
hook = RedisHook(redis_conn_id=redis_conn_id)
client = hook.get_conn()
client.ping()
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
return client
except redis.exceptions.AuthenticationError:
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
except Exception as e:
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
# --- Python Callable for Check and List Task --- # --- Python Callable for Check and List Task ---

View File

@ -1,5 +1,10 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# vim:fenc=utf-8 # vim:fenc=utf-8
#
# Copyright © 2024 rl <rl@rlmbp>
#
# Distributed under terms of the MIT license.
""" """
Airflow DAG for manually clearing (deleting) a specific Redis key used by YTDLP queues. Airflow DAG for manually clearing (deleting) a specific Redis key used by YTDLP queues.
""" """
@ -22,22 +27,8 @@ DEFAULT_REDIS_CONN_ID = 'redis_default'
# Provide a placeholder default, user MUST specify the queue to clear # Provide a placeholder default, user MUST specify the queue to clear
DEFAULT_QUEUE_TO_CLEAR = 'PLEASE_SPECIFY_QUEUE_TO_CLEAR' DEFAULT_QUEUE_TO_CLEAR = 'PLEASE_SPECIFY_QUEUE_TO_CLEAR'
# --- Helper Function --- # Import utility functions
from utils.redis_utils import _get_redis_client
def _get_redis_client(redis_conn_id):
"""Gets a Redis client connection using RedisHook."""
try:
hook = RedisHook(redis_conn_id=redis_conn_id)
client = hook.get_conn()
client.ping()
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
return client
except redis.exceptions.AuthenticationError:
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
except Exception as e:
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
# --- Python Callable for Clear Task --- # --- Python Callable for Clear Task ---

View File

@ -29,24 +29,8 @@ DEFAULT_REDIS_CONN_ID = 'redis_default'
DEFAULT_QUEUE_TO_LIST = 'video_queue_inbox' DEFAULT_QUEUE_TO_LIST = 'video_queue_inbox'
DEFAULT_MAX_ITEMS = 10 # Limit number of items listed by default DEFAULT_MAX_ITEMS = 10 # Limit number of items listed by default
# --- Helper Function --- # Import utility functions
from utils.redis_utils import _get_redis_client
def _get_redis_client(redis_conn_id):
"""Gets a Redis client connection using RedisHook."""
try:
hook = RedisHook(redis_conn_id=redis_conn_id)
# decode_responses=True removed as it's not supported by get_conn in some environments
# We will decode manually where needed.
client = hook.get_conn()
client.ping()
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
return client
except redis.exceptions.AuthenticationError:
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
except Exception as e:
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
# --- Python Callable for List Contents Task --- # --- Python Callable for List Contents Task ---

View File

@ -1,720 +0,0 @@
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2024 rl <rl@rlmbp>
#
# Distributed under terms of the MIT license.
"""
DAG for processing YouTube URLs sequentially from a Redis queue using YTDLP Ops Thrift service.
"""
from airflow import DAG
from airflow.exceptions import AirflowException, AirflowSkipException, AirflowFailException
from airflow.hooks.base import BaseHook
from airflow.models import BaseOperator, Variable
from airflow.models.param import Param
from airflow.operators.bash import BashOperator # Import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.redis.hooks.redis import RedisHook
from airflow.utils.dates import days_ago
from airflow.utils.decorators import apply_defaults
from datetime import datetime, timedelta
from pangramia.yt.common.ttypes import TokenUpdateMode
from pangramia.yt.exceptions.ttypes import PBServiceException
from pangramia.yt.tokens_ops import YTTokenOpService
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport
from thrift.transport.TTransport import TTransportException
import json
import logging
import os
import redis # Import redis exceptions if needed
import socket
import time
import traceback # For logging stack traces in failure handler
# Configure logging
logger = logging.getLogger(__name__)
# Default settings
DEFAULT_QUEUE_NAME = 'video_queue' # Base name for queues
DEFAULT_REDIS_CONN_ID = 'redis_default'
DEFAULT_TIMEOUT = 30 # Default Thrift timeout in seconds
MAX_RETRIES_REDIS_LOOKUP = 3 # Retries for fetching service details from Redis
RETRY_DELAY_REDIS_LOOKUP = 10 # Delay (seconds) for Redis lookup retries
# --- Helper Functions ---
def _get_redis_client(redis_conn_id):
"""Gets a Redis client connection using RedisHook."""
try:
hook = RedisHook(redis_conn_id=redis_conn_id)
client = hook.get_conn()
client.ping()
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
return client
except redis.exceptions.AuthenticationError:
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
except Exception as e:
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
def _extract_video_id(url):
"""Extracts YouTube video ID from URL."""
if not url or not isinstance(url, str):
logger.debug("URL is empty or not a string, cannot extract video ID.")
return None
try:
video_id = None
if 'youtube.com/watch?v=' in url:
video_id = url.split('v=')[1].split('&')[0]
elif 'youtu.be/' in url:
video_id = url.split('youtu.be/')[1].split('?')[0]
if video_id and len(video_id) >= 11:
video_id = video_id[:11] # Standard ID length
logger.debug(f"Extracted video ID '{video_id}' from URL: {url}")
return video_id
else:
logger.debug(f"Could not extract a standard video ID pattern from URL: {url}")
return None
except Exception as e:
logger.error(f"Failed to extract video ID from URL '{url}'. Error: {e}")
return None
# --- Queue Management Callables ---
def pop_url_from_queue(**context):
"""Pops a URL from the inbox queue and pushes to XCom."""
params = context['params']
queue_name = params['queue_name']
inbox_queue = f"{queue_name}_inbox"
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
logger.info(f"Attempting to pop URL from inbox queue: {inbox_queue}")
try:
client = _get_redis_client(redis_conn_id)
# LPOP is non-blocking, returns None if empty
url_bytes = client.lpop(inbox_queue) # Returns bytes if decode_responses=False on hook/client
if url_bytes:
url = url_bytes.decode('utf-8') if isinstance(url_bytes, bytes) else url_bytes
logger.info(f"Popped URL: {url}")
context['task_instance'].xcom_push(key='current_url', value=url)
return url # Return URL for logging/potential use
else:
logger.info(f"Inbox queue '{inbox_queue}' is empty. Skipping downstream tasks.")
context['task_instance'].xcom_push(key='current_url', value=None)
# Raise AirflowSkipException to signal downstream tasks to skip
raise AirflowSkipException(f"Inbox queue '{inbox_queue}' is empty.")
except AirflowSkipException:
raise # Re-raise skip exception
except Exception as e:
logger.error(f"Error popping URL from Redis queue '{inbox_queue}': {e}", exc_info=True)
raise AirflowException(f"Failed to pop URL from Redis: {e}")
def move_url_to_progress(**context):
"""Moves the current URL from XCom to the progress hash."""
ti = context['task_instance']
url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url')
# This task should be skipped if pop_url_from_queue raised AirflowSkipException
# Adding check for robustness
if not url:
logger.info("No URL found in XCom (or upstream skipped). Skipping move to progress.")
raise AirflowSkipException("No URL to process.")
params = context['params']
queue_name = params['queue_name']
progress_queue = f"{queue_name}_progress"
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
logger.info(f"Moving URL '{url}' to progress hash: {progress_queue}")
progress_data = {
'status': 'processing',
'start_time': time.time(),
'dag_run_id': context['dag_run'].run_id,
'task_instance_key_str': context['task_instance_key_str']
}
try:
client = _get_redis_client(redis_conn_id)
client.hset(progress_queue, url, json.dumps(progress_data))
logger.info(f"Moved URL '{url}' to progress hash '{progress_queue}'.")
except Exception as e:
logger.error(f"Error moving URL to Redis progress hash '{progress_queue}': {e}", exc_info=True)
# If this fails, the URL is popped but not tracked as processing. Fail the task.
raise AirflowException(f"Failed to move URL to progress hash: {e}")
def handle_success(**context):
"""Moves URL from progress to result hash on success."""
ti = context['task_instance']
url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url')
if not url:
logger.warning("handle_success called but no URL found from pop_url_from_queue XCom. This shouldn't happen on success path.")
return # Or raise error
params = context['params']
queue_name = params['queue_name']
progress_queue = f"{queue_name}_progress"
result_queue = f"{queue_name}_result"
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
# Pull results from get_token task
info_json_path = ti.xcom_pull(task_ids='get_token', key='info_json_path')
socks_proxy = ti.xcom_pull(task_ids='get_token', key='socks_proxy')
ytdlp_command = ti.xcom_pull(task_ids='get_token', key='ytdlp_command') # Original command
downloaded_file_path = ti.xcom_pull(task_ids='download_video') # Pull from download_video task
logger.info(f"Handling success for URL: {url}")
logger.info(f" Info JSON Path: {info_json_path}")
logger.info(f" SOCKS Proxy: {socks_proxy}")
logger.info(f" YTDLP Command: {ytdlp_command[:100] if ytdlp_command else 'None'}...") # Log truncated command
logger.info(f" Downloaded File Path: {downloaded_file_path}")
result_data = {
'status': 'success',
'end_time': time.time(),
'info_json_path': info_json_path,
'socks_proxy': socks_proxy,
'ytdlp_command': ytdlp_command,
'downloaded_file_path': downloaded_file_path,
'url': url,
'dag_run_id': context['dag_run'].run_id,
'task_instance_key_str': context['task_instance_key_str'] # Record which task instance succeeded
}
try:
client = _get_redis_client(redis_conn_id)
# Remove from progress hash
removed_count = client.hdel(progress_queue, url)
if removed_count > 0:
logger.info(f"Removed URL '{url}' from progress hash '{progress_queue}'.")
else:
logger.warning(f"URL '{url}' not found in progress hash '{progress_queue}' during success handling.")
# Add to result hash
client.hset(result_queue, url, json.dumps(result_data))
logger.info(f"Stored success result for URL '{url}' in result hash '{result_queue}'.")
except Exception as e:
logger.error(f"Error handling success in Redis for URL '{url}': {e}", exc_info=True)
# Even if Redis fails, the task succeeded. Log error but don't fail the task.
# Consider adding retry logic for Redis operations here or marking state differently.
def handle_failure(**context):
"""
Handles failed processing. Depending on the `requeue_on_failure` parameter,
it either moves the URL to the fail hash or re-queues it in the inbox.
If `stop_on_failure` is True, this task will fail, stopping the DAG loop.
"""
ti = context['task_instance']
url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url')
if not url:
logger.error("handle_failure called but no URL found from pop_url_from_queue XCom.")
return
params = context['params']
queue_name = params['queue_name']
progress_queue = f"{queue_name}_progress"
fail_queue = f"{queue_name}_fail"
inbox_queue = f"{queue_name}_inbox"
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
requeue_on_failure = params.get('requeue_on_failure', False)
stop_on_failure = params.get('stop_on_failure', True) # Default to True
exception = context.get('exception')
error_message = str(exception) if exception else "Unknown error"
tb_str = traceback.format_exc() if exception else "No traceback available."
logger.info(f"Handling failure for URL: {url}")
logger.error(f" Failure Reason: {error_message}")
logger.debug(f" Traceback:\n{tb_str}")
try:
client = _get_redis_client(redis_conn_id)
# Always remove from progress hash first
removed_count = client.hdel(progress_queue, url)
if removed_count > 0:
logger.info(f"Removed URL '{url}' from progress hash '{progress_queue}'.")
else:
logger.warning(f"URL '{url}' not found in progress hash '{progress_queue}' during failure handling.")
if requeue_on_failure:
# Re-queue the URL for another attempt
client.rpush(inbox_queue, url)
logger.info(f"Re-queued failed URL '{url}' to inbox '{inbox_queue}' for retry.")
else:
# Move to the permanent fail hash
fail_data = {
'status': 'failed',
'end_time': time.time(),
'error': error_message,
'traceback': tb_str,
'url': url,
'dag_run_id': context['dag_run'].run_id,
'task_instance_key_str': context['task_instance_key_str']
}
client.hset(fail_queue, url, json.dumps(fail_data))
logger.info(f"Stored failure details for URL '{url}' in fail hash '{fail_queue}'.")
except Exception as e:
logger.error(f"Error during failure handling in Redis for URL '{url}': {e}", exc_info=True)
# This is a critical error in the failure handling logic itself.
raise AirflowException(f"Could not handle failure in Redis: {e}")
# After handling Redis, decide whether to fail the task to stop the loop
if stop_on_failure:
logger.error("stop_on_failure is True. Failing this task to stop the DAG loop.")
# Re-raise the original exception to fail the task instance.
# This is better than AirflowFailException because it preserves the original error.
if exception:
raise exception
else:
# If for some reason there's no exception, fail explicitly.
raise AirflowFailException("Failing task as per stop_on_failure=True, but original exception was not found.")
# --- YtdlpOpsOperator ---
class YtdlpOpsOperator(BaseOperator):
"""
Custom Airflow operator to interact with YTDLP Thrift service. Handles direct connections
and Redis-based discovery, retrieves tokens, saves info.json, and manages errors.
Modified to pull URL from XCom for sequential processing.
"""
# Removed 'url' from template_fields as it's pulled from XCom
template_fields = ('service_ip', 'service_port', 'account_id', 'timeout', 'info_json_dir', 'redis_conn_id')
@apply_defaults
def __init__(self,
# url parameter removed - will be pulled from XCom
redis_conn_id=DEFAULT_REDIS_CONN_ID,
max_retries_lookup=MAX_RETRIES_REDIS_LOOKUP,
retry_delay_lookup=RETRY_DELAY_REDIS_LOOKUP,
service_ip=None,
service_port=None,
redis_enabled=False, # Default to direct connection now
account_id=None,
# save_info_json removed, always True
info_json_dir=None,
# get_socks_proxy removed, always True
# store_socks_proxy removed, always True
# get_socks_proxy=True, # Removed
# store_socks_proxy=True, # Store proxy in XCom by default # Removed
timeout=DEFAULT_TIMEOUT,
*args, **kwargs):
super().__init__(*args, **kwargs)
logger.info(f"Initializing YtdlpOpsOperator (Processor Version) with parameters: "
f"redis_conn_id={redis_conn_id}, max_retries_lookup={max_retries_lookup}, retry_delay_lookup={retry_delay_lookup}, "
f"service_ip={service_ip}, service_port={service_port}, redis_enabled={redis_enabled}, "
f"account_id={account_id}, info_json_dir={info_json_dir}, timeout={timeout}")
# save_info_json, get_socks_proxy, store_socks_proxy removed from log
# Validate parameters based on connection mode
if redis_enabled:
# If using Redis, account_id is essential for lookup
if not account_id:
raise ValueError("account_id is required when redis_enabled=True for service lookup.")
else:
# If direct connection, IP and Port are essential
if not service_ip or not service_port:
raise ValueError("Both service_ip and service_port must be specified when redis_enabled=False.")
# Account ID is still needed for the API call itself, but rely on DAG param or operator config
if not account_id:
logger.warning("No account_id provided for direct connection mode. Ensure it's set in DAG params or operator config.")
# We won't assign 'default' here, let the value passed during instantiation be used.
# self.url is no longer needed here
self.redis_conn_id = redis_conn_id
self.max_retries_lookup = max_retries_lookup
self.retry_delay_lookup = int(retry_delay_lookup.total_seconds() if isinstance(retry_delay_lookup, timedelta) else retry_delay_lookup)
self.service_ip = service_ip
self.service_port = service_port
self.redis_enabled = redis_enabled
self.account_id = account_id
# self.save_info_json removed
self.info_json_dir = info_json_dir # Still needed
# self.get_socks_proxy removed
# self.store_socks_proxy removed
self.timeout = timeout
def execute(self, context):
logger.info("Executing YtdlpOpsOperator (Processor Version)")
transport = None
ti = context['task_instance'] # Get task instance for XCom access
try:
# --- Get URL from XCom ---
url = ti.xcom_pull(task_ids='pop_url_from_queue', key='current_url')
if not url:
# This should ideally be caught by upstream skip, but handle defensively
logger.info("No URL found in XCom from pop_url_from_queue. Skipping execution.")
raise AirflowSkipException("Upstream task did not provide a URL.")
logger.info(f"Processing URL from XCom: {url}")
# --- End Get URL ---
logger.info("Getting task parameters and rendering templates")
params = context['params'] # DAG run params
# Render template fields using context
# Use render_template_as_native for better type handling if needed, else render_template
redis_conn_id = self.render_template(self.redis_conn_id, context)
service_ip = self.render_template(self.service_ip, context)
service_port_rendered = self.render_template(self.service_port, context)
account_id = self.render_template(self.account_id, context)
timeout_rendered = self.render_template(self.timeout, context)
info_json_dir = self.render_template(self.info_json_dir, context) # Rendered here for _save_info_json
# Determine effective settings (DAG params override operator defaults)
redis_enabled = params.get('redis_enabled', self.redis_enabled)
account_id = params.get('account_id', account_id) # Use DAG param if provided
redis_conn_id = params.get('redis_conn_id', redis_conn_id) # Use DAG param if provided
logger.info(f"Effective settings: redis_enabled={redis_enabled}, account_id='{account_id}', redis_conn_id='{redis_conn_id}'")
host = None
port = None
if redis_enabled:
# Get Redis connection using the helper for consistency
redis_client = _get_redis_client(redis_conn_id)
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}' for service discovery.")
# Get service details from Redis with retries
service_key = f"ytdlp:{account_id}"
legacy_key = account_id # For backward compatibility
for attempt in range(self.max_retries_lookup):
try:
logger.info(f"Attempt {attempt + 1}/{self.max_retries_lookup}: Fetching service details from Redis for keys: '{service_key}', '{legacy_key}'")
service_details = redis_client.hgetall(service_key)
if not service_details:
logger.warning(f"Key '{service_key}' not found, trying legacy key '{legacy_key}'")
service_details = redis_client.hgetall(legacy_key)
if not service_details:
raise ValueError(f"No service details found in Redis for keys: {service_key} or {legacy_key}")
# Find IP and port (case-insensitive keys)
ip_key = next((k for k in service_details if k.lower() == 'ip'), None)
port_key = next((k for k in service_details if k.lower() == 'port'), None)
if not ip_key: raise ValueError(f"'ip' key not found in Redis hash for {service_key}/{legacy_key}")
if not port_key: raise ValueError(f"'port' key not found in Redis hash for {service_key}/{legacy_key}")
host = service_details[ip_key] # Assumes decode_responses=True in hook
port_str = service_details[port_key]
try:
port = int(port_str)
except (ValueError, TypeError):
raise ValueError(f"Invalid port value '{port_str}' found in Redis for {service_key}/{legacy_key}")
logger.info(f"Extracted from Redis - Service IP: {host}, Service Port: {port}")
break # Success
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed to get Redis details: {str(e)}")
if attempt == self.max_retries_lookup - 1:
logger.error("Max retries reached for fetching Redis details.")
raise AirflowException(f"Failed to get service details from Redis after {self.max_retries_lookup} attempts: {e}")
logger.info(f"Retrying in {self.retry_delay_lookup} seconds...")
time.sleep(self.retry_delay_lookup)
else:
# Direct connection: Use rendered/param values
host = params.get('service_ip', service_ip) # Use DAG param if provided
port_str = params.get('service_port', service_port_rendered) # Use DAG param if provided
logger.info(f"Using direct connection settings: service_ip={host}, service_port={port_str}")
if not host or not port_str:
raise ValueError("Direct connection requires service_ip and service_port (check Operator config and DAG params)")
try:
port = int(port_str)
except (ValueError, TypeError):
raise ValueError(f"Invalid service_port value: {port_str}")
logger.info(f"Connecting directly to Thrift service at {host}:{port} (Redis bypassed)")
# Validate and use timeout
try:
timeout = int(timeout_rendered)
if timeout <= 0: raise ValueError("Timeout must be positive")
logger.info(f"Using timeout: {timeout} seconds")
except (ValueError, TypeError):
logger.warning(f"Invalid timeout value: '{timeout_rendered}'. Using default: {DEFAULT_TIMEOUT}")
timeout = DEFAULT_TIMEOUT
# Create Thrift connection objects
# socket_conn = TSocket.TSocket(host, port) # Original
socket_conn = TSocket.TSocket(host, port, socket_family=socket.AF_INET) # Explicitly use AF_INET (IPv4)
socket_conn.setTimeout(timeout * 1000) # Thrift timeout is in milliseconds
transport = TTransport.TFramedTransport(socket_conn) # Use TFramedTransport if server expects it
# transport = TTransport.TBufferedTransport(socket_conn) # Use TBufferedTransport if server expects it
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = YTTokenOpService.Client(protocol)
logger.info(f"Attempting to connect to Thrift server at {host}:{port}...")
try:
transport.open()
logger.info("Successfully connected to Thrift server.")
# Test connection with ping
try:
client.ping()
logger.info("Server ping successful.")
except Exception as e:
logger.error(f"Server ping failed: {e}")
raise AirflowException(f"Server connection test (ping) failed: {e}")
# Get token from service using the URL from XCom
try:
logger.info(f"Requesting token for accountId='{account_id}', url='{url}'")
token_data = client.getOrRefreshToken(
accountId=account_id,
updateType=TokenUpdateMode.AUTO,
url=url # Use the url variable from XCom
)
logger.info("Successfully retrieved token data from service.")
except PBServiceException as e:
# Handle specific service exceptions
error_code = getattr(e, 'errorCode', 'N/A')
error_message = getattr(e, 'message', 'N/A')
error_context = getattr(e, 'context', {})
logger.error(f"PBServiceException occurred: Code={error_code}, Message={error_message}")
if error_context:
logger.error(f" Context: {error_context}") # Log context separately
# Construct a concise error message for AirflowException
error_msg = f"YTDLP service error (Code: {error_code}): {error_message}"
# Add specific error code handling if needed...
logger.error(f"Failing task instance due to PBServiceException: {error_msg}") # Add explicit log before raising
raise AirflowException(error_msg) # Fail task on service error
except TTransportException as e:
logger.error(f"Thrift transport error during getOrRefreshToken: {e}")
logger.error(f"Failing task instance due to TTransportException: {e}") # Add explicit log before raising
raise AirflowException(f"Transport error during API call: {e}")
except Exception as e:
logger.error(f"Unexpected error during getOrRefreshToken: {e}")
logger.error(f"Failing task instance due to unexpected error during API call: {e}") # Add explicit log before raising
raise AirflowException(f"Unexpected error during API call: {e}")
except TTransportException as e:
# Handle connection errors
logger.error(f"Thrift transport error during connection: {str(e)}")
logger.error(f"Failing task instance due to TTransportException during connection: {e}") # Add explicit log before raising
raise AirflowException(f"Transport error connecting to YTDLP service: {str(e)}")
# Removed the overly broad except Exception block here, as inner blocks raise AirflowException
# --- Process Token Data ---
logger.debug(f"Token data received. Attributes: {dir(token_data)}")
info_json_path = None # Initialize
# save_info_json is now always True
logger.info("Proceeding to save info.json (save_info_json=True).")
info_json = self._get_info_json(token_data)
if info_json and self._is_valid_json(info_json):
try:
# Pass rendered info_json_dir to helper
info_json_path = self._save_info_json(context, info_json, url, account_id, info_json_dir)
if info_json_path:
ti.xcom_push(key='info_json_path', value=info_json_path)
logger.info(f"Successfully saved info.json and pushed path to XCom: {info_json_path}")
else:
ti.xcom_push(key='info_json_path', value=None)
logger.warning("info.json saving failed (check logs from _save_info_json).")
except Exception as e:
logger.error(f"Unexpected error during info.json saving process: {e}", exc_info=True)
ti.xcom_push(key='info_json_path', value=None)
elif info_json:
logger.warning("Retrieved infoJson is not valid JSON. Skipping save.")
ti.xcom_push(key='info_json_path', value=None)
else:
logger.info("No infoJson found in token data. Skipping save.")
ti.xcom_push(key='info_json_path', value=None)
# Extract and potentially store SOCKS proxy
# get_socks_proxy and store_socks_proxy are now always True
socks_proxy = None
logger.info("Attempting to extract SOCKS proxy (get_socks_proxy=True).")
proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None)
if proxy_attr:
socks_proxy = getattr(token_data, proxy_attr)
if socks_proxy:
logger.info(f"Extracted SOCKS proxy ({proxy_attr}): {socks_proxy}")
# Always store if found (store_socks_proxy=True)
ti.xcom_push(key='socks_proxy', value=socks_proxy)
logger.info("Pushed 'socks_proxy' to XCom.")
else:
logger.info(f"Found proxy attribute '{proxy_attr}' but value is empty.")
# Store None if attribute found but empty
ti.xcom_push(key='socks_proxy', value=None)
logger.info("Pushed None to XCom for 'socks_proxy' as extracted value was empty.")
else:
logger.info("No SOCKS proxy attribute found in token data.")
# Store None if attribute not found
ti.xcom_push(key='socks_proxy', value=None)
logger.info("Pushed None to XCom for 'socks_proxy' as attribute was not found.")
# --- Removed old logic block ---
# # Extract and potentially store SOCKS proxy
# socks_proxy = None
# get_socks_proxy = params.get('get_socks_proxy', self.get_socks_proxy)
# store_socks_proxy = params.get('store_socks_proxy', self.store_socks_proxy)
#
# if get_socks_proxy:
# proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None)
# if proxy_attr:
# socks_proxy = getattr(token_data, proxy_attr)
# if socks_proxy:
# logger.info(f"Extracted SOCKS proxy ({proxy_attr}): {socks_proxy}")
# if store_socks_proxy:
# ti.xcom_push(key='socks_proxy', value=socks_proxy)
# logger.info("Pushed 'socks_proxy' to XCom.")
# else:
# logger.info(f"Found proxy attribute '{proxy_attr}' but value is empty.")
# if store_socks_proxy: ti.xcom_push(key='socks_proxy', value=None)
# else:
# logger.info("get_socks_proxy is True, but no SOCKS proxy attribute found.")
# if store_socks_proxy: ti.xcom_push(key='socks_proxy', value=None)
# else:
# logger.info("get_socks_proxy is False. Skipping proxy extraction.")
# if store_socks_proxy: ti.xcom_push(key='socks_proxy', value=None)
# --- End Removed old logic block ---
# Get the original command from the server, or construct a fallback
ytdlp_cmd = getattr(token_data, 'ytdlpCommand', None)
if ytdlp_cmd:
logger.info(f"Original command received from server: {ytdlp_cmd[:100]}...") # Log truncated
else:
logger.warning("No 'ytdlpCommand' attribute found in token data. Constructing a fallback for logging.")
# Construct a representative command for logging purposes
if socks_proxy:
ytdlp_cmd = f"yt-dlp --dump-json --proxy \"{socks_proxy}\" \"{url}\""
else:
ytdlp_cmd = f"yt-dlp --dump-json \"{url}\""
logger.info(f"Constructed fallback command: {ytdlp_cmd}")
# Push the command to XCom
ti.xcom_push(key='ytdlp_command', value=ytdlp_cmd)
logger.info("Pushed command to XCom key 'ytdlp_command'.")
# No explicit return needed, success is implicit if no exception raised
except (AirflowSkipException, AirflowFailException) as e:
logger.info(f"Task skipped or failed explicitly: {e}")
raise # Re-raise to let Airflow handle state
except AirflowException as e: # Catch AirflowExceptions raised explicitly
logger.error(f"Operation failed due to AirflowException: {e}", exc_info=True)
raise # Re-raise AirflowExceptions to ensure task failure
except (TTransportException, PBServiceException) as e: # Catch specific Thrift/Service errors not already handled inside inner try
logger.error(f"Unhandled YTDLP Service/Transport error in outer block: {e}", exc_info=True)
logger.error(f"Failing task instance due to unhandled outer Service/Transport error: {e}") # Add explicit log before raising
raise AirflowException(f"Unhandled YTDLP service error: {e}") # Wrap in AirflowException to fail task
except Exception as e: # General catch-all for truly unexpected errors
logger.error(f"Caught unexpected error in YtdlpOpsOperator outer block: {e}", exc_info=True)
logger.error(f"Failing task instance due to unexpected outer error: {e}") # Add explicit log before raising
raise AirflowException(f"Unexpected error caused task failure: {e}") # Wrap to fail task
finally:
if transport and transport.isOpen():
logger.info("Closing Thrift transport.")
transport.close()
# --- Helper Methods ---
def _get_info_json(self, token_data):
"""Safely extracts infoJson from token data."""
return getattr(token_data, 'infoJson', None)
def _is_valid_json(self, json_str):
"""Checks if a string is valid JSON."""
if not json_str or not isinstance(json_str, str): return False
try:
json.loads(json_str)
return True
except json.JSONDecodeError:
return False
def _save_info_json(self, context, info_json, url, account_id, rendered_info_json_dir):
"""Saves info_json to a file. Uses pre-rendered directory path."""
try:
video_id = _extract_video_id(url) # Use standalone helper
save_dir = rendered_info_json_dir or "." # Use rendered path
logger.info(f"Target directory for info.json: {save_dir}")
# Ensure directory exists
try:
os.makedirs(save_dir, exist_ok=True)
logger.info(f"Ensured directory exists: {save_dir}")
except OSError as e:
logger.error(f"Could not create directory {save_dir}: {e}. Cannot save info.json.")
return None
# Construct filename
timestamp = int(time.time())
base_filename = f"info_{video_id or 'unknown'}_{account_id}_{timestamp}.json"
info_json_path = os.path.join(save_dir, base_filename)
latest_json_path = os.path.join(save_dir, "latest.json") # Path for the latest symlink/copy
# Write to timestamped file
try:
logger.info(f"Writing info.json content (received from service) to {info_json_path}...")
with open(info_json_path, 'w', encoding='utf-8') as f:
f.write(info_json)
logger.info(f"Successfully saved info.json to timestamped file: {info_json_path}")
except IOError as e:
logger.error(f"Failed to write info.json to {info_json_path}: {e}")
return None
# Write to latest.json (overwrite) - best effort
try:
with open(latest_json_path, 'w', encoding='utf-8') as f:
f.write(info_json)
logger.info(f"Updated latest.json file: {latest_json_path}")
except IOError as e:
logger.warning(f"Failed to update latest.json at {latest_json_path}: {e}")
return info_json_path
except Exception as e:
logger.error(f"Unexpected error in _save_info_json: {e}", exc_info=True)
return None
# =============================================================================
# DAG Definition
# =============================================================================
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1, # Default retries for tasks like queue management
'retry_delay': timedelta(minutes=1),
'start_date': days_ago(1),
# Add concurrency control if needed for sequential processing
# 'concurrency': 1, # Ensure only one task instance runs at a time per DAG run
# 'max_active_runs': 1, # Ensure only one DAG run is active
}
# Define DAG
#
# --- DAG Block Deactivated on 2025-07-16 ---
# This DAG has been replaced by the Sensor/Worker pattern implemented in:
# - ytdlp_sensor_redis_queue.py (polls the queue)
# - ytdlp_worker_per_url.py (processes a single URL)
# This code is kept for reference but is not active.
#

View File

@ -21,6 +21,9 @@ from datetime import timedelta
import logging import logging
import redis import redis
# Import utility functions
from utils.redis_utils import _get_redis_client
# Configure logging # Configure logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -30,23 +33,6 @@ DEFAULT_REDIS_CONN_ID = 'redis_default'
DEFAULT_TIMEOUT = 30 DEFAULT_TIMEOUT = 30
DEFAULT_MAX_URLS = '1' # Default number of URLs to process per run DEFAULT_MAX_URLS = '1' # Default number of URLs to process per run
# --- Helper Functions ---
def _get_redis_client(redis_conn_id):
"""Gets a Redis client connection using RedisHook."""
try:
hook = RedisHook(redis_conn_id=redis_conn_id)
client = hook.get_conn()
client.ping()
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
return client
except redis.exceptions.AuthenticationError:
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
except Exception as e:
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
# --- Task Callables --- # --- Task Callables ---
def log_trigger_info_callable(**context): def log_trigger_info_callable(**context):
@ -57,6 +43,8 @@ def log_trigger_info_callable(**context):
if trigger_type == 'manual': if trigger_type == 'manual':
logger.info("Trigger source: Manual execution from Airflow UI or CLI.") logger.info("Trigger source: Manual execution from Airflow UI or CLI.")
elif trigger_type == 'scheduled':
logger.info("Trigger source: Scheduled run (periodic check).")
elif trigger_type == 'dag_run': elif trigger_type == 'dag_run':
# In Airflow 2.2+ we can get the triggering run object # In Airflow 2.2+ we can get the triggering run object
try: try:
@ -154,10 +142,10 @@ default_args = {
with DAG( with DAG(
dag_id='ytdlp_sensor_redis_queue', dag_id='ytdlp_sensor_redis_queue',
default_args=default_args, default_args=default_args,
schedule_interval=None, # This DAG is now only triggered (manually or by a worker) schedule_interval='*/1 * * * *', # Runs every minute and can also be triggered.
max_active_runs=1, # Prevent multiple sensors from running at once max_active_runs=1, # Prevent multiple sensors from running at once
catchup=False, catchup=False,
description='Polls Redis queue for a batch of URLs and triggers parallel worker DAGs.', description='Polls Redis queue every minute (and on trigger) for URLs and starts worker DAGs.',
tags=['ytdlp', 'sensor', 'queue', 'redis', 'batch'], tags=['ytdlp', 'sensor', 'queue', 'redis', 'batch'],
params={ params={
'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Base name for Redis queues."), 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Base name for Redis queues."),

View File

@ -33,6 +33,10 @@ import os
import redis import redis
import socket import socket
import time import time
import traceback
# Import utility functions
from utils.redis_utils import _get_redis_client
# Configure logging # Configure logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -106,7 +110,7 @@ def handle_success(**context):
try: try:
# In the worker pattern, there's no "progress" hash to remove from. # In the worker pattern, there's no "progress" hash to remove from.
# We just add the result to the success hash. # We just add the result to the success hash.
client = YtdlpOpsOperator._get_redis_client(redis_conn_id) client = _get_redis_client(redis_conn_id)
client.hset(result_queue, url, json.dumps(result_data)) client.hset(result_queue, url, json.dumps(result_data))
logger.info(f"Stored success result for URL '{url}' in result hash '{result_queue}'.") logger.info(f"Stored success result for URL '{url}' in result hash '{result_queue}'.")
except Exception as e: except Exception as e:
@ -115,8 +119,8 @@ def handle_success(**context):
def handle_failure(**context): def handle_failure(**context):
""" """
Handles failed processing. Moves the URL to the fail hash and, if stop_on_failure Handles failed processing. Records detailed error information to the fail hash
is True, fails the task to make the DAG run failure visible. and, if stop_on_failure is True, fails the task to make the DAG run failure visible.
""" """
ti = context['task_instance'] ti = context['task_instance']
params = context['params'] params = context['params']
@ -132,14 +136,31 @@ def handle_failure(**context):
requeue_on_failure = params.get('requeue_on_failure', False) requeue_on_failure = params.get('requeue_on_failure', False)
stop_on_failure = params.get('stop_on_failure', True) stop_on_failure = params.get('stop_on_failure', True)
# --- Extract Detailed Error Information ---
exception = context.get('exception') exception = context.get('exception')
error_message = str(exception) if exception else "Unknown error" error_message = str(exception) if exception else "Unknown error"
error_type = type(exception).__name__ if exception else "Unknown"
tb_str = "".join(traceback.format_exception(etype=type(exception), value=exception, tb=exception.__traceback__)) if exception else "No traceback available."
# Find the specific task that failed
dag_run = context['dag_run']
failed_task_id = "unknown"
# Look at direct upstream tasks of the current task ('handle_failure')
upstream_tasks = ti.get_direct_relatives(upstream=True)
for task in upstream_tasks:
upstream_ti = dag_run.get_task_instance(task_id=task.task_id)
if upstream_ti and upstream_ti.state == 'failed':
failed_task_id = task.task_id
break
logger.info(f"Handling failure for URL: {url}") logger.info(f"Handling failure for URL: {url}")
logger.error(f" Failed Task: {failed_task_id}")
logger.error(f" Failure Type: {error_type}")
logger.error(f" Failure Reason: {error_message}") logger.error(f" Failure Reason: {error_message}")
logger.debug(f" Traceback:\n{tb_str}")
try: try:
client = YtdlpOpsOperator._get_redis_client(redis_conn_id) client = _get_redis_client(redis_conn_id)
if requeue_on_failure: if requeue_on_failure:
client.rpush(inbox_queue, url) client.rpush(inbox_queue, url)
logger.info(f"Re-queued failed URL '{url}' to inbox '{inbox_queue}' for retry.") logger.info(f"Re-queued failed URL '{url}' to inbox '{inbox_queue}' for retry.")
@ -147,12 +168,15 @@ def handle_failure(**context):
fail_data = { fail_data = {
'status': 'failed', 'status': 'failed',
'end_time': time.time(), 'end_time': time.time(),
'error': error_message, 'failed_task': failed_task_id,
'error_type': error_type,
'error_message': error_message,
'traceback': tb_str,
'url': url, 'url': url,
'dag_run_id': context['dag_run'].run_id, 'dag_run_id': context['dag_run'].run_id,
} }
client.hset(fail_queue, url, json.dumps(fail_data)) client.hset(fail_queue, url, json.dumps(fail_data, indent=2))
logger.info(f"Stored failure details for URL '{url}' in fail hash '{fail_queue}'.") logger.info(f"Stored detailed failure info for URL '{url}' in fail hash '{fail_queue}'.")
except Exception as e: except Exception as e:
logger.error(f"Critical error during failure handling in Redis for URL '{url}': {e}", exc_info=True) logger.error(f"Critical error during failure handling in Redis for URL '{url}': {e}", exc_info=True)
# This is a critical error in the failure handling logic itself. # This is a critical error in the failure handling logic itself.
@ -178,22 +202,6 @@ class YtdlpOpsOperator(BaseOperator):
""" """
template_fields = ('service_ip', 'service_port', 'account_id', 'timeout', 'info_json_dir') template_fields = ('service_ip', 'service_port', 'account_id', 'timeout', 'info_json_dir')
@staticmethod
def _get_redis_client(redis_conn_id):
"""Gets a Redis client connection using RedisHook."""
try:
hook = RedisHook(redis_conn_id=redis_conn_id)
client = hook.get_conn()
client.ping()
logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.")
return client
except redis.exceptions.AuthenticationError:
logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.")
raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.")
except Exception as e:
logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}")
raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")
@apply_defaults @apply_defaults
def __init__(self, def __init__(self,
service_ip=None, service_ip=None,
@ -448,8 +456,8 @@ with DAG(
trigger_sensor_for_next_batch.doc_md = """ trigger_sensor_for_next_batch.doc_md = """
### Trigger Sensor for Next Batch ### Trigger Sensor for Next Batch
Triggers a new run of the `ytdlp_sensor_redis_queue` DAG to create a continuous processing loop. Triggers a new run of the `ytdlp_sensor_redis_queue` DAG to create a continuous processing loop.
This task runs after the main processing tasks are complete (either success or failure), This task **only runs on the success path** after a URL has been fully processed.
ensuring that the system immediately checks for more URLs to process. This ensures that the system immediately checks for more URLs to process, but stops the loop on failure.
""" """
# Define success and failure handling tasks # Define success and failure handling tasks
@ -470,10 +478,9 @@ with DAG(
# The main processing flow # The main processing flow
get_token >> download_video get_token >> download_video
# Branch after download: one path for success, one for failure # The success path: if download_video succeeds, run success_task, then trigger the next sensor run.
download_video >> success_task download_video >> success_task >> trigger_sensor_for_next_batch
download_video >> failure_task
# The trigger to continue the loop ONLY runs on the success path. # The failure path: if get_token OR download_video fails, run the failure_task.
# A failure will be recorded in Redis by `handle_failure` and then the loop will stop. # This is a "fan-in" dependency.
success_task >> trigger_sensor_for_next_batch [get_token, download_video] >> failure_task