From 274bef5370bed09dd62e7d8fed4d53801c0615cd Mon Sep 17 00:00:00 2001 From: aperez Date: Wed, 6 Aug 2025 18:02:44 +0300 Subject: [PATCH] Provide updates on ytdlp dags --- dags/README.ru.md | 86 ++- dags/ytdlp_mgmt_proxy.py | 197 ------ dags/ytdlp_mgmt_proxy_account.py | 405 +++++++++++ dags/ytdlp_mgmt_queues.py | 218 ++++-- dags/ytdlp_ops_orchestrator.py | 194 ++++++ dags/ytdlp_ops_sensor_queue.py | 215 ------ dags/ytdlp_ops_worker_per_url.py | 1081 ++++++++++++++++++++---------- docker-compose-ytdlp-ops.yaml | 74 +- requirements.txt | 9 + 9 files changed, 1617 insertions(+), 862 deletions(-) delete mode 100644 dags/ytdlp_mgmt_proxy.py create mode 100644 dags/ytdlp_mgmt_proxy_account.py create mode 100644 dags/ytdlp_ops_orchestrator.py delete mode 100644 dags/ytdlp_ops_sensor_queue.py create mode 100644 requirements.txt diff --git a/dags/README.ru.md b/dags/README.ru.md index ea623d0..9726b79 100644 --- a/dags/README.ru.md +++ b/dags/README.ru.md @@ -1,46 +1,78 @@ # Архитектура и описание YTDLP Airflow DAGs -Этот документ описывает архитектуру и назначение DAG'ов, используемых для скачивания видео с YouTube. Система построена по паттерну "Сенсор/Воркер" для обеспечения непрерывной и параллельной обработки. +Этот документ описывает архитектуру и назначение DAG'ов, используемых для скачивания видео с YouTube. Система построена на модели непрерывного, самоподдерживающегося цикла для параллельной и отказоустойчивой обработки. ## Основной цикл обработки -### `ytdlp_sensor_redis_queue` (Сенсор) +Обработка выполняется двумя основными DAG'ами, которые работают в паре: оркестратор и воркер. -- **Назначение:** Забирает URL на скачивание из очереди Redis и запускает воркеры для их обработки. -- **Принцип работы (Запуск по триггеру):** - - **По триггеру:** Когда воркер `ytdlp_worker_per_url` успешно завершает работу, он немедленно запускает сенсор. Это обеспечивает непрерывную обработку без задержек. Запуск по расписанию отключен, чтобы избежать повторного запуска задач для заблокированных аккаунтов. - - **Логика:** Извлекает из Redis (`_inbox` лист) пачку URL. Если очередь пуста, DAG успешно завершается до следующего запуска по триггеру. +### `ytdlp_ops_orchestrator` (Система "зажигания") -### `ytdlp_worker_per_url` (Воркер) - -- **Назначение:** Обрабатывает один URL, скачивает видео и продолжает цикл. +- **Назначение:** Этот DAG действует как "система зажигания" для запуска обработки. Он запускается вручную для старта указанного количества параллельных циклов-воркеров. - **Принцип работы:** - - Получает один URL от сенсора. - - Обращается к сервису `ytdlp-ops-auth` для получения `info.json` и `socks5` прокси. - - Скачивает видео, используя полученные данные. (TODO: заменить вызов `yt-dlp` как команды на вызов библиотеки). - - В зависимости от статуса (успех/неуспех), помещает результат в соответствующий хэш Redis (`_result` или `_fail`). - - В случае успеха, повторно запускает сенсор `ytdlp_sensor_redis_queue` для продолжения цикла обработки. В случае ошибки цикл останавливается для ручной диагностики. + - Он **не** обрабатывает URL-адреса самостоятельно. + - Его единственная задача — запустить сконфигурированное количество DAG'ов `ytdlp_ops_worker_per_url`. + - Он передает всю необходимую конфигурацию (пул аккаунтов, подключение к Redis и т.д.) воркерам. + +### `ytdlp_ops_worker_per_url` (Самоподдерживающийся воркер) + +- **Назначение:** Этот DAG обрабатывает один URL и спроектирован для работы в непрерывном цикле. +- **Принцип работы:** + 1. **Запуск:** Начальный запуск инициируется `ytdlp_ops_orchestrator`. + 2. **Получение задачи:** Воркер извлекает один URL из очереди `_inbox` в Redis. Если очередь пуста, выполнение воркера завершается, и его "линия" обработки останавливается. + 3. **Обработка:** Он взаимодействует с сервисом `ytdlp-ops-server` для получения `info.json` и прокси, после чего скачивает видео. + 4. **Продолжение или остановка:** + - **В случае успеха:** Он запускает новый экземпляр самого себя, создавая непрерывный цикл для обработки следующего URL. + - **В случае сбоя:** Цикл прерывается (если `stop_on_failure` установлено в `True`), останавливая эту "линию" обработки. Это предотвращает остановку всей системы из-за одного проблемного URL или аккаунта. ## Управляющие DAG'и -Эти DAG'и предназначены для ручного управления очередями и не участвуют в автоматическом цикле. +### `ytdlp_mgmt_proxy_account` -- **`ytdlp_mgmt_queue_add_and_verify`**: Добавление URL в очередь задач (`_inbox`) и последующая проверка статуса этой очереди. -- **`ytdlp_mgmt_queues_check_status`**: Просмотр состояния и содержимого всех ключевых очередей (`_inbox`, `_progress`, `_result`, `_fail`). Помогает отслеживать процесс обработки. -- **`ytdlp_mgmt_queue_clear`**: Очистка (полное удаление) указанной очереди Redis. **Использовать с осторожностью**, так как операция необратима. +- **Назначение:** Это основной инструмент для мониторинга и управления состоянием ресурсов, используемых `ytdlp-ops-server`. +- **Функциональность:** + - **Просмотр статусов:** Позволяет увидеть текущий статус всех прокси и аккаунтов (например, `ACTIVE`, `BANNED`, `RESTING`). + - **Управление прокси:** Позволяет вручную банить, разбанивать или сбрасывать статус прокси. + - **Управление аккаунтами:** Позволяет вручную банить или разбанивать аккаунты. + +## Стратегия управления ресурсами (Прокси и Аккаунты) + +Система использует интеллектуальную стратегию для управления жизненным циклом и состоянием аккаунтов и прокси, чтобы максимизировать процент успеха и минимизировать блокировки. + +- **Жизненный цикл аккаунта ("Cooldown"):** + - Чтобы предотвратить "выгорание", аккаунты автоматически переходят в состояние "отдыха" (`RESTING`) после периода интенсивного использования. + - По истечении периода отдыха они автоматически возвращаются в `ACTIVE` и снова становятся доступными для воркеров. + +- **Умная стратегия банов:** + - **Сначала бан аккаунта:** При возникновении серьезной ошибки (например, `BOT_DETECTED`) система наказывает **только аккаунт**, который вызвал сбой. Прокси при этом продолжает работать. + - **Бан прокси по "скользящему окну":** Прокси банится автоматически, только если он демонстрирует **систематические сбои с РАЗНЫМИ аккаунтами** за короткий промежуток времени. Это является надежным индикатором того, что проблема именно в прокси. + +- **Мониторинг:** + - DAG `ytdlp_mgmt_proxy_account` является основным инструментом для мониторинга. Он показывает текущий статус всех ресурсов, включая время, оставшееся до активации забаненных или отдыхающих аккаунтов. + - Граф выполнения DAG `ytdlp_ops_worker_per_url` теперь явно показывает шаги, такие как `assign_account`, `get_token`, `ban_account`, `retry_get_token`, что делает процесс отладки более наглядным. ## Внешние сервисы -### `ytdlp-ops-auth` (Thrift Service) +### `ytdlp-ops-server` (Thrift Service) - **Назначение:** Внешний сервис, который предоставляет аутентификационные данные (токены, cookies, proxy) для скачивания видео. -- **Взаимодействие:** Worker DAG (`ytdlp_worker_per_url`) обращается к этому сервису перед началом загрузки для получения необходимых данных для `yt-dlp`. +- **Взаимодействие:** Worker DAG (`ytdlp_ops_worker_per_url`) обращается к этому сервису перед началом загрузки для получения необходимых данных для `yt-dlp`. -## TODO (Планы на доработку) +## Логика работы Worker DAG (`ytdlp_ops_worker_per_url`) -- **Реализовать механизм "Circuit Breaker" (автоматического выключателя):** - - **Проблема:** Если воркер падает с ошибкой (например, из-за бана аккаунта), сенсор, запускаемый по расписанию, продолжает создавать новые задачи для этого же аккаунта, усугубляя проблему. - - **Решение:** - 1. **Воркер (`ytdlp_worker_per_url`):** При сбое задачи, воркер должен устанавливать в Redis флаг временной блокировки для своего `account_id` (например, на 5-10 минут). - 2. **Сенсор (`ytdlp_sensor_redis_queue`):** Перед проверкой очереди, сенсор должен проверять наличие флага блокировки для своего `account_id`. Если аккаунт заблокирован, сенсор должен пропустить выполнение, предотвращая запуск новых воркеров для проблемного аккаунта. - - **Результат:** Это предотвратит многократные повторные запросы к заблокированному аккаунту и даст системе время на восстановление. +Этот DAG является "рабочей лошадкой" системы. Он спроектирован как самоподдерживающийся цикл для обработки одного URL за запуск. + +### Задачи и их назначение: + +- **`pull_url_from_redis`**: Извлекает один URL из очереди `_inbox` в Redis. Если очередь пуста, DAG завершается со статусом `skipped`, останавливая эту "линию" обработки. +- **`assign_account`**: Выбирает аккаунт для выполнения задачи. Он будет повторно использовать тот же аккаунт, который был успешно использован в предыдущем запуске в своей "линии" (привязка аккаунта). Если это первый запуск, он выбирает случайный аккаунт. +- **`get_token`**: Основная задача. Она обращается к `ytdlp-ops-server` для получения `info.json`. +- **`handle_bannable_error_branch`**: Если `get_token` завершается с ошибкой, требующей бана, эта задача-развилка решает, что делать дальше, в зависимости от политики `on_bannable_failure`. +- **`ban_account_and_prepare_for_retry`**: Если политика разрешает повтор, эта задача банит сбойный аккаунт и выбирает новый для повторной попытки. +- **`retry_get_token`**: Выполняет вторую попытку получить токен с новым аккаунтом. +- **`ban_second_account_and_proxy`**: Если и вторая попытка неудачна, эта задача банит второй аккаунт и использованный прокси. +- **`download_and_probe`**: Если `get_token` (или `retry_get_token`) завершилась успешно, эта задача использует `yt-dlp` для скачивания медиа и `ffmpeg` для проверки целостности скачанного файла. +- **`mark_url_as_success`**: Если `download_and_probe` завершилась успешно, эта задача записывает результат в хэш `_result` в Redis. +- **`handle_generic_failure`**: Если любая из основных задач завершается с неисправимой ошибкой, эта задача записывает подробную информацию об ошибке в хэш `_fail` в Redis. +- **`decide_what_to_do_next`**: Задача-развилка, которая запускается после успеха или неудачи. Она решает, продолжать ли цикл. +- **`trigger_self_run`**: Задача, которая фактически запускает следующий экземпляр DAG, создавая непрерывный цикл. diff --git a/dags/ytdlp_mgmt_proxy.py b/dags/ytdlp_mgmt_proxy.py deleted file mode 100644 index 811b11e..0000000 --- a/dags/ytdlp_mgmt_proxy.py +++ /dev/null @@ -1,197 +0,0 @@ -""" -DAG to manage the state of proxies used by the ytdlp-ops-server. -""" -from __future__ import annotations - -import logging -from datetime import datetime - -from airflow.models.dag import DAG -from airflow.models.param import Param -from airflow.operators.python import PythonOperator -from airflow.utils.dates import days_ago - -# Configure logging -logger = logging.getLogger(__name__) - -# Import and apply Thrift exceptions patch for Airflow compatibility -try: - from thrift_exceptions_patch import patch_thrift_exceptions - patch_thrift_exceptions() - logger.info("Applied Thrift exceptions patch for Airflow compatibility.") -except ImportError: - logger.warning("Could not import thrift_exceptions_patch. Compatibility may be affected.") -except Exception as e: - logger.error(f"Error applying Thrift exceptions patch: {e}") - -# Thrift imports -try: - from thrift.transport import TSocket, TTransport - from thrift.protocol import TBinaryProtocol - from pangramia.yt.tokens_ops import YTTokenOpService - from pangramia.yt.exceptions.ttypes import PBServiceException, PBUserException -except ImportError as e: - logger.critical(f"Could not import Thrift modules: {e}. Ensure ytdlp-ops-auth package is installed.") - # Fail DAG parsing if thrift modules are not available - raise - -def format_timestamp(ts_str: str) -> str: - """Formats a string timestamp into a human-readable date string.""" - if not ts_str: - return "" - try: - ts_float = float(ts_str) - if ts_float <= 0: - return "" - # Use datetime from the imported 'from datetime import datetime' - dt_obj = datetime.fromtimestamp(ts_float) - return dt_obj.strftime('%Y-%m-%d %H:%M:%S') - except (ValueError, TypeError): - return ts_str # Return original string if conversion fails - -def get_thrift_client(host: str, port: int): - """Helper function to create and connect a Thrift client.""" - transport = TSocket.TSocket(host, port) - transport = TTransport.TFramedTransport(transport) - protocol = TBinaryProtocol.TBinaryProtocol(transport) - client = YTTokenOpService.Client(protocol) - transport.open() - logger.info(f"Connected to Thrift server at {host}:{port}") - return client, transport - -def manage_proxies_callable(**context): - """Main callable to interact with the proxy management endpoints.""" - params = context["params"] - action = params["action"] - host = params["host"] - port = params["port"] - server_identity = params.get("server_identity") - proxy_url = params.get("proxy_url") - - if not server_identity and action in ["ban", "unban", "reset_all"]: - raise ValueError(f"A 'server_identity' is required for the '{action}' action.") - - client, transport = None, None - try: - client, transport = get_thrift_client(host, port) - - if action == "list": - logger.info(f"Listing proxy statuses for server: {server_identity or 'ALL'}") - statuses = client.getProxyStatus(server_identity) - if not statuses: - logger.info("No proxy statuses found.") - print("No proxy statuses found.") - else: - from tabulate import tabulate - status_list = [ - { - "Server": s.serverIdentity, - "Proxy URL": s.proxyUrl, - "Status": s.status, - "Success": s.successCount, - "Failures": s.failureCount, - "Last Success": format_timestamp(s.lastSuccessTimestamp), - "Last Failure": format_timestamp(s.lastFailureTimestamp), - } - for s in statuses - ] - print("\n--- Proxy Statuses ---") - print(tabulate(status_list, headers="keys", tablefmt="grid")) - print("----------------------\n") - - elif action == "ban": - if not proxy_url: - raise ValueError("A 'proxy_url' is required to ban a proxy.") - logger.info(f"Banning proxy '{proxy_url}' for server '{server_identity}'...") - success = client.banProxy(proxy_url, server_identity) - if success: - logger.info("Successfully banned proxy.") - print(f"Successfully banned proxy '{proxy_url}' for server '{server_identity}'.") - else: - logger.error("Failed to ban proxy.") - raise Exception("Server returned failure for banProxy operation.") - - elif action == "unban": - if not proxy_url: - raise ValueError("A 'proxy_url' is required to unban a proxy.") - logger.info(f"Unbanning proxy '{proxy_url}' for server '{server_identity}'...") - success = client.unbanProxy(proxy_url, server_identity) - if success: - logger.info("Successfully unbanned proxy.") - print(f"Successfully unbanned proxy '{proxy_url}' for server '{server_identity}'.") - else: - logger.error("Failed to unban proxy.") - raise Exception("Server returned failure for unbanProxy operation.") - - elif action == "reset_all": - logger.info(f"Resetting all proxy statuses for server '{server_identity}'...") - success = client.resetAllProxyStatuses(server_identity) - if success: - logger.info("Successfully reset all proxy statuses.") - print(f"Successfully reset all proxy statuses for server '{server_identity}'.") - else: - logger.error("Failed to reset all proxy statuses.") - raise Exception("Server returned failure for resetAllProxyStatuses operation.") - - else: - raise ValueError(f"Invalid action: {action}") - - except (PBServiceException, PBUserException) as e: - logger.error(f"Thrift error performing action '{action}': {e.message}", exc_info=True) - raise - except Exception as e: - logger.error(f"Error performing action '{action}': {e}", exc_info=True) - raise - finally: - if transport and transport.isOpen(): - transport.close() - logger.info("Thrift connection closed.") - -with DAG( - dag_id="ytdlp_mgmt_proxy", - start_date=days_ago(1), - schedule=None, - catchup=False, - tags=["ytdlp", "utility", "proxy"], - doc_md=""" - ### YT-DLP Proxy Manager DAG - - This DAG provides tools to manage the state of proxies used by the `ytdlp-ops-server`. - You can view statuses, and manually ban, unban, or reset proxies for a specific server instance. - - **Parameters:** - - `host`: The hostname or IP of the `ytdlp-ops-server` Thrift service. - - `port`: The port of the Thrift service. - - `action`: The operation to perform. - - `list`: List proxy statuses. Provide a `server_identity` to query a specific server, or leave it blank to query the server instance you are connected to. - - `ban`: Ban a specific proxy. Requires `server_identity` and `proxy_url`. - - `unban`: Un-ban a specific proxy. Requires `server_identity` and `proxy_url`. - - `reset_all`: Reset all proxies for a server to `ACTIVE`. Requires `server_identity`. - - `server_identity`: The unique identifier for the server instance (e.g., `ytdlp-ops-airflow-service`). - - `proxy_url`: The full URL of the proxy to act upon (e.g., `socks5://host:port`). - """, - params={ - "host": Param("89.253.221.173", type="string", description="The hostname of the ytdlp-ops-server service."), - "port": Param(9090, type="integer", description="The port of the ytdlp-ops-server service."), - "action": Param( - "list", - type="string", - enum=["list", "ban", "unban", "reset_all"], - description="The management action to perform.", - ), - "server_identity": Param( - "ytdlp-ops-airflow-service", - type=["null", "string"], - description="The identity of the server to manage. Leave blank to query the connected server instance.", - ), - "proxy_url": Param( - None, - type=["null", "string"], - description="The proxy URL to ban/unban (e.g., 'socks5://host:port').", - ), - }, -) as dag: - proxy_management_task = PythonOperator( - task_id="proxy_management_task", - python_callable=manage_proxies_callable, - ) diff --git a/dags/ytdlp_mgmt_proxy_account.py b/dags/ytdlp_mgmt_proxy_account.py new file mode 100644 index 0000000..fff3623 --- /dev/null +++ b/dags/ytdlp_mgmt_proxy_account.py @@ -0,0 +1,405 @@ +""" +DAG to manage the state of proxies and accounts used by the ytdlp-ops-server. +""" +from __future__ import annotations + +import logging +from datetime import datetime +import socket + +from airflow.exceptions import AirflowException +from airflow.models.dag import DAG +from airflow.models.param import Param +from airflow.operators.python import PythonOperator +from airflow.utils.dates import days_ago +from airflow.models.variable import Variable +from airflow.providers.redis.hooks.redis import RedisHook + +# Configure logging +logger = logging.getLogger(__name__) + +# Import and apply Thrift exceptions patch for Airflow compatibility +try: + from thrift_exceptions_patch import patch_thrift_exceptions + patch_thrift_exceptions() + logger.info("Applied Thrift exceptions patch for Airflow compatibility.") +except ImportError: + logger.warning("Could not import thrift_exceptions_patch. Compatibility may be affected.") +except Exception as e: + logger.error(f"Error applying Thrift exceptions patch: {e}") + +# Thrift imports +try: + from thrift.transport import TSocket, TTransport + from thrift.protocol import TBinaryProtocol + from pangramia.yt.tokens_ops import YTTokenOpService + from pangramia.yt.exceptions.ttypes import PBServiceException, PBUserException +except ImportError as e: + logger.critical(f"Could not import Thrift modules: {e}. Ensure ytdlp-ops-auth package is installed.") + # Fail DAG parsing if thrift modules are not available + raise + +DEFAULT_YT_AUTH_SERVICE_IP = Variable.get("YT_AUTH_SERVICE_IP", default_var="16.162.82.212") +DEFAULT_YT_AUTH_SERVICE_PORT = Variable.get("YT_AUTH_SERVICE_PORT", default_var=9080) +DEFAULT_REDIS_CONN_ID = "redis_default" + + +# Helper function to connect to Redis, similar to other DAGs +def _get_redis_client(redis_conn_id: str): + """Gets a Redis client from an Airflow connection.""" + try: + # Use the imported RedisHook + redis_hook = RedisHook(redis_conn_id=redis_conn_id) + # get_conn returns a redis.Redis client + return redis_hook.get_conn() + except Exception as e: + logger.error(f"Failed to connect to Redis using connection '{redis_conn_id}': {e}") + # Use the imported AirflowException + raise AirflowException(f"Redis connection failed: {e}") + + +def format_timestamp(ts_str: str) -> str: + """Formats a string timestamp into a human-readable date string.""" + if not ts_str: + return "" + try: + ts_float = float(ts_str) + if ts_float <= 0: + return "" + # Use datetime from the imported 'from datetime import datetime' + dt_obj = datetime.fromtimestamp(ts_float) + return dt_obj.strftime('%Y-%m-%d %H:%M:%S') + except (ValueError, TypeError): + return ts_str # Return original string if conversion fails + +def get_thrift_client(host: str, port: int): + """Helper function to create and connect a Thrift client.""" + transport = TSocket.TSocket(host, port) + transport.setTimeout(30 * 1000) # 30s timeout + transport = TTransport.TFramedTransport(transport) + protocol = TBinaryProtocol.TBinaryProtocol(transport) + client = YTTokenOpService.Client(protocol) + transport.open() + logger.info(f"Connected to Thrift server at {host}:{port}") + return client, transport + +def _list_proxy_statuses(client, server_identity): + """Lists the status of proxies.""" + logger.info(f"Listing proxy statuses for server: {server_identity or 'ALL'}") + statuses = client.getProxyStatus(server_identity) + if not statuses: + logger.info("No proxy statuses found.") + print("No proxy statuses found.") + return + + from tabulate import tabulate + status_list = [] + # This is forward-compatible: it checks for new attributes before using them. + has_extended_info = hasattr(statuses[0], 'recentAccounts') or hasattr(statuses[0], 'recentMachines') + + headers = ["Server", "Proxy URL", "Status", "Success", "Failures", "Last Success", "Last Failure"] + if has_extended_info: + headers.extend(["Recent Accounts", "Recent Machines"]) + + for s in statuses: + status_item = { + "Server": s.serverIdentity, + "Proxy URL": s.proxyUrl, + "Status": s.status, + "Success": s.successCount, + "Failures": s.failureCount, + "Last Success": format_timestamp(s.lastSuccessTimestamp), + "Last Failure": format_timestamp(s.lastFailureTimestamp), + } + if has_extended_info: + recent_accounts = getattr(s, 'recentAccounts', []) + recent_machines = getattr(s, 'recentMachines', []) + status_item["Recent Accounts"] = "\n".join(recent_accounts) if recent_accounts else "N/A" + status_item["Recent Machines"] = "\n".join(recent_machines) if recent_machines else "N/A" + status_list.append(status_item) + + print("\n--- Proxy Statuses ---") + # The f-string with a newline ensures the table starts on a new line in the logs. + print(f"\n{tabulate(status_list, headers='keys', tablefmt='grid')}") + print("----------------------\n") + if not has_extended_info: + logger.warning("Server does not seem to support 'recentAccounts' or 'recentMachines' fields yet.") + print("NOTE: To see Recent Accounts/Machines, the server's `getProxyStatus` method must be updated to return these fields.") + + +def _list_account_statuses(client, account_id): + """Lists the status of accounts.""" + logger.info(f"Listing account statuses for account: {account_id or 'ALL'}") + try: + # The thrift method takes accountId (specific) or accountPrefix. + # If account_id is provided, we use it. If not, we get all by leaving both params as None. + statuses = client.getAccountStatus(accountId=account_id, accountPrefix=None) + if not statuses: + logger.info("No account statuses found.") + print("\n--- Account Statuses ---\nNo account statuses found.\n------------------------\n") + return + + from tabulate import tabulate + status_list = [] + + for s in statuses: + # Determine the last activity timestamp for sorting + last_success = float(s.lastSuccessTimestamp) if s.lastSuccessTimestamp else 0 + last_failure = float(s.lastFailureTimestamp) if s.lastFailureTimestamp else 0 + last_activity = max(last_success, last_failure) + + status_item = { + "Account ID": s.accountId, + "Status": s.status, + "Success": s.successCount, + "Failures": s.failureCount, + "Last Success": format_timestamp(s.lastSuccessTimestamp), + "Last Failure": format_timestamp(s.lastFailureTimestamp), + "Last Proxy": s.lastUsedProxy or "N/A", + "Last Machine": s.lastUsedMachine or "N/A", + "_last_activity": last_activity, # Add a temporary key for sorting + } + status_list.append(status_item) + + # Sort the list by the last activity timestamp in descending order + status_list.sort(key=lambda item: item.get('_last_activity', 0), reverse=True) + + # Remove the temporary sort key before printing + for item in status_list: + del item['_last_activity'] + + print("\n--- Account Statuses ---") + # The f-string with a newline ensures the table starts on a new line in the logs. + print(f"\n{tabulate(status_list, headers='keys', tablefmt='grid')}") + print("------------------------\n") + except (PBServiceException, PBUserException) as e: + logger.error(f"Failed to get account statuses: {e.message}", exc_info=True) + print(f"\nERROR: Could not retrieve account statuses. Server returned: {e.message}\n") + except Exception as e: + logger.error(f"An unexpected error occurred while getting account statuses: {e}", exc_info=True) + print(f"\nERROR: An unexpected error occurred: {e}\n") + + +def manage_system_callable(**context): + """Main callable to interact with the system management endpoints.""" + params = context["params"] + entity = params["entity"] + action = params["action"] + host = params["host"] + port = params["port"] + server_identity = params.get("server_identity") + proxy_url = params.get("proxy_url") + account_id = params.get("account_id") + + if action in ["ban", "unban", "reset_all"] and entity == "proxy" and not server_identity: + raise ValueError(f"A 'server_identity' is required for proxy action '{action}'.") + if action in ["ban", "unban"] and entity == "account" and not account_id: + raise ValueError(f"An 'account_id' is required for account action '{action}'.") + + # Handle direct Redis action separately to avoid creating an unnecessary Thrift connection. + if entity == "account" and action == "remove_all": + confirm = params.get("confirm_remove_all_accounts", False) + if not confirm: + message = "FATAL: 'remove_all' action requires 'confirm_remove_all_accounts' to be set to True. No accounts were removed." + logger.error(message) + print(f"\nERROR: {message}\n") + raise ValueError(message) + + redis_conn_id = params["redis_conn_id"] + account_prefix = params.get("account_id") # Repurpose account_id param as an optional prefix + + redis_client = _get_redis_client(redis_conn_id) + + pattern = f"account_status:{account_prefix}*" if account_prefix else "account_status:*" + logger.warning(f"Searching for account status keys in Redis with pattern: '{pattern}'") + + # scan_iter returns bytes, so we don't need to decode for deletion + keys_to_delete = [key for key in redis_client.scan_iter(pattern)] + + if not keys_to_delete: + logger.info(f"No account keys found matching pattern '{pattern}'. Nothing to do.") + print(f"\nNo accounts found matching pattern '{pattern}'.\n") + return + + logger.warning(f"Found {len(keys_to_delete)} account keys to delete. This is a destructive operation!") + print(f"\nWARNING: Found {len(keys_to_delete)} accounts to remove from Redis.") + # Decode for printing + for key in keys_to_delete[:10]: + print(f" - {key.decode('utf-8')}") + if len(keys_to_delete) > 10: + print(f" ... and {len(keys_to_delete) - 10} more.") + + deleted_count = redis_client.delete(*keys_to_delete) + logger.info(f"Successfully deleted {deleted_count} account keys from Redis.") + print(f"\nSuccessfully removed {deleted_count} accounts from Redis.\n") + return # End execution for this action + + client, transport = None, None + try: + client, transport = get_thrift_client(host, port) + + if entity == "proxy": + if action == "list": + _list_proxy_statuses(client, server_identity) + elif action == "ban": + if not proxy_url: raise ValueError("A 'proxy_url' is required.") + logger.info(f"Banning proxy '{proxy_url}' for server '{server_identity}'...") + client.banProxy(proxy_url, server_identity) + print(f"Successfully sent request to ban proxy '{proxy_url}'.") + elif action == "unban": + if not proxy_url: raise ValueError("A 'proxy_url' is required.") + logger.info(f"Unbanning proxy '{proxy_url}' for server '{server_identity}'...") + client.unbanProxy(proxy_url, server_identity) + print(f"Successfully sent request to unban proxy '{proxy_url}'.") + elif action == "reset_all": + logger.info(f"Resetting all proxy statuses for server '{server_identity}'...") + client.resetAllProxyStatuses(server_identity) + print(f"Successfully sent request to reset all proxy statuses for '{server_identity}'.") + else: + raise ValueError(f"Invalid action '{action}' for entity 'proxy'.") + + elif entity == "account": + if action == "list": + _list_account_statuses(client, account_id) + elif action == "ban": + if not account_id: raise ValueError("An 'account_id' is required.") + reason = f"Manual ban from Airflow mgmt DAG by {socket.gethostname()}" + logger.info(f"Banning account '{account_id}'...") + client.banAccount(accountId=account_id, reason=reason) + print(f"Successfully sent request to ban account '{account_id}'.") + elif action == "unban": + if not account_id: raise ValueError("An 'account_id' is required.") + reason = f"Manual un-ban from Airflow mgmt DAG by {socket.gethostname()}" + logger.info(f"Unbanning account '{account_id}'...") + client.unbanAccount(accountId=account_id, reason=reason) + print(f"Successfully sent request to unban account '{account_id}'.") + elif action == "reset_all": + account_prefix = account_id # Repurpose account_id param as an optional prefix + logger.info(f"Resetting all account statuses to ACTIVE (prefix: '{account_prefix or 'ALL'}')...") + + all_statuses = client.getAccountStatus(accountId=None, accountPrefix=account_prefix) + if not all_statuses: + print(f"No accounts found with prefix '{account_prefix or 'ALL'}' to reset.") + return + + accounts_to_reset = [s.accountId for s in all_statuses] + logger.info(f"Found {len(accounts_to_reset)} accounts to reset.") + print(f"Found {len(accounts_to_reset)} accounts. Sending unban request for each...") + + reset_count = 0 + fail_count = 0 + for acc_id in accounts_to_reset: + try: + reason = f"Manual reset from Airflow mgmt DAG by {socket.gethostname()}" + client.unbanAccount(accountId=acc_id, reason=reason) + logger.info(f" - Sent reset (unban) for '{acc_id}'.") + reset_count += 1 + except Exception as e: + logger.error(f" - Failed to reset account '{acc_id}': {e}") + fail_count += 1 + + print(f"\nSuccessfully sent reset requests for {reset_count} accounts.") + if fail_count > 0: + print(f"Failed to send reset requests for {fail_count} accounts. See logs for details.") + + # Optionally, list statuses again to confirm + print("\n--- Listing statuses after reset ---") + _list_account_statuses(client, account_prefix) + else: + raise ValueError(f"Invalid action '{action}' for entity 'account'.") + + elif entity == "all": + if action == "list": + print("\nListing all entities...") + _list_proxy_statuses(client, server_identity) + _list_account_statuses(client, account_id) + else: + raise ValueError(f"Action '{action}' is not supported for entity 'all'. Only 'list' is supported.") + + except (PBServiceException, PBUserException) as e: + logger.error(f"Thrift error performing action '{action}': {e.message}", exc_info=True) + raise + except NotImplementedError as e: + logger.error(f"Feature not implemented: {e}", exc_info=True) + raise + except Exception as e: + logger.error(f"Error performing action '{action}': {e}", exc_info=True) + raise + finally: + if transport and transport.isOpen(): + transport.close() + logger.info("Thrift connection closed.") + +with DAG( + dag_id="ytdlp_mgmt_proxy_account", + start_date=days_ago(1), + schedule=None, + catchup=False, + tags=["ytdlp", "utility", "proxy", "account", "management"], + doc_md=""" + ### YT-DLP Proxy and Account Manager DAG + + This DAG provides tools to manage the state of **proxies and accounts** used by the `ytdlp-ops-server`. + + **Parameters:** + - `host`, `port`: Connection details for the `ytdlp-ops-server` Thrift service. + - `entity`: The type of resource to manage (`proxy`, `account`, or `all`). + - `action`: The operation to perform. + - `list`: View statuses. For `entity: all`, lists both proxies and accounts. + - `ban`: Ban a specific proxy or account. + - `unban`: Un-ban a specific proxy or account. + - `reset_all`: Reset all proxies for a server (or all accounts) to `ACTIVE`. + - `remove_all`: **Deletes all account status keys** from Redis for a given prefix. This is a destructive action. + - `server_identity`: Required for most proxy actions. + - `proxy_url`: Required for banning/unbanning a specific proxy. + - `account_id`: Required for managing a specific account. For `action: reset_all` or `remove_all` on `entity: account`, this can be used as an optional prefix to filter which accounts to act on. + - `confirm_remove_all_accounts`: **Required for `remove_all` action.** Must be set to `True` to confirm deletion. + """, + params={ + "host": Param(DEFAULT_YT_AUTH_SERVICE_IP, type="string", description="The hostname of the ytdlp-ops-server service. Default is from Airflow variable YT_AUTH_SERVICE_IP or hardcoded."), + "port": Param(DEFAULT_YT_AUTH_SERVICE_PORT, type="integer", description="The port of the ytdlp-ops-server service (Envoy load balancer). Default is from Airflow variable YT_AUTH_SERVICE_PORT or hardcoded."), + "entity": Param( + "all", + type="string", + enum=["proxy", "account", "all"], + description="The type of entity to manage. Use 'all' with action 'list' to see both.", + ), + "action": Param( + "list", + type="string", + enum=["list", "ban", "unban", "reset_all", "remove_all"], + description="The management action to perform. `reset_all` for proxies/accounts. `remove_all` for accounts only.", + ), + "server_identity": Param( + "ytdlp-ops-airflow-service", + type=["null", "string"], + description="The identity of the server instance (for proxy management).", + ), + "proxy_url": Param( + None, + type=["null", "string"], + description="The proxy URL to act upon (e.g., 'socks5://host:port').", + ), + "account_id": Param( + None, + type=["null", "string"], + description="The account ID to act upon. For `reset_all` or `remove_all` on accounts, this can be an optional prefix.", + ), + "confirm_remove_all_accounts": Param( + False, + type="boolean", + title="[remove_all] Confirm Deletion", + description="Must be set to True to execute the 'remove_all' action for accounts. This is a destructive operation.", + ), + "redis_conn_id": Param( + DEFAULT_REDIS_CONN_ID, + type="string", + title="Redis Connection ID", + description="The Airflow connection ID for the Redis server (used for 'remove_all').", + ), + }, +) as dag: + system_management_task = PythonOperator( + task_id="system_management_task", + python_callable=manage_system_callable, + ) diff --git a/dags/ytdlp_mgmt_queues.py b/dags/ytdlp_mgmt_queues.py index 489297f..9e18279 100644 --- a/dags/ytdlp_mgmt_queues.py +++ b/dags/ytdlp_mgmt_queues.py @@ -164,7 +164,7 @@ def clear_queue_callable(**context): redis_conn_id = params['redis_conn_id'] queue_to_clear = params['queue_to_clear'] dump_queues = params['dump_queues'] - # Get the rendered dump_dir from the templates_dict passed to the operator + # The value from templates_dict is already rendered by Airflow. dump_dir = context['templates_dict']['dump_dir'] dump_patterns = params['dump_patterns'].split(',') if params.get('dump_patterns') else [] @@ -191,72 +191,80 @@ def clear_queue_callable(**context): def list_contents_callable(**context): - """Lists the contents of the specified Redis key (list or hash).""" + """Lists the contents of the specified Redis key(s) (list or hash).""" params = context['params'] redis_conn_id = params['redis_conn_id'] - queue_to_list = params['queue_to_list'] + queues_to_list_str = params.get('queue_to_list') max_items = params.get('max_items', 10) - if not queue_to_list: + if not queues_to_list_str: raise ValueError("Parameter 'queue_to_list' cannot be empty.") - logger.info(f"Attempting to list contents of Redis key '{queue_to_list}' (max: {max_items}) using connection '{redis_conn_id}'.") - try: - redis_client = _get_redis_client(redis_conn_id) - key_type_bytes = redis_client.type(queue_to_list) - key_type = key_type_bytes.decode('utf-8') # Decode type + queues_to_list = [q.strip() for q in queues_to_list_str.split(',') if q.strip()] + + if not queues_to_list: + logger.info("No valid queue names provided in 'queue_to_list'. Nothing to do.") + return - if key_type == 'list': - list_length = redis_client.llen(queue_to_list) - # Get the last N items, which are the most recently added with rpush - items_to_fetch = min(max_items, list_length) - # lrange with negative indices gets items from the end of the list. - # -N to -1 gets the last N items. - contents_bytes = redis_client.lrange(queue_to_list, -items_to_fetch, -1) - contents = [item.decode('utf-8') for item in contents_bytes] - # Reverse the list so the absolute most recent item is printed first - contents.reverse() - logger.info(f"--- Contents of Redis List '{queue_to_list}' (showing most recent {len(contents)} of {list_length}) ---") - for i, item in enumerate(contents): - # The index here is just for display, 0 is the most recent - logger.info(f" [recent_{i}]: {item}") - if list_length > len(contents): - logger.info(f" ... ({list_length - len(contents)} older items not shown)") - logger.info(f"--- End of List Contents ---") + logger.info(f"Attempting to list contents for {len(queues_to_list)} Redis key(s): {queues_to_list}") + + redis_client = _get_redis_client(redis_conn_id) - elif key_type == 'hash': - hash_size = redis_client.hlen(queue_to_list) - # HGETALL can be risky for large hashes. Consider HSCAN for production. - # For manual inspection, HGETALL is often acceptable. - if hash_size > max_items * 2: # Heuristic: avoid huge HGETALL - logger.warning(f"Hash '{queue_to_list}' has {hash_size} fields, which is large. Listing might be slow or incomplete. Consider using redis-cli HSCAN.") - # hgetall returns dict of bytes keys and bytes values, decode them - contents_bytes = redis_client.hgetall(queue_to_list) - contents = {k.decode('utf-8'): v.decode('utf-8') for k, v in contents_bytes.items()} - logger.info(f"--- Contents of Redis Hash '{queue_to_list}' ({len(contents)} fields) ---") - item_count = 0 - for key, value in contents.items(): # key and value are now strings - if item_count >= max_items: - logger.info(f" ... (stopped listing after {max_items} items of {hash_size})") - break - # Attempt to pretty-print if value is JSON - try: - parsed_value = json.loads(value) - pretty_value = json.dumps(parsed_value, indent=2) - logger.info(f" '{key}':\n{pretty_value}") - except json.JSONDecodeError: - logger.info(f" '{key}': {value}") # Print as string if not JSON - item_count += 1 - logger.info(f"--- End of Hash Contents ---") + for queue_to_list in queues_to_list: + # Add a newline for better separation in logs + logger.info(f"\n--- Listing contents of Redis key '{queue_to_list}' (max: {max_items}) ---") + try: + key_type_bytes = redis_client.type(queue_to_list) + key_type = key_type_bytes.decode('utf-8') # Decode type - elif key_type == 'none': - logger.info(f"Redis key '{queue_to_list}' does not exist.") - else: - logger.info(f"Redis key '{queue_to_list}' is of type '{key_type}'. Listing contents for this type is not implemented.") + if key_type == 'list': + list_length = redis_client.llen(queue_to_list) + items_to_fetch = min(max_items, list_length) + contents_bytes = redis_client.lrange(queue_to_list, -items_to_fetch, -1) + contents = [item.decode('utf-8') for item in contents_bytes] + contents.reverse() + logger.info(f"--- Contents of Redis List '{queue_to_list}' ---") + logger.info(f"Total items in list: {list_length}") + if contents: + logger.info(f"Showing most recent {len(contents)} item(s):") + for i, item in enumerate(contents): + logger.info(f" [recent_{i}]: {item}") + if list_length > len(contents): + logger.info(f" ... ({list_length - len(contents)} older items not shown)") + logger.info(f"--- End of List Contents ---") - except Exception as e: - logger.error(f"Failed to list contents of Redis key '{queue_to_list}': {e}", exc_info=True) - raise AirflowException(f"Failed to list Redis key contents: {e}") + elif key_type == 'hash': + hash_size = redis_client.hlen(queue_to_list) + if hash_size > max_items * 2: + logger.warning(f"Hash '{queue_to_list}' has {hash_size} fields, which is large. Listing might be slow or incomplete. Consider using redis-cli HSCAN.") + contents_bytes = redis_client.hgetall(queue_to_list) + contents = {k.decode('utf-8'): v.decode('utf-8') for k, v in contents_bytes.items()} + logger.info(f"--- Contents of Redis Hash '{queue_to_list}' ---") + logger.info(f"Total fields in hash: {hash_size}") + if contents: + logger.info(f"Showing up to {max_items} item(s):") + item_count = 0 + for key, value in contents.items(): + if item_count >= max_items: + logger.info(f" ... (stopped listing after {max_items} items of {hash_size})") + break + try: + parsed_value = json.loads(value) + pretty_value = json.dumps(parsed_value, indent=2) + logger.info(f" '{key}':\n{pretty_value}") + except json.JSONDecodeError: + logger.info(f" '{key}': {value}") + item_count += 1 + logger.info(f"--- End of Hash Contents ---") + + elif key_type == 'none': + logger.info(f"Redis key '{queue_to_list}' does not exist.") + else: + logger.info(f"Redis key '{queue_to_list}' is of type '{key_type}'. Listing contents for this type is not implemented.") + + except Exception as e: + logger.error(f"Failed to list contents of Redis key '{queue_to_list}': {e}", exc_info=True) + # Continue to the next key in the list instead of failing the whole task def check_status_callable(**context): @@ -292,6 +300,63 @@ def check_status_callable(**context): raise AirflowException(f"Failed to check queue status: {e}") +def requeue_failed_callable(**context): + """ + Copies all URLs from the fail hash to the inbox list and optionally clears the fail hash. + """ + params = context['params'] + redis_conn_id = params['redis_conn_id'] + queue_name = params['queue_name_for_requeue'] + clear_fail_queue = params['clear_fail_queue_after_requeue'] + + fail_queue_name = f"{queue_name}_fail" + inbox_queue_name = f"{queue_name}_inbox" + + logger.info(f"Requeuing failed URLs from '{fail_queue_name}' to '{inbox_queue_name}'.") + print(f"Requeuing failed URLs from '{fail_queue_name}' to '{inbox_queue_name}'.") + + redis_client = _get_redis_client(redis_conn_id) + + try: + # The fail queue is a hash. The keys are the URLs. + failed_urls_bytes = redis_client.hkeys(fail_queue_name) + if not failed_urls_bytes: + logger.info(f"Fail queue '{fail_queue_name}' is empty. Nothing to requeue.") + print(f"Fail queue '{fail_queue_name}' is empty. Nothing to requeue.") + return + + failed_urls = [url.decode('utf-8') for url in failed_urls_bytes] + logger.info(f"Found {len(failed_urls)} URLs to requeue.") + print(f"Found {len(failed_urls)} URLs to requeue:") + for url in failed_urls: + print(f" - {url}") + + # Add URLs to the inbox list + if failed_urls: + with redis_client.pipeline() as pipe: + pipe.rpush(inbox_queue_name, *failed_urls) + if clear_fail_queue: + pipe.delete(fail_queue_name) + pipe.execute() + + final_list_length = redis_client.llen(inbox_queue_name) + success_message = ( + f"Successfully requeued {len(failed_urls)} URLs to '{inbox_queue_name}'. " + f"The list now contains {final_list_length} items." + ) + logger.info(success_message) + print(f"\n{success_message}") + + if clear_fail_queue: + logger.info(f"Successfully cleared fail queue '{fail_queue_name}'.") + else: + logger.info(f"Fail queue '{fail_queue_name}' was not cleared as per configuration.") + + except Exception as e: + logger.error(f"Failed to requeue failed URLs: {e}", exc_info=True) + raise AirflowException(f"Failed to requeue failed URLs: {e}") + + def add_videos_to_queue_callable(**context): """ Parses video inputs, normalizes them to URLs, and adds them to a Redis queue. @@ -381,13 +446,14 @@ with DAG( - `add_videos`: Add one or more YouTube videos to a queue. - `clear_queue`: Dump and/or delete a specific Redis key. - `list_contents`: View the contents of a Redis key (list or hash). - - `check_status`: (Placeholder) Check the overall status of the queues. + - `check_status`: Check the overall status of the queues. + - `requeue_failed`: Copy all URLs from the `_fail` hash to the `_inbox` list and clear the `_fail` hash. """, params={ "action": Param( "add_videos", type="string", - enum=["add_videos", "clear_queue", "list_contents", "check_status"], + enum=["add_videos", "clear_queue", "list_contents", "check_status", "requeue_failed"], title="Action", description="The management action to perform.", ), @@ -437,10 +503,10 @@ with DAG( ), # --- Params for 'list_contents' --- "queue_to_list": Param( - 'video_queue_inbox', + 'video_queue_inbox,video_queue_fail', type="string", - title="[list_contents] Queue to List", - description="Exact name of the Redis key to list.", + title="[list_contents] Queues to List", + description="Comma-separated list of exact Redis key names to list.", ), "max_items": Param( 10, @@ -455,6 +521,19 @@ with DAG( title="[check_status] Base Queue Name", description="Base name of the queues to check (e.g., 'video_queue').", ), + # --- Params for 'requeue_failed' --- + "queue_name_for_requeue": Param( + DEFAULT_QUEUE_NAME, + type="string", + title="[requeue_failed] Base Queue Name", + description="Base name of the queues to requeue from (e.g., 'video_queue' will use 'video_queue_fail').", + ), + "clear_fail_queue_after_requeue": Param( + True, + type="boolean", + title="[requeue_failed] Clear Fail Queue", + description="If True, deletes the `_fail` hash after requeueing items.", + ), # --- Common Params --- "redis_conn_id": Param( DEFAULT_REDIS_CONN_ID, @@ -489,5 +568,16 @@ with DAG( python_callable=check_status_callable, ) - # --- Placeholder Tasks --- - branch_on_action >> [action_add_videos, action_clear_queue, action_list_contents, action_check_status] + action_requeue_failed = PythonOperator( + task_id="action_requeue_failed", + python_callable=requeue_failed_callable, + ) + + # --- Wire up tasks --- + branch_on_action >> [ + action_add_videos, + action_clear_queue, + action_list_contents, + action_check_status, + action_requeue_failed, + ] diff --git a/dags/ytdlp_ops_orchestrator.py b/dags/ytdlp_ops_orchestrator.py new file mode 100644 index 0000000..fc8cd7c --- /dev/null +++ b/dags/ytdlp_ops_orchestrator.py @@ -0,0 +1,194 @@ +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# +# Copyright © 2024 rl +# +# Distributed under terms of the MIT license. + +""" +DAG to orchestrate ytdlp_ops_worker_per_url DAG runs based on a defined policy. +It fetches URLs from a Redis queue and launches workers in controlled bunches. +""" + +from airflow import DAG +from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.operators.python import PythonOperator +from airflow.models.param import Param +from airflow.models.variable import Variable +from airflow.utils.dates import days_ago +from airflow.api.common.trigger_dag import trigger_dag +from airflow.models.dagrun import DagRun +from airflow.models.dag import DagModel +from datetime import timedelta +import logging +import random +import time + +# Import utility functions +from utils.redis_utils import _get_redis_client + +# Import Thrift modules for proxy status check +from pangramia.yt.tokens_ops import YTTokenOpService +from thrift.protocol import TBinaryProtocol +from thrift.transport import TSocket, TTransport + +# Configure logging +logger = logging.getLogger(__name__) + +# Default settings +DEFAULT_QUEUE_NAME = 'video_queue' +DEFAULT_REDIS_CONN_ID = 'redis_default' +DEFAULT_TOTAL_WORKERS = 3 +DEFAULT_WORKERS_PER_BUNCH = 1 +DEFAULT_WORKER_DELAY_S = 5 +DEFAULT_BUNCH_DELAY_S = 20 + +DEFAULT_YT_AUTH_SERVICE_IP = Variable.get("YT_AUTH_SERVICE_IP", default_var="16.162.82.212") +DEFAULT_YT_AUTH_SERVICE_PORT = Variable.get("YT_AUTH_SERVICE_PORT", default_var=9080) + +# --- Helper Functions --- + + +# --- Main Orchestration Callable --- + +def orchestrate_workers_ignition_callable(**context): + """ + Main orchestration logic. Triggers a specified number of worker DAGs + to initiate self-sustaining processing loops. + """ + params = context['params'] + logger.info("Starting worker ignition sequence.") + + worker_dag_id = 'ytdlp_ops_worker_per_url' + dag_model = DagModel.get_dagmodel(worker_dag_id) + if dag_model and dag_model.is_paused: + raise AirflowException(f"Worker DAG '{worker_dag_id}' is paused. Cannot start worker loops.") + + total_workers = int(params['total_workers']) + workers_per_bunch = int(params['workers_per_bunch']) + worker_delay = int(params['delay_between_workers_s']) + bunch_delay = int(params['delay_between_bunches_s']) + + # Create a list of worker numbers to trigger + worker_indices = list(range(total_workers)) + bunches = [worker_indices[i:i + workers_per_bunch] for i in range(0, len(worker_indices), workers_per_bunch)] + + logger.info(f"Plan: Starting {total_workers} total workers in {len(bunches)} bunches.") + + dag_run_id = context['dag_run'].run_id + total_triggered = 0 + + # Pass all orchestrator params to the worker so it has the full context for its loop. + conf_to_pass = {p: params[p] for p in params} + # The worker pulls its own URL, so we don't pass one. + if 'url' in conf_to_pass: + del conf_to_pass['url'] + + for i, bunch in enumerate(bunches): + logger.info(f"--- Igniting Bunch {i+1}/{len(bunches)} (contains {len(bunch)} worker(s)) ---") + for j, _ in enumerate(bunch): + # Create a unique run_id for each worker loop starter + run_id = f"ignited_{dag_run_id}_{total_triggered}" + + logger.info(f"Igniting worker {j+1}/{len(bunch)} in bunch {i+1} (loop {total_triggered + 1}/{total_workers}) (Run ID: {run_id})") + logger.debug(f"Full conf for worker loop {run_id}: {conf_to_pass}") + + trigger_dag( + dag_id=worker_dag_id, + run_id=run_id, + conf=conf_to_pass, + replace_microseconds=False + ) + total_triggered += 1 + + # Delay between workers in a bunch + if j < len(bunch) - 1: + logger.info(f"Waiting {worker_delay}s before next worker in bunch...") + time.sleep(worker_delay) + + # Delay between bunches + if i < len(bunches) - 1: + logger.info(f"--- Bunch {i+1} ignited. Waiting {bunch_delay}s before next bunch... ---") + time.sleep(bunch_delay) + + logger.info(f"--- Ignition sequence complete. Total worker loops started: {total_triggered}. ---") + + + + +# ============================================================================= +# DAG Definition +# ============================================================================= + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=1), + 'start_date': days_ago(1), +} + +with DAG( + dag_id='ytdlp_ops_orchestrator', + default_args=default_args, + schedule_interval=None, # This DAG runs only when triggered. + max_active_runs=1, # Only one ignition process should run at a time. + catchup=False, + description='Ignition system for ytdlp_ops_worker_per_url DAGs. Starts self-sustaining worker loops.', + doc_md=""" + ### YT-DLP Worker Ignition System + + This DAG acts as an "ignition system" to start one or more self-sustaining worker loops. + It does **not** process URLs itself. Its only job is to trigger a specified number of `ytdlp_ops_worker_per_url` DAGs. + + #### How it Works: + + 1. **Manual Trigger:** You manually trigger this DAG with parameters defining how many worker loops to start (`total_workers`), in what configuration (`workers_per_bunch`, delays). + 2. **Ignition:** The orchestrator triggers the initial set of worker DAGs in a "fire-and-forget" manner, passing all its configuration parameters to them. + 3. **Completion:** Once all initial workers have been triggered, the orchestrator's job is complete. + + The workers then take over, each running its own continuous processing loop. + """, + tags=['ytdlp', 'orchestrator', 'ignition'], + params={ + # --- Ignition Control Parameters --- + 'total_workers': Param(DEFAULT_TOTAL_WORKERS, type="integer", description="Total number of worker loops to start."), + 'workers_per_bunch': Param(DEFAULT_WORKERS_PER_BUNCH, type="integer", description="Number of workers to start in each bunch."), + 'delay_between_workers_s': Param(DEFAULT_WORKER_DELAY_S, type="integer", description="Delay in seconds between starting each worker within a bunch."), + 'delay_between_bunches_s': Param(DEFAULT_BUNCH_DELAY_S, type="integer", description="Delay in seconds between starting each bunch."), + + # --- Worker Passthrough Parameters --- + 'on_bannable_failure': Param( + 'retry_with_new_account', + type="string", + enum=['stop_loop', 'retry_with_new_account'], + title="[Worker Param] On Bannable Failure Policy", + description="Policy for a worker when a bannable error occurs. " + "'stop_loop': Ban the account, mark URL as failed, and stop the worker's loop. " + "'retry_with_new_account': Ban the failed account, retry ONCE with a new account. If retry fails, ban the second account and proxy, then stop." + ), + 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="[Worker Param] Base name for Redis queues."), + 'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="[Worker Param] Airflow Redis connection ID."), + 'clients': Param('mweb,ios,android', type="string", description="[Worker Param] Comma-separated list of clients for token generation."), + 'account_pool': Param('ytdlp_account', type="string", description="[Worker Param] Account pool prefix or comma-separated list."), + 'account_pool_size': Param(10, type=["integer", "null"], description="[Worker Param] If using a prefix for 'account_pool', this specifies the number of accounts to generate (e.g., 10 for 'prefix_01' through 'prefix_10'). Required when using a prefix."), + 'service_ip': Param(DEFAULT_YT_AUTH_SERVICE_IP, type="string", description="[Worker Param] IP of the ytdlp-ops-server. Default is from Airflow variable YT_AUTH_SERVICE_IP or hardcoded."), + 'service_port': Param(DEFAULT_YT_AUTH_SERVICE_PORT, type="integer", description="[Worker Param] Port of the Envoy load balancer. Default is from Airflow variable YT_AUTH_SERVICE_PORT or hardcoded."), + 'machine_id': Param("ytdlp-ops-airflow-service", type="string", description="[Worker Param] Identifier for the client machine."), + 'auto_create_new_accounts_on_exhaustion': Param(True, type="boolean", description="[Worker Param] If True and all accounts in a prefix-based pool are exhausted, create a new one automatically."), + 'retrigger_delay_on_empty_s': Param(60, type="integer", description="[Worker Param] Delay in seconds before a worker re-triggers itself if the queue is empty. Set to -1 to stop the loop."), + } +) as dag: + + orchestrate_task = PythonOperator( + task_id='start_worker_loops', + python_callable=orchestrate_workers_ignition_callable, + ) + orchestrate_task.doc_md = """ + ### Start Worker Loops + This is the main task that executes the ignition policy. + - It triggers `ytdlp_ops_worker_per_url` DAGs according to the batch settings. + - It passes all its parameters down to the workers, which will use them to run their continuous loops. + """ diff --git a/dags/ytdlp_ops_sensor_queue.py b/dags/ytdlp_ops_sensor_queue.py deleted file mode 100644 index 302fcd6..0000000 --- a/dags/ytdlp_ops_sensor_queue.py +++ /dev/null @@ -1,215 +0,0 @@ -# -*- coding: utf-8 -*- -# vim:fenc=utf-8 -# -# Copyright © 2024 rl -# -# Distributed under terms of the MIT license. - -""" -DAG to sense a Redis queue for new URLs and trigger the ytdlp_worker_per_url DAG. -This is the "Sensor" part of a Sensor/Worker pattern. -""" - -from airflow import DAG -from airflow.exceptions import AirflowException, AirflowSkipException -from airflow.operators.python import PythonOperator -from airflow.operators.trigger_dagrun import TriggerDagRunOperator -from airflow.providers.redis.hooks.redis import RedisHook -from airflow.models.param import Param -from airflow.utils.dates import days_ago -from datetime import timedelta -import logging -import redis - -# Import utility functions -from utils.redis_utils import _get_redis_client - -# Configure logging -logger = logging.getLogger(__name__) - -# Default settings -DEFAULT_QUEUE_NAME = 'video_queue' -DEFAULT_REDIS_CONN_ID = 'redis_default' -DEFAULT_TIMEOUT = 30 -DEFAULT_MAX_URLS = '1' # Default number of URLs to process per run - -# --- Task Callables --- - -def select_account_callable(**context): - """ - Placeholder task for future logic to dynamically select an account. - For now, it just passes through the account_id from the DAG params. - """ - params = context['params'] - account_id = params.get('account_id', 'default_account') - logger.info(f"Selected account for this run: {account_id}") - # This task could push the selected account_id to XComs in the future. - # For now, the next task will just read it from params. - return account_id - - -def log_trigger_info_callable(**context): - """Logs information about how the DAG run was triggered.""" - dag_run = context['dag_run'] - trigger_type = dag_run.run_type - logger.info(f"Sensor DAG triggered. Run ID: {dag_run.run_id}, Type: {trigger_type}") - - if trigger_type == 'manual': - logger.info("Trigger source: Manual execution from Airflow UI or CLI.") - elif trigger_type == 'scheduled': - logger.info("Trigger source: Scheduled run (periodic check).") - elif trigger_type == 'dag_run': - # In Airflow 2.2+ we can get the triggering run object - try: - triggering_dag_run = dag_run.get_triggering_dagrun() - if triggering_dag_run: - triggering_dag_id = triggering_dag_run.dag_id - triggering_run_id = triggering_dag_run.run_id - logger.info(f"Trigger source: DAG Run from '{triggering_dag_id}' (Run ID: {triggering_run_id}).") - # Check if it's a worker by looking at the conf keys - conf = dag_run.conf or {} - if all(k in conf for k in ['queue_name', 'redis_conn_id', 'max_urls_per_run']): - logger.info("This appears to be a standard trigger from a worker DAG continuing the loop.") - else: - logger.warning(f"Triggered by another DAG but conf does not match worker pattern. Conf: {conf}") - else: - logger.warning("Trigger type is 'dag_run' but could not retrieve triggering DAG run details.") - except Exception as e: - logger.error(f"Could not get triggering DAG run details: {e}") - else: - logger.info(f"Trigger source: {trigger_type}") - - -def check_queue_for_urls_batch(**context): - """ - Pops a batch of URLs from the inbox queue. - Returns a list of configuration dictionaries for the TriggerDagRunOperator. - If the queue is empty, it raises AirflowSkipException. - """ - params = context['params'] - queue_name = params['queue_name'] - inbox_queue = f"{queue_name}_inbox" - redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) - max_urls_raw = params.get('max_urls_per_run', DEFAULT_MAX_URLS) - try: - max_urls = int(max_urls_raw) - except (ValueError, TypeError): - logger.warning(f"Invalid value for max_urls_per_run: '{max_urls_raw}'. Using default: {DEFAULT_MAX_URLS}") - max_urls = DEFAULT_MAX_URLS - - urls_to_process = [] - try: - client = _get_redis_client(redis_conn_id) - current_queue_size = client.llen(inbox_queue) - logger.info(f"Queue '{inbox_queue}' has {current_queue_size} URLs. Attempting to pop up to {max_urls}.") - - for _ in range(max_urls): - url_bytes = client.lpop(inbox_queue) - if url_bytes: - url = url_bytes.decode('utf-8') if isinstance(url_bytes, bytes) else url_bytes - logger.info(f" - Popped URL: {url}") - urls_to_process.append(url) - else: - # Queue is empty, stop trying to pop - break - - if urls_to_process: - logger.info(f"Found {len(urls_to_process)} URLs in queue. Generating trigger configurations.") - # Create a list of 'conf' objects for the trigger operator to expand - trigger_configs = [] - for url in urls_to_process: - # The worker DAG will use its own default params for its operations. - # We only need to provide the URL for processing, and the sensor's own - # params so the worker can trigger the sensor again to continue the loop. - worker_conf = { - 'url': url, - 'queue_name': queue_name, - 'redis_conn_id': redis_conn_id, - 'max_urls_per_run': int(max_urls), - 'stop_on_failure': params.get('stop_on_failure', True), - 'account_id': params.get('account_id', 'default_account') - } - trigger_configs.append(worker_conf) - return trigger_configs - else: - logger.info(f"Queue '{inbox_queue}' is empty. Skipping trigger.") - raise AirflowSkipException(f"Redis queue '{inbox_queue}' is empty.") - except AirflowSkipException: - raise - except Exception as e: - logger.error(f"Error popping URLs from Redis queue '{inbox_queue}': {e}", exc_info=True) - raise AirflowException(f"Failed to pop URLs from Redis: {e}") - -# ============================================================================= -# DAG Definition -# ============================================================================= - -default_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 0, # The sensor itself should not retry on failure, it will run again on schedule - 'start_date': days_ago(1), -} - -with DAG( - dag_id='ytdlp_ops_sensor_queue', - default_args=default_args, - schedule_interval=None, # Runs only on trigger, not on a schedule. - max_active_runs=1, # Prevent multiple sensors from running at once - catchup=False, - description='Polls Redis queue on trigger for URLs and starts worker DAGs.', - tags=['ytdlp', 'sensor', 'queue', 'redis', 'batch'], - params={ - 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Base name for Redis queues."), - 'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="Airflow Redis connection ID."), - 'max_urls_per_run': Param(DEFAULT_MAX_URLS, type="string", description="Maximum number of URLs to process in one batch."), - 'stop_on_failure': Param(True, type="boolean", description="If True, a worker failure will stop the entire processing loop."), - 'account_id': Param('default_account', type="string", description="The account ID to use for processing the batch."), - } -) as dag: - - log_trigger_info_task = PythonOperator( - task_id='log_trigger_info', - python_callable=log_trigger_info_callable, - ) - log_trigger_info_task.doc_md = """ - ### Log Trigger Information - Logs details about how this DAG run was initiated (e.g., manually or by a worker DAG). - This provides visibility into the processing loop. - """ - - poll_redis_task = PythonOperator( - task_id='check_queue_for_urls_batch', - python_callable=check_queue_for_urls_batch, - ) - poll_redis_task.doc_md = """ - ### Poll Redis Queue for Batch - Checks the Redis inbox queue for a batch of new URLs (up to `max_urls_per_run`). - - **On Success (URLs found):** Returns a list of configuration objects for the trigger task. - - **On Skip (Queue empty):** Skips this task and the trigger task. The DAG run succeeds. - """ - - # This operator will be dynamically expanded based on the output of poll_redis_task - trigger_worker_dags = TriggerDagRunOperator.partial( - task_id='trigger_worker_dags', - trigger_dag_id='ytdlp_ops_worker_per_url', - wait_for_completion=False, # Fire and forget - doc_md=""" -### Trigger Worker DAGs (Dynamically Mapped) -Triggers one `ytdlp_worker_per_url` DAG run for each URL found by the polling task. -Each triggered DAG receives its own specific configuration (including the URL). -This task is skipped if the polling task finds no URLs. -""" - ).expand( - conf=poll_redis_task.output - ) - - select_account_task = PythonOperator( - task_id='select_account', - python_callable=select_account_callable, - ) - select_account_task.doc_md = "### Select Account\n(Placeholder for future dynamic account selection logic)" - - log_trigger_info_task >> select_account_task >> poll_redis_task >> trigger_worker_dags diff --git a/dags/ytdlp_ops_worker_per_url.py b/dags/ytdlp_ops_worker_per_url.py index 09c04bc..244d1ae 100644 --- a/dags/ytdlp_ops_worker_per_url.py +++ b/dags/ytdlp_ops_worker_per_url.py @@ -15,14 +15,16 @@ from airflow.exceptions import AirflowException, AirflowSkipException from airflow.models import BaseOperator, Variable from airflow.models.param import Param from airflow.operators.bash import BashOperator -from airflow.operators.python import PythonOperator -from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.operators.dummy import DummyOperator +from airflow.operators.python import PythonOperator, BranchPythonOperator +from airflow.operators.dummy import DummyOperator from airflow.providers.redis.hooks.redis import RedisHook from airflow.utils.dates import days_ago from airflow.utils.decorators import apply_defaults from datetime import datetime, timedelta +from airflow.api.common.trigger_dag import trigger_dag from pangramia.yt.common.ttypes import TokenUpdateMode -from pangramia.yt.exceptions.ttypes import PBServiceException +from pangramia.yt.exceptions.ttypes import PBServiceException, PBUserException from pangramia.yt.tokens_ops import YTTokenOpService from thrift.protocol import TBinaryProtocol from thrift.transport import TSocket, TTransport @@ -30,10 +32,14 @@ from thrift.transport.TTransport import TTransportException import json import logging import os +import random import redis import socket import time import traceback +import inspect +import uuid +import uuid # Import utility functions from utils.redis_utils import _get_redis_client @@ -45,10 +51,60 @@ logger = logging.getLogger(__name__) DEFAULT_QUEUE_NAME = 'video_queue' DEFAULT_REDIS_CONN_ID = 'redis_default' DEFAULT_MAX_URLS = 1 -DEFAULT_TIMEOUT = 30 # Default Thrift timeout in seconds +DEFAULT_TIMEOUT = 180 # Default Thrift timeout in seconds + +DEFAULT_YT_AUTH_SERVICE_IP = Variable.get("YT_AUTH_SERVICE_IP", default_var="16.162.82.212") +DEFAULT_YT_AUTH_SERVICE_PORT = Variable.get("YT_AUTH_SERVICE_PORT", default_var=9080) # --- Helper Functions --- +def _get_thrift_client(host, port, timeout): + """Helper to create and connect a Thrift client.""" + transport = TSocket.TSocket(host, port) + transport.setTimeout(timeout * 1000) + transport = TTransport.TFramedTransport(transport) + protocol = TBinaryProtocol.TBinaryProtocol(transport) + client = YTTokenOpService.Client(protocol) + transport.open() + logger.info(f"Connected to Thrift server at {host}:{port}") + return client, transport + + +def _ban_resource_task(resource_type, resource_id_xcom_key, host, port, timeout, **kwargs): + """ + A callable function to ban a resource (account or proxy). + Designed to be used in a PythonOperator. + """ + ti = kwargs['ti'] + resource_id = ti.xcom_pull(key=resource_id_xcom_key) + if not resource_id: + logger.warning(f"Could not find resource ID in XCom key '{resource_id_xcom_key}' to ban. Skipping.") + return + + client, transport = None, None + try: + client, transport = _get_thrift_client(host, port, timeout) + if resource_type == 'account': + reason = f"Banned by Airflow worker due to {kwargs.get('reason', 'failure')}" + logger.warning(f"Banning account '{resource_id}'. Reason: {reason}") + client.banAccount(accountId=resource_id, reason=reason) + logger.info(f"Successfully sent request to ban account '{resource_id}'.") + elif resource_type == 'proxy': + server_identity = kwargs.get('server_identity') + if not server_identity: + logger.error("Cannot ban proxy without server_identity.") + return + logger.warning(f"Banning proxy '{resource_id}' for server '{server_identity}'.") + client.banProxy(proxyUrl=resource_id, serverIdentity=server_identity) + logger.info(f"Successfully sent request to ban proxy '{resource_id}'.") + except Exception as e: + # Log the error but don't fail the task, as this is a best-effort cleanup action. + logger.error(f"Failed to issue ban for {resource_type} '{resource_id}': {e}", exc_info=True) + finally: + if transport and transport.isOpen(): + transport.close() + + def _extract_video_id(url): """Extracts YouTube video ID from URL.""" if not url or not isinstance(url, str): @@ -74,49 +130,13 @@ def _extract_video_id(url): # --- Queue Management Callables (for success/failure reporting) --- -def mark_proxy_banned_callable(**context): - """Makes a Thrift call to ban a proxy if the get_token task failed with a bannable error.""" - ti = context['task_instance'] - proxy_to_ban = ti.xcom_pull(task_ids='get_token', key='proxy_to_ban') - - if not proxy_to_ban: - logger.info("No proxy to ban was pushed to XCom. Skipping task.") - raise AirflowSkipException("No proxy to ban was identified in the upstream failure.") - - server_identity = ti.xcom_pull(task_ids='get_token', key='server_identity_for_ban') - host = ti.xcom_pull(task_ids='get_token', key='service_host_for_ban') - port = ti.xcom_pull(task_ids='get_token', key='service_port_for_ban') - - if not all([server_identity, host, port]): - logger.error("Missing connection details (identity, host, or port) from XCom. Cannot ban proxy.") - raise AirflowException("Missing connection details to ban proxy.") - - logger.warning(f"Attempting to ban proxy '{proxy_to_ban}' for server '{server_identity}' at {host}:{port}.") - - transport = None - try: - socket_conn = TSocket.TSocket(host, port, socket_family=socket.AF_INET) - socket_conn.setTimeout(15 * 1000) # 15s timeout for ban call - transport = TTransport.TFramedTransport(socket_conn) - protocol = TBinaryProtocol.TBinaryProtocol(transport) - client = YTTokenOpService.Client(protocol) - transport.open() - client.banProxy(proxyUrl=proxy_to_ban, serverIdentity=server_identity) - logger.info(f"Successfully sent request to ban proxy '{proxy_to_ban}'.") - except Exception as ban_exc: - logger.error(f"Failed to send ban request for proxy '{proxy_to_ban}': {ban_exc}", exc_info=True) - # We should fail the task if the ban call fails, as it's an important side-effect. - raise AirflowException(f"Failed to ban proxy: {ban_exc}") - finally: - if transport and transport.isOpen(): - transport.close() def mark_url_as_success(**context): """Moves URL from progress to result hash on success.""" ti = context['task_instance'] params = context['params'] - url = params.get('url') # Get URL from params, not XCom + url = ti.xcom_pull(task_ids='pull_url_from_redis', key='url_to_process') if not url: logger.warning("mark_url_as_success called but no URL found in DAG run parameters.") return @@ -156,34 +176,22 @@ def mark_url_as_success(**context): # Log error but don't fail the task, as the main work succeeded. -def mark_url_as_failed(**context): +def handle_failure_callable(**context): """ - Handles failed processing. Records detailed error information to the fail hash - and, if stop_on_failure is True, fails the task to make the DAG run failure visible. + Handles a failed processing run by recording the error details to Redis. + The decision to stop or continue the loop is handled by `decide_what_to_do_next`. """ ti = context['task_instance'] params = context['params'] - url = params.get('url') # Get URL from params + dag_run = context['dag_run'] + url = ti.xcom_pull(task_ids='pull_url_from_redis', key='url_to_process') + if not url: - logger.error("mark_url_as_failed called but no URL found in DAG run parameters.") + logger.error("handle_failure_callable called but no URL found in XCom.") return - queue_name = params['queue_name'] - fail_queue = f"{queue_name}_fail" - inbox_queue = f"{queue_name}_inbox" - redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) - stop_on_failure = params.get('stop_on_failure', True) - - # Determine if we should requeue based on various parameters - should_requeue = params.get('requeue_on_failure', False) - requeue_on_bannable_error = params.get('requeue_on_bannable_error', False) - requeue_on_ffprobe_failure = params.get('requeue_on_ffprobe_failure', False) - - # --- Extract Detailed Error Information --- + # --- Determine the source and type of failure --- exception = context.get('exception') - - # Find the specific task that failed to pull its XComs - dag_run = context['dag_run'] failed_task_id = "unknown" upstream_tasks = ti.task.get_direct_relatives(upstream=True) for task in upstream_tasks: @@ -192,14 +200,15 @@ def mark_url_as_failed(**context): failed_task_id = task.task_id break - error_details = None + # --- Extract Detailed Error Information --- + error_details_from_xcom = None if failed_task_id != "unknown": - error_details = ti.xcom_pull(task_ids=failed_task_id, key='error_details') + error_details_from_xcom = ti.xcom_pull(task_ids=failed_task_id, key='error_details') - if error_details: - error_message = error_details.get('error_message', 'Unknown error from XCom') - error_type = error_details.get('error_type', 'Unknown type from XCom') - tb_str = error_details.get('traceback', 'No traceback in XCom.') + if error_details_from_xcom: + error_message = error_details_from_xcom.get('error_message', 'Unknown error from XCom') + error_type = error_details_from_xcom.get('error_type', 'Unknown type from XCom') + tb_str = error_details_from_xcom.get('traceback', 'No traceback in XCom.') else: error_message = str(exception) if exception else "Unknown error" error_type = type(exception).__name__ if exception else "Unknown" @@ -211,247 +220,511 @@ def mark_url_as_failed(**context): logger.error(f" Failure Reason: {error_message}") logger.debug(f" Traceback:\n{tb_str}") - # --- Check for specific requeue conditions --- - if not should_requeue: # Only check specific conditions if the general one is false - if requeue_on_bannable_error and isinstance(exception, PBServiceException): - bannable_error_codes = [ - "BOT_DETECTED", "BOT_DETECTION_SIGN_IN_REQUIRED", - "SOCKS5_CONNECTION_FAILED", "CLIENT_TIMEOUT", "GLOBAL_TIMEOUT" - ] - if hasattr(exception, 'errorCode') and exception.errorCode in bannable_error_codes: - should_requeue = True - logger.info(f"Bannable error '{exception.errorCode}' detected. Re-queuing URL as per 'requeue_on_bannable_error' param.") - - if requeue_on_ffprobe_failure and isinstance(exception, AirflowException) and "Bash command failed" in str(exception): - # Check for the specific exit code for probe failure - if "exit code 2" in str(exception): - should_requeue = True - logger.info("Probe failure detected (exit code 2). Re-queuing URL as per 'requeue_on_ffprobe_failure' param.") + final_error_details = { + 'failed_task': failed_task_id, + 'error_type': error_type, + 'error_message': error_message, + 'traceback': tb_str, + 'url': url, + 'dag_run_id': context['dag_run'].run_id, + } + # For all failures, mark the URL as failed in Redis. + redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) + queue_name = params['queue_name'] + fail_queue = f"{queue_name}_fail" try: client = _get_redis_client(redis_conn_id) - if should_requeue: - client.rpush(inbox_queue, url) - logger.info(f"Re-queued failed URL '{url}' to inbox '{inbox_queue}' for retry.") - else: - fail_data = { - 'status': 'failed', - 'end_time': time.time(), - 'failed_task': failed_task_id, - 'error_type': error_type, - 'error_message': error_message, - 'traceback': tb_str, - 'url': url, - 'dag_run_id': context['dag_run'].run_id, - } - client.hset(fail_queue, url, json.dumps(fail_data, indent=2)) - logger.info(f"Stored detailed failure info for URL '{url}' in fail hash '{fail_queue}'.") + client.hset(fail_queue, url, json.dumps(final_error_details, indent=2)) + logger.info(f"Stored detailed failure info for URL '{url}' in fail hash '{fail_queue}'.") except Exception as e: logger.error(f"Critical error during failure handling in Redis for URL '{url}': {e}", exc_info=True) - # This is a critical error in the failure handling logic itself. raise AirflowException(f"Could not handle failure in Redis: {e}") - # If stop_on_failure is True, we should fail this task to make the DAG run fail. - # The loop is already stopped by the DAG structure, but this makes the failure visible. - if stop_on_failure: - logger.error("stop_on_failure is True. Failing this task to mark the DAG run as failed.") - # Re-raise the original exception to fail the task instance. - if exception: - raise exception - else: - # If we got details from XCom, we don't have the original exception object. - # So, we raise a new AirflowException with the details we have. - raise AirflowException(f"Failing task as per stop_on_failure=True. Upstream error: [{error_type}] {error_message}") - # --- YtdlpOpsOperator --- -class YtdlpOpsOperator(BaseOperator): +def _get_account_pool(params: dict) -> list: """ - Custom Airflow operator to interact with YTDLP Thrift service. - Processes a single URL passed via DAG run configuration. + Gets the list of accounts to use for processing, filtering out banned/resting accounts. + Supports three modes for the 'account_pool' parameter: + 1. Explicit List: If 'account_pool' contains a comma, it's treated as a comma-separated list. + 2. Prefix-based Generation: If 'account_pool_size' is provided, 'account_pool' is treated as a prefix + to generate numbered accounts (e.g., prefix_01, prefix_02). + 3. Single Account: If 'account_pool' has no comma and 'account_pool_size' is not provided, it's treated as a single account name. + If the pool is exhausted and auto-creation is enabled, it will generate a new account ID. """ - template_fields = ('service_ip', 'service_port', 'account_id', 'timeout', 'info_json_dir') + account_pool_str = params.get('account_pool', 'default_account') + accounts = [] + is_prefix_mode = False - @apply_defaults - def __init__(self, - service_ip=None, - service_port=None, - account_id=None, - info_json_dir=None, - timeout=DEFAULT_TIMEOUT, - *args, **kwargs): - super().__init__(*args, **kwargs) + if ',' in account_pool_str: + # Mode 1: Explicit comma-separated list + logger.info("Detected comma in 'account_pool', treating as an explicit list.") + accounts = [acc.strip() for acc in account_pool_str.split(',') if acc.strip()] + else: + # Mode 2 or 3: Prefix-based generation OR single account + prefix = account_pool_str + pool_size_param = params.get('account_pool_size') - logger.info(f"Initializing YtdlpOpsOperator (Worker Version) with parameters: " - f"service_ip={service_ip}, service_port={service_port}, " - f"account_id={account_id}, info_json_dir={info_json_dir}, timeout={timeout}") - - if not service_ip or not service_port: - raise ValueError("Both service_ip and service_port must be specified.") - if not account_id: - logger.warning("No account_id provided. Ensure it's set in DAG params or operator config.") - - self.service_ip = service_ip - self.service_port = service_port - self.account_id = account_id - self.info_json_dir = info_json_dir - self.timeout = timeout - - def execute(self, context): - logger.info("Executing YtdlpOpsOperator (Worker Version)") - transport = None - ti = context['task_instance'] - - # Define connection parameters outside the try block to be available in except blocks - params = context['params'] - url = params.get('url') - if not url: - raise AirflowException("DAG was triggered without a 'url' in its configuration.") - - service_ip = self.render_template(self.service_ip, context) - service_port_rendered = self.render_template(self.service_port, context) - account_id = self.render_template(self.account_id, context) - timeout_rendered = self.render_template(self.timeout, context) - info_json_dir = self.render_template(self.info_json_dir, context) - - host = params.get('service_ip', service_ip) - port_str = params.get('service_port', service_port_rendered) - account_id = params.get('account_id', account_id) - clients = params.get('clients') - - logger.info(f"Using direct connection settings: service_ip={host}, service_port={port_str}") - - if not host or not port_str: - raise ValueError("Direct connection requires service_ip and service_port") - try: - port = int(port_str) - except (ValueError, TypeError): - raise ValueError(f"Invalid service_port value: {port_str}") - - try: - timeout = int(timeout_rendered) - if timeout <= 0: raise ValueError("Timeout must be positive") - except (ValueError, TypeError): - timeout = DEFAULT_TIMEOUT - - try: - logger.info(f"Processing URL from DAG run config: {url}") - socket_conn = TSocket.TSocket(host, port, socket_family=socket.AF_INET) - socket_conn.setTimeout(timeout * 1000) - transport = TTransport.TFramedTransport(socket_conn) - protocol = TBinaryProtocol.TBinaryProtocol(transport) - client = YTTokenOpService.Client(protocol) - - transport.open() - logger.info("Successfully connected to Thrift server.") - client.ping() - logger.info("Server ping successful.") - - token_data = client.getOrRefreshToken( - accountId=account_id, - updateType=TokenUpdateMode.AUTO, - url=url, - clients=clients - ) - logger.info("Successfully retrieved token data from service.") - - info_json_path = None - info_json = self._get_info_json(token_data) - if info_json and self._is_valid_json(info_json): - info_json_path = self._save_info_json(context, info_json, url, account_id, info_json_dir) - if info_json_path: - ti.xcom_push(key='info_json_path', value=info_json_path) - else: - ti.xcom_push(key='info_json_path', value=None) - else: - ti.xcom_push(key='info_json_path', value=None) - - socks_proxy = None - proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None) - if proxy_attr: - socks_proxy = getattr(token_data, proxy_attr) - ti.xcom_push(key='socks_proxy', value=socks_proxy) - - ytdlp_cmd = getattr(token_data, 'ytdlpCommand', None) - ti.xcom_push(key='ytdlp_command', value=ytdlp_cmd) - - except (PBServiceException, TTransportException) as e: - # Enhanced logging to make failures clear in Airflow logs. - logger.error(f"Thrift call failed for URL '{url}' with account '{account_id}'.") - logger.error(f"Exception Type: {type(e).__name__}") - logger.error(f"Exception Message: {getattr(e, 'message', str(e))}") - if isinstance(e, PBServiceException): - logger.error(f"Service Error Code: {getattr(e, 'errorCode', 'N/A')}") - if hasattr(e, 'context') and e.context: - logger.error(f"Service Context: {e.context}") + if pool_size_param is not None: + # Mode 2: Prefix mode + is_prefix_mode = True + logger.info("Detected 'account_pool_size', treating 'account_pool' as a prefix.") - # Use exc_info=True to get the full traceback in the logs - logger.error("Full exception traceback:", exc_info=True) + try: + pool_size = int(pool_size_param) + if pool_size <= 0: + raise AirflowException("'account_pool_size' must be a positive integer for prefix-based generation.") + except (ValueError, TypeError): + raise AirflowException(f"'account_pool_size' must be an integer, but got: {pool_size_param}") - # Push exception details to XCom for the failure handler - error_details = { - 'error_message': getattr(e, 'message', str(e)), - 'error_type': type(e).__name__, - 'traceback': traceback.format_exc() - } - ti.xcom_push(key='error_details', value=error_details) + logger.info(f"Account pool size is set to: {pool_size}") + + # Generate accounts like 'prefix_01', 'prefix_02', ..., 'prefix_10' + for i in range(1, pool_size + 1): + accounts.append(f"{prefix}_{i:02d}") + else: + # Mode 3: Single account mode + logger.info("No 'account_pool_size' provided. Treating 'account_pool' as a single account name.") + accounts = [prefix] - proxy_to_ban = None - if isinstance(e, PBServiceException) and hasattr(e, 'context') and e.context: - # Assuming server adds 'proxy_url' to context on failure - proxy_to_ban = e.context.get('proxy_url') - bannable_error_codes = [ - "BOT_DETECTED", "BOT_DETECTION_SIGN_IN_REQUIRED", - "SOCKS5_CONNECTION_FAILED", "CLIENT_TIMEOUT", "GLOBAL_TIMEOUT" - ] - if e.errorCode not in bannable_error_codes: - proxy_to_ban = None + if not accounts: + raise AirflowException("Initial account pool is empty. Please check 'account_pool' and 'account_pool_size' parameters.") + + logger.info(f"Generated initial account pool with {len(accounts)} accounts: {accounts}") - if proxy_to_ban: - logger.info(f"Found proxy to ban: {proxy_to_ban}. Pushing to XCom for 'mark_proxy_banned' task.") - ti.xcom_push(key='proxy_to_ban', value=proxy_to_ban) - ti.xcom_push(key='server_identity_for_ban', value=account_id) - ti.xcom_push(key='service_host_for_ban', value=host) - ti.xcom_push(key='service_port_for_ban', value=port) + # --- Filter out banned/resting accounts by checking Redis --- + redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) + try: + redis_client = _get_redis_client(redis_conn_id) + active_accounts = [] + for account in accounts: + status_key = f"account_status:{account}" + status_bytes = redis_client.hget(status_key, "status") + status = status_bytes.decode('utf-8') if status_bytes else "ACTIVE" + + if status == 'BANNED': + logger.warning(f"Account '{account}' is BANNED. Skipping.") + continue + if 'RESTING' in status: # Check for 'RESTING' or 'RESTING (active in...)' + logger.info(f"Account '{account}' is RESTING. Skipping.") + continue + + active_accounts.append(account) + + if not active_accounts and accounts: + logger.error(f"All {len(accounts)} accounts in the pool are banned or resting.") + + auto_create = params.get('auto_create_new_accounts_on_exhaustion', False) + if auto_create and is_prefix_mode: + prefix = account_pool_str + new_account_id = f"{prefix}-auto-{str(uuid.uuid4())[:8]}" + logger.warning(f"Account pool exhausted. Auto-creating new account: '{new_account_id}'") + active_accounts.append(new_account_id) else: - logger.info("No specific proxy to ban based on the error context.") - # Push None explicitly so the downstream task knows not to run - ti.xcom_push(key='proxy_to_ban', value=None) + if not auto_create: + logger.error("Auto-creation is disabled. No workers can be scheduled.") + if not is_prefix_mode: + logger.error("Auto-creation is only supported for prefix-based account pools.") + raise AirflowException("All accounts in the configured pool are currently exhausted (banned or resting).") - # Re-raise the original exception to fail the Airflow task - raise e - except Exception as e: - logger.error(f"YtdlpOpsOperator (Worker) failed with an unexpected exception: {e}", exc_info=True) - raise AirflowException(f"Task failed with unexpected error: {e}") - finally: - if transport and transport.isOpen(): - transport.close() + if len(active_accounts) < len(accounts): + logger.info(f"Filtered account pool. Using {len(active_accounts)} of {len(accounts)} available accounts.") + + accounts = active_accounts - def _get_info_json(self, token_data): - return getattr(token_data, 'infoJson', None) + except Exception as e: + logger.error(f"Could not filter accounts by status from Redis. Using unfiltered pool. Error: {e}", exc_info=True) - def _is_valid_json(self, json_str): - if not json_str or not isinstance(json_str, str): return False - try: - json.loads(json_str) - return True - except json.JSONDecodeError: - return False + if not accounts: + raise AirflowException("Account pool is empty after filtering. Please check account statuses in Redis or enable auto-creation.") + + logger.info(f"Final active account pool with {len(accounts)} accounts: {accounts}") + return accounts - def _save_info_json(self, context, info_json, url, account_id, rendered_info_json_dir): - try: + + +def pull_url_from_redis_callable(**context): + """ + Pulls a single URL from the Redis inbox queue. + If the queue is empty, it skips the DAG run. + Otherwise, it pushes the URL to XCom for downstream tasks. + """ + params = context['params'] + ti = context['task_instance'] + queue_name = params['queue_name'] + redis_conn_id = params['redis_conn_id'] + inbox_queue = f"{queue_name}_inbox" + + logger.info(f"Attempting to pull one URL from Redis queue '{inbox_queue}'...") + client = _get_redis_client(redis_conn_id) + url_bytes = client.lpop(inbox_queue) + + if not url_bytes: + logger.info("Queue is empty. Stopping this worker loop.") + raise AirflowSkipException("Redis queue is empty.") + + url_to_process = url_bytes.decode('utf-8') + logger.info(f"Pulled URL '{url_to_process}' from the queue.") + ti.xcom_push(key='url_to_process', value=url_to_process) + +def decide_what_to_do_next_callable(**context): + """ + Decides whether to continue the processing loop by triggering the next worker + or to stop the loop, based on task success, failure, or an empty queue. + """ + params = context['params'] + dag_run = context['dag_run'] + + # Check if a failure was handled. If the 'handle_generic_failure' task was not skipped, + # it means a failure occurred somewhere in the pipeline. + handle_generic_failure_ti = dag_run.get_task_instance(task_id='handle_generic_failure') + if handle_generic_failure_ti and handle_generic_failure_ti.state != 'skipped': + logger.error(f"Failure handler task 'handle_generic_failure' was in state '{handle_generic_failure_ti.state}'. Stopping this processing lane.") + return 'fail_loop' + + # Check if the worker was skipped because the Redis queue was empty. + pull_task_instance = dag_run.get_task_instance(task_id='pull_url_from_redis') + if pull_task_instance and pull_task_instance.state == 'skipped': + logger.info("Worker was skipped because Redis queue was empty.") + retrigger_delay_on_empty_s = params.get('retrigger_delay_on_empty_s', 60) + + if retrigger_delay_on_empty_s < 0: + logger.info(f"retrigger_delay_on_empty_s is {retrigger_delay_on_empty_s}. Stopping this worker loop.") + return 'stop_loop' + else: + logger.info(f"Queue is empty. Will re-trigger this worker loop after a delay of {retrigger_delay_on_empty_s}s.") + return 'trigger_self_run' + + # If no failure was handled and the queue wasn't empty, it must be a success. + logger.info("All preceding tasks succeeded. Continuing the processing loop by triggering the next worker.") + return 'trigger_self_run' + +def assign_account_callable(**context): + """ + Selects an account for the run. + It uses the account from the previous run if available (affinity), + otherwise it picks a random one from the active pool. + """ + ti = context['task_instance'] + params = context['params'] + + # Affinity logic: check if an account was passed from a previous run + account_id = params.get('current_account_id') + if account_id: + logger.info(f"Using account '{account_id}' passed from previous run (affinity).") + else: + logger.info("No account passed from previous run. Selecting a new one from the pool.") + account_pool = _get_account_pool(params) + account_id = random.choice(account_pool) + logger.info(f"Selected initial account '{account_id}'.") + + ti.xcom_push(key='account_id', value=account_id) + ti.xcom_push(key='accounts_tried', value=[account_id]) + + +def get_token_callable(**context): + """Makes a single attempt to get a token from the Thrift service.""" + ti = context['task_instance'] + params = context['params'] + + # Determine which account to use (initial or retry) + # Pull from all upstreams, which might return a LazySelectSequence + xcom_results = ti.xcom_pull(task_ids=context['task'].upstream_task_ids, key='account_id') + + # The result can be a single value or an iterable. We need to find the first valid item. + account_id = None + if hasattr(xcom_results, '__iter__') and not isinstance(xcom_results, str): + # It's a list, tuple, or LazySelectSequence. Find the first real value. + account_id = next((item for item in xcom_results if item is not None), None) + else: + # It's a single value + account_id = xcom_results + + if not account_id: + raise AirflowException("Could not find a valid account_id in XCom from any upstream task.") + + url = ti.xcom_pull(task_ids='pull_url_from_redis', key='url_to_process') + if not url: + logger.info("No URL pulled from XCom. Assuming upstream task was skipped. Ending task.") + return + + host = params['service_ip'] + port = int(params['service_port']) + timeout = int(params.get('timeout', DEFAULT_TIMEOUT)) + # The value from templates_dict is already rendered by Airflow. + info_json_dir = context['templates_dict']['info_json_dir'] + machine_id = params.get('machine_id') or socket.gethostname() + clients = params.get('clients') + + logger.info(f"--- Attempting to get token for URL '{url}' with account '{account_id}' ---") + client, transport = None, None + try: + client, transport = _get_thrift_client(host, port, timeout) + client.ping() + + call_kwargs = {'accountId': account_id, 'updateType': TokenUpdateMode.AUTO, 'url': url, 'clients': clients, 'machineId': machine_id} + token_data = client.getOrRefreshToken(**call_kwargs) + logger.info("Successfully retrieved token data from service.") + + # --- Success Case --- + info_json = getattr(token_data, 'infoJson', None) + if info_json and json.loads(info_json): video_id = _extract_video_id(url) - save_dir = rendered_info_json_dir or "." + save_dir = info_json_dir or "." os.makedirs(save_dir, exist_ok=True) timestamp = int(time.time()) base_filename = f"info_{video_id or 'unknown'}_{account_id}_{timestamp}.json" info_json_path = os.path.join(save_dir, base_filename) with open(info_json_path, 'w', encoding='utf-8') as f: f.write(info_json) - return info_json_path - except Exception as e: - logger.error(f"Failed to save info.json: {e}", exc_info=True) - return None + ti.xcom_push(key='info_json_path', value=info_json_path) + + # Log key details from the info.json to confirm success + try: + info_data = json.loads(info_json) + if isinstance(info_data, dict): + title = info_data.get('title', 'N/A') + uploader = info_data.get('uploader', 'N/A') + duration = info_data.get('duration_string', 'N/A') + logger.info(f"Successfully got info.json for video: '{title}' by '{uploader}' ({duration})") + except Exception as log_e: + logger.warning(f"Could not log info.json details: {log_e}") + + proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None) + ti.xcom_push(key='socks_proxy', value=getattr(token_data, proxy_attr) if proxy_attr else None) + ti.xcom_push(key='ytdlp_command', value=getattr(token_data, 'ytdlpCommand', None)) + ti.xcom_push(key='successful_account_id', value=account_id) # For affinity + ti.xcom_push(key='get_token_succeeded', value=True) + + except (PBServiceException, PBUserException, TTransportException) as e: + logger.error(f"Thrift call failed for account '{account_id}'. Exception: {getattr(e, 'message', str(e))}") + error_context = getattr(e, 'context', None) + if isinstance(error_context, str): + try: + error_context = json.loads(error_context.replace("'", "\"")) + except: pass + + error_details = { + 'error_message': getattr(e, 'message', str(e)), + 'error_code': getattr(e, 'errorCode', 'TRANSPORT_ERROR'), + 'error_type': type(e).__name__, + 'traceback': traceback.format_exc(), + 'proxy_url': error_context.get('proxy_url') if isinstance(error_context, dict) else None + } + ti.xcom_push(key='error_details', value=error_details) + ti.xcom_push(key='get_token_succeeded', value=False) + + # For non-bannable errors like Connection Refused, fail the task immediately to stop the loop. + # Bannable errors will let the task succeed, allowing the branch operator to decide on a retry. + error_code = error_details.get('error_code', '').strip() + error_message = error_details.get('error_message', '').lower() + bannable_codes = ["BOT_DETECTED", "BOT_DETECTION_SIGN_IN_REQUIRED", "SOCKS5_CONNECTION_FAILED"] + is_bannable = error_code in bannable_codes + + # Override bannable status for age-restricted content, which is not a true bot detection. + if is_bannable and ('confirm your age' in error_message or 'age-restricted' in error_message): + logger.warning(f"Error is age-related ('{error_code}'). Treating as a non-bannable failure to avoid banning the account.") + is_bannable = False + + if not is_bannable: + logger.error(f"Non-bannable error '{error_code}' detected. Failing task to stop the loop.") + raise AirflowException(f"Non-bannable Thrift call failed: {error_details['error_message']}") + else: + logger.warning(f"Bannable error '{error_code}' detected. Passing to branch operator for handling.") + # Do not raise exception here; let the branch operator handle it. + finally: + if transport and transport.isOpen(): + transport.close() + + +def handle_bannable_error_branch_callable(**context): + """ + Checks if a `get_token` failure is bannable and if a retry is allowed. + """ + ti = context['task_instance'] + params = context['params'] + + # Check the result of the first get_token attempt + get_token_succeeded = ti.xcom_pull(task_ids='get_token', key='get_token_succeeded') + if get_token_succeeded: + return 'setup_download_and_probe' + + # It failed, so check the error details + error_details = ti.xcom_pull(task_ids='get_token', key='error_details') + if not error_details: + logger.error("get_token failed but no error details were found in XCom. Stopping loop.") + return 'handle_generic_failure' + + error_code = error_details.get('error_code', '').strip() + error_message = error_details.get('error_message', '').lower() + policy = params.get('on_bannable_failure', 'retry_with_new_account') + bannable_codes = ["BOT_DETECTED", "BOT_DETECTION_SIGN_IN_REQUIRED", "SOCKS5_CONNECTION_FAILED"] + is_bannable = error_code in bannable_codes + + # Override bannable status for age-restricted content, which is not a true bot detection. + if is_bannable and ('confirm your age' in error_message or 'age-restricted' in error_message): + logger.warning(f"Error is age-related ('{error_code}'). Treating as a non-bannable failure to avoid banning the account.") + is_bannable = False + + logger.info(f"Handling failure from 'get_token'. Error code: '{error_code}', Policy: '{policy}'") + + if is_bannable and policy == 'retry_with_new_account': + logger.info("Error is bannable and policy allows retry. Proceeding to ban first account and retry.") + return 'ban_account_and_prepare_for_retry' + elif is_bannable: # and policy is 'stop_loop' + logger.warning("Error is bannable and policy is 'stop_loop'. Banning account and stopping.") + return 'ban_account_and_fail' + else: + logger.warning("Error is not considered bannable. Proceeding to generic failure handling.") + return 'handle_generic_failure' + + +def assign_new_account_for_retry_callable(**context): + """Selects a new, unused account for the retry attempt.""" + ti = context['task_instance'] + params = context['params'] + + accounts_tried = ti.xcom_pull(task_ids='assign_account', key='accounts_tried') + if not accounts_tried: + raise AirflowException("Cannot retry, list of previously tried accounts not found.") + + logger.info(f"Policy is 'retry_with_new_account'. Selecting a new account. Already tried: {accounts_tried}") + try: + account_pool = _get_account_pool(params) + available_for_retry = [acc for acc in account_pool if acc not in accounts_tried] + + new_account_id = None + if available_for_retry: + new_account_id = random.choice(available_for_retry) + else: + # No unused accounts left in the pool. Check if we can auto-create one. + logger.warning("No unused accounts available in the pool for a retry. Checking for auto-creation.") + auto_create = params.get('auto_create_new_accounts_on_exhaustion', False) + account_pool_str = params.get('account_pool', 'default_account') + pool_size_param = params.get('account_pool_size') + is_prefix_mode = pool_size_param is not None and ',' not in account_pool_str + + if auto_create and is_prefix_mode: + prefix = account_pool_str + new_account_id = f"{prefix}-auto-{str(uuid.uuid4())[:8]}" + logger.warning(f"Auto-creating new account for retry: '{new_account_id}'") + else: + if not auto_create: + logger.error("Auto-creation is disabled.") + if not is_prefix_mode: + logger.error("Auto-creation is only supported for prefix-based account pools (requires 'account_pool_size').") + raise AirflowException("No other accounts available in the pool for a retry.") + + accounts_tried.append(new_account_id) + + logger.info(f"Selected new account for retry: '{new_account_id}'") + ti.xcom_push(key='account_id', value=new_account_id) + ti.xcom_push(key='accounts_tried', value=accounts_tried) + + except Exception as e: + logger.error(f"Could not get a new account for retry: {e}") + raise AirflowException(f"Failed to assign new account for retry: {e}") + + +def handle_retry_failure_branch_callable(**context): + """Checks the result of the retry_get_token task.""" + ti = context['task_instance'] + retry_succeeded = ti.xcom_pull(task_ids='retry_get_token', key='get_token_succeeded') + + if retry_succeeded: + logger.info("Retry attempt was successful.") + return 'setup_download_and_probe' + else: + logger.error("Retry attempt also failed. Banning second account and proxy.") + return 'ban_second_account_and_proxy' + + +def ban_second_account_and_proxy_callable(**context): + """Bans the second account and the proxy used in the failed retry.""" + ti = context['task_instance'] + params = context['params'] + + # Ban the second account + account_to_ban = ti.xcom_pull(task_ids='assign_new_account_for_retry', key='account_id') + if account_to_ban: + ti.xcom_push(key='account_to_ban', value=account_to_ban) + _ban_resource_task( + 'account', 'account_to_ban', + params['service_ip'], int(params['service_port']), int(params.get('timeout', DEFAULT_TIMEOUT)), + ti=ti, reason="Failed on retry attempt" + ) + + # Ban the proxy + error_details = ti.xcom_pull(task_ids='retry_get_token', key='error_details') + proxy_to_ban = error_details.get('proxy_url') if error_details else None + if proxy_to_ban: + ti.xcom_push(key='proxy_to_ban', value=proxy_to_ban) + _ban_resource_task( + 'proxy', 'proxy_to_ban', + params['service_ip'], int(params['service_port']), int(params.get('timeout', DEFAULT_TIMEOUT)), + ti=ti, server_identity=(params.get('machine_id') or socket.gethostname()) + ) + + +def trigger_self_run_callable(**context): + """Triggers a new run of this same DAG to continue the processing loop, with an optional delay.""" + ti = context['task_instance'] + params = context['params'] + dag_run = context['dag_run'] + + # Check if this was triggered due to an empty queue to apply the specific delay. + pull_task_instance = dag_run.get_task_instance(task_id='pull_url_from_redis') + is_empty_queue_scenario = pull_task_instance and pull_task_instance.state == 'skipped' + + delay = 0 + if is_empty_queue_scenario: + # Use the specific delay for empty queues. Default to 60s. + delay = params.get('retrigger_delay_on_empty_s', 60) + logger.info(f"Queue was empty. Applying delay of {delay}s before re-triggering.") + else: + # For successful runs, re-trigger immediately by default. + logger.info("Worker finished successfully. Triggering next run of itself to continue the loop.") + delay = 0 # Immediate re-trigger on success. + + if delay > 0: + logger.info(f"Waiting for {delay}s before triggering next run.") + time.sleep(delay) + + # Generate a unique run_id for the new worker run + run_id = f"self_triggered_{datetime.utcnow().isoformat()}" + + # Pass through all original parameters to the new run. + conf_to_pass = {k: v for k, v in params.items() if v is not None} + + # The new run will pull its own URL, so we ensure 'url' is not passed. + if 'url' in conf_to_pass: + del conf_to_pass['url'] + + # Pass the successful account ID to the next run for affinity. + # It could come from the first attempt or the retry. + successful_account_ids = ti.xcom_pull(task_ids=['get_token', 'retry_get_token'], key='successful_account_id') + successful_account_id = next((acc for acc in successful_account_ids if acc), None) + + if successful_account_id: + conf_to_pass['current_account_id'] = successful_account_id + logger.info(f"Passing successful account '{successful_account_id}' to the next worker run for affinity.") + else: + # If no account was successful (e.g., empty queue scenario), don't pass one. + # The next run will pick a new one. + conf_to_pass['current_account_id'] = None + logger.info("No successful account ID found. Next worker will select a new account from the pool.") + + logger.info(f"Triggering 'ytdlp_ops_worker_per_url' with run_id '{run_id}' and conf: {conf_to_pass}") + + trigger_dag( + dag_id='ytdlp_ops_worker_per_url', # Trigger itself + run_id=run_id, + conf=conf_to_pass, + replace_microseconds=False + ) + logger.info("Successfully triggered the next worker run.") + # ============================================================================= # DAG Definition @@ -472,38 +745,132 @@ with DAG( default_args=default_args, schedule_interval=None, catchup=False, - description='Processes a single YouTube URL passed via configuration.', - tags=['ytdlp', 'thrift', 'client', 'worker'], + description='Self-sustaining worker DAG that processes URLs from a Redis queue in a continuous loop.', + doc_md=""" + ### YT-DLP Self-Sustaining Worker + + This DAG is a self-sustaining worker that processes URLs in a continuous loop. + It is started by the `ytdlp_ops_orchestrator` (the "ignition system"). + + #### How it Works: + + 1. **Ignition:** An initial run is triggered by the orchestrator. + 2. **Pull & Assign:** It pulls a URL from Redis and assigns an account for the job, reusing the last successful account if available (affinity). + 3. **Get Token:** It calls the `ytdlp-ops-server` to get tokens and `info.json`. + 4. **Failure Handling:** If `get_token` fails with a "bannable" error (like bot detection), it follows the `on_bannable_failure` policy: + - `retry_with_new_account` (default): It bans the failing account, picks a new one, and retries the `get_token` call once. If the retry also fails, it bans the second account and the proxy, then stops the loop. + - `stop_loop`: It bans the account and stops the loop immediately. + 5. **Download:** If tokens are retrieved successfully, it downloads the media. + 6. **Continue or Stop:** After success, or a non-recoverable failure, it decides whether to continue the loop by re-triggering itself or to stop. + + This creates a "processing lane" that runs independently until the queue is empty or a failure occurs. + """, + tags=['ytdlp', 'thrift', 'client', 'worker', 'loop'], params={ - 'url': Param(None, type=["string", "null"], description="The YouTube URL to process. This is set by the triggering DAG."), - # Sensor params (passed through to re-trigger the sensor, with defaults for standalone runs) - 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Sensor param: Base name for Redis queues."), - 'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="Sensor param: Airflow Redis connection ID."), - 'max_urls_per_run': Param(DEFAULT_MAX_URLS, type="integer", description="Sensor param: Maximum number of URLs to process in one batch."), + # Worker loop control params (passed from orchestrator) + 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Base name for Redis queues."), + 'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="Airflow Redis connection ID."), # Worker-specific params - 'service_ip': Param('89.253.221.173', type="string", description="Service IP."), - 'service_port': Param(9090, type="integer", description="Service port."), - 'account_id': Param('default_account', type="string", description="Account ID for the API call."), - 'clients': Param('ios', type="string", description="Comma-separated list of clients to use for token generation (e.g., 'ios,android,mweb')."), + 'service_ip': Param(DEFAULT_YT_AUTH_SERVICE_IP, type="string", description="Service IP. Default is from Airflow variable YT_AUTH_SERVICE_IP or hardcoded."), + 'service_port': Param(DEFAULT_YT_AUTH_SERVICE_PORT, type="integer", description="Port of the Envoy load balancer. Default is from Airflow variable YT_AUTH_SERVICE_PORT or hardcoded."), + 'account_pool': Param('default_account', type="string", description="Account pool prefix or comma-separated list."), + 'account_pool_size': Param(None, type=["integer", "null"], description="If using a prefix for 'account_pool', this specifies the number of accounts to generate (e.g., 10 for 'prefix_01' through 'prefix_10'). Required when using a prefix."), + 'machine_id': Param(None, type=["string", "null"], description="Identifier for the client machine, used for proxy usage tracking. If not set, worker hostname will be used."), + 'clients': Param('mweb', type="string", description="Comma-separated list of clients to use for token generation (e.g., 'ios,android,mweb')."), 'timeout': Param(DEFAULT_TIMEOUT, type="integer", description="Timeout in seconds for the Thrift connection."), 'download_format': Param('ba[ext=m4a]/bestaudio/best', type="string", description="yt-dlp format selection string."), 'output_path_template': Param("%(title)s [%(id)s].%(ext)s", type="string", description="yt-dlp output filename template."), - 'info_json_dir': Param("{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}", type="string", description="Directory to save info.json."), - 'requeue_on_failure': Param(False, type="boolean", description="If True, re-adds the URL to the inbox on failure instead of moving to the fail hash."), - 'stop_on_failure': Param(True, type="boolean", description="If True, a worker failure will stop the entire processing loop."), + 'on_bannable_failure': Param( + 'retry_with_new_account', + type="string", + enum=['stop_loop', 'retry_with_new_account'], + title="On Bannable Failure Policy", + description="Policy for when a bannable error occurs. 'stop_loop' or 'retry_with_new_account'." + ), 'retry_on_probe_failure': Param(False, type="boolean", description="If True, attempts to re-download and probe a file if the initial probe fails."), - 'requeue_on_bannable_error': Param(False, type="boolean", description="If True, re-queues the URL if a bannable error (proxy, bot detection) occurs."), - 'requeue_on_ffprobe_failure': Param(False, type="boolean", description="If True, re-queues the URL if the ffmpeg/ffprobe check fails."), + 'auto_create_new_accounts_on_exhaustion': Param(True, type="boolean", description="If True and all accounts in a prefix-based pool are exhausted, create a new one automatically."), + 'retrigger_delay_on_empty_s': Param(60, type="integer", description="Delay in seconds before re-triggering a worker if the queue is empty. Set to -1 to stop the loop."), + # --- Internal Worker Parameters (for self-triggering loop) --- + 'current_account_id': Param(None, type=["string", "null"], description="[Internal] The account ID used by the previous run in this worker lane. Used to maintain account affinity."), } ) as dag: - get_token = YtdlpOpsOperator( + pull_url_from_redis = PythonOperator( + task_id='pull_url_from_redis', + python_callable=pull_url_from_redis_callable, + ) + + assign_account = PythonOperator( + task_id='assign_account', + python_callable=assign_account_callable, + ) + + get_token = PythonOperator( task_id='get_token', - service_ip="{{ params.service_ip }}", - service_port="{{ params.service_port }}", - account_id="{{ params.account_id }}", - timeout="{{ params.timeout }}", - info_json_dir="{{ params.info_json_dir }}", + python_callable=get_token_callable, + templates_dict={'info_json_dir': "{{ dag_run.conf.get('info_json_dir', var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles')) }}"}, + ) + + handle_bannable_error_branch = BranchPythonOperator( + task_id='handle_bannable_error_branch', + python_callable=handle_bannable_error_branch_callable, + trigger_rule='all_done', # Run even if get_token succeeds + ) + + # --- Retry Path --- + ban_account_and_prepare_for_retry = PythonOperator( + task_id='ban_account_and_prepare_for_retry', + python_callable=_ban_resource_task, + op_kwargs={ + 'resource_type': 'account', + 'resource_id_xcom_key': 'account_id', + 'host': "{{ params.service_ip }}", + 'port': "{{ params.service_port }}", + 'timeout': "{{ params.timeout }}", + 'reason': "Bannable error detected, preparing for retry." + }, + ) + + assign_new_account_for_retry = PythonOperator( + task_id='assign_new_account_for_retry', + python_callable=assign_new_account_for_retry_callable, + ) + + retry_get_token = PythonOperator( + task_id='retry_get_token', + python_callable=get_token_callable, + templates_dict={'info_json_dir': "{{ dag_run.conf.get('info_json_dir', var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles')) }}"}, + ) + + handle_retry_failure_branch = BranchPythonOperator( + task_id='handle_retry_failure_branch', + python_callable=handle_retry_failure_branch_callable, + trigger_rule='none_skipped', + ) + + ban_second_account_and_proxy = PythonOperator( + task_id='ban_second_account_and_proxy', + python_callable=ban_second_account_and_proxy_callable, + ) + + # --- Stop Path --- + ban_account_and_fail = PythonOperator( + task_id='ban_account_and_fail', + python_callable=_ban_resource_task, + op_kwargs={ + 'resource_type': 'account', + 'resource_id_xcom_key': 'account_id', + 'host': "{{ params.service_ip }}", + 'port': "{{ params.service_port }}", + 'timeout': "{{ params.timeout }}", + 'reason': "Bannable error detected, policy is stop_loop." + }, + ) + + # --- Main Execution Path --- + setup_download_and_probe = DummyOperator( + task_id='setup_download_and_probe', + trigger_rule='one_success', ) download_and_probe = BashOperator( @@ -511,8 +878,14 @@ with DAG( bash_command=""" set -e - INFO_JSON_PATH="{{ ti.xcom_pull(task_ids='get_token', key='info_json_path') }}" - PROXY="{{ ti.xcom_pull(task_ids='get_token', key='socks_proxy') }}" + INFO_JSON_PATH_1="{{ ti.xcom_pull(task_ids='get_token', key='info_json_path') }}" + INFO_JSON_PATH_2="{{ ti.xcom_pull(task_ids='retry_get_token', key='info_json_path') }}" + INFO_JSON_PATH="${INFO_JSON_PATH_1:-$INFO_JSON_PATH_2}" + + PROXY_1="{{ ti.xcom_pull(task_ids='get_token', key='socks_proxy') }}" + PROXY_2="{{ ti.xcom_pull(task_ids='retry_get_token', key='socks_proxy') }}" + PROXY="${PROXY_1:-$PROXY_2}" + FORMAT="{{ params.download_format }}" DOWNLOAD_DIR="{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles/video') }}" FILENAME_TEMPLATE="{{ params.output_path_template }}" @@ -525,13 +898,13 @@ with DAG( echo "Download Directory: $DOWNLOAD_DIR" echo "Full Output Path: $FULL_OUTPUT_PATH" - if [ -z "$INFO_JSON_PATH" ] || [ ! -f "$INFO_JSON_PATH" ]; then + if [ -z "$INFO_JSON_PATH" ] || [ "$INFO_JSON_PATH" == "None" ] || [ ! -f "$INFO_JSON_PATH" ]; then echo "Error: info.json path is missing or file does not exist ($INFO_JSON_PATH)." exit 1 fi CMD_ARRAY=(yt-dlp --load-info-json "$INFO_JSON_PATH") - if [ -n "$PROXY" ]; then + if [ -n "$PROXY" ] && [ "$PROXY" != "None" ]; then CMD_ARRAY+=(--proxy "$PROXY") fi CMD_ARRAY+=(-f "$FORMAT" -o "$FULL_OUTPUT_PATH" --print filename) @@ -596,60 +969,70 @@ with DAG( # Push the final filename for the success_task echo "$FINAL_FILENAME" """, - retries=0, # Retries are now handled inside the script based on a DAG param + retries=0, retry_delay=timedelta(minutes=1), ) - mark_proxy_banned = PythonOperator( - task_id='mark_proxy_banned', - python_callable=mark_proxy_banned_callable, - trigger_rule='one_failed', # Run only if get_token fails - ) - - # This task triggers the sensor DAG to check for more work as soon as this worker is done. - trigger_sensor_for_next_batch = TriggerDagRunOperator( - task_id='trigger_sensor_for_next_batch', - trigger_dag_id='ytdlp_ops_sensor_queue', - # Pass only the sensor's needed parameters back to it. - # These values were originally passed from the sensor to this worker. - # The values are templated and will be passed as strings to the triggered DAG. - conf={ - "queue_name": "{{ params.queue_name }}", - "redis_conn_id": "{{ params.redis_conn_id }}", - "max_urls_per_run": "{{ params.max_urls_per_run }}", - }, - # This task will only run on the success path, so it inherits the default - # trigger_rule='all_success'. - wait_for_completion=False, - ) - trigger_sensor_for_next_batch.doc_md = """ - ### Trigger Sensor for Next Batch - Triggers a new run of the `ytdlp_sensor_redis_queue` DAG to create a continuous processing loop. - This task **only runs on the success path** after a URL has been fully processed. - This ensures that the system immediately checks for more URLs to process, but stops the loop on failure. - """ - - # Define success and failure handling tasks - success_task = PythonOperator( + # --- Finalization Tasks --- + mark_url_as_success = PythonOperator( task_id='mark_url_as_success', python_callable=mark_url_as_success, - trigger_rule='all_success', # Run only if upstream tasks succeeded ) - failure_task = PythonOperator( - task_id='mark_url_as_failed', - python_callable=mark_url_as_failed, - trigger_rule='one_failed', # Run if any upstream task failed + handle_generic_failure = PythonOperator( + task_id='handle_generic_failure', + python_callable=handle_failure_callable, + trigger_rule='one_failed', # Trigger if any upstream in the failure path fails ) + decide_next_step = BranchPythonOperator( + task_id='decide_what_to_do_next', + python_callable=decide_what_to_do_next_callable, + trigger_rule='all_done', + ) + + trigger_self_run = PythonOperator( + task_id='trigger_self_run', + python_callable=trigger_self_run_callable, + ) + + stop_loop = DummyOperator(task_id='stop_loop') + fail_loop = BashOperator(task_id='fail_loop', bash_command='exit 1') + # --- Define Task Dependencies --- + pull_url_from_redis >> assign_account >> get_token >> handle_bannable_error_branch - # The main success flow - get_token >> download_and_probe >> success_task >> trigger_sensor_for_next_batch + # The branch operator decides the path after the first token attempt. + # It can go to the success path (setup_download_and_probe), the retry path (ban_account_and_prepare_for_retry), + # the stop-on-failure path (ban_account_and_fail), or the generic failure path. + handle_bannable_error_branch >> [ + setup_download_and_probe, + ban_account_and_prepare_for_retry, + ban_account_and_fail, + handle_generic_failure, + ] - # The failure path for get_token, which includes the explicit ban task - get_token >> mark_proxy_banned + # The retry path itself + ban_account_and_prepare_for_retry >> assign_new_account_for_retry >> retry_get_token >> handle_retry_failure_branch - # The main failure handler, which listens to the primary tasks. - # If get_token or download_and_probe fails, it will trigger failure_task. - [get_token, download_and_probe] >> failure_task + # The branch operator after the retry attempt. + # It can go to the success path (setup_download_and_probe) or the final failure path (ban_second_account_and_proxy). + handle_retry_failure_branch >> [ + setup_download_and_probe, + ban_second_account_and_proxy, + ] + + # The main success path, which can be reached from either the first attempt or the retry. + setup_download_and_probe >> download_and_probe >> mark_url_as_success + + # Define all paths that lead to the generic failure handler. + # This task runs if any of its direct upstreams fail. + download_and_probe >> handle_generic_failure + ban_account_and_fail >> handle_generic_failure + ban_second_account_and_proxy >> handle_generic_failure + # A non-bannable failure in get_token will fail the task, which is handled by the branch operator + # which then correctly routes to handle_generic_failure. A direct link is not needed. + + # Final decision point. It runs after success or after failure has been handled. + [mark_url_as_success, handle_generic_failure] >> decide_next_step + decide_next_step >> [trigger_self_run, stop_loop, fail_loop] diff --git a/docker-compose-ytdlp-ops.yaml b/docker-compose-ytdlp-ops.yaml index 68cba6e..9ccda2d 100644 --- a/docker-compose-ytdlp-ops.yaml +++ b/docker-compose-ytdlp-ops.yaml @@ -1,4 +1,42 @@ services: + config-generator: + image: python:3.9-slim + container_name: ytdlp-ops-config-generator + working_dir: /app + volumes: + # Mount the current directory to access the template, .env, and script + - .:/app + env_file: + - ./.env + environment: + ENVOY_CLUSTER_TYPE: STRICT_DNS + # Pass worker count and base port to ensure Envoy config matches the workers + YTDLP_WORKERS: ${YTDLP_WORKERS:-3} + YTDLP_BASE_PORT: ${YTDLP_BASE_PORT:-9090} + # This command cleans up old runs, installs jinja2, and generates the config. + command: > + sh -c "rm -rf ./envoy.yaml && + pip install --no-cache-dir -q jinja2 && + python3 ./generate_envoy_config.py" + + envoy: + image: envoyproxy/envoy:v1.29-latest + container_name: envoy-thrift-lb + restart: unless-stopped + volumes: + # Mount the generated config file from the host + - ./envoy.yaml:/etc/envoy/envoy.yaml:ro + ports: + # This is the single public port for all Thrift traffic + - "${ENVOY_PORT:-9080}:${ENVOY_PORT:-9080}" + networks: + - airflow_prod_proxynet + depends_on: + config-generator: + condition: service_completed_successfully + ytdlp-ops: + condition: service_started + camoufox: build: context: ./camoufox # Path relative to the docker-compose file @@ -15,9 +53,8 @@ services: "--ws-host", "0.0.0.0", "--port", "12345", "--ws-path", "mypath", - "--proxy-url", "socks5://sslocal-rust-1084:1084", + "--proxy-url", "socks5://${SOCKS5_SOCK_SERVER_IP:-89.253.221.173}:1084", "--locale", "en-US", - "--geoip", "--extensions", "/app/extensions/google_sign_in_popup_blocker-1.0.2.xpi,/app/extensions/spoof_timezone-0.3.4.xpi,/app/extensions/youtube_ad_auto_skipper-0.6.0.xpi" ] restart: unless-stopped @@ -25,25 +62,36 @@ services: ytdlp-ops: image: pangramia/ytdlp-ops-server:latest # Don't comment out or remove, build is performed externally + container_name: ytdlp-ops-workers # Renamed for clarity depends_on: - camoufox # Ensure camoufox starts first - ports: - - "9090:9090" # Main RPC port - - "9091:9091" # Health check port + # Ports are no longer exposed directly. Envoy will connect to them on the internal network. + env_file: + - ./.env # Path is relative to the compose file volumes: - context-data:/app/context-data + # Mount the plugin source code for live updates without rebuilding the image. + # Assumes the plugin source is in a 'bgutil-ytdlp-pot-provider' directory + # next to your docker-compose.yaml file. + #- ./bgutil-ytdlp-pot-provider:/app/bgutil-ytdlp-pot-provider networks: - airflow_prod_proxynet command: + - "--script-dir" + - "/app" - "--context-dir" - "/app/context-data" + # Use environment variables for port and worker count - "--port" - - "9090" + - "${YTDLP_BASE_PORT:-9090}" + - "--workers" + - "${YTDLP_WORKERS:-3}" - "--clients" - # Add 'web' client since we now have camoufox, test firstly - "web,ios,android,mweb" - "--proxies" - - "socks5://sslocal-rust-1081:1081,socks5://sslocal-rust-1082:1082,socks5://sslocal-rust-1083:1083,socks5://sslocal-rust-1084:1084,socks5://sslocal-rust-1085:1085" + #- "socks5://sslocal-rust-1081:1081,socks5://sslocal-rust-1082:1082,socks5://sslocal-rust-1083:1083,socks5://sslocal-rust-1084:1084,socks5://sslocal-rust-1085:1085" + - "socks5://${SOCKS5_SOCK_SERVER_IP:-89.253.221.173}:1084" + # # Add the endpoint argument pointing to the camoufox service - "--endpoint" - "ws://camoufox:12345/mypath" @@ -61,6 +109,13 @@ services: - "${REDIS_PORT:-6379}" - "--redis-password" - "${REDIS_PASSWORD}" + # Add account cooldown parameters (values are in minutes) + - "--account-active-duration-min" + - "${ACCOUNT_ACTIVE_DURATION_MIN:-30}" + - "--account-cooldown-duration-min" + - "${ACCOUNT_COOLDOWN_DURATION_MIN:-60}" + # Add flag to clean context directory on start + - "--clean-context-dir" restart: unless-stopped pull_policy: always @@ -69,5 +124,4 @@ volumes: name: context-data networks: - airflow_prod_proxynet: - external: true + airflow_prod_proxynet: {} diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0d0b044 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +thrift>=0.16.0,<=0.20.0 +backoff>=2.2.1 +python-dotenv==1.0.1 +psutil>=5.9.0 +docker>=6.0.0 +apache-airflow-providers-docker +redis +ffprobe3 +ffmpeg-python \ No newline at end of file