From 4455a107260d1f552d30243e42eaf3f000c29fba Mon Sep 17 00:00:00 2001 From: aperez Date: Thu, 7 Aug 2025 18:01:23 +0300 Subject: [PATCH] Updated dags with queue and hosts --- dags/README.ru.md | 10 + dags/ytdlp_mgmt_proxy_account.py | 251 +++++++---- dags/ytdlp_mgmt_queues.py | 247 ++++++++--- dags/ytdlp_ops_orchestrator.py | 29 +- dags/ytdlp_ops_worker_per_url.py | 731 ++++++++++++++++++------------- 5 files changed, 798 insertions(+), 470 deletions(-) diff --git a/dags/README.ru.md b/dags/README.ru.md index 9726b79..92df83c 100644 --- a/dags/README.ru.md +++ b/dags/README.ru.md @@ -35,6 +35,16 @@ - **Управление прокси:** Позволяет вручную банить, разбанивать или сбрасывать статус прокси. - **Управление аккаунтами:** Позволяет вручную банить или разбанивать аккаунты. +### `ytdlp_mgmt_queues` + +- **Назначение:** Предоставляет набор инструментов для управления очередями Redis, используемыми в конвейере обработки. +- **Функциональность (через параметр `action`):** + - `add_videos`: Добавление одного или нескольких URL-адресов YouTube в очередь. + - `clear_queue`: Очистка (удаление) указанного ключа Redis. + - `list_contents`: Просмотр содержимого ключа Redis (списка или хэша). + - `check_status`: Проверка общего состояния очередей (тип, размер). + - `requeue_failed`: Перемещение всех URL-адресов из очереди сбоев `_fail` обратно в очередь `_inbox` для повторной обработки. + ## Стратегия управления ресурсами (Прокси и Аккаунты) Система использует интеллектуальную стратегию для управления жизненным циклом и состоянием аккаунтов и прокси, чтобы максимизировать процент успеха и минимизировать блокировки. diff --git a/dags/ytdlp_mgmt_proxy_account.py b/dags/ytdlp_mgmt_proxy_account.py index fff3623..0944ed7 100644 --- a/dags/ytdlp_mgmt_proxy_account.py +++ b/dags/ytdlp_mgmt_proxy_account.py @@ -89,7 +89,6 @@ def _list_proxy_statuses(client, server_identity): statuses = client.getProxyStatus(server_identity) if not statuses: logger.info("No proxy statuses found.") - print("No proxy statuses found.") return from tabulate import tabulate @@ -127,15 +126,23 @@ def _list_proxy_statuses(client, server_identity): print("NOTE: To see Recent Accounts/Machines, the server's `getProxyStatus` method must be updated to return these fields.") -def _list_account_statuses(client, account_id): - """Lists the status of accounts.""" +def _list_account_statuses(client, account_id, redis_conn_id): + """Lists the status of accounts, enriching with live data from Redis.""" logger.info(f"Listing account statuses for account: {account_id or 'ALL'}") + + redis_client = None + try: + redis_client = _get_redis_client(redis_conn_id) + logger.info("Successfully connected to Redis to fetch detailed account status.") + except Exception as e: + logger.warning(f"Could not connect to Redis to get detailed status. Will show basic status. Error: {e}") + redis_client = None + try: # The thrift method takes accountId (specific) or accountPrefix. # If account_id is provided, we use it. If not, we get all by leaving both params as None. statuses = client.getAccountStatus(accountId=account_id, accountPrefix=None) if not statuses: - logger.info("No account statuses found.") print("\n--- Account Statuses ---\nNo account statuses found.\n------------------------\n") return @@ -143,6 +150,29 @@ def _list_account_statuses(client, account_id): status_list = [] for s in statuses: + status_str = s.status + # If an account is resting, get the live countdown from Redis for accuracy. + if redis_client and 'RESTING' in status_str: + try: + status_key = f"account_status:{s.accountId}" + # The server stores resting expiry time in 'resting_until'. + expiry_ts_bytes = redis_client.hget(status_key, "resting_until") + if expiry_ts_bytes: + expiry_ts = float(expiry_ts_bytes) + now = datetime.now().timestamp() + if now >= expiry_ts: + status_str = "ACTIVE (was RESTING)" + else: + remaining_seconds = int(expiry_ts - now) + if remaining_seconds > 3600: + status_str = f"RESTING (active in {remaining_seconds // 3600}h {remaining_seconds % 3600 // 60}m)" + elif remaining_seconds > 60: + status_str = f"RESTING (active in {remaining_seconds // 60}m {remaining_seconds % 60}s)" + else: + status_str = f"RESTING (active in {remaining_seconds}s)" + except Exception as e: + logger.warning(f"Could not parse resting time for {s.accountId} from Redis: {e}. Using server status.") + # Determine the last activity timestamp for sorting last_success = float(s.lastSuccessTimestamp) if s.lastSuccessTimestamp else 0 last_failure = float(s.lastFailureTimestamp) if s.lastFailureTimestamp else 0 @@ -150,7 +180,7 @@ def _list_account_statuses(client, account_id): status_item = { "Account ID": s.accountId, - "Status": s.status, + "Status": status_str, "Success": s.successCount, "Failures": s.failureCount, "Last Success": format_timestamp(s.lastSuccessTimestamp), @@ -191,47 +221,77 @@ def manage_system_callable(**context): proxy_url = params.get("proxy_url") account_id = params.get("account_id") - if action in ["ban", "unban", "reset_all"] and entity == "proxy" and not server_identity: - raise ValueError(f"A 'server_identity' is required for proxy action '{action}'.") - if action in ["ban", "unban"] and entity == "account" and not account_id: - raise ValueError(f"An 'account_id' is required for account action '{action}'.") + # --- Validate Action/Entity Combination and Parameters --- + valid_actions = { + "proxy": ["list_statuses", "ban", "unban", "unban_all", "delete_from_redis"], + "account": ["list_statuses", "ban", "unban", "unban_all", "delete_from_redis"], + "all": ["list_statuses"] + } - # Handle direct Redis action separately to avoid creating an unnecessary Thrift connection. - if entity == "account" and action == "remove_all": - confirm = params.get("confirm_remove_all_accounts", False) - if not confirm: - message = "FATAL: 'remove_all' action requires 'confirm_remove_all_accounts' to be set to True. No accounts were removed." - logger.error(message) - print(f"\nERROR: {message}\n") - raise ValueError(message) + if action not in valid_actions.get(entity, []): + raise ValueError( + f"The action '{action}' is not valid for entity '{entity}'.\n" + f"Valid actions for '{entity}' are: {', '.join(valid_actions.get(entity, ['None']))}." + ) + + # Validate required parameters for the chosen action + if entity == "proxy": + if action in ["ban", "unban", "unban_all"] and not server_identity: + raise ValueError(f"A 'server_identity' is required for proxy action '{action}'.") + if action in ["ban", "unban"] and not proxy_url: + raise ValueError(f"A 'proxy_url' is required for proxy action '{action}'.") + + if entity == "account": + if action in ["ban", "unban"] and not account_id: + raise ValueError(f"An 'account_id' is required for account action '{action}'.") + # Handle direct Redis actions separately to avoid creating an unnecessary Thrift connection. + if action == "delete_from_redis": redis_conn_id = params["redis_conn_id"] - account_prefix = params.get("account_id") # Repurpose account_id param as an optional prefix - redis_client = _get_redis_client(redis_conn_id) - pattern = f"account_status:{account_prefix}*" if account_prefix else "account_status:*" - logger.warning(f"Searching for account status keys in Redis with pattern: '{pattern}'") + if entity == "account": + account_prefix = params.get("account_id") # Repurpose account_id param as an optional prefix + pattern = f"account_status:{account_prefix}*" if account_prefix else "account_status:*" + logger.warning(f"Searching for account status keys in Redis with pattern: '{pattern}'") - # scan_iter returns bytes, so we don't need to decode for deletion - keys_to_delete = [key for key in redis_client.scan_iter(pattern)] + keys_to_delete = [key for key in redis_client.scan_iter(pattern)] - if not keys_to_delete: - logger.info(f"No account keys found matching pattern '{pattern}'. Nothing to do.") - print(f"\nNo accounts found matching pattern '{pattern}'.\n") - return + if not keys_to_delete: + print(f"\nNo accounts found matching pattern '{pattern}'.\n") + return - logger.warning(f"Found {len(keys_to_delete)} account keys to delete. This is a destructive operation!") - print(f"\nWARNING: Found {len(keys_to_delete)} accounts to remove from Redis.") - # Decode for printing - for key in keys_to_delete[:10]: - print(f" - {key.decode('utf-8')}") - if len(keys_to_delete) > 10: - print(f" ... and {len(keys_to_delete) - 10} more.") + print(f"\nWARNING: Found {len(keys_to_delete)} accounts to remove from Redis.") + for key in keys_to_delete[:10]: + print(f" - {key.decode('utf-8')}") + if len(keys_to_delete) > 10: + print(f" ... and {len(keys_to_delete) - 10} more.") + + deleted_count = redis_client.delete(*keys_to_delete) + print(f"\nSuccessfully removed {deleted_count} accounts from Redis.\n") + + elif entity == "proxy": + proxy_url = params.get("proxy_url") + server_identity = params.get("server_identity") + if not proxy_url: + raise ValueError("A 'proxy_url' is required for proxy action 'delete_from_redis'.") + if not server_identity: + raise ValueError("A 'server_identity' is required for proxy action 'delete_from_redis'.") + + proxy_state_key = f"proxies:{server_identity}" + proxy_failure_key = f"proxy_failures:{proxy_url}" + + logger.warning(f"Deleting proxy '{proxy_url}' state from hash '{proxy_state_key}' and failure key '{proxy_failure_key}' from Redis.") + + with redis_client.pipeline() as pipe: + pipe.hdel(proxy_state_key, proxy_url) + pipe.delete(proxy_failure_key) + results = pipe.execute() + + hdel_result = results[0] + del_result = results[1] + print(f"\nSuccessfully removed proxy '{proxy_url}' from state hash (result: {hdel_result}) and deleted failure key (result: {del_result}).\n") - deleted_count = redis_client.delete(*keys_to_delete) - logger.info(f"Successfully deleted {deleted_count} account keys from Redis.") - print(f"\nSuccessfully removed {deleted_count} accounts from Redis.\n") return # End execution for this action client, transport = None, None @@ -239,7 +299,7 @@ def manage_system_callable(**context): client, transport = get_thrift_client(host, port) if entity == "proxy": - if action == "list": + if action == "list_statuses": _list_proxy_statuses(client, server_identity) elif action == "ban": if not proxy_url: raise ValueError("A 'proxy_url' is required.") @@ -251,16 +311,14 @@ def manage_system_callable(**context): logger.info(f"Unbanning proxy '{proxy_url}' for server '{server_identity}'...") client.unbanProxy(proxy_url, server_identity) print(f"Successfully sent request to unban proxy '{proxy_url}'.") - elif action == "reset_all": - logger.info(f"Resetting all proxy statuses for server '{server_identity}'...") + elif action == "unban_all": + logger.info(f"Unbanning all proxy statuses for server '{server_identity}'...") client.resetAllProxyStatuses(server_identity) - print(f"Successfully sent request to reset all proxy statuses for '{server_identity}'.") - else: - raise ValueError(f"Invalid action '{action}' for entity 'proxy'.") + print(f"Successfully sent request to unban all proxy statuses for '{server_identity}'.") elif entity == "account": - if action == "list": - _list_account_statuses(client, account_id) + if action == "list_statuses": + _list_account_statuses(client, account_id, params["redis_conn_id"]) elif action == "ban": if not account_id: raise ValueError("An 'account_id' is required.") reason = f"Manual ban from Airflow mgmt DAG by {socket.gethostname()}" @@ -273,48 +331,44 @@ def manage_system_callable(**context): logger.info(f"Unbanning account '{account_id}'...") client.unbanAccount(accountId=account_id, reason=reason) print(f"Successfully sent request to unban account '{account_id}'.") - elif action == "reset_all": + elif action == "unban_all": account_prefix = account_id # Repurpose account_id param as an optional prefix - logger.info(f"Resetting all account statuses to ACTIVE (prefix: '{account_prefix or 'ALL'}')...") + logger.info(f"Unbanning all account statuses to ACTIVE (prefix: '{account_prefix or 'ALL'}')...") all_statuses = client.getAccountStatus(accountId=None, accountPrefix=account_prefix) if not all_statuses: - print(f"No accounts found with prefix '{account_prefix or 'ALL'}' to reset.") + print(f"No accounts found with prefix '{account_prefix or 'ALL'}' to unban.") return - accounts_to_reset = [s.accountId for s in all_statuses] - logger.info(f"Found {len(accounts_to_reset)} accounts to reset.") - print(f"Found {len(accounts_to_reset)} accounts. Sending unban request for each...") + accounts_to_unban = [s.accountId for s in all_statuses] + logger.info(f"Found {len(accounts_to_unban)} accounts to unban.") + print(f"Found {len(accounts_to_unban)} accounts. Sending unban request for each...") - reset_count = 0 + unban_count = 0 fail_count = 0 - for acc_id in accounts_to_reset: + for acc_id in accounts_to_unban: try: - reason = f"Manual reset from Airflow mgmt DAG by {socket.gethostname()}" + reason = f"Manual unban_all from Airflow mgmt DAG by {socket.gethostname()}" client.unbanAccount(accountId=acc_id, reason=reason) - logger.info(f" - Sent reset (unban) for '{acc_id}'.") - reset_count += 1 + logger.info(f" - Sent unban for '{acc_id}'.") + unban_count += 1 except Exception as e: - logger.error(f" - Failed to reset account '{acc_id}': {e}") + logger.error(f" - Failed to unban account '{acc_id}': {e}") fail_count += 1 - print(f"\nSuccessfully sent reset requests for {reset_count} accounts.") + print(f"\nSuccessfully sent unban requests for {unban_count} accounts.") if fail_count > 0: - print(f"Failed to send reset requests for {fail_count} accounts. See logs for details.") + print(f"Failed to send unban requests for {fail_count} accounts. See logs for details.") # Optionally, list statuses again to confirm - print("\n--- Listing statuses after reset ---") - _list_account_statuses(client, account_prefix) - else: - raise ValueError(f"Invalid action '{action}' for entity 'account'.") + print("\n--- Listing statuses after unban_all ---") + _list_account_statuses(client, account_prefix, params["redis_conn_id"]) elif entity == "all": - if action == "list": + if action == "list_statuses": print("\nListing all entities...") _list_proxy_statuses(client, server_identity) - _list_account_statuses(client, account_id) - else: - raise ValueError(f"Action '{action}' is not supported for entity 'all'. Only 'list' is supported.") + _list_account_statuses(client, account_id, params["redis_conn_id"]) except (PBServiceException, PBUserException) as e: logger.error(f"Thrift error performing action '{action}': {e.message}", exc_info=True) @@ -335,40 +389,53 @@ with DAG( start_date=days_ago(1), schedule=None, catchup=False, - tags=["ytdlp", "utility", "proxy", "account", "management"], + tags=["ytdlp", "mgmt", "master"], doc_md=""" ### YT-DLP Proxy and Account Manager DAG This DAG provides tools to manage the state of **proxies and accounts** used by the `ytdlp-ops-server`. + Select an `entity` and an `action` to perform. Note that not all actions are available for all entities. - **Parameters:** - - `host`, `port`: Connection details for the `ytdlp-ops-server` Thrift service. - - `entity`: The type of resource to manage (`proxy`, `account`, or `all`). - - `action`: The operation to perform. - - `list`: View statuses. For `entity: all`, lists both proxies and accounts. - - `ban`: Ban a specific proxy or account. - - `unban`: Un-ban a specific proxy or account. - - `reset_all`: Reset all proxies for a server (or all accounts) to `ACTIVE`. - - `remove_all`: **Deletes all account status keys** from Redis for a given prefix. This is a destructive action. - - `server_identity`: Required for most proxy actions. - - `proxy_url`: Required for banning/unbanning a specific proxy. - - `account_id`: Required for managing a specific account. For `action: reset_all` or `remove_all` on `entity: account`, this can be used as an optional prefix to filter which accounts to act on. - - `confirm_remove_all_accounts`: **Required for `remove_all` action.** Must be set to `True` to confirm deletion. + --- + + #### Actions for `entity: proxy` + - `list_statuses`: View status of all proxies, optionally filtered by `server_identity`. + - `ban`: Ban a specific proxy for a given `server_identity`. Requires `proxy_url`. + - `unban`: Un-ban a specific proxy. Requires `proxy_url`. + - `unban_all`: Resets the status of all proxies for a given `server_identity` to `ACTIVE`. + - `delete_from_redis`: **(Destructive)** Deletes a proxy's state from Redis for a specific `server_identity`. This removes its state (ACTIVE/BANNED) and its failure history. The server will re-create it with a default `ACTIVE` state on its next refresh if the proxy is still in the server's configuration. Use this to reset a single proxy's state completely. Requires `proxy_url` and `server_identity`. + + #### Actions for `entity: account` + - `list_statuses`: View status of all accounts, optionally filtered by `account_id` (as a prefix). + - `ban`: Ban a specific account. Requires `account_id`. + - `unban`: Un-ban a specific account. Requires `account_id`. + - `unban_all`: Sets the status of all accounts (or those matching a prefix in `account_id`) to `ACTIVE`. + - `delete_from_redis`: **(Destructive)** Deletes account status keys from Redis. This permanently removes the account from being tracked by the system. This is different from `unban`. Use with caution. + + #### Actions for `entity: all` + - `list_statuses`: A convenience to view statuses for both proxies and accounts in one run. + + --- + + **When to use `delete_from_redis`?** + + - **For Accounts:** Account state is managed entirely within Redis. Deleting an account's key is a permanent removal from the system's tracking. This is different from `unban`, which just resets the status. Use this when you want to completely remove an account. + - **For Proxies:** Proxies are defined in the server's startup configuration. Redis only stores their *state* (e.g., `BANNED` or `ACTIVE`) and failure history. Deleting a proxy's state from Redis will cause the server to re-create it with a default `ACTIVE` state on its next refresh cycle. This action is useful for completely resetting a single proxy that may be stuck or has a long failure history, without having to reset all proxies for that server. """, params={ "host": Param(DEFAULT_YT_AUTH_SERVICE_IP, type="string", description="The hostname of the ytdlp-ops-server service. Default is from Airflow variable YT_AUTH_SERVICE_IP or hardcoded."), "port": Param(DEFAULT_YT_AUTH_SERVICE_PORT, type="integer", description="The port of the ytdlp-ops-server service (Envoy load balancer). Default is from Airflow variable YT_AUTH_SERVICE_PORT or hardcoded."), "entity": Param( - "all", + "account", type="string", - enum=["proxy", "account", "all"], - description="The type of entity to manage. Use 'all' with action 'list' to see both.", + enum=["account", "proxy", "all"], + description="The type of entity to manage.", ), "action": Param( - "list", + "list_statuses", type="string", - enum=["list", "ban", "unban", "reset_all", "remove_all"], - description="The management action to perform. `reset_all` for proxies/accounts. `remove_all` for accounts only.", + enum=["list_statuses", "ban", "unban", "unban_all", "delete_from_redis"], + description="The management action to perform. See the DAG documentation for which actions are valid for each entity.", ), "server_identity": Param( "ytdlp-ops-airflow-service", @@ -383,19 +450,13 @@ with DAG( "account_id": Param( None, type=["null", "string"], - description="The account ID to act upon. For `reset_all` or `remove_all` on accounts, this can be an optional prefix.", - ), - "confirm_remove_all_accounts": Param( - False, - type="boolean", - title="[remove_all] Confirm Deletion", - description="Must be set to True to execute the 'remove_all' action for accounts. This is a destructive operation.", + description="The account ID to act upon. For `unban_all` or `delete_from_redis` on accounts, this can be an optional prefix.", ), "redis_conn_id": Param( DEFAULT_REDIS_CONN_ID, type="string", title="Redis Connection ID", - description="The Airflow connection ID for the Redis server (used for 'remove_all').", + description="The Airflow connection ID for the Redis server (used for 'delete_from_redis' and for fetching detailed account status).", ), }, ) as dag: diff --git a/dags/ytdlp_mgmt_queues.py b/dags/ytdlp_mgmt_queues.py index 9e18279..2c116c4 100644 --- a/dags/ytdlp_mgmt_queues.py +++ b/dags/ytdlp_mgmt_queues.py @@ -20,6 +20,8 @@ from airflow.operators.python import PythonOperator, BranchPythonOperator from airflow.operators.empty import EmptyOperator from airflow.providers.redis.hooks.redis import RedisHook from airflow.utils.dates import days_ago +from airflow.models.variable import Variable +import requests # Configure logging logger = logging.getLogger(__name__) @@ -28,6 +30,7 @@ logger = logging.getLogger(__name__) DEFAULT_REDIS_CONN_ID = "redis_default" DEFAULT_QUEUE_NAME = "video_queue" DEFAULT_QUEUE_TO_CLEAR = 'PLEASE_SPECIFY_QUEUE_TO_CLEAR' +DEFAULT_URL_LISTS_DIR = '/opt/airflow/inputfiles' # --- Helper Functions --- @@ -42,6 +45,92 @@ def _get_redis_client(redis_conn_id: str): raise AirflowException(f"Redis connection failed: {e}") +def _get_predefined_url_lists(): + """Returns a static list of predefined URL list files.""" + # This is a static list to ensure options are always visible in the UI, + # even if the files don't exist on the filesystem at parse time. + # The DAG will check for the file's existence at runtime. + predefined_files = [ + 'urls.dh128.json', + 'urls.rt100.json', + 'urls.rt25.json', + 'urls.sky28.json', + 'urls.sky3.json', + 'urls.tq46.json', + ] + return ['None'] + sorted(predefined_files) + + +def _get_urls_from_source(**params) -> List[str]: + """ + Determines the source of video inputs based on the 'input_source' param and returns a list of raw items. + """ + input_source = params.get("input_source", "manual") + predefined_list = params.get("predefined_url_list") + file_path_or_url = params.get("url_list_file_path") + manual_inputs = params.get("video_inputs") + + # Source 1: Predefined file + if input_source == 'predefined_file': + if not predefined_list or predefined_list == 'None': + raise AirflowException("Input source is 'predefined_file', but no file was selected from the list.") + + default_path = DEFAULT_URL_LISTS_DIR + url_lists_dir = Variable.get('YTDLP_URL_LISTS_DIR', default_var=default_path) + file_path = os.path.join(url_lists_dir, predefined_list) + logger.info(f"Loading URLs from predefined file: {file_path}") + if not os.path.exists(file_path): + raise AirflowException(f"Selected predefined file does not exist: {file_path}") + with open(file_path, 'r', encoding='utf-8') as f: + try: + data = json.load(f) + if not isinstance(data, list): + raise AirflowException(f"JSON file '{predefined_list}' must contain a list of strings.") + return [str(item) for item in data] + except json.JSONDecodeError: + raise AirflowException(f"Failed to parse JSON from file: {predefined_list}") + + # Source 2: File path or URL + elif input_source == 'file_path_or_url': + if not file_path_or_url: + raise AirflowException("Input source is 'file_path_or_url', but no path/URL was provided.") + + logger.info(f"Loading URLs from provided path/URL: {file_path_or_url}") + content = "" + if file_path_or_url.startswith(('http://', 'https://')): + try: + response = requests.get(file_path_or_url, timeout=30) + response.raise_for_status() + content = response.text + except requests.RequestException as e: + raise AirflowException(f"Failed to fetch URL list from '{file_path_or_url}': {e}") + else: # Assume local file path + if not os.path.exists(file_path_or_url): + raise AirflowException(f"Provided file path does not exist: {file_path_or_url}") + with open(file_path_or_url, 'r', encoding='utf-8') as f: + content = f.read() + + try: + data = json.loads(content) + if not isinstance(data, list): + raise AirflowException("JSON content from path/URL must contain a list of strings.") + return [str(item) for item in data] + except json.JSONDecodeError: + raise AirflowException(f"Failed to parse JSON from path/URL: {file_path_or_url}") + + # Source 3: Manual input + elif input_source == 'manual': + if not manual_inputs: + logger.info("Input source is 'manual', but no inputs were provided. Nothing to do.") + return [] + logger.info("Loading URLs from manual input.") + return parse_video_inputs(manual_inputs) + + else: + logger.warning(f"No valid input source selected or no data provided for the selected source. Nothing to do.") + return [] + + def parse_video_inputs(input_str: str) -> List[str]: """Parses a flexible string of video inputs into a list of individual items.""" if not input_str or not isinstance(input_str, str): @@ -159,35 +248,54 @@ def dump_redis_data_to_csv(redis_client, dump_dir, patterns): def clear_queue_callable(**context): - """Dumps Redis data to CSV and/or clears a specified Redis key.""" + """Dumps Redis data to CSV and/or clears specified Redis keys based on selection.""" params = context['params'] redis_conn_id = params['redis_conn_id'] - queue_to_clear = params['queue_to_clear'] + queue_base_name = params['queue_base_name'] + queues_to_clear_options = params.get('queues_to_clear_options', []) + confirm_clear = params.get('confirm_clear', False) dump_queues = params['dump_queues'] - # The value from templates_dict is already rendered by Airflow. dump_dir = context['templates_dict']['dump_dir'] dump_patterns = params['dump_patterns'].split(',') if params.get('dump_patterns') else [] + if not confirm_clear: + message = "Action is 'clear_queue', but 'Confirm Deletion' was not checked. Aborting to prevent accidental data loss." + logger.error(message) + raise AirflowException(message) + + # If no queues are selected, default to clearing all of them. + if not queues_to_clear_options: + logger.warning("No specific queues selected to clear. Defaulting to '_all'.") + queues_to_clear_options = ['_all'] + redis_client = _get_redis_client(redis_conn_id) if dump_queues and dump_patterns: + logger.info("Dumping is enabled. Performing dump before clearing.") dump_redis_data_to_csv(redis_client, dump_dir, dump_patterns) - if not queue_to_clear or queue_to_clear == DEFAULT_QUEUE_TO_CLEAR: - logger.info("Parameter 'queue_to_clear' is not specified or is the default placeholder. Skipping key deletion.") - # If we only wanted to dump, this is a success. + all_suffixes = ['_inbox', '_fail', '_result', '_progress'] + keys_to_delete = set() + if '_all' in queues_to_clear_options: + logger.info("'_all' option selected. Clearing all standard queues.") + for suffix in all_suffixes: + keys_to_delete.add(f"{queue_base_name}{suffix}") + else: + for suffix in queues_to_clear_options: + if suffix in all_suffixes: + keys_to_delete.add(f"{queue_base_name}{suffix}") + + if not keys_to_delete: + logger.warning("No valid queue suffixes were selected. Nothing to delete.") return - logger.info(f"Attempting to clear Redis key '{queue_to_clear}' using connection '{redis_conn_id}'.") + logger.info(f"Attempting to clear {len(keys_to_delete)} Redis key(s): {sorted(list(keys_to_delete))}") try: - deleted_count = redis_client.delete(queue_to_clear) - if deleted_count > 0: - logger.info(f"Successfully cleared Redis key '{queue_to_clear}'.") - else: - logger.info(f"Redis key '{queue_to_clear}' did not exist or was already empty.") + deleted_count = redis_client.delete(*keys_to_delete) + logger.info(f"Successfully sent delete command for {len(keys_to_delete)} key(s). Redis reported {deleted_count} deleted.") except Exception as e: - logger.error(f"Failed to clear Redis key '{queue_to_clear}': {e}", exc_info=True) - raise AirflowException(f"Failed to clear Redis key: {e}") + logger.error(f"Failed to clear Redis keys: {e}", exc_info=True) + raise AirflowException(f"Failed to clear Redis keys: {e}") def list_contents_callable(**context): @@ -271,7 +379,7 @@ def check_status_callable(**context): """Checks the status (type and size) of all standard Redis queues for a given base name.""" params = context['params'] redis_conn_id = params['redis_conn_id'] - queue_name = params.get('queue_name_for_status', DEFAULT_QUEUE_NAME) + queue_name = params.get('queue_base_name', DEFAULT_QUEUE_NAME) queue_suffixes = ['_inbox', '_progress', '_result', '_fail'] logger.info(f"--- Checking Status for Queues with Base Name: '{queue_name}' ---") @@ -306,14 +414,13 @@ def requeue_failed_callable(**context): """ params = context['params'] redis_conn_id = params['redis_conn_id'] - queue_name = params['queue_name_for_requeue'] + queue_name = params['queue_base_name'] clear_fail_queue = params['clear_fail_queue_after_requeue'] fail_queue_name = f"{queue_name}_fail" inbox_queue_name = f"{queue_name}_inbox" logger.info(f"Requeuing failed URLs from '{fail_queue_name}' to '{inbox_queue_name}'.") - print(f"Requeuing failed URLs from '{fail_queue_name}' to '{inbox_queue_name}'.") redis_client = _get_redis_client(redis_conn_id) @@ -322,14 +429,12 @@ def requeue_failed_callable(**context): failed_urls_bytes = redis_client.hkeys(fail_queue_name) if not failed_urls_bytes: logger.info(f"Fail queue '{fail_queue_name}' is empty. Nothing to requeue.") - print(f"Fail queue '{fail_queue_name}' is empty. Nothing to requeue.") return failed_urls = [url.decode('utf-8') for url in failed_urls_bytes] - logger.info(f"Found {len(failed_urls)} URLs to requeue.") - print(f"Found {len(failed_urls)} URLs to requeue:") + logger.info(f"Found {len(failed_urls)} URLs to requeue:") for url in failed_urls: - print(f" - {url}") + logger.info(f" - {url}") # Add URLs to the inbox list if failed_urls: @@ -345,7 +450,6 @@ def requeue_failed_callable(**context): f"The list now contains {final_list_length} items." ) logger.info(success_message) - print(f"\n{success_message}") if clear_fail_queue: logger.info(f"Successfully cleared fail queue '{fail_queue_name}'.") @@ -359,23 +463,19 @@ def requeue_failed_callable(**context): def add_videos_to_queue_callable(**context): """ - Parses video inputs, normalizes them to URLs, and adds them to a Redis queue. + Parses video inputs from manual text, a predefined file, or a file path/URL, + normalizes them to URLs, and adds them to a Redis queue. """ params = context["params"] - video_inputs = params["video_inputs"] - queue_name = params["queue_name"] + queue_name = params["queue_base_name"] redis_conn_id = params["redis_conn_id"] dry_run = params["dry_run"] - if not video_inputs: - logger.info("No video inputs provided. Nothing to do.") - print("No video inputs provided. Nothing to do.") - return + # This function will get the list of strings from the correct source based on precedence + raw_items = _get_urls_from_source(**params) - raw_items = parse_video_inputs(video_inputs) if not raw_items: - logger.info("Input string was empty or contained no items after parsing.") - print("Input string was empty or contained no items after parsing.") + logger.info("No video inputs found from any source. Nothing to do.") return valid_urls = [] @@ -390,10 +490,8 @@ def add_videos_to_queue_callable(**context): raise AirflowException("No valid YouTube URLs or IDs were found in the provided input.") logger.info(f"Found {len(valid_urls)} valid and unique URLs to add to the queue:") - print(f"Found {len(valid_urls)} valid and unique URLs to add to the queue:") for url in valid_urls: logger.info(f" - {url}") - print(f" - {url}") if dry_run: logger.info("Dry run is enabled. Skipping Redis operation.") @@ -418,7 +516,6 @@ def add_videos_to_queue_callable(**context): f"The list now contains {final_list_length} items." ) logger.info(success_message) - print(f"\n{success_message}") except Exception as e: logger.error(f"Failed to add URLs to Redis queue '{inbox_queue}': {e}", exc_info=True) @@ -435,7 +532,7 @@ with DAG( }, schedule=None, catchup=False, - tags=["ytdlp", "queue", "management", "redis", "manual"], + tags=["ytdlp", "mgmt", "master"], doc_md=""" ### YT-DLP Queue Management @@ -443,7 +540,7 @@ with DAG( Select an `action` to perform when triggering the DAG. **Actions:** - - `add_videos`: Add one or more YouTube videos to a queue. + - `add_videos`: Add one or more YouTube videos to a queue. You can provide input manually, select a predefined file from the server, or provide a path/URL to a file. - `clear_queue`: Dump and/or delete a specific Redis key. - `list_contents`: View the contents of a Redis key (list or hash). - `check_status`: Check the overall status of the queues. @@ -457,18 +554,41 @@ with DAG( title="Action", description="The management action to perform.", ), + "queue_base_name": Param( + DEFAULT_QUEUE_NAME, + type="string", + title="Queue Base Name", + description="Base name for queues used in actions like 'add_videos', 'check_status', 'clear_queue', 'requeue_failed'.", + ), # --- Params for 'add_videos' --- + "input_source": Param( + "predefined_file", + type="string", + enum=["manual", "predefined_file", "file_path_or_url"], + title="[add_videos] Video Input Source", + description="Choose how to provide the video URLs. This choice determines which of the following parameters is used.", + ), "video_inputs": Param( None, type=["null", "string"], - title="[add_videos] Video URLs or IDs", - description="A single item, comma-separated list, or JSON array of YouTube URLs or Video IDs.", + title="[add_videos] 1. Manual Input", + description="Used if 'Input Source' is 'manual'. Paste a single item, a comma-separated list, or a JSON array of YouTube URLs or Video IDs.", ), - "queue_name": Param( - DEFAULT_QUEUE_NAME, + "predefined_url_list": Param( + "None", type="string", - title="[add_videos] Queue Name", - description="The base name of the Redis queue to add videos to (e.g., 'video_queue').", + enum=_get_predefined_url_lists(), + title="[add_videos] 2. Predefined File", + description=( + "Used if 'Input Source' is 'predefined_file'. Select a JSON file from the server's URL list directory " + f"(defined by Airflow Variable 'YTDLP_URL_LISTS_DIR', defaults to '{DEFAULT_URL_LISTS_DIR}')." + ), + ), + "url_list_file_path": Param( + None, + type=["null", "string"], + title="[add_videos] 3. File Path or URL", + description="Used if 'Input Source' is 'file_path_or_url'. Enter a local file path (on the Airflow worker) or a remote URL to a JSON file containing a list of URLs/IDs.", ), "dry_run": Param( False, @@ -477,11 +597,21 @@ with DAG( description="If True, validate inputs without adding them to the queue.", ), # --- Params for 'clear_queue' --- - "queue_to_clear": Param( - DEFAULT_QUEUE_TO_CLEAR, - type="string", - title="[clear_queue] Queue to Clear", - description="Exact name of the Redis key to delete.", + "queues_to_clear_options": Param( + None, + type=["null", "array"], + title="[clear_queue] Queues to Clear", + description="Select which standard queues to clear. '_all' clears all four. If left empty, it defaults to '_all'.", + items={ + "type": "string", + "enum": ["_inbox", "_fail", "_result", "_progress", "_all"], + } + ), + "confirm_clear": Param( + False, + type="boolean", + title="[clear_queue] Confirm Deletion", + description="Must be set to True to execute the 'clear_queue' action. This is a destructive operation.", ), "dump_queues": Param( True, @@ -490,10 +620,10 @@ with DAG( description="If True, dump data before clearing.", ), "dump_dir": Param( - "{{ var.value.get('YTDLP_REDIS_DUMP_DIR', '/opt/airflow/dumps') }}", - type="string", + None, + type=["null", "string"], title="[clear_queue] Dump Directory", - description="Base directory to save CSV dump files.", + description="Base directory to save CSV dump files. Supports Jinja. If empty, defaults to Airflow variable 'YTDLP_REDIS_DUMP_DIR' or '/opt/airflow/dumps'.", ), "dump_patterns": Param( 'ytdlp:*,video_queue_*', @@ -514,20 +644,7 @@ with DAG( title="[list_contents] Max Items to List", description="Maximum number of items to show.", ), - # --- Params for 'check_status' --- - "queue_name_for_status": Param( - DEFAULT_QUEUE_NAME, - type="string", - title="[check_status] Base Queue Name", - description="Base name of the queues to check (e.g., 'video_queue').", - ), # --- Params for 'requeue_failed' --- - "queue_name_for_requeue": Param( - DEFAULT_QUEUE_NAME, - type="string", - title="[requeue_failed] Base Queue Name", - description="Base name of the queues to requeue from (e.g., 'video_queue' will use 'video_queue_fail').", - ), "clear_fail_queue_after_requeue": Param( True, type="boolean", @@ -555,7 +672,7 @@ with DAG( action_clear_queue = PythonOperator( task_id="action_clear_queue", python_callable=clear_queue_callable, - templates_dict={'dump_dir': "{{ params.dump_dir }}"}, + templates_dict={'dump_dir': "{{ params.dump_dir or var.value.get('YTDLP_REDIS_DUMP_DIR', '/opt/airflow/dumps') }}"}, ) action_list_contents = PythonOperator( diff --git a/dags/ytdlp_ops_orchestrator.py b/dags/ytdlp_ops_orchestrator.py index fc8cd7c..490fdeb 100644 --- a/dags/ytdlp_ops_orchestrator.py +++ b/dags/ytdlp_ops_orchestrator.py @@ -73,24 +73,34 @@ def orchestrate_workers_ignition_callable(**context): worker_indices = list(range(total_workers)) bunches = [worker_indices[i:i + workers_per_bunch] for i in range(0, len(worker_indices), workers_per_bunch)] - logger.info(f"Plan: Starting {total_workers} total workers in {len(bunches)} bunches.") + # Get and parse worker hosts (which are used as queue names) + worker_hosts_str = params.get('worker_hosts', 'celery@dl002') + worker_hosts = [h.strip() for h in worker_hosts_str.split(',') if h.strip()] + if not worker_hosts: + raise AirflowException("The 'worker_hosts' parameter cannot be empty.") + + logger.info(f"Plan: Starting {total_workers} total workers in {len(bunches)} bunches, distributing across hosts (queues): {worker_hosts}") dag_run_id = context['dag_run'].run_id total_triggered = 0 - # Pass all orchestrator params to the worker so it has the full context for its loop. - conf_to_pass = {p: params[p] for p in params} - # The worker pulls its own URL, so we don't pass one. - if 'url' in conf_to_pass: - del conf_to_pass['url'] - for i, bunch in enumerate(bunches): logger.info(f"--- Igniting Bunch {i+1}/{len(bunches)} (contains {len(bunch)} worker(s)) ---") for j, _ in enumerate(bunch): # Create a unique run_id for each worker loop starter run_id = f"ignited_{dag_run_id}_{total_triggered}" - logger.info(f"Igniting worker {j+1}/{len(bunch)} in bunch {i+1} (loop {total_triggered + 1}/{total_workers}) (Run ID: {run_id})") + # Pass all orchestrator params to the worker so it has the full context for its loop. + conf_to_pass = {p: params[p] for p in params} + # The worker pulls its own URL, so we don't pass one. + if 'url' in conf_to_pass: + del conf_to_pass['url'] + + # Assign host/queue in a round-robin fashion + queue_for_worker = worker_hosts[total_triggered % len(worker_hosts)] + conf_to_pass['queue'] = queue_for_worker + + logger.info(f"Igniting worker {j+1}/{len(bunch)} in bunch {i+1} (loop {total_triggered + 1}/{total_workers}) on host (queue) '{queue_for_worker}' (Run ID: {run_id})") logger.debug(f"Full conf for worker loop {run_id}: {conf_to_pass}") trigger_dag( @@ -151,7 +161,7 @@ with DAG( The workers then take over, each running its own continuous processing loop. """, - tags=['ytdlp', 'orchestrator', 'ignition'], + tags=['ytdlp', 'mgmt', 'master'], params={ # --- Ignition Control Parameters --- 'total_workers': Param(DEFAULT_TOTAL_WORKERS, type="integer", description="Total number of worker loops to start."), @@ -160,6 +170,7 @@ with DAG( 'delay_between_bunches_s': Param(DEFAULT_BUNCH_DELAY_S, type="integer", description="Delay in seconds between starting each bunch."), # --- Worker Passthrough Parameters --- + 'worker_hosts': Param('celery@dl002', type="string", title="[Worker Param] Worker Hosts", description="Comma-separated list of Celery worker hostnames (e.g., 'celery@dl002') to distribute workers to. These are used as queue names. Workers will be assigned to these queues in a round-robin fashion."), 'on_bannable_failure': Param( 'retry_with_new_account', type="string", diff --git a/dags/ytdlp_ops_worker_per_url.py b/dags/ytdlp_ops_worker_per_url.py index 244d1ae..81f1697 100644 --- a/dags/ytdlp_ops_worker_per_url.py +++ b/dags/ytdlp_ops_worker_per_url.py @@ -21,6 +21,7 @@ from airflow.operators.dummy import DummyOperator from airflow.providers.redis.hooks.redis import RedisHook from airflow.utils.dates import days_ago from airflow.utils.decorators import apply_defaults +from airflow.utils.task_group import TaskGroup from datetime import datetime, timedelta from airflow.api.common.trigger_dag import trigger_dag from pangramia.yt.common.ttypes import TokenUpdateMode @@ -70,39 +71,6 @@ def _get_thrift_client(host, port, timeout): return client, transport -def _ban_resource_task(resource_type, resource_id_xcom_key, host, port, timeout, **kwargs): - """ - A callable function to ban a resource (account or proxy). - Designed to be used in a PythonOperator. - """ - ti = kwargs['ti'] - resource_id = ti.xcom_pull(key=resource_id_xcom_key) - if not resource_id: - logger.warning(f"Could not find resource ID in XCom key '{resource_id_xcom_key}' to ban. Skipping.") - return - - client, transport = None, None - try: - client, transport = _get_thrift_client(host, port, timeout) - if resource_type == 'account': - reason = f"Banned by Airflow worker due to {kwargs.get('reason', 'failure')}" - logger.warning(f"Banning account '{resource_id}'. Reason: {reason}") - client.banAccount(accountId=resource_id, reason=reason) - logger.info(f"Successfully sent request to ban account '{resource_id}'.") - elif resource_type == 'proxy': - server_identity = kwargs.get('server_identity') - if not server_identity: - logger.error("Cannot ban proxy without server_identity.") - return - logger.warning(f"Banning proxy '{resource_id}' for server '{server_identity}'.") - client.banProxy(proxyUrl=resource_id, serverIdentity=server_identity) - logger.info(f"Successfully sent request to ban proxy '{resource_id}'.") - except Exception as e: - # Log the error but don't fail the task, as this is a best-effort cleanup action. - logger.error(f"Failed to issue ban for {resource_type} '{resource_id}': {e}", exc_info=True) - finally: - if transport and transport.isOpen(): - transport.close() def _extract_video_id(url): @@ -136,7 +104,7 @@ def mark_url_as_success(**context): """Moves URL from progress to result hash on success.""" ti = context['task_instance'] params = context['params'] - url = ti.xcom_pull(task_ids='pull_url_from_redis', key='url_to_process') + url = ti.xcom_pull(task_ids='pull_url_and_assign_account', key='url_to_process') if not url: logger.warning("mark_url_as_success called but no URL found in DAG run parameters.") return @@ -146,9 +114,12 @@ def mark_url_as_success(**context): redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) # Pull results from previous tasks - info_json_path = ti.xcom_pull(task_ids='get_token', key='info_json_path') - socks_proxy = ti.xcom_pull(task_ids='get_token', key='socks_proxy') - ytdlp_command = ti.xcom_pull(task_ids='get_token', key='ytdlp_command') + info_json_path = ti.xcom_pull(task_ids='acquire_token_with_retry.get_token', key='info_json_path') or \ + ti.xcom_pull(task_ids='acquire_token_with_retry.retry_get_token', key='info_json_path') + socks_proxy = ti.xcom_pull(task_ids='acquire_token_with_retry.get_token', key='socks_proxy') or \ + ti.xcom_pull(task_ids='acquire_token_with_retry.retry_get_token', key='socks_proxy') + ytdlp_command = ti.xcom_pull(task_ids='acquire_token_with_retry.get_token', key='ytdlp_command') or \ + ti.xcom_pull(task_ids='acquire_token_with_retry.retry_get_token', key='ytdlp_command') downloaded_file_path = ti.xcom_pull(task_ids='download_and_probe') logger.info(f"Handling success for URL: {url}") @@ -178,56 +149,120 @@ def mark_url_as_success(**context): def handle_failure_callable(**context): """ - Handles a failed processing run by recording the error details to Redis. - The decision to stop or continue the loop is handled by `decide_what_to_do_next`. + Handles a failed processing run by recording rich, detailed error information to Redis. """ ti = context['task_instance'] params = context['params'] dag_run = context['dag_run'] - url = ti.xcom_pull(task_ids='pull_url_from_redis', key='url_to_process') + url = ti.xcom_pull(task_ids='pull_url_and_assign_account', key='url_to_process') if not url: - logger.error("handle_failure_callable called but no URL found in XCom.") + # This can happen if pull_url_and_assign_account itself fails. + # We can't record a URL-specific failure, but we should log it. + failed_tis = [ti for ti in dag_run.get_task_instances() if ti.state == 'failed'] + failed_task_ids = [ti.task_id for ti in failed_tis] + logger.error(f"handle_failure_callable was triggered for run {dag_run.run_id}, but no URL was found in XCom. " + f"This likely means an early task failed. Failed tasks in run: {failed_task_ids}") return - # --- Determine the source and type of failure --- + # --- Start building the rich error report --- + failure_report = { + 'url': url, + 'dag_run_id': dag_run.run_id, + 'failure_timestamp': datetime.now().isoformat(), + 'failed_task': 'unknown', + 'failure_summary': 'An unknown error occurred.', + 'failure_history': [], + 'download_error': None, + 'generic_error': None + } + + # --- Gather data from token acquisition attempts --- + # Attempt 1: get_token + get_token_ti = dag_run.get_task_instance('acquire_token_with_retry.get_token') + if get_token_ti: + error_details_1 = ti.xcom_pull(task_ids=get_token_ti.task_id, key='error_details') + account_1 = ti.xcom_pull(task_ids='pull_url_and_assign_account', key='account_id') + + attempt_1_report = { + 'task_id': get_token_ti.task_id, + 'account_id': account_1, + 'status': get_token_ti.state, + 'start_date': get_token_ti.start_date.isoformat() if get_token_ti.start_date else None, + 'end_date': get_token_ti.end_date.isoformat() if get_token_ti.end_date else None, + } + if error_details_1: + attempt_1_report.update({ + 'proxy_url': error_details_1.get('proxy_url'), + 'error_code': error_details_1.get('error_code'), + 'error_message': error_details_1.get('error_message'), + }) + failure_report['failure_history'].append(attempt_1_report) + + # Attempt 2: retry_get_token + retry_get_token_ti = dag_run.get_task_instance('acquire_token_with_retry.retry_get_token') + # Only report on retry if it actually ran + if retry_get_token_ti and retry_get_token_ti.state: + error_details_2 = ti.xcom_pull(task_ids=retry_get_token_ti.task_id, key='error_details') + account_2 = ti.xcom_pull(task_ids='acquire_token_with_retry.assign_new_account_for_retry', key='account_id') + + attempt_2_report = { + 'task_id': retry_get_token_ti.task_id, + 'account_id': account_2, + 'status': retry_get_token_ti.state, + 'start_date': retry_get_token_ti.start_date.isoformat() if retry_get_token_ti.start_date else None, + 'end_date': retry_get_token_ti.end_date.isoformat() if retry_get_token_ti.end_date else None, + } + if error_details_2: + attempt_2_report.update({ + 'proxy_url': error_details_2.get('proxy_url'), + 'error_code': error_details_2.get('error_code'), + 'error_message': error_details_2.get('error_message'), + }) + failure_report['failure_history'].append(attempt_2_report) + + # --- Identify the primary failure point --- exception = context.get('exception') - failed_task_id = "unknown" - upstream_tasks = ti.task.get_direct_relatives(upstream=True) - for task in upstream_tasks: - upstream_ti = dag_run.get_task_instance(task_id=task.task_id) - if upstream_ti and upstream_ti.state == 'failed': - failed_task_id = task.task_id - break - # --- Extract Detailed Error Information --- - error_details_from_xcom = None - if failed_task_id != "unknown": - error_details_from_xcom = ti.xcom_pull(task_ids=failed_task_id, key='error_details') + # Case 1: Download & Probe failure + download_probe_ti = dag_run.get_task_instance('download_and_probe') + if download_probe_ti and download_probe_ti.state == 'failed': + failure_report['failed_task'] = download_probe_ti.task_id + failure_report['failure_summary'] = 'Download or probe failed after successful token acquisition.' + failure_report['download_error'] = { + 'error_message': str(exception) if exception else "BashOperator failed. Check task logs for yt-dlp/ffmpeg output.", + 'error_type': type(exception).__name__ if exception else "Unknown", + } - if error_details_from_xcom: - error_message = error_details_from_xcom.get('error_message', 'Unknown error from XCom') - error_type = error_details_from_xcom.get('error_type', 'Unknown type from XCom') - tb_str = error_details_from_xcom.get('traceback', 'No traceback in XCom.') + # Case 2: Token acquisition failure else: - error_message = str(exception) if exception else "Unknown error" - error_type = type(exception).__name__ if exception else "Unknown" - tb_str = "".join(traceback.format_exception(etype=type(exception), value=exception, tb=exception.__traceback__)) if exception else "No traceback available." + last_failed_attempt = next((attempt for attempt in reversed(failure_report['failure_history']) if attempt['status'] == 'failed'), None) + if last_failed_attempt: + failure_report['failed_task'] = last_failed_attempt['task_id'] + failure_report['failure_summary'] = f"Token acquisition failed with error: {last_failed_attempt.get('error_code', 'Unknown')}" + else: + # Case 3: Generic/unexpected failure + failed_tis = [ti for ti in dag_run.get_task_instances() if ti.state == 'failed'] + if failed_tis: + # Heuristic: pick the one with the latest end_date that is not this task itself + failed_tis.sort(key=lambda x: x.end_date or datetime.min) + last_failed_ti = next((ti for ti in reversed(failed_tis) if ti.task_id != context['task_instance'].task_id), None) + if last_failed_ti: + failure_report['failed_task'] = last_failed_ti.task_id + failure_report['failure_summary'] = f"Task '{last_failed_ti.task_id}' failed unexpectedly." + failure_report['generic_error'] = { + 'error_message': str(exception) if exception else f"Unexpected failure in task {last_failed_ti.task_id}. Check logs.", + 'error_type': type(exception).__name__ if exception else "Unknown", + 'traceback': "".join(traceback.format_exception(etype=type(exception), value=exception, tb=exception.__traceback__)) if exception else "No traceback available." + } logger.info(f"Handling failure for URL: {url}") - logger.error(f" Failed Task: {failed_task_id}") - logger.error(f" Failure Type: {error_type}") - logger.error(f" Failure Reason: {error_message}") - logger.debug(f" Traceback:\n{tb_str}") - - final_error_details = { - 'failed_task': failed_task_id, - 'error_type': error_type, - 'error_message': error_message, - 'traceback': tb_str, - 'url': url, - 'dag_run_id': context['dag_run'].run_id, - } + logger.error(f" Failure Summary: {failure_report['failure_summary']}") + logger.error(f" Failed Task: {failure_report['failed_task']}") + # Using print to ensure the full JSON is visible in the logs without truncation + print("--- Detailed Failure Report ---") + print(json.dumps(failure_report, indent=2)) + print("-----------------------------") # For all failures, mark the URL as failed in Redis. redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) @@ -235,7 +270,7 @@ def handle_failure_callable(**context): fail_queue = f"{queue_name}_fail" try: client = _get_redis_client(redis_conn_id) - client.hset(fail_queue, url, json.dumps(final_error_details, indent=2)) + client.hset(fail_queue, url, json.dumps(failure_report, indent=2)) logger.info(f"Stored detailed failure info for URL '{url}' in fail hash '{fail_queue}'.") except Exception as e: logger.error(f"Critical error during failure handling in Redis for URL '{url}': {e}", exc_info=True) @@ -344,14 +379,16 @@ def _get_account_pool(params: dict) -> list: -def pull_url_from_redis_callable(**context): +def pull_url_and_assign_account_callable(**context): """ - Pulls a single URL from the Redis inbox queue. + Pulls a single URL from Redis and assigns an active account for the run. If the queue is empty, it skips the DAG run. - Otherwise, it pushes the URL to XCom for downstream tasks. + Otherwise, it pushes the URL and account details to XCom. """ params = context['params'] ti = context['task_instance'] + + # --- Part 1: Pull URL from Redis --- queue_name = params['queue_name'] redis_conn_id = params['redis_conn_id'] inbox_queue = f"{queue_name}_inbox" @@ -368,47 +405,8 @@ def pull_url_from_redis_callable(**context): logger.info(f"Pulled URL '{url_to_process}' from the queue.") ti.xcom_push(key='url_to_process', value=url_to_process) -def decide_what_to_do_next_callable(**context): - """ - Decides whether to continue the processing loop by triggering the next worker - or to stop the loop, based on task success, failure, or an empty queue. - """ - params = context['params'] - dag_run = context['dag_run'] - - # Check if a failure was handled. If the 'handle_generic_failure' task was not skipped, - # it means a failure occurred somewhere in the pipeline. - handle_generic_failure_ti = dag_run.get_task_instance(task_id='handle_generic_failure') - if handle_generic_failure_ti and handle_generic_failure_ti.state != 'skipped': - logger.error(f"Failure handler task 'handle_generic_failure' was in state '{handle_generic_failure_ti.state}'. Stopping this processing lane.") - return 'fail_loop' - - # Check if the worker was skipped because the Redis queue was empty. - pull_task_instance = dag_run.get_task_instance(task_id='pull_url_from_redis') - if pull_task_instance and pull_task_instance.state == 'skipped': - logger.info("Worker was skipped because Redis queue was empty.") - retrigger_delay_on_empty_s = params.get('retrigger_delay_on_empty_s', 60) - - if retrigger_delay_on_empty_s < 0: - logger.info(f"retrigger_delay_on_empty_s is {retrigger_delay_on_empty_s}. Stopping this worker loop.") - return 'stop_loop' - else: - logger.info(f"Queue is empty. Will re-trigger this worker loop after a delay of {retrigger_delay_on_empty_s}s.") - return 'trigger_self_run' - - # If no failure was handled and the queue wasn't empty, it must be a success. - logger.info("All preceding tasks succeeded. Continuing the processing loop by triggering the next worker.") - return 'trigger_self_run' - -def assign_account_callable(**context): - """ - Selects an account for the run. - It uses the account from the previous run if available (affinity), - otherwise it picks a random one from the active pool. - """ - ti = context['task_instance'] - params = context['params'] - + # --- Part 2: Assign Account --- + logger.info("URL found, proceeding to assign an account.") # Affinity logic: check if an account was passed from a previous run account_id = params.get('current_account_id') if account_id: @@ -423,6 +421,38 @@ def assign_account_callable(**context): ti.xcom_push(key='accounts_tried', value=[account_id]) +def decide_what_to_do_next_callable(**context): + """ + Decides whether to continue the processing loop by triggering the next worker + or to stop the loop, based on task success, failure, or an empty queue. + """ + params = context['params'] + dag_run = context['dag_run'] + + # Check if a failure was handled. If the 'handle_generic_failure' task was not skipped, + # it means a failure occurred somewhere in the pipeline. + handle_generic_failure_ti = dag_run.get_task_instance(task_id='handle_generic_failure') + if handle_generic_failure_ti and handle_generic_failure_ti.state != 'skipped': + logger.error(f"Failure handler task 'handle_generic_failure' was in state '{handle_generic_failure_ti.state}'. Stopping this processing lane.") + return 'mark_dag_run_as_failed' + + # Check if the worker was skipped because the Redis queue was empty. + pull_task_instance = dag_run.get_task_instance(task_id='pull_url_and_assign_account') + if pull_task_instance and pull_task_instance.state == 'skipped': + logger.info("Worker was skipped because Redis queue was empty.") + retrigger_delay_on_empty_s = params.get('retrigger_delay_on_empty_s', 60) + + if retrigger_delay_on_empty_s < 0: + logger.info(f"retrigger_delay_on_empty_s is {retrigger_delay_on_empty_s}. Stopping this worker loop.") + return 'stop_worker_lane_gracefully' + else: + logger.info(f"Queue is empty. Will re-trigger this worker loop after a delay of {retrigger_delay_on_empty_s}s.") + return 'continue_loop_and_trigger_next_run' + + # If no failure was handled and the queue wasn't empty, it must be a success. + logger.info("All preceding tasks succeeded. Continuing the processing loop by triggering the next worker.") + return 'continue_loop_and_trigger_next_run' + def get_token_callable(**context): """Makes a single attempt to get a token from the Thrift service.""" ti = context['task_instance'] @@ -444,7 +474,7 @@ def get_token_callable(**context): if not account_id: raise AirflowException("Could not find a valid account_id in XCom from any upstream task.") - url = ti.xcom_pull(task_ids='pull_url_from_redis', key='url_to_process') + url = ti.xcom_pull(task_ids='pull_url_and_assign_account', key='url_to_process') if not url: logger.info("No URL pulled from XCom. Assuming upstream task was skipped. Ending task.") return @@ -465,7 +495,16 @@ def get_token_callable(**context): call_kwargs = {'accountId': account_id, 'updateType': TokenUpdateMode.AUTO, 'url': url, 'clients': clients, 'machineId': machine_id} token_data = client.getOrRefreshToken(**call_kwargs) - logger.info("Successfully retrieved token data from service.") + + # --- Log response details for debugging --- + response_summary = { + "has_infoJson": hasattr(token_data, 'infoJson') and bool(token_data.infoJson), + "infoJson_size": len(token_data.infoJson) if hasattr(token_data, 'infoJson') and token_data.infoJson else 0, + "has_ytdlpCommand": hasattr(token_data, 'ytdlpCommand') and bool(token_data.ytdlpCommand), + "proxy_type": next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr) and getattr(token_data, attr)), 'None'), + "jobId": getattr(token_data, 'jobId', None) + } + logger.info(f"Successfully retrieved token data from service. Response summary: {json.dumps(response_summary)}") # --- Success Case --- info_json = getattr(token_data, 'infoJson', None) @@ -491,48 +530,55 @@ def get_token_callable(**context): except Exception as log_e: logger.warning(f"Could not log info.json details: {log_e}") - proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None) - ti.xcom_push(key='socks_proxy', value=getattr(token_data, proxy_attr) if proxy_attr else None) - ti.xcom_push(key='ytdlp_command', value=getattr(token_data, 'ytdlpCommand', None)) - ti.xcom_push(key='successful_account_id', value=account_id) # For affinity - ti.xcom_push(key='get_token_succeeded', value=True) + proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None) + ti.xcom_push(key='socks_proxy', value=getattr(token_data, proxy_attr) if proxy_attr else None) + ti.xcom_push(key='ytdlp_command', value=getattr(token_data, 'ytdlpCommand', None)) + ti.xcom_push(key='successful_account_id', value=account_id) # For affinity + ti.xcom_push(key='get_token_succeeded', value=True) + else: + # This is a failure case: the service returned success but no usable data. + logger.error(f"Thrift call for account '{account_id}' succeeded but returned no info.json. Treating as failure.") + # The generic failure handler will pick up this exception. + raise AirflowException("Service returned success but info.json was empty or invalid.") except (PBServiceException, PBUserException, TTransportException) as e: - logger.error(f"Thrift call failed for account '{account_id}'. Exception: {getattr(e, 'message', str(e))}") error_context = getattr(e, 'context', None) if isinstance(error_context, str): try: error_context = json.loads(error_context.replace("'", "\"")) except: pass - + + error_message = getattr(e, 'message', str(e)) + error_code = getattr(e, 'errorCode', 'TRANSPORT_ERROR') + + # Check for wrapped timeout exception to provide a clearer error message. + inner_exception = getattr(e, 'inner', getattr(e, '__cause__', None)) + if isinstance(e, TTransportException) and isinstance(inner_exception, socket.timeout): + error_message = f"Socket timeout during Thrift call (wrapped in TTransportException)" + error_code = 'SOCKET_TIMEOUT' + error_details = { - 'error_message': getattr(e, 'message', str(e)), - 'error_code': getattr(e, 'errorCode', 'TRANSPORT_ERROR'), + 'error_message': error_message, + 'error_code': error_code, 'error_type': type(e).__name__, 'traceback': traceback.format_exc(), 'proxy_url': error_context.get('proxy_url') if isinstance(error_context, dict) else None } + + proxy_url_info = f" with proxy '{error_details['proxy_url']}'" if error_details.get('proxy_url') else "" + + if error_code == 'SOCKET_TIMEOUT': + logger.error(f"Thrift call for account '{account_id}'{proxy_url_info} failed due to a socket timeout after {timeout} seconds.") + elif isinstance(e, TTransportException) and e.type == TTransportException.TIMED_OUT: + logger.error(f"Thrift call for account '{account_id}'{proxy_url_info} timed out after {timeout} seconds.") + else: + logger.error(f"Thrift call failed for account '{account_id}'{proxy_url_info}. Exception: {error_details['error_message']}") + ti.xcom_push(key='error_details', value=error_details) ti.xcom_push(key='get_token_succeeded', value=False) - - # For non-bannable errors like Connection Refused, fail the task immediately to stop the loop. - # Bannable errors will let the task succeed, allowing the branch operator to decide on a retry. - error_code = error_details.get('error_code', '').strip() - error_message = error_details.get('error_message', '').lower() - bannable_codes = ["BOT_DETECTED", "BOT_DETECTION_SIGN_IN_REQUIRED", "SOCKS5_CONNECTION_FAILED"] - is_bannable = error_code in bannable_codes - # Override bannable status for age-restricted content, which is not a true bot detection. - if is_bannable and ('confirm your age' in error_message or 'age-restricted' in error_message): - logger.warning(f"Error is age-related ('{error_code}'). Treating as a non-bannable failure to avoid banning the account.") - is_bannable = False - - if not is_bannable: - logger.error(f"Non-bannable error '{error_code}' detected. Failing task to stop the loop.") - raise AirflowException(f"Non-bannable Thrift call failed: {error_details['error_message']}") - else: - logger.warning(f"Bannable error '{error_code}' detected. Passing to branch operator for handling.") - # Do not raise exception here; let the branch operator handle it. + # Always fail the task on any Thrift exception. The branch operator will inspect the failure. + raise AirflowException(f"Thrift call failed: {error_details['error_message']}") finally: if transport and transport.isOpen(): transport.close() @@ -540,22 +586,22 @@ def get_token_callable(**context): def handle_bannable_error_branch_callable(**context): """ - Checks if a `get_token` failure is bannable and if a retry is allowed. + Inspects a failed `get_token` task. If the failure was a "bannable" error, + it routes to the retry logic. Otherwise, it lets the DAG fail. + This task only runs if the upstream `get_token` task fails. """ ti = context['task_instance'] params = context['params'] - - # Check the result of the first get_token attempt - get_token_succeeded = ti.xcom_pull(task_ids='get_token', key='get_token_succeeded') - if get_token_succeeded: - return 'setup_download_and_probe' - # It failed, so check the error details - error_details = ti.xcom_pull(task_ids='get_token', key='error_details') + # We know get_token failed because of the trigger_rule='one_failed'. + # Pull the error details it left behind. + error_details = ti.xcom_pull(task_ids='acquire_token_with_retry.get_token', key='error_details') if not error_details: - logger.error("get_token failed but no error details were found in XCom. Stopping loop.") - return 'handle_generic_failure' + logger.error("The 'get_token' task failed, but no error details were found in XCom. " + "This indicates an unexpected error. Letting the DAG fail.") + return None # Do nothing, let the group fail. + # We have error details, now check if the error is "bannable". error_code = error_details.get('error_code', '').strip() error_message = error_details.get('error_message', '').lower() policy = params.get('on_bannable_failure', 'retry_with_new_account') @@ -567,17 +613,17 @@ def handle_bannable_error_branch_callable(**context): logger.warning(f"Error is age-related ('{error_code}'). Treating as a non-bannable failure to avoid banning the account.") is_bannable = False - logger.info(f"Handling failure from 'get_token'. Error code: '{error_code}', Policy: '{policy}'") + logger.info(f"Handling failure from 'get_token'. Error code: '{error_code}', Is Bannable: {is_bannable}, Policy: '{policy}'") if is_bannable and policy == 'retry_with_new_account': logger.info("Error is bannable and policy allows retry. Proceeding to ban first account and retry.") - return 'ban_account_and_prepare_for_retry' + return 'acquire_token_with_retry.ban_account_and_prepare_for_retry' elif is_bannable: # and policy is 'stop_loop' logger.warning("Error is bannable and policy is 'stop_loop'. Banning account and stopping.") - return 'ban_account_and_fail' - else: - logger.warning("Error is not considered bannable. Proceeding to generic failure handling.") - return 'handle_generic_failure' + return 'acquire_token_with_retry.ban_account_and_fail' + else: # Not a bannable error + logger.error(f"Error '{error_code}' is not bannable. Letting the DAG fail.") + return None # Do nothing, let the group fail. def assign_new_account_for_retry_callable(**context): @@ -585,7 +631,7 @@ def assign_new_account_for_retry_callable(**context): ti = context['task_instance'] params = context['params'] - accounts_tried = ti.xcom_pull(task_ids='assign_account', key='accounts_tried') + accounts_tried = ti.xcom_pull(task_ids='pull_url_and_assign_account', key='accounts_tried') if not accounts_tried: raise AirflowException("Cannot retry, list of previously tried accounts not found.") @@ -628,43 +674,142 @@ def assign_new_account_for_retry_callable(**context): def handle_retry_failure_branch_callable(**context): - """Checks the result of the retry_get_token task.""" + """ + Checks a failed `retry_get_token` task. If the failure was a handled Thrift error, + it triggers the banning of the second account/proxy. + This task only runs if the upstream `retry_get_token` task fails. + """ ti = context['task_instance'] - retry_succeeded = ti.xcom_pull(task_ids='retry_get_token', key='get_token_succeeded') - if retry_succeeded: - logger.info("Retry attempt was successful.") - return 'setup_download_and_probe' - else: - logger.error("Retry attempt also failed. Banning second account and proxy.") - return 'ban_second_account_and_proxy' + # We know retry_get_token failed. Check if it was a handled failure. + error_details = ti.xcom_pull(task_ids='acquire_token_with_retry.retry_get_token', key='error_details') + + if not error_details: + logger.error("The 'retry_get_token' task failed unexpectedly before it could record error details. " + "Letting the DAG fail without banning the account/proxy.") + return None + + # If we are here, it means the retry failed with a handled Thrift error. + # We will proceed to ban the second account and proxy. + logger.error("Retry attempt also failed with a handled Thrift error. Banning second account and proxy.") + return 'acquire_token_with_retry.ban_second_account_and_proxy' -def ban_second_account_and_proxy_callable(**context): - """Bans the second account and the proxy used in the failed retry.""" +def ban_first_account_callable(**context): + """Bans the first account that failed due to a bannable error.""" ti = context['task_instance'] params = context['params'] - # Ban the second account - account_to_ban = ti.xcom_pull(task_ids='assign_new_account_for_retry', key='account_id') - if account_to_ban: - ti.xcom_push(key='account_to_ban', value=account_to_ban) - _ban_resource_task( - 'account', 'account_to_ban', - params['service_ip'], int(params['service_port']), int(params.get('timeout', DEFAULT_TIMEOUT)), - ti=ti, reason="Failed on retry attempt" - ) + # The account ID is pulled from the initial assignment task. + account_to_ban = ti.xcom_pull(task_ids='pull_url_and_assign_account', key='account_id') + if not account_to_ban: + logger.warning("Could not find the initial account ID to ban. Skipping.") + return + + client, transport = None, None + try: + host = params['service_ip'] + port = int(params['service_port']) + timeout = int(params.get('timeout', DEFAULT_TIMEOUT)) + client, transport = _get_thrift_client(host, port, timeout) + + reason = "Banned by Airflow worker due to bannable error on first attempt" + logger.warning(f"Banning account '{account_to_ban}'. Reason: {reason}") + client.banAccount(accountId=account_to_ban, reason=reason) + logger.info(f"Successfully sent request to ban account '{account_to_ban}'.") + except Exception as e: + logger.error(f"Failed to issue ban for account '{account_to_ban}': {e}", exc_info=True) + # Don't fail the task, as this is a best-effort cleanup action. + finally: + if transport and transport.isOpen(): + transport.close() + + +def ban_first_account_and_fail_callable(**context): + """Bans the first account that failed, and then intentionally fails the task.""" + ti = context['task_instance'] + params = context['params'] - # Ban the proxy - error_details = ti.xcom_pull(task_ids='retry_get_token', key='error_details') + # The account ID is pulled from the initial assignment task. + account_to_ban = ti.xcom_pull(task_ids='pull_url_and_assign_account', key='account_id') + if not account_to_ban: + logger.warning("Could not find the initial account ID to ban. Skipping.") + else: + client, transport = None, None + try: + host = params['service_ip'] + port = int(params['service_port']) + timeout = int(params.get('timeout', DEFAULT_TIMEOUT)) + client, transport = _get_thrift_client(host, port, timeout) + + reason = "Banned by Airflow worker due to bannable error (policy is stop_loop)" + logger.warning(f"Banning account '{account_to_ban}'. Reason: {reason}") + client.banAccount(accountId=account_to_ban, reason=reason) + logger.info(f"Successfully sent request to ban account '{account_to_ban}'.") + except Exception as e: + logger.error(f"Failed to issue ban for account '{account_to_ban}': {e}", exc_info=True) + # Log error, but continue to fail the task. + finally: + if transport and transport.isOpen(): + transport.close() + + # Intentionally fail the task to stop the DAG run as per policy. + reason = "Bannable error detected, policy is stop_loop." + logger.warning(f"INTENTIONAL FAILURE: This task is now failing itself as per the 'stop_loop' policy. Reason: {reason}") + raise AirflowException(f"Failing task as per policy. Reason: {reason}") + + +def ban_second_account_and_proxy_callable(**context): + """Bans the second account and the proxy used in the failed retry, then fails the task.""" + ti = context['task_instance'] + params = context['params'] + + account_to_ban = ti.xcom_pull(task_ids='acquire_token_with_retry.assign_new_account_for_retry', key='account_id') + error_details = ti.xcom_pull(task_ids='acquire_token_with_retry.retry_get_token', key='error_details') proxy_to_ban = error_details.get('proxy_url') if error_details else None - if proxy_to_ban: - ti.xcom_push(key='proxy_to_ban', value=proxy_to_ban) - _ban_resource_task( - 'proxy', 'proxy_to_ban', - params['service_ip'], int(params['service_port']), int(params.get('timeout', DEFAULT_TIMEOUT)), - ti=ti, server_identity=(params.get('machine_id') or socket.gethostname()) - ) + + if not account_to_ban and not proxy_to_ban: + logger.warning("Could not find an account or proxy to ban from the failed retry. Nothing to do.") + # Still fail the task to stop the DAG. + raise AirflowException("Token acquisition failed on retry, but no resources found to ban.") + + client, transport = None, None + try: + host = params['service_ip'] + port = int(params['service_port']) + timeout = int(params.get('timeout', DEFAULT_TIMEOUT)) + client, transport = _get_thrift_client(host, port, timeout) + + # Ban the second account + if account_to_ban: + reason = "Banned by Airflow worker due to failure on retry attempt" + logger.warning(f"Banning account '{account_to_ban}'. Reason: {reason}") + try: + client.banAccount(accountId=account_to_ban, reason=reason) + logger.info(f"Successfully sent request to ban account '{account_to_ban}'.") + except Exception as e: + logger.error(f"Failed to issue ban for account '{account_to_ban}': {e}", exc_info=True) + + # Ban the proxy + if proxy_to_ban: + server_identity = params.get('machine_id') or socket.gethostname() + logger.warning(f"Banning proxy '{proxy_to_ban}' for server '{server_identity}'.") + try: + client.banProxy(proxyUrl=proxy_to_ban, serverIdentity=server_identity) + logger.info(f"Successfully sent request to ban proxy '{proxy_to_ban}'.") + except Exception as e: + logger.error(f"Failed to issue ban for proxy '{proxy_to_ban}': {e}", exc_info=True) + + except Exception as e: + logger.error(f"An error occurred while trying to connect to the Thrift service to ban resources: {e}", exc_info=True) + # Log the error but continue to the failure exception, as this is a best-effort cleanup. + finally: + if transport and transport.isOpen(): + transport.close() + + # After attempting to ban, we must fail this task to fail the group. + logger.warning("INTENTIONAL FAILURE: This task is now failing itself to correctly signal the end of the retry process and stop the worker lane. The second account and/or proxy have been banned.") + raise AirflowException("Token acquisition failed on retry. Banned second account and proxy.") def trigger_self_run_callable(**context): @@ -674,7 +819,7 @@ def trigger_self_run_callable(**context): dag_run = context['dag_run'] # Check if this was triggered due to an empty queue to apply the specific delay. - pull_task_instance = dag_run.get_task_instance(task_id='pull_url_from_redis') + pull_task_instance = dag_run.get_task_instance(task_id='pull_url_and_assign_account') is_empty_queue_scenario = pull_task_instance and pull_task_instance.state == 'skipped' delay = 0 @@ -690,6 +835,7 @@ def trigger_self_run_callable(**context): if delay > 0: logger.info(f"Waiting for {delay}s before triggering next run.") time.sleep(delay) + logger.info(f"Finished waiting {delay}s. Proceeding to trigger next run.") # Generate a unique run_id for the new worker run run_id = f"self_triggered_{datetime.utcnow().isoformat()}" @@ -703,7 +849,7 @@ def trigger_self_run_callable(**context): # Pass the successful account ID to the next run for affinity. # It could come from the first attempt or the retry. - successful_account_ids = ti.xcom_pull(task_ids=['get_token', 'retry_get_token'], key='successful_account_id') + successful_account_ids = ti.xcom_pull(task_ids=['acquire_token_with_retry.get_token', 'acquire_token_with_retry.retry_get_token'], key='successful_account_id') successful_account_id = next((acc for acc in successful_account_ids if acc), None) if successful_account_id: @@ -738,6 +884,7 @@ default_args = { 'retries': 0, 'retry_delay': timedelta(minutes=1), 'start_date': days_ago(1), + 'queue': "{{ params.get('queue') }}", } with DAG( @@ -756,7 +903,7 @@ with DAG( 1. **Ignition:** An initial run is triggered by the orchestrator. 2. **Pull & Assign:** It pulls a URL from Redis and assigns an account for the job, reusing the last successful account if available (affinity). - 3. **Get Token:** It calls the `ytdlp-ops-server` to get tokens and `info.json`. + 3. **Get Token:** It calls the `ytdlp-ops-server` to get tokens and `info.json`. This step is encapsulated in a `TaskGroup` that handles a single retry on failure. 4. **Failure Handling:** If `get_token` fails with a "bannable" error (like bot detection), it follows the `on_bannable_failure` policy: - `retry_with_new_account` (default): It bans the failing account, picks a new one, and retries the `get_token` call once. If the retry also fails, it bans the second account and the proxy, then stops the loop. - `stop_loop`: It bans the account and stops the loop immediately. @@ -765,7 +912,7 @@ with DAG( This creates a "processing lane" that runs independently until the queue is empty or a failure occurs. """, - tags=['ytdlp', 'thrift', 'client', 'worker', 'loop'], + tags=['ytdlp', 'worker'], params={ # Worker loop control params (passed from orchestrator) 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Base name for Redis queues."), @@ -795,95 +942,94 @@ with DAG( } ) as dag: - pull_url_from_redis = PythonOperator( - task_id='pull_url_from_redis', - python_callable=pull_url_from_redis_callable, + pull_url_and_assign_account = PythonOperator( + task_id='pull_url_and_assign_account', + python_callable=pull_url_and_assign_account_callable, ) - assign_account = PythonOperator( - task_id='assign_account', - python_callable=assign_account_callable, - ) + # --- Encapsulate token acquisition logic in a TaskGroup for visual clarity --- + with TaskGroup(group_id='acquire_token_with_retry') as acquire_token_group: + get_token = PythonOperator( + task_id='get_token', + python_callable=get_token_callable, + templates_dict={'info_json_dir': "{{ dag_run.conf.get('info_json_dir', var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles')) }}"}, + ) - get_token = PythonOperator( - task_id='get_token', - python_callable=get_token_callable, - templates_dict={'info_json_dir': "{{ dag_run.conf.get('info_json_dir', var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles')) }}"}, - ) + handle_bannable_error_branch = BranchPythonOperator( + task_id='handle_bannable_error_branch', + python_callable=handle_bannable_error_branch_callable, + trigger_rule='one_failed', # This task should only run if get_token fails + ) - handle_bannable_error_branch = BranchPythonOperator( - task_id='handle_bannable_error_branch', - python_callable=handle_bannable_error_branch_callable, - trigger_rule='all_done', # Run even if get_token succeeds - ) + # --- Retry Path --- + ban_account_and_prepare_for_retry = PythonOperator( + task_id='ban_account_and_prepare_for_retry', + python_callable=ban_first_account_callable, + ) - # --- Retry Path --- - ban_account_and_prepare_for_retry = PythonOperator( - task_id='ban_account_and_prepare_for_retry', - python_callable=_ban_resource_task, - op_kwargs={ - 'resource_type': 'account', - 'resource_id_xcom_key': 'account_id', - 'host': "{{ params.service_ip }}", - 'port': "{{ params.service_port }}", - 'timeout': "{{ params.timeout }}", - 'reason': "Bannable error detected, preparing for retry." - }, - ) + assign_new_account_for_retry = PythonOperator( + task_id='assign_new_account_for_retry', + python_callable=assign_new_account_for_retry_callable, + ) - assign_new_account_for_retry = PythonOperator( - task_id='assign_new_account_for_retry', - python_callable=assign_new_account_for_retry_callable, - ) + retry_get_token = PythonOperator( + task_id='retry_get_token', + python_callable=get_token_callable, + templates_dict={'info_json_dir': "{{ dag_run.conf.get('info_json_dir', var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles')) }}"}, + ) - retry_get_token = PythonOperator( - task_id='retry_get_token', - python_callable=get_token_callable, - templates_dict={'info_json_dir': "{{ dag_run.conf.get('info_json_dir', var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles')) }}"}, - ) + handle_retry_failure_branch = BranchPythonOperator( + task_id='handle_retry_failure_branch', + python_callable=handle_retry_failure_branch_callable, + trigger_rule='one_failed', # This task should only run if retry_get_token fails + ) - handle_retry_failure_branch = BranchPythonOperator( - task_id='handle_retry_failure_branch', - python_callable=handle_retry_failure_branch_callable, - trigger_rule='none_skipped', - ) + ban_second_account_and_proxy = PythonOperator( + task_id='ban_second_account_and_proxy', + python_callable=ban_second_account_and_proxy_callable, + ) - ban_second_account_and_proxy = PythonOperator( - task_id='ban_second_account_and_proxy', - python_callable=ban_second_account_and_proxy_callable, - ) + # --- Stop Path --- + ban_account_and_fail = PythonOperator( + task_id='ban_account_and_fail', + python_callable=ban_first_account_and_fail_callable, + ) - # --- Stop Path --- - ban_account_and_fail = PythonOperator( - task_id='ban_account_and_fail', - python_callable=_ban_resource_task, - op_kwargs={ - 'resource_type': 'account', - 'resource_id_xcom_key': 'account_id', - 'host': "{{ params.service_ip }}", - 'port': "{{ params.service_port }}", - 'timeout': "{{ params.timeout }}", - 'reason': "Bannable error detected, policy is stop_loop." - }, - ) + # --- Internal Success Merge Point --- + token_acquisition_succeeded = DummyOperator( + task_id='token_acquisition_succeeded', + trigger_rule='one_success', + ) - # --- Main Execution Path --- - setup_download_and_probe = DummyOperator( - task_id='setup_download_and_probe', - trigger_rule='one_success', - ) + # --- Define dependencies within the TaskGroup --- + # The success dummy task is the merge point for the two possible success tasks. + [get_token, retry_get_token] >> token_acquisition_succeeded + # The first branch operator runs only if get_token fails. + get_token >> handle_bannable_error_branch + # It branches to the retry path or the hard-fail path. + handle_bannable_error_branch >> [ban_account_and_prepare_for_retry, ban_account_and_fail] + + # The retry path + ban_account_and_prepare_for_retry >> assign_new_account_for_retry >> retry_get_token + + # The second branch operator runs only if retry_get_token fails. + retry_get_token >> handle_retry_failure_branch + # It only branches to the final failure task. + handle_retry_failure_branch >> ban_second_account_and_proxy + + # --- Main Execution Path (outside the TaskGroup) --- download_and_probe = BashOperator( task_id='download_and_probe', bash_command=""" set -e - INFO_JSON_PATH_1="{{ ti.xcom_pull(task_ids='get_token', key='info_json_path') }}" - INFO_JSON_PATH_2="{{ ti.xcom_pull(task_ids='retry_get_token', key='info_json_path') }}" + INFO_JSON_PATH_1="{{ ti.xcom_pull(task_ids='acquire_token_with_retry.get_token', key='info_json_path') }}" + INFO_JSON_PATH_2="{{ ti.xcom_pull(task_ids='acquire_token_with_retry.retry_get_token', key='info_json_path') }}" INFO_JSON_PATH="${INFO_JSON_PATH_1:-$INFO_JSON_PATH_2}" - PROXY_1="{{ ti.xcom_pull(task_ids='get_token', key='socks_proxy') }}" - PROXY_2="{{ ti.xcom_pull(task_ids='retry_get_token', key='socks_proxy') }}" + PROXY_1="{{ ti.xcom_pull(task_ids='acquire_token_with_retry.get_token', key='socks_proxy') }}" + PROXY_2="{{ ti.xcom_pull(task_ids='acquire_token_with_retry.retry_get_token', key='socks_proxy') }}" PROXY="${PROXY_1:-$PROXY_2}" FORMAT="{{ params.download_format }}" @@ -991,48 +1137,31 @@ with DAG( trigger_rule='all_done', ) - trigger_self_run = PythonOperator( - task_id='trigger_self_run', + continue_loop_and_trigger_next_run = PythonOperator( + task_id='continue_loop_and_trigger_next_run', python_callable=trigger_self_run_callable, ) - stop_loop = DummyOperator(task_id='stop_loop') - fail_loop = BashOperator(task_id='fail_loop', bash_command='exit 1') + stop_worker_lane_gracefully = DummyOperator(task_id='stop_worker_lane_gracefully') + mark_dag_run_as_failed = BashOperator(task_id='mark_dag_run_as_failed', bash_command='exit 1') # --- Define Task Dependencies --- - pull_url_from_redis >> assign_account >> get_token >> handle_bannable_error_branch + pull_url_and_assign_account >> acquire_token_group - # The branch operator decides the path after the first token attempt. - # It can go to the success path (setup_download_and_probe), the retry path (ban_account_and_prepare_for_retry), - # the stop-on-failure path (ban_account_and_fail), or the generic failure path. - handle_bannable_error_branch >> [ - setup_download_and_probe, - ban_account_and_prepare_for_retry, - ban_account_and_fail, - handle_generic_failure, - ] + # The TaskGroup's internal success task (`token_acquisition_succeeded`) is the trigger for the download. + # This is more explicit than depending on the entire group's state and prevents the skip issue. + dag.get_task('acquire_token_with_retry.token_acquisition_succeeded') >> download_and_probe - # The retry path itself - ban_account_and_prepare_for_retry >> assign_new_account_for_retry >> retry_get_token >> handle_retry_failure_branch + download_and_probe >> mark_url_as_success - # The branch operator after the retry attempt. - # It can go to the success path (setup_download_and_probe) or the final failure path (ban_second_account_and_proxy). - handle_retry_failure_branch >> [ - setup_download_and_probe, - ban_second_account_and_proxy, - ] + # Define the failure path. The generic failure handler is set downstream of the two + # main tasks that can fail. Its 'one_failed' trigger rule ensures it only runs on failure. + # This explicit list avoids potential scheduler ambiguity. + [acquire_token_group, download_and_probe] >> handle_generic_failure - # The main success path, which can be reached from either the first attempt or the retry. - setup_download_and_probe >> download_and_probe >> mark_url_as_success + # Define the final decision point. This task must run after the success path completes + # OR after the failure path completes. Its 'all_done' trigger rule makes this possible. + mark_url_as_success >> decide_next_step + handle_generic_failure >> decide_next_step - # Define all paths that lead to the generic failure handler. - # This task runs if any of its direct upstreams fail. - download_and_probe >> handle_generic_failure - ban_account_and_fail >> handle_generic_failure - ban_second_account_and_proxy >> handle_generic_failure - # A non-bannable failure in get_token will fail the task, which is handled by the branch operator - # which then correctly routes to handle_generic_failure. A direct link is not needed. - - # Final decision point. It runs after success or after failure has been handled. - [mark_url_as_success, handle_generic_failure] >> decide_next_step - decide_next_step >> [trigger_self_run, stop_loop, fail_loop] + decide_next_step >> [continue_loop_and_trigger_next_run, stop_worker_lane_gracefully, mark_dag_run_as_failed]