Updated dags with queue and hosts

This commit is contained in:
aperez 2025-08-07 18:01:23 +03:00
parent 274bef5370
commit 4455a10726
5 changed files with 798 additions and 470 deletions

View File

@ -35,6 +35,16 @@
- **Управление прокси:** Позволяет вручную банить, разбанивать или сбрасывать статус прокси. - **Управление прокси:** Позволяет вручную банить, разбанивать или сбрасывать статус прокси.
- **Управление аккаунтами:** Позволяет вручную банить или разбанивать аккаунты. - **Управление аккаунтами:** Позволяет вручную банить или разбанивать аккаунты.
### `ytdlp_mgmt_queues`
- **Назначение:** Предоставляет набор инструментов для управления очередями Redis, используемыми в конвейере обработки.
- **Функциональность (через параметр `action`):**
- `add_videos`: Добавление одного или нескольких URL-адресов YouTube в очередь.
- `clear_queue`: Очистка (удаление) указанного ключа Redis.
- `list_contents`: Просмотр содержимого ключа Redis (списка или хэша).
- `check_status`: Проверка общего состояния очередей (тип, размер).
- `requeue_failed`: Перемещение всех URL-адресов из очереди сбоев `_fail` обратно в очередь `_inbox` для повторной обработки.
## Стратегия управления ресурсами (Прокси и Аккаунты) ## Стратегия управления ресурсами (Прокси и Аккаунты)
Система использует интеллектуальную стратегию для управления жизненным циклом и состоянием аккаунтов и прокси, чтобы максимизировать процент успеха и минимизировать блокировки. Система использует интеллектуальную стратегию для управления жизненным циклом и состоянием аккаунтов и прокси, чтобы максимизировать процент успеха и минимизировать блокировки.

View File

@ -89,7 +89,6 @@ def _list_proxy_statuses(client, server_identity):
statuses = client.getProxyStatus(server_identity) statuses = client.getProxyStatus(server_identity)
if not statuses: if not statuses:
logger.info("No proxy statuses found.") logger.info("No proxy statuses found.")
print("No proxy statuses found.")
return return
from tabulate import tabulate from tabulate import tabulate
@ -127,15 +126,23 @@ def _list_proxy_statuses(client, server_identity):
print("NOTE: To see Recent Accounts/Machines, the server's `getProxyStatus` method must be updated to return these fields.") print("NOTE: To see Recent Accounts/Machines, the server's `getProxyStatus` method must be updated to return these fields.")
def _list_account_statuses(client, account_id): def _list_account_statuses(client, account_id, redis_conn_id):
"""Lists the status of accounts.""" """Lists the status of accounts, enriching with live data from Redis."""
logger.info(f"Listing account statuses for account: {account_id or 'ALL'}") logger.info(f"Listing account statuses for account: {account_id or 'ALL'}")
redis_client = None
try:
redis_client = _get_redis_client(redis_conn_id)
logger.info("Successfully connected to Redis to fetch detailed account status.")
except Exception as e:
logger.warning(f"Could not connect to Redis to get detailed status. Will show basic status. Error: {e}")
redis_client = None
try: try:
# The thrift method takes accountId (specific) or accountPrefix. # The thrift method takes accountId (specific) or accountPrefix.
# If account_id is provided, we use it. If not, we get all by leaving both params as None. # If account_id is provided, we use it. If not, we get all by leaving both params as None.
statuses = client.getAccountStatus(accountId=account_id, accountPrefix=None) statuses = client.getAccountStatus(accountId=account_id, accountPrefix=None)
if not statuses: if not statuses:
logger.info("No account statuses found.")
print("\n--- Account Statuses ---\nNo account statuses found.\n------------------------\n") print("\n--- Account Statuses ---\nNo account statuses found.\n------------------------\n")
return return
@ -143,6 +150,29 @@ def _list_account_statuses(client, account_id):
status_list = [] status_list = []
for s in statuses: for s in statuses:
status_str = s.status
# If an account is resting, get the live countdown from Redis for accuracy.
if redis_client and 'RESTING' in status_str:
try:
status_key = f"account_status:{s.accountId}"
# The server stores resting expiry time in 'resting_until'.
expiry_ts_bytes = redis_client.hget(status_key, "resting_until")
if expiry_ts_bytes:
expiry_ts = float(expiry_ts_bytes)
now = datetime.now().timestamp()
if now >= expiry_ts:
status_str = "ACTIVE (was RESTING)"
else:
remaining_seconds = int(expiry_ts - now)
if remaining_seconds > 3600:
status_str = f"RESTING (active in {remaining_seconds // 3600}h {remaining_seconds % 3600 // 60}m)"
elif remaining_seconds > 60:
status_str = f"RESTING (active in {remaining_seconds // 60}m {remaining_seconds % 60}s)"
else:
status_str = f"RESTING (active in {remaining_seconds}s)"
except Exception as e:
logger.warning(f"Could not parse resting time for {s.accountId} from Redis: {e}. Using server status.")
# Determine the last activity timestamp for sorting # Determine the last activity timestamp for sorting
last_success = float(s.lastSuccessTimestamp) if s.lastSuccessTimestamp else 0 last_success = float(s.lastSuccessTimestamp) if s.lastSuccessTimestamp else 0
last_failure = float(s.lastFailureTimestamp) if s.lastFailureTimestamp else 0 last_failure = float(s.lastFailureTimestamp) if s.lastFailureTimestamp else 0
@ -150,7 +180,7 @@ def _list_account_statuses(client, account_id):
status_item = { status_item = {
"Account ID": s.accountId, "Account ID": s.accountId,
"Status": s.status, "Status": status_str,
"Success": s.successCount, "Success": s.successCount,
"Failures": s.failureCount, "Failures": s.failureCount,
"Last Success": format_timestamp(s.lastSuccessTimestamp), "Last Success": format_timestamp(s.lastSuccessTimestamp),
@ -191,47 +221,77 @@ def manage_system_callable(**context):
proxy_url = params.get("proxy_url") proxy_url = params.get("proxy_url")
account_id = params.get("account_id") account_id = params.get("account_id")
if action in ["ban", "unban", "reset_all"] and entity == "proxy" and not server_identity: # --- Validate Action/Entity Combination and Parameters ---
raise ValueError(f"A 'server_identity' is required for proxy action '{action}'.") valid_actions = {
if action in ["ban", "unban"] and entity == "account" and not account_id: "proxy": ["list_statuses", "ban", "unban", "unban_all", "delete_from_redis"],
raise ValueError(f"An 'account_id' is required for account action '{action}'.") "account": ["list_statuses", "ban", "unban", "unban_all", "delete_from_redis"],
"all": ["list_statuses"]
}
# Handle direct Redis action separately to avoid creating an unnecessary Thrift connection. if action not in valid_actions.get(entity, []):
if entity == "account" and action == "remove_all": raise ValueError(
confirm = params.get("confirm_remove_all_accounts", False) f"The action '{action}' is not valid for entity '{entity}'.\n"
if not confirm: f"Valid actions for '{entity}' are: {', '.join(valid_actions.get(entity, ['None']))}."
message = "FATAL: 'remove_all' action requires 'confirm_remove_all_accounts' to be set to True. No accounts were removed." )
logger.error(message)
print(f"\nERROR: {message}\n") # Validate required parameters for the chosen action
raise ValueError(message) if entity == "proxy":
if action in ["ban", "unban", "unban_all"] and not server_identity:
raise ValueError(f"A 'server_identity' is required for proxy action '{action}'.")
if action in ["ban", "unban"] and not proxy_url:
raise ValueError(f"A 'proxy_url' is required for proxy action '{action}'.")
if entity == "account":
if action in ["ban", "unban"] and not account_id:
raise ValueError(f"An 'account_id' is required for account action '{action}'.")
# Handle direct Redis actions separately to avoid creating an unnecessary Thrift connection.
if action == "delete_from_redis":
redis_conn_id = params["redis_conn_id"] redis_conn_id = params["redis_conn_id"]
account_prefix = params.get("account_id") # Repurpose account_id param as an optional prefix
redis_client = _get_redis_client(redis_conn_id) redis_client = _get_redis_client(redis_conn_id)
pattern = f"account_status:{account_prefix}*" if account_prefix else "account_status:*" if entity == "account":
logger.warning(f"Searching for account status keys in Redis with pattern: '{pattern}'") account_prefix = params.get("account_id") # Repurpose account_id param as an optional prefix
pattern = f"account_status:{account_prefix}*" if account_prefix else "account_status:*"
logger.warning(f"Searching for account status keys in Redis with pattern: '{pattern}'")
# scan_iter returns bytes, so we don't need to decode for deletion keys_to_delete = [key for key in redis_client.scan_iter(pattern)]
keys_to_delete = [key for key in redis_client.scan_iter(pattern)]
if not keys_to_delete: if not keys_to_delete:
logger.info(f"No account keys found matching pattern '{pattern}'. Nothing to do.") print(f"\nNo accounts found matching pattern '{pattern}'.\n")
print(f"\nNo accounts found matching pattern '{pattern}'.\n") return
return
logger.warning(f"Found {len(keys_to_delete)} account keys to delete. This is a destructive operation!") print(f"\nWARNING: Found {len(keys_to_delete)} accounts to remove from Redis.")
print(f"\nWARNING: Found {len(keys_to_delete)} accounts to remove from Redis.") for key in keys_to_delete[:10]:
# Decode for printing print(f" - {key.decode('utf-8')}")
for key in keys_to_delete[:10]: if len(keys_to_delete) > 10:
print(f" - {key.decode('utf-8')}") print(f" ... and {len(keys_to_delete) - 10} more.")
if len(keys_to_delete) > 10:
print(f" ... and {len(keys_to_delete) - 10} more.") deleted_count = redis_client.delete(*keys_to_delete)
print(f"\nSuccessfully removed {deleted_count} accounts from Redis.\n")
elif entity == "proxy":
proxy_url = params.get("proxy_url")
server_identity = params.get("server_identity")
if not proxy_url:
raise ValueError("A 'proxy_url' is required for proxy action 'delete_from_redis'.")
if not server_identity:
raise ValueError("A 'server_identity' is required for proxy action 'delete_from_redis'.")
proxy_state_key = f"proxies:{server_identity}"
proxy_failure_key = f"proxy_failures:{proxy_url}"
logger.warning(f"Deleting proxy '{proxy_url}' state from hash '{proxy_state_key}' and failure key '{proxy_failure_key}' from Redis.")
with redis_client.pipeline() as pipe:
pipe.hdel(proxy_state_key, proxy_url)
pipe.delete(proxy_failure_key)
results = pipe.execute()
hdel_result = results[0]
del_result = results[1]
print(f"\nSuccessfully removed proxy '{proxy_url}' from state hash (result: {hdel_result}) and deleted failure key (result: {del_result}).\n")
deleted_count = redis_client.delete(*keys_to_delete)
logger.info(f"Successfully deleted {deleted_count} account keys from Redis.")
print(f"\nSuccessfully removed {deleted_count} accounts from Redis.\n")
return # End execution for this action return # End execution for this action
client, transport = None, None client, transport = None, None
@ -239,7 +299,7 @@ def manage_system_callable(**context):
client, transport = get_thrift_client(host, port) client, transport = get_thrift_client(host, port)
if entity == "proxy": if entity == "proxy":
if action == "list": if action == "list_statuses":
_list_proxy_statuses(client, server_identity) _list_proxy_statuses(client, server_identity)
elif action == "ban": elif action == "ban":
if not proxy_url: raise ValueError("A 'proxy_url' is required.") if not proxy_url: raise ValueError("A 'proxy_url' is required.")
@ -251,16 +311,14 @@ def manage_system_callable(**context):
logger.info(f"Unbanning proxy '{proxy_url}' for server '{server_identity}'...") logger.info(f"Unbanning proxy '{proxy_url}' for server '{server_identity}'...")
client.unbanProxy(proxy_url, server_identity) client.unbanProxy(proxy_url, server_identity)
print(f"Successfully sent request to unban proxy '{proxy_url}'.") print(f"Successfully sent request to unban proxy '{proxy_url}'.")
elif action == "reset_all": elif action == "unban_all":
logger.info(f"Resetting all proxy statuses for server '{server_identity}'...") logger.info(f"Unbanning all proxy statuses for server '{server_identity}'...")
client.resetAllProxyStatuses(server_identity) client.resetAllProxyStatuses(server_identity)
print(f"Successfully sent request to reset all proxy statuses for '{server_identity}'.") print(f"Successfully sent request to unban all proxy statuses for '{server_identity}'.")
else:
raise ValueError(f"Invalid action '{action}' for entity 'proxy'.")
elif entity == "account": elif entity == "account":
if action == "list": if action == "list_statuses":
_list_account_statuses(client, account_id) _list_account_statuses(client, account_id, params["redis_conn_id"])
elif action == "ban": elif action == "ban":
if not account_id: raise ValueError("An 'account_id' is required.") if not account_id: raise ValueError("An 'account_id' is required.")
reason = f"Manual ban from Airflow mgmt DAG by {socket.gethostname()}" reason = f"Manual ban from Airflow mgmt DAG by {socket.gethostname()}"
@ -273,48 +331,44 @@ def manage_system_callable(**context):
logger.info(f"Unbanning account '{account_id}'...") logger.info(f"Unbanning account '{account_id}'...")
client.unbanAccount(accountId=account_id, reason=reason) client.unbanAccount(accountId=account_id, reason=reason)
print(f"Successfully sent request to unban account '{account_id}'.") print(f"Successfully sent request to unban account '{account_id}'.")
elif action == "reset_all": elif action == "unban_all":
account_prefix = account_id # Repurpose account_id param as an optional prefix account_prefix = account_id # Repurpose account_id param as an optional prefix
logger.info(f"Resetting all account statuses to ACTIVE (prefix: '{account_prefix or 'ALL'}')...") logger.info(f"Unbanning all account statuses to ACTIVE (prefix: '{account_prefix or 'ALL'}')...")
all_statuses = client.getAccountStatus(accountId=None, accountPrefix=account_prefix) all_statuses = client.getAccountStatus(accountId=None, accountPrefix=account_prefix)
if not all_statuses: if not all_statuses:
print(f"No accounts found with prefix '{account_prefix or 'ALL'}' to reset.") print(f"No accounts found with prefix '{account_prefix or 'ALL'}' to unban.")
return return
accounts_to_reset = [s.accountId for s in all_statuses] accounts_to_unban = [s.accountId for s in all_statuses]
logger.info(f"Found {len(accounts_to_reset)} accounts to reset.") logger.info(f"Found {len(accounts_to_unban)} accounts to unban.")
print(f"Found {len(accounts_to_reset)} accounts. Sending unban request for each...") print(f"Found {len(accounts_to_unban)} accounts. Sending unban request for each...")
reset_count = 0 unban_count = 0
fail_count = 0 fail_count = 0
for acc_id in accounts_to_reset: for acc_id in accounts_to_unban:
try: try:
reason = f"Manual reset from Airflow mgmt DAG by {socket.gethostname()}" reason = f"Manual unban_all from Airflow mgmt DAG by {socket.gethostname()}"
client.unbanAccount(accountId=acc_id, reason=reason) client.unbanAccount(accountId=acc_id, reason=reason)
logger.info(f" - Sent reset (unban) for '{acc_id}'.") logger.info(f" - Sent unban for '{acc_id}'.")
reset_count += 1 unban_count += 1
except Exception as e: except Exception as e:
logger.error(f" - Failed to reset account '{acc_id}': {e}") logger.error(f" - Failed to unban account '{acc_id}': {e}")
fail_count += 1 fail_count += 1
print(f"\nSuccessfully sent reset requests for {reset_count} accounts.") print(f"\nSuccessfully sent unban requests for {unban_count} accounts.")
if fail_count > 0: if fail_count > 0:
print(f"Failed to send reset requests for {fail_count} accounts. See logs for details.") print(f"Failed to send unban requests for {fail_count} accounts. See logs for details.")
# Optionally, list statuses again to confirm # Optionally, list statuses again to confirm
print("\n--- Listing statuses after reset ---") print("\n--- Listing statuses after unban_all ---")
_list_account_statuses(client, account_prefix) _list_account_statuses(client, account_prefix, params["redis_conn_id"])
else:
raise ValueError(f"Invalid action '{action}' for entity 'account'.")
elif entity == "all": elif entity == "all":
if action == "list": if action == "list_statuses":
print("\nListing all entities...") print("\nListing all entities...")
_list_proxy_statuses(client, server_identity) _list_proxy_statuses(client, server_identity)
_list_account_statuses(client, account_id) _list_account_statuses(client, account_id, params["redis_conn_id"])
else:
raise ValueError(f"Action '{action}' is not supported for entity 'all'. Only 'list' is supported.")
except (PBServiceException, PBUserException) as e: except (PBServiceException, PBUserException) as e:
logger.error(f"Thrift error performing action '{action}': {e.message}", exc_info=True) logger.error(f"Thrift error performing action '{action}': {e.message}", exc_info=True)
@ -335,40 +389,53 @@ with DAG(
start_date=days_ago(1), start_date=days_ago(1),
schedule=None, schedule=None,
catchup=False, catchup=False,
tags=["ytdlp", "utility", "proxy", "account", "management"], tags=["ytdlp", "mgmt", "master"],
doc_md=""" doc_md="""
### YT-DLP Proxy and Account Manager DAG ### YT-DLP Proxy and Account Manager DAG
This DAG provides tools to manage the state of **proxies and accounts** used by the `ytdlp-ops-server`. This DAG provides tools to manage the state of **proxies and accounts** used by the `ytdlp-ops-server`.
Select an `entity` and an `action` to perform. Note that not all actions are available for all entities.
**Parameters:** ---
- `host`, `port`: Connection details for the `ytdlp-ops-server` Thrift service.
- `entity`: The type of resource to manage (`proxy`, `account`, or `all`). #### Actions for `entity: proxy`
- `action`: The operation to perform. - `list_statuses`: View status of all proxies, optionally filtered by `server_identity`.
- `list`: View statuses. For `entity: all`, lists both proxies and accounts. - `ban`: Ban a specific proxy for a given `server_identity`. Requires `proxy_url`.
- `ban`: Ban a specific proxy or account. - `unban`: Un-ban a specific proxy. Requires `proxy_url`.
- `unban`: Un-ban a specific proxy or account. - `unban_all`: Resets the status of all proxies for a given `server_identity` to `ACTIVE`.
- `reset_all`: Reset all proxies for a server (or all accounts) to `ACTIVE`. - `delete_from_redis`: **(Destructive)** Deletes a proxy's state from Redis for a specific `server_identity`. This removes its state (ACTIVE/BANNED) and its failure history. The server will re-create it with a default `ACTIVE` state on its next refresh if the proxy is still in the server's configuration. Use this to reset a single proxy's state completely. Requires `proxy_url` and `server_identity`.
- `remove_all`: **Deletes all account status keys** from Redis for a given prefix. This is a destructive action.
- `server_identity`: Required for most proxy actions. #### Actions for `entity: account`
- `proxy_url`: Required for banning/unbanning a specific proxy. - `list_statuses`: View status of all accounts, optionally filtered by `account_id` (as a prefix).
- `account_id`: Required for managing a specific account. For `action: reset_all` or `remove_all` on `entity: account`, this can be used as an optional prefix to filter which accounts to act on. - `ban`: Ban a specific account. Requires `account_id`.
- `confirm_remove_all_accounts`: **Required for `remove_all` action.** Must be set to `True` to confirm deletion. - `unban`: Un-ban a specific account. Requires `account_id`.
- `unban_all`: Sets the status of all accounts (or those matching a prefix in `account_id`) to `ACTIVE`.
- `delete_from_redis`: **(Destructive)** Deletes account status keys from Redis. This permanently removes the account from being tracked by the system. This is different from `unban`. Use with caution.
#### Actions for `entity: all`
- `list_statuses`: A convenience to view statuses for both proxies and accounts in one run.
---
**When to use `delete_from_redis`?**
- **For Accounts:** Account state is managed entirely within Redis. Deleting an account's key is a permanent removal from the system's tracking. This is different from `unban`, which just resets the status. Use this when you want to completely remove an account.
- **For Proxies:** Proxies are defined in the server's startup configuration. Redis only stores their *state* (e.g., `BANNED` or `ACTIVE`) and failure history. Deleting a proxy's state from Redis will cause the server to re-create it with a default `ACTIVE` state on its next refresh cycle. This action is useful for completely resetting a single proxy that may be stuck or has a long failure history, without having to reset all proxies for that server.
""", """,
params={ params={
"host": Param(DEFAULT_YT_AUTH_SERVICE_IP, type="string", description="The hostname of the ytdlp-ops-server service. Default is from Airflow variable YT_AUTH_SERVICE_IP or hardcoded."), "host": Param(DEFAULT_YT_AUTH_SERVICE_IP, type="string", description="The hostname of the ytdlp-ops-server service. Default is from Airflow variable YT_AUTH_SERVICE_IP or hardcoded."),
"port": Param(DEFAULT_YT_AUTH_SERVICE_PORT, type="integer", description="The port of the ytdlp-ops-server service (Envoy load balancer). Default is from Airflow variable YT_AUTH_SERVICE_PORT or hardcoded."), "port": Param(DEFAULT_YT_AUTH_SERVICE_PORT, type="integer", description="The port of the ytdlp-ops-server service (Envoy load balancer). Default is from Airflow variable YT_AUTH_SERVICE_PORT or hardcoded."),
"entity": Param( "entity": Param(
"all", "account",
type="string", type="string",
enum=["proxy", "account", "all"], enum=["account", "proxy", "all"],
description="The type of entity to manage. Use 'all' with action 'list' to see both.", description="The type of entity to manage.",
), ),
"action": Param( "action": Param(
"list", "list_statuses",
type="string", type="string",
enum=["list", "ban", "unban", "reset_all", "remove_all"], enum=["list_statuses", "ban", "unban", "unban_all", "delete_from_redis"],
description="The management action to perform. `reset_all` for proxies/accounts. `remove_all` for accounts only.", description="The management action to perform. See the DAG documentation for which actions are valid for each entity.",
), ),
"server_identity": Param( "server_identity": Param(
"ytdlp-ops-airflow-service", "ytdlp-ops-airflow-service",
@ -383,19 +450,13 @@ with DAG(
"account_id": Param( "account_id": Param(
None, None,
type=["null", "string"], type=["null", "string"],
description="The account ID to act upon. For `reset_all` or `remove_all` on accounts, this can be an optional prefix.", description="The account ID to act upon. For `unban_all` or `delete_from_redis` on accounts, this can be an optional prefix.",
),
"confirm_remove_all_accounts": Param(
False,
type="boolean",
title="[remove_all] Confirm Deletion",
description="Must be set to True to execute the 'remove_all' action for accounts. This is a destructive operation.",
), ),
"redis_conn_id": Param( "redis_conn_id": Param(
DEFAULT_REDIS_CONN_ID, DEFAULT_REDIS_CONN_ID,
type="string", type="string",
title="Redis Connection ID", title="Redis Connection ID",
description="The Airflow connection ID for the Redis server (used for 'remove_all').", description="The Airflow connection ID for the Redis server (used for 'delete_from_redis' and for fetching detailed account status).",
), ),
}, },
) as dag: ) as dag:

View File

@ -20,6 +20,8 @@ from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator from airflow.operators.empty import EmptyOperator
from airflow.providers.redis.hooks.redis import RedisHook from airflow.providers.redis.hooks.redis import RedisHook
from airflow.utils.dates import days_ago from airflow.utils.dates import days_ago
from airflow.models.variable import Variable
import requests
# Configure logging # Configure logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -28,6 +30,7 @@ logger = logging.getLogger(__name__)
DEFAULT_REDIS_CONN_ID = "redis_default" DEFAULT_REDIS_CONN_ID = "redis_default"
DEFAULT_QUEUE_NAME = "video_queue" DEFAULT_QUEUE_NAME = "video_queue"
DEFAULT_QUEUE_TO_CLEAR = 'PLEASE_SPECIFY_QUEUE_TO_CLEAR' DEFAULT_QUEUE_TO_CLEAR = 'PLEASE_SPECIFY_QUEUE_TO_CLEAR'
DEFAULT_URL_LISTS_DIR = '/opt/airflow/inputfiles'
# --- Helper Functions --- # --- Helper Functions ---
@ -42,6 +45,92 @@ def _get_redis_client(redis_conn_id: str):
raise AirflowException(f"Redis connection failed: {e}") raise AirflowException(f"Redis connection failed: {e}")
def _get_predefined_url_lists():
"""Returns a static list of predefined URL list files."""
# This is a static list to ensure options are always visible in the UI,
# even if the files don't exist on the filesystem at parse time.
# The DAG will check for the file's existence at runtime.
predefined_files = [
'urls.dh128.json',
'urls.rt100.json',
'urls.rt25.json',
'urls.sky28.json',
'urls.sky3.json',
'urls.tq46.json',
]
return ['None'] + sorted(predefined_files)
def _get_urls_from_source(**params) -> List[str]:
"""
Determines the source of video inputs based on the 'input_source' param and returns a list of raw items.
"""
input_source = params.get("input_source", "manual")
predefined_list = params.get("predefined_url_list")
file_path_or_url = params.get("url_list_file_path")
manual_inputs = params.get("video_inputs")
# Source 1: Predefined file
if input_source == 'predefined_file':
if not predefined_list or predefined_list == 'None':
raise AirflowException("Input source is 'predefined_file', but no file was selected from the list.")
default_path = DEFAULT_URL_LISTS_DIR
url_lists_dir = Variable.get('YTDLP_URL_LISTS_DIR', default_var=default_path)
file_path = os.path.join(url_lists_dir, predefined_list)
logger.info(f"Loading URLs from predefined file: {file_path}")
if not os.path.exists(file_path):
raise AirflowException(f"Selected predefined file does not exist: {file_path}")
with open(file_path, 'r', encoding='utf-8') as f:
try:
data = json.load(f)
if not isinstance(data, list):
raise AirflowException(f"JSON file '{predefined_list}' must contain a list of strings.")
return [str(item) for item in data]
except json.JSONDecodeError:
raise AirflowException(f"Failed to parse JSON from file: {predefined_list}")
# Source 2: File path or URL
elif input_source == 'file_path_or_url':
if not file_path_or_url:
raise AirflowException("Input source is 'file_path_or_url', but no path/URL was provided.")
logger.info(f"Loading URLs from provided path/URL: {file_path_or_url}")
content = ""
if file_path_or_url.startswith(('http://', 'https://')):
try:
response = requests.get(file_path_or_url, timeout=30)
response.raise_for_status()
content = response.text
except requests.RequestException as e:
raise AirflowException(f"Failed to fetch URL list from '{file_path_or_url}': {e}")
else: # Assume local file path
if not os.path.exists(file_path_or_url):
raise AirflowException(f"Provided file path does not exist: {file_path_or_url}")
with open(file_path_or_url, 'r', encoding='utf-8') as f:
content = f.read()
try:
data = json.loads(content)
if not isinstance(data, list):
raise AirflowException("JSON content from path/URL must contain a list of strings.")
return [str(item) for item in data]
except json.JSONDecodeError:
raise AirflowException(f"Failed to parse JSON from path/URL: {file_path_or_url}")
# Source 3: Manual input
elif input_source == 'manual':
if not manual_inputs:
logger.info("Input source is 'manual', but no inputs were provided. Nothing to do.")
return []
logger.info("Loading URLs from manual input.")
return parse_video_inputs(manual_inputs)
else:
logger.warning(f"No valid input source selected or no data provided for the selected source. Nothing to do.")
return []
def parse_video_inputs(input_str: str) -> List[str]: def parse_video_inputs(input_str: str) -> List[str]:
"""Parses a flexible string of video inputs into a list of individual items.""" """Parses a flexible string of video inputs into a list of individual items."""
if not input_str or not isinstance(input_str, str): if not input_str or not isinstance(input_str, str):
@ -159,35 +248,54 @@ def dump_redis_data_to_csv(redis_client, dump_dir, patterns):
def clear_queue_callable(**context): def clear_queue_callable(**context):
"""Dumps Redis data to CSV and/or clears a specified Redis key.""" """Dumps Redis data to CSV and/or clears specified Redis keys based on selection."""
params = context['params'] params = context['params']
redis_conn_id = params['redis_conn_id'] redis_conn_id = params['redis_conn_id']
queue_to_clear = params['queue_to_clear'] queue_base_name = params['queue_base_name']
queues_to_clear_options = params.get('queues_to_clear_options', [])
confirm_clear = params.get('confirm_clear', False)
dump_queues = params['dump_queues'] dump_queues = params['dump_queues']
# The value from templates_dict is already rendered by Airflow.
dump_dir = context['templates_dict']['dump_dir'] dump_dir = context['templates_dict']['dump_dir']
dump_patterns = params['dump_patterns'].split(',') if params.get('dump_patterns') else [] dump_patterns = params['dump_patterns'].split(',') if params.get('dump_patterns') else []
if not confirm_clear:
message = "Action is 'clear_queue', but 'Confirm Deletion' was not checked. Aborting to prevent accidental data loss."
logger.error(message)
raise AirflowException(message)
# If no queues are selected, default to clearing all of them.
if not queues_to_clear_options:
logger.warning("No specific queues selected to clear. Defaulting to '_all'.")
queues_to_clear_options = ['_all']
redis_client = _get_redis_client(redis_conn_id) redis_client = _get_redis_client(redis_conn_id)
if dump_queues and dump_patterns: if dump_queues and dump_patterns:
logger.info("Dumping is enabled. Performing dump before clearing.")
dump_redis_data_to_csv(redis_client, dump_dir, dump_patterns) dump_redis_data_to_csv(redis_client, dump_dir, dump_patterns)
if not queue_to_clear or queue_to_clear == DEFAULT_QUEUE_TO_CLEAR: all_suffixes = ['_inbox', '_fail', '_result', '_progress']
logger.info("Parameter 'queue_to_clear' is not specified or is the default placeholder. Skipping key deletion.") keys_to_delete = set()
# If we only wanted to dump, this is a success. if '_all' in queues_to_clear_options:
logger.info("'_all' option selected. Clearing all standard queues.")
for suffix in all_suffixes:
keys_to_delete.add(f"{queue_base_name}{suffix}")
else:
for suffix in queues_to_clear_options:
if suffix in all_suffixes:
keys_to_delete.add(f"{queue_base_name}{suffix}")
if not keys_to_delete:
logger.warning("No valid queue suffixes were selected. Nothing to delete.")
return return
logger.info(f"Attempting to clear Redis key '{queue_to_clear}' using connection '{redis_conn_id}'.") logger.info(f"Attempting to clear {len(keys_to_delete)} Redis key(s): {sorted(list(keys_to_delete))}")
try: try:
deleted_count = redis_client.delete(queue_to_clear) deleted_count = redis_client.delete(*keys_to_delete)
if deleted_count > 0: logger.info(f"Successfully sent delete command for {len(keys_to_delete)} key(s). Redis reported {deleted_count} deleted.")
logger.info(f"Successfully cleared Redis key '{queue_to_clear}'.")
else:
logger.info(f"Redis key '{queue_to_clear}' did not exist or was already empty.")
except Exception as e: except Exception as e:
logger.error(f"Failed to clear Redis key '{queue_to_clear}': {e}", exc_info=True) logger.error(f"Failed to clear Redis keys: {e}", exc_info=True)
raise AirflowException(f"Failed to clear Redis key: {e}") raise AirflowException(f"Failed to clear Redis keys: {e}")
def list_contents_callable(**context): def list_contents_callable(**context):
@ -271,7 +379,7 @@ def check_status_callable(**context):
"""Checks the status (type and size) of all standard Redis queues for a given base name.""" """Checks the status (type and size) of all standard Redis queues for a given base name."""
params = context['params'] params = context['params']
redis_conn_id = params['redis_conn_id'] redis_conn_id = params['redis_conn_id']
queue_name = params.get('queue_name_for_status', DEFAULT_QUEUE_NAME) queue_name = params.get('queue_base_name', DEFAULT_QUEUE_NAME)
queue_suffixes = ['_inbox', '_progress', '_result', '_fail'] queue_suffixes = ['_inbox', '_progress', '_result', '_fail']
logger.info(f"--- Checking Status for Queues with Base Name: '{queue_name}' ---") logger.info(f"--- Checking Status for Queues with Base Name: '{queue_name}' ---")
@ -306,14 +414,13 @@ def requeue_failed_callable(**context):
""" """
params = context['params'] params = context['params']
redis_conn_id = params['redis_conn_id'] redis_conn_id = params['redis_conn_id']
queue_name = params['queue_name_for_requeue'] queue_name = params['queue_base_name']
clear_fail_queue = params['clear_fail_queue_after_requeue'] clear_fail_queue = params['clear_fail_queue_after_requeue']
fail_queue_name = f"{queue_name}_fail" fail_queue_name = f"{queue_name}_fail"
inbox_queue_name = f"{queue_name}_inbox" inbox_queue_name = f"{queue_name}_inbox"
logger.info(f"Requeuing failed URLs from '{fail_queue_name}' to '{inbox_queue_name}'.") logger.info(f"Requeuing failed URLs from '{fail_queue_name}' to '{inbox_queue_name}'.")
print(f"Requeuing failed URLs from '{fail_queue_name}' to '{inbox_queue_name}'.")
redis_client = _get_redis_client(redis_conn_id) redis_client = _get_redis_client(redis_conn_id)
@ -322,14 +429,12 @@ def requeue_failed_callable(**context):
failed_urls_bytes = redis_client.hkeys(fail_queue_name) failed_urls_bytes = redis_client.hkeys(fail_queue_name)
if not failed_urls_bytes: if not failed_urls_bytes:
logger.info(f"Fail queue '{fail_queue_name}' is empty. Nothing to requeue.") logger.info(f"Fail queue '{fail_queue_name}' is empty. Nothing to requeue.")
print(f"Fail queue '{fail_queue_name}' is empty. Nothing to requeue.")
return return
failed_urls = [url.decode('utf-8') for url in failed_urls_bytes] failed_urls = [url.decode('utf-8') for url in failed_urls_bytes]
logger.info(f"Found {len(failed_urls)} URLs to requeue.") logger.info(f"Found {len(failed_urls)} URLs to requeue:")
print(f"Found {len(failed_urls)} URLs to requeue:")
for url in failed_urls: for url in failed_urls:
print(f" - {url}") logger.info(f" - {url}")
# Add URLs to the inbox list # Add URLs to the inbox list
if failed_urls: if failed_urls:
@ -345,7 +450,6 @@ def requeue_failed_callable(**context):
f"The list now contains {final_list_length} items." f"The list now contains {final_list_length} items."
) )
logger.info(success_message) logger.info(success_message)
print(f"\n{success_message}")
if clear_fail_queue: if clear_fail_queue:
logger.info(f"Successfully cleared fail queue '{fail_queue_name}'.") logger.info(f"Successfully cleared fail queue '{fail_queue_name}'.")
@ -359,23 +463,19 @@ def requeue_failed_callable(**context):
def add_videos_to_queue_callable(**context): def add_videos_to_queue_callable(**context):
""" """
Parses video inputs, normalizes them to URLs, and adds them to a Redis queue. Parses video inputs from manual text, a predefined file, or a file path/URL,
normalizes them to URLs, and adds them to a Redis queue.
""" """
params = context["params"] params = context["params"]
video_inputs = params["video_inputs"] queue_name = params["queue_base_name"]
queue_name = params["queue_name"]
redis_conn_id = params["redis_conn_id"] redis_conn_id = params["redis_conn_id"]
dry_run = params["dry_run"] dry_run = params["dry_run"]
if not video_inputs: # This function will get the list of strings from the correct source based on precedence
logger.info("No video inputs provided. Nothing to do.") raw_items = _get_urls_from_source(**params)
print("No video inputs provided. Nothing to do.")
return
raw_items = parse_video_inputs(video_inputs)
if not raw_items: if not raw_items:
logger.info("Input string was empty or contained no items after parsing.") logger.info("No video inputs found from any source. Nothing to do.")
print("Input string was empty or contained no items after parsing.")
return return
valid_urls = [] valid_urls = []
@ -390,10 +490,8 @@ def add_videos_to_queue_callable(**context):
raise AirflowException("No valid YouTube URLs or IDs were found in the provided input.") raise AirflowException("No valid YouTube URLs or IDs were found in the provided input.")
logger.info(f"Found {len(valid_urls)} valid and unique URLs to add to the queue:") logger.info(f"Found {len(valid_urls)} valid and unique URLs to add to the queue:")
print(f"Found {len(valid_urls)} valid and unique URLs to add to the queue:")
for url in valid_urls: for url in valid_urls:
logger.info(f" - {url}") logger.info(f" - {url}")
print(f" - {url}")
if dry_run: if dry_run:
logger.info("Dry run is enabled. Skipping Redis operation.") logger.info("Dry run is enabled. Skipping Redis operation.")
@ -418,7 +516,6 @@ def add_videos_to_queue_callable(**context):
f"The list now contains {final_list_length} items." f"The list now contains {final_list_length} items."
) )
logger.info(success_message) logger.info(success_message)
print(f"\n{success_message}")
except Exception as e: except Exception as e:
logger.error(f"Failed to add URLs to Redis queue '{inbox_queue}': {e}", exc_info=True) logger.error(f"Failed to add URLs to Redis queue '{inbox_queue}': {e}", exc_info=True)
@ -435,7 +532,7 @@ with DAG(
}, },
schedule=None, schedule=None,
catchup=False, catchup=False,
tags=["ytdlp", "queue", "management", "redis", "manual"], tags=["ytdlp", "mgmt", "master"],
doc_md=""" doc_md="""
### YT-DLP Queue Management ### YT-DLP Queue Management
@ -443,7 +540,7 @@ with DAG(
Select an `action` to perform when triggering the DAG. Select an `action` to perform when triggering the DAG.
**Actions:** **Actions:**
- `add_videos`: Add one or more YouTube videos to a queue. - `add_videos`: Add one or more YouTube videos to a queue. You can provide input manually, select a predefined file from the server, or provide a path/URL to a file.
- `clear_queue`: Dump and/or delete a specific Redis key. - `clear_queue`: Dump and/or delete a specific Redis key.
- `list_contents`: View the contents of a Redis key (list or hash). - `list_contents`: View the contents of a Redis key (list or hash).
- `check_status`: Check the overall status of the queues. - `check_status`: Check the overall status of the queues.
@ -457,18 +554,41 @@ with DAG(
title="Action", title="Action",
description="The management action to perform.", description="The management action to perform.",
), ),
"queue_base_name": Param(
DEFAULT_QUEUE_NAME,
type="string",
title="Queue Base Name",
description="Base name for queues used in actions like 'add_videos', 'check_status', 'clear_queue', 'requeue_failed'.",
),
# --- Params for 'add_videos' --- # --- Params for 'add_videos' ---
"input_source": Param(
"predefined_file",
type="string",
enum=["manual", "predefined_file", "file_path_or_url"],
title="[add_videos] Video Input Source",
description="Choose how to provide the video URLs. This choice determines which of the following parameters is used.",
),
"video_inputs": Param( "video_inputs": Param(
None, None,
type=["null", "string"], type=["null", "string"],
title="[add_videos] Video URLs or IDs", title="[add_videos] 1. Manual Input",
description="A single item, comma-separated list, or JSON array of YouTube URLs or Video IDs.", description="Used if 'Input Source' is 'manual'. Paste a single item, a comma-separated list, or a JSON array of YouTube URLs or Video IDs.",
), ),
"queue_name": Param( "predefined_url_list": Param(
DEFAULT_QUEUE_NAME, "None",
type="string", type="string",
title="[add_videos] Queue Name", enum=_get_predefined_url_lists(),
description="The base name of the Redis queue to add videos to (e.g., 'video_queue').", title="[add_videos] 2. Predefined File",
description=(
"Used if 'Input Source' is 'predefined_file'. Select a JSON file from the server's URL list directory "
f"(defined by Airflow Variable 'YTDLP_URL_LISTS_DIR', defaults to '{DEFAULT_URL_LISTS_DIR}')."
),
),
"url_list_file_path": Param(
None,
type=["null", "string"],
title="[add_videos] 3. File Path or URL",
description="Used if 'Input Source' is 'file_path_or_url'. Enter a local file path (on the Airflow worker) or a remote URL to a JSON file containing a list of URLs/IDs.",
), ),
"dry_run": Param( "dry_run": Param(
False, False,
@ -477,11 +597,21 @@ with DAG(
description="If True, validate inputs without adding them to the queue.", description="If True, validate inputs without adding them to the queue.",
), ),
# --- Params for 'clear_queue' --- # --- Params for 'clear_queue' ---
"queue_to_clear": Param( "queues_to_clear_options": Param(
DEFAULT_QUEUE_TO_CLEAR, None,
type="string", type=["null", "array"],
title="[clear_queue] Queue to Clear", title="[clear_queue] Queues to Clear",
description="Exact name of the Redis key to delete.", description="Select which standard queues to clear. '_all' clears all four. If left empty, it defaults to '_all'.",
items={
"type": "string",
"enum": ["_inbox", "_fail", "_result", "_progress", "_all"],
}
),
"confirm_clear": Param(
False,
type="boolean",
title="[clear_queue] Confirm Deletion",
description="Must be set to True to execute the 'clear_queue' action. This is a destructive operation.",
), ),
"dump_queues": Param( "dump_queues": Param(
True, True,
@ -490,10 +620,10 @@ with DAG(
description="If True, dump data before clearing.", description="If True, dump data before clearing.",
), ),
"dump_dir": Param( "dump_dir": Param(
"{{ var.value.get('YTDLP_REDIS_DUMP_DIR', '/opt/airflow/dumps') }}", None,
type="string", type=["null", "string"],
title="[clear_queue] Dump Directory", title="[clear_queue] Dump Directory",
description="Base directory to save CSV dump files.", description="Base directory to save CSV dump files. Supports Jinja. If empty, defaults to Airflow variable 'YTDLP_REDIS_DUMP_DIR' or '/opt/airflow/dumps'.",
), ),
"dump_patterns": Param( "dump_patterns": Param(
'ytdlp:*,video_queue_*', 'ytdlp:*,video_queue_*',
@ -514,20 +644,7 @@ with DAG(
title="[list_contents] Max Items to List", title="[list_contents] Max Items to List",
description="Maximum number of items to show.", description="Maximum number of items to show.",
), ),
# --- Params for 'check_status' ---
"queue_name_for_status": Param(
DEFAULT_QUEUE_NAME,
type="string",
title="[check_status] Base Queue Name",
description="Base name of the queues to check (e.g., 'video_queue').",
),
# --- Params for 'requeue_failed' --- # --- Params for 'requeue_failed' ---
"queue_name_for_requeue": Param(
DEFAULT_QUEUE_NAME,
type="string",
title="[requeue_failed] Base Queue Name",
description="Base name of the queues to requeue from (e.g., 'video_queue' will use 'video_queue_fail').",
),
"clear_fail_queue_after_requeue": Param( "clear_fail_queue_after_requeue": Param(
True, True,
type="boolean", type="boolean",
@ -555,7 +672,7 @@ with DAG(
action_clear_queue = PythonOperator( action_clear_queue = PythonOperator(
task_id="action_clear_queue", task_id="action_clear_queue",
python_callable=clear_queue_callable, python_callable=clear_queue_callable,
templates_dict={'dump_dir': "{{ params.dump_dir }}"}, templates_dict={'dump_dir': "{{ params.dump_dir or var.value.get('YTDLP_REDIS_DUMP_DIR', '/opt/airflow/dumps') }}"},
) )
action_list_contents = PythonOperator( action_list_contents = PythonOperator(

View File

@ -73,24 +73,34 @@ def orchestrate_workers_ignition_callable(**context):
worker_indices = list(range(total_workers)) worker_indices = list(range(total_workers))
bunches = [worker_indices[i:i + workers_per_bunch] for i in range(0, len(worker_indices), workers_per_bunch)] bunches = [worker_indices[i:i + workers_per_bunch] for i in range(0, len(worker_indices), workers_per_bunch)]
logger.info(f"Plan: Starting {total_workers} total workers in {len(bunches)} bunches.") # Get and parse worker hosts (which are used as queue names)
worker_hosts_str = params.get('worker_hosts', 'celery@dl002')
worker_hosts = [h.strip() for h in worker_hosts_str.split(',') if h.strip()]
if not worker_hosts:
raise AirflowException("The 'worker_hosts' parameter cannot be empty.")
logger.info(f"Plan: Starting {total_workers} total workers in {len(bunches)} bunches, distributing across hosts (queues): {worker_hosts}")
dag_run_id = context['dag_run'].run_id dag_run_id = context['dag_run'].run_id
total_triggered = 0 total_triggered = 0
# Pass all orchestrator params to the worker so it has the full context for its loop.
conf_to_pass = {p: params[p] for p in params}
# The worker pulls its own URL, so we don't pass one.
if 'url' in conf_to_pass:
del conf_to_pass['url']
for i, bunch in enumerate(bunches): for i, bunch in enumerate(bunches):
logger.info(f"--- Igniting Bunch {i+1}/{len(bunches)} (contains {len(bunch)} worker(s)) ---") logger.info(f"--- Igniting Bunch {i+1}/{len(bunches)} (contains {len(bunch)} worker(s)) ---")
for j, _ in enumerate(bunch): for j, _ in enumerate(bunch):
# Create a unique run_id for each worker loop starter # Create a unique run_id for each worker loop starter
run_id = f"ignited_{dag_run_id}_{total_triggered}" run_id = f"ignited_{dag_run_id}_{total_triggered}"
logger.info(f"Igniting worker {j+1}/{len(bunch)} in bunch {i+1} (loop {total_triggered + 1}/{total_workers}) (Run ID: {run_id})") # Pass all orchestrator params to the worker so it has the full context for its loop.
conf_to_pass = {p: params[p] for p in params}
# The worker pulls its own URL, so we don't pass one.
if 'url' in conf_to_pass:
del conf_to_pass['url']
# Assign host/queue in a round-robin fashion
queue_for_worker = worker_hosts[total_triggered % len(worker_hosts)]
conf_to_pass['queue'] = queue_for_worker
logger.info(f"Igniting worker {j+1}/{len(bunch)} in bunch {i+1} (loop {total_triggered + 1}/{total_workers}) on host (queue) '{queue_for_worker}' (Run ID: {run_id})")
logger.debug(f"Full conf for worker loop {run_id}: {conf_to_pass}") logger.debug(f"Full conf for worker loop {run_id}: {conf_to_pass}")
trigger_dag( trigger_dag(
@ -151,7 +161,7 @@ with DAG(
The workers then take over, each running its own continuous processing loop. The workers then take over, each running its own continuous processing loop.
""", """,
tags=['ytdlp', 'orchestrator', 'ignition'], tags=['ytdlp', 'mgmt', 'master'],
params={ params={
# --- Ignition Control Parameters --- # --- Ignition Control Parameters ---
'total_workers': Param(DEFAULT_TOTAL_WORKERS, type="integer", description="Total number of worker loops to start."), 'total_workers': Param(DEFAULT_TOTAL_WORKERS, type="integer", description="Total number of worker loops to start."),
@ -160,6 +170,7 @@ with DAG(
'delay_between_bunches_s': Param(DEFAULT_BUNCH_DELAY_S, type="integer", description="Delay in seconds between starting each bunch."), 'delay_between_bunches_s': Param(DEFAULT_BUNCH_DELAY_S, type="integer", description="Delay in seconds between starting each bunch."),
# --- Worker Passthrough Parameters --- # --- Worker Passthrough Parameters ---
'worker_hosts': Param('celery@dl002', type="string", title="[Worker Param] Worker Hosts", description="Comma-separated list of Celery worker hostnames (e.g., 'celery@dl002') to distribute workers to. These are used as queue names. Workers will be assigned to these queues in a round-robin fashion."),
'on_bannable_failure': Param( 'on_bannable_failure': Param(
'retry_with_new_account', 'retry_with_new_account',
type="string", type="string",

View File

@ -21,6 +21,7 @@ from airflow.operators.dummy import DummyOperator
from airflow.providers.redis.hooks.redis import RedisHook from airflow.providers.redis.hooks.redis import RedisHook
from airflow.utils.dates import days_ago from airflow.utils.dates import days_ago
from airflow.utils.decorators import apply_defaults from airflow.utils.decorators import apply_defaults
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta from datetime import datetime, timedelta
from airflow.api.common.trigger_dag import trigger_dag from airflow.api.common.trigger_dag import trigger_dag
from pangramia.yt.common.ttypes import TokenUpdateMode from pangramia.yt.common.ttypes import TokenUpdateMode
@ -70,39 +71,6 @@ def _get_thrift_client(host, port, timeout):
return client, transport return client, transport
def _ban_resource_task(resource_type, resource_id_xcom_key, host, port, timeout, **kwargs):
"""
A callable function to ban a resource (account or proxy).
Designed to be used in a PythonOperator.
"""
ti = kwargs['ti']
resource_id = ti.xcom_pull(key=resource_id_xcom_key)
if not resource_id:
logger.warning(f"Could not find resource ID in XCom key '{resource_id_xcom_key}' to ban. Skipping.")
return
client, transport = None, None
try:
client, transport = _get_thrift_client(host, port, timeout)
if resource_type == 'account':
reason = f"Banned by Airflow worker due to {kwargs.get('reason', 'failure')}"
logger.warning(f"Banning account '{resource_id}'. Reason: {reason}")
client.banAccount(accountId=resource_id, reason=reason)
logger.info(f"Successfully sent request to ban account '{resource_id}'.")
elif resource_type == 'proxy':
server_identity = kwargs.get('server_identity')
if not server_identity:
logger.error("Cannot ban proxy without server_identity.")
return
logger.warning(f"Banning proxy '{resource_id}' for server '{server_identity}'.")
client.banProxy(proxyUrl=resource_id, serverIdentity=server_identity)
logger.info(f"Successfully sent request to ban proxy '{resource_id}'.")
except Exception as e:
# Log the error but don't fail the task, as this is a best-effort cleanup action.
logger.error(f"Failed to issue ban for {resource_type} '{resource_id}': {e}", exc_info=True)
finally:
if transport and transport.isOpen():
transport.close()
def _extract_video_id(url): def _extract_video_id(url):
@ -136,7 +104,7 @@ def mark_url_as_success(**context):
"""Moves URL from progress to result hash on success.""" """Moves URL from progress to result hash on success."""
ti = context['task_instance'] ti = context['task_instance']
params = context['params'] params = context['params']
url = ti.xcom_pull(task_ids='pull_url_from_redis', key='url_to_process') url = ti.xcom_pull(task_ids='pull_url_and_assign_account', key='url_to_process')
if not url: if not url:
logger.warning("mark_url_as_success called but no URL found in DAG run parameters.") logger.warning("mark_url_as_success called but no URL found in DAG run parameters.")
return return
@ -146,9 +114,12 @@ def mark_url_as_success(**context):
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
# Pull results from previous tasks # Pull results from previous tasks
info_json_path = ti.xcom_pull(task_ids='get_token', key='info_json_path') info_json_path = ti.xcom_pull(task_ids='acquire_token_with_retry.get_token', key='info_json_path') or \
socks_proxy = ti.xcom_pull(task_ids='get_token', key='socks_proxy') ti.xcom_pull(task_ids='acquire_token_with_retry.retry_get_token', key='info_json_path')
ytdlp_command = ti.xcom_pull(task_ids='get_token', key='ytdlp_command') socks_proxy = ti.xcom_pull(task_ids='acquire_token_with_retry.get_token', key='socks_proxy') or \
ti.xcom_pull(task_ids='acquire_token_with_retry.retry_get_token', key='socks_proxy')
ytdlp_command = ti.xcom_pull(task_ids='acquire_token_with_retry.get_token', key='ytdlp_command') or \
ti.xcom_pull(task_ids='acquire_token_with_retry.retry_get_token', key='ytdlp_command')
downloaded_file_path = ti.xcom_pull(task_ids='download_and_probe') downloaded_file_path = ti.xcom_pull(task_ids='download_and_probe')
logger.info(f"Handling success for URL: {url}") logger.info(f"Handling success for URL: {url}")
@ -178,56 +149,120 @@ def mark_url_as_success(**context):
def handle_failure_callable(**context): def handle_failure_callable(**context):
""" """
Handles a failed processing run by recording the error details to Redis. Handles a failed processing run by recording rich, detailed error information to Redis.
The decision to stop or continue the loop is handled by `decide_what_to_do_next`.
""" """
ti = context['task_instance'] ti = context['task_instance']
params = context['params'] params = context['params']
dag_run = context['dag_run'] dag_run = context['dag_run']
url = ti.xcom_pull(task_ids='pull_url_from_redis', key='url_to_process') url = ti.xcom_pull(task_ids='pull_url_and_assign_account', key='url_to_process')
if not url: if not url:
logger.error("handle_failure_callable called but no URL found in XCom.") # This can happen if pull_url_and_assign_account itself fails.
# We can't record a URL-specific failure, but we should log it.
failed_tis = [ti for ti in dag_run.get_task_instances() if ti.state == 'failed']
failed_task_ids = [ti.task_id for ti in failed_tis]
logger.error(f"handle_failure_callable was triggered for run {dag_run.run_id}, but no URL was found in XCom. "
f"This likely means an early task failed. Failed tasks in run: {failed_task_ids}")
return return
# --- Determine the source and type of failure --- # --- Start building the rich error report ---
failure_report = {
'url': url,
'dag_run_id': dag_run.run_id,
'failure_timestamp': datetime.now().isoformat(),
'failed_task': 'unknown',
'failure_summary': 'An unknown error occurred.',
'failure_history': [],
'download_error': None,
'generic_error': None
}
# --- Gather data from token acquisition attempts ---
# Attempt 1: get_token
get_token_ti = dag_run.get_task_instance('acquire_token_with_retry.get_token')
if get_token_ti:
error_details_1 = ti.xcom_pull(task_ids=get_token_ti.task_id, key='error_details')
account_1 = ti.xcom_pull(task_ids='pull_url_and_assign_account', key='account_id')
attempt_1_report = {
'task_id': get_token_ti.task_id,
'account_id': account_1,
'status': get_token_ti.state,
'start_date': get_token_ti.start_date.isoformat() if get_token_ti.start_date else None,
'end_date': get_token_ti.end_date.isoformat() if get_token_ti.end_date else None,
}
if error_details_1:
attempt_1_report.update({
'proxy_url': error_details_1.get('proxy_url'),
'error_code': error_details_1.get('error_code'),
'error_message': error_details_1.get('error_message'),
})
failure_report['failure_history'].append(attempt_1_report)
# Attempt 2: retry_get_token
retry_get_token_ti = dag_run.get_task_instance('acquire_token_with_retry.retry_get_token')
# Only report on retry if it actually ran
if retry_get_token_ti and retry_get_token_ti.state:
error_details_2 = ti.xcom_pull(task_ids=retry_get_token_ti.task_id, key='error_details')
account_2 = ti.xcom_pull(task_ids='acquire_token_with_retry.assign_new_account_for_retry', key='account_id')
attempt_2_report = {
'task_id': retry_get_token_ti.task_id,
'account_id': account_2,
'status': retry_get_token_ti.state,
'start_date': retry_get_token_ti.start_date.isoformat() if retry_get_token_ti.start_date else None,
'end_date': retry_get_token_ti.end_date.isoformat() if retry_get_token_ti.end_date else None,
}
if error_details_2:
attempt_2_report.update({
'proxy_url': error_details_2.get('proxy_url'),
'error_code': error_details_2.get('error_code'),
'error_message': error_details_2.get('error_message'),
})
failure_report['failure_history'].append(attempt_2_report)
# --- Identify the primary failure point ---
exception = context.get('exception') exception = context.get('exception')
failed_task_id = "unknown"
upstream_tasks = ti.task.get_direct_relatives(upstream=True)
for task in upstream_tasks:
upstream_ti = dag_run.get_task_instance(task_id=task.task_id)
if upstream_ti and upstream_ti.state == 'failed':
failed_task_id = task.task_id
break
# --- Extract Detailed Error Information --- # Case 1: Download & Probe failure
error_details_from_xcom = None download_probe_ti = dag_run.get_task_instance('download_and_probe')
if failed_task_id != "unknown": if download_probe_ti and download_probe_ti.state == 'failed':
error_details_from_xcom = ti.xcom_pull(task_ids=failed_task_id, key='error_details') failure_report['failed_task'] = download_probe_ti.task_id
failure_report['failure_summary'] = 'Download or probe failed after successful token acquisition.'
failure_report['download_error'] = {
'error_message': str(exception) if exception else "BashOperator failed. Check task logs for yt-dlp/ffmpeg output.",
'error_type': type(exception).__name__ if exception else "Unknown",
}
if error_details_from_xcom: # Case 2: Token acquisition failure
error_message = error_details_from_xcom.get('error_message', 'Unknown error from XCom')
error_type = error_details_from_xcom.get('error_type', 'Unknown type from XCom')
tb_str = error_details_from_xcom.get('traceback', 'No traceback in XCom.')
else: else:
error_message = str(exception) if exception else "Unknown error" last_failed_attempt = next((attempt for attempt in reversed(failure_report['failure_history']) if attempt['status'] == 'failed'), None)
error_type = type(exception).__name__ if exception else "Unknown" if last_failed_attempt:
tb_str = "".join(traceback.format_exception(etype=type(exception), value=exception, tb=exception.__traceback__)) if exception else "No traceback available." failure_report['failed_task'] = last_failed_attempt['task_id']
failure_report['failure_summary'] = f"Token acquisition failed with error: {last_failed_attempt.get('error_code', 'Unknown')}"
else:
# Case 3: Generic/unexpected failure
failed_tis = [ti for ti in dag_run.get_task_instances() if ti.state == 'failed']
if failed_tis:
# Heuristic: pick the one with the latest end_date that is not this task itself
failed_tis.sort(key=lambda x: x.end_date or datetime.min)
last_failed_ti = next((ti for ti in reversed(failed_tis) if ti.task_id != context['task_instance'].task_id), None)
if last_failed_ti:
failure_report['failed_task'] = last_failed_ti.task_id
failure_report['failure_summary'] = f"Task '{last_failed_ti.task_id}' failed unexpectedly."
failure_report['generic_error'] = {
'error_message': str(exception) if exception else f"Unexpected failure in task {last_failed_ti.task_id}. Check logs.",
'error_type': type(exception).__name__ if exception else "Unknown",
'traceback': "".join(traceback.format_exception(etype=type(exception), value=exception, tb=exception.__traceback__)) if exception else "No traceback available."
}
logger.info(f"Handling failure for URL: {url}") logger.info(f"Handling failure for URL: {url}")
logger.error(f" Failed Task: {failed_task_id}") logger.error(f" Failure Summary: {failure_report['failure_summary']}")
logger.error(f" Failure Type: {error_type}") logger.error(f" Failed Task: {failure_report['failed_task']}")
logger.error(f" Failure Reason: {error_message}") # Using print to ensure the full JSON is visible in the logs without truncation
logger.debug(f" Traceback:\n{tb_str}") print("--- Detailed Failure Report ---")
print(json.dumps(failure_report, indent=2))
final_error_details = { print("-----------------------------")
'failed_task': failed_task_id,
'error_type': error_type,
'error_message': error_message,
'traceback': tb_str,
'url': url,
'dag_run_id': context['dag_run'].run_id,
}
# For all failures, mark the URL as failed in Redis. # For all failures, mark the URL as failed in Redis.
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
@ -235,7 +270,7 @@ def handle_failure_callable(**context):
fail_queue = f"{queue_name}_fail" fail_queue = f"{queue_name}_fail"
try: try:
client = _get_redis_client(redis_conn_id) client = _get_redis_client(redis_conn_id)
client.hset(fail_queue, url, json.dumps(final_error_details, indent=2)) client.hset(fail_queue, url, json.dumps(failure_report, indent=2))
logger.info(f"Stored detailed failure info for URL '{url}' in fail hash '{fail_queue}'.") logger.info(f"Stored detailed failure info for URL '{url}' in fail hash '{fail_queue}'.")
except Exception as e: except Exception as e:
logger.error(f"Critical error during failure handling in Redis for URL '{url}': {e}", exc_info=True) logger.error(f"Critical error during failure handling in Redis for URL '{url}': {e}", exc_info=True)
@ -344,14 +379,16 @@ def _get_account_pool(params: dict) -> list:
def pull_url_from_redis_callable(**context): def pull_url_and_assign_account_callable(**context):
""" """
Pulls a single URL from the Redis inbox queue. Pulls a single URL from Redis and assigns an active account for the run.
If the queue is empty, it skips the DAG run. If the queue is empty, it skips the DAG run.
Otherwise, it pushes the URL to XCom for downstream tasks. Otherwise, it pushes the URL and account details to XCom.
""" """
params = context['params'] params = context['params']
ti = context['task_instance'] ti = context['task_instance']
# --- Part 1: Pull URL from Redis ---
queue_name = params['queue_name'] queue_name = params['queue_name']
redis_conn_id = params['redis_conn_id'] redis_conn_id = params['redis_conn_id']
inbox_queue = f"{queue_name}_inbox" inbox_queue = f"{queue_name}_inbox"
@ -368,47 +405,8 @@ def pull_url_from_redis_callable(**context):
logger.info(f"Pulled URL '{url_to_process}' from the queue.") logger.info(f"Pulled URL '{url_to_process}' from the queue.")
ti.xcom_push(key='url_to_process', value=url_to_process) ti.xcom_push(key='url_to_process', value=url_to_process)
def decide_what_to_do_next_callable(**context): # --- Part 2: Assign Account ---
""" logger.info("URL found, proceeding to assign an account.")
Decides whether to continue the processing loop by triggering the next worker
or to stop the loop, based on task success, failure, or an empty queue.
"""
params = context['params']
dag_run = context['dag_run']
# Check if a failure was handled. If the 'handle_generic_failure' task was not skipped,
# it means a failure occurred somewhere in the pipeline.
handle_generic_failure_ti = dag_run.get_task_instance(task_id='handle_generic_failure')
if handle_generic_failure_ti and handle_generic_failure_ti.state != 'skipped':
logger.error(f"Failure handler task 'handle_generic_failure' was in state '{handle_generic_failure_ti.state}'. Stopping this processing lane.")
return 'fail_loop'
# Check if the worker was skipped because the Redis queue was empty.
pull_task_instance = dag_run.get_task_instance(task_id='pull_url_from_redis')
if pull_task_instance and pull_task_instance.state == 'skipped':
logger.info("Worker was skipped because Redis queue was empty.")
retrigger_delay_on_empty_s = params.get('retrigger_delay_on_empty_s', 60)
if retrigger_delay_on_empty_s < 0:
logger.info(f"retrigger_delay_on_empty_s is {retrigger_delay_on_empty_s}. Stopping this worker loop.")
return 'stop_loop'
else:
logger.info(f"Queue is empty. Will re-trigger this worker loop after a delay of {retrigger_delay_on_empty_s}s.")
return 'trigger_self_run'
# If no failure was handled and the queue wasn't empty, it must be a success.
logger.info("All preceding tasks succeeded. Continuing the processing loop by triggering the next worker.")
return 'trigger_self_run'
def assign_account_callable(**context):
"""
Selects an account for the run.
It uses the account from the previous run if available (affinity),
otherwise it picks a random one from the active pool.
"""
ti = context['task_instance']
params = context['params']
# Affinity logic: check if an account was passed from a previous run # Affinity logic: check if an account was passed from a previous run
account_id = params.get('current_account_id') account_id = params.get('current_account_id')
if account_id: if account_id:
@ -423,6 +421,38 @@ def assign_account_callable(**context):
ti.xcom_push(key='accounts_tried', value=[account_id]) ti.xcom_push(key='accounts_tried', value=[account_id])
def decide_what_to_do_next_callable(**context):
"""
Decides whether to continue the processing loop by triggering the next worker
or to stop the loop, based on task success, failure, or an empty queue.
"""
params = context['params']
dag_run = context['dag_run']
# Check if a failure was handled. If the 'handle_generic_failure' task was not skipped,
# it means a failure occurred somewhere in the pipeline.
handle_generic_failure_ti = dag_run.get_task_instance(task_id='handle_generic_failure')
if handle_generic_failure_ti and handle_generic_failure_ti.state != 'skipped':
logger.error(f"Failure handler task 'handle_generic_failure' was in state '{handle_generic_failure_ti.state}'. Stopping this processing lane.")
return 'mark_dag_run_as_failed'
# Check if the worker was skipped because the Redis queue was empty.
pull_task_instance = dag_run.get_task_instance(task_id='pull_url_and_assign_account')
if pull_task_instance and pull_task_instance.state == 'skipped':
logger.info("Worker was skipped because Redis queue was empty.")
retrigger_delay_on_empty_s = params.get('retrigger_delay_on_empty_s', 60)
if retrigger_delay_on_empty_s < 0:
logger.info(f"retrigger_delay_on_empty_s is {retrigger_delay_on_empty_s}. Stopping this worker loop.")
return 'stop_worker_lane_gracefully'
else:
logger.info(f"Queue is empty. Will re-trigger this worker loop after a delay of {retrigger_delay_on_empty_s}s.")
return 'continue_loop_and_trigger_next_run'
# If no failure was handled and the queue wasn't empty, it must be a success.
logger.info("All preceding tasks succeeded. Continuing the processing loop by triggering the next worker.")
return 'continue_loop_and_trigger_next_run'
def get_token_callable(**context): def get_token_callable(**context):
"""Makes a single attempt to get a token from the Thrift service.""" """Makes a single attempt to get a token from the Thrift service."""
ti = context['task_instance'] ti = context['task_instance']
@ -444,7 +474,7 @@ def get_token_callable(**context):
if not account_id: if not account_id:
raise AirflowException("Could not find a valid account_id in XCom from any upstream task.") raise AirflowException("Could not find a valid account_id in XCom from any upstream task.")
url = ti.xcom_pull(task_ids='pull_url_from_redis', key='url_to_process') url = ti.xcom_pull(task_ids='pull_url_and_assign_account', key='url_to_process')
if not url: if not url:
logger.info("No URL pulled from XCom. Assuming upstream task was skipped. Ending task.") logger.info("No URL pulled from XCom. Assuming upstream task was skipped. Ending task.")
return return
@ -465,7 +495,16 @@ def get_token_callable(**context):
call_kwargs = {'accountId': account_id, 'updateType': TokenUpdateMode.AUTO, 'url': url, 'clients': clients, 'machineId': machine_id} call_kwargs = {'accountId': account_id, 'updateType': TokenUpdateMode.AUTO, 'url': url, 'clients': clients, 'machineId': machine_id}
token_data = client.getOrRefreshToken(**call_kwargs) token_data = client.getOrRefreshToken(**call_kwargs)
logger.info("Successfully retrieved token data from service.")
# --- Log response details for debugging ---
response_summary = {
"has_infoJson": hasattr(token_data, 'infoJson') and bool(token_data.infoJson),
"infoJson_size": len(token_data.infoJson) if hasattr(token_data, 'infoJson') and token_data.infoJson else 0,
"has_ytdlpCommand": hasattr(token_data, 'ytdlpCommand') and bool(token_data.ytdlpCommand),
"proxy_type": next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr) and getattr(token_data, attr)), 'None'),
"jobId": getattr(token_data, 'jobId', None)
}
logger.info(f"Successfully retrieved token data from service. Response summary: {json.dumps(response_summary)}")
# --- Success Case --- # --- Success Case ---
info_json = getattr(token_data, 'infoJson', None) info_json = getattr(token_data, 'infoJson', None)
@ -491,48 +530,55 @@ def get_token_callable(**context):
except Exception as log_e: except Exception as log_e:
logger.warning(f"Could not log info.json details: {log_e}") logger.warning(f"Could not log info.json details: {log_e}")
proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None) proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None)
ti.xcom_push(key='socks_proxy', value=getattr(token_data, proxy_attr) if proxy_attr else None) ti.xcom_push(key='socks_proxy', value=getattr(token_data, proxy_attr) if proxy_attr else None)
ti.xcom_push(key='ytdlp_command', value=getattr(token_data, 'ytdlpCommand', None)) ti.xcom_push(key='ytdlp_command', value=getattr(token_data, 'ytdlpCommand', None))
ti.xcom_push(key='successful_account_id', value=account_id) # For affinity ti.xcom_push(key='successful_account_id', value=account_id) # For affinity
ti.xcom_push(key='get_token_succeeded', value=True) ti.xcom_push(key='get_token_succeeded', value=True)
else:
# This is a failure case: the service returned success but no usable data.
logger.error(f"Thrift call for account '{account_id}' succeeded but returned no info.json. Treating as failure.")
# The generic failure handler will pick up this exception.
raise AirflowException("Service returned success but info.json was empty or invalid.")
except (PBServiceException, PBUserException, TTransportException) as e: except (PBServiceException, PBUserException, TTransportException) as e:
logger.error(f"Thrift call failed for account '{account_id}'. Exception: {getattr(e, 'message', str(e))}")
error_context = getattr(e, 'context', None) error_context = getattr(e, 'context', None)
if isinstance(error_context, str): if isinstance(error_context, str):
try: try:
error_context = json.loads(error_context.replace("'", "\"")) error_context = json.loads(error_context.replace("'", "\""))
except: pass except: pass
error_message = getattr(e, 'message', str(e))
error_code = getattr(e, 'errorCode', 'TRANSPORT_ERROR')
# Check for wrapped timeout exception to provide a clearer error message.
inner_exception = getattr(e, 'inner', getattr(e, '__cause__', None))
if isinstance(e, TTransportException) and isinstance(inner_exception, socket.timeout):
error_message = f"Socket timeout during Thrift call (wrapped in TTransportException)"
error_code = 'SOCKET_TIMEOUT'
error_details = { error_details = {
'error_message': getattr(e, 'message', str(e)), 'error_message': error_message,
'error_code': getattr(e, 'errorCode', 'TRANSPORT_ERROR'), 'error_code': error_code,
'error_type': type(e).__name__, 'error_type': type(e).__name__,
'traceback': traceback.format_exc(), 'traceback': traceback.format_exc(),
'proxy_url': error_context.get('proxy_url') if isinstance(error_context, dict) else None 'proxy_url': error_context.get('proxy_url') if isinstance(error_context, dict) else None
} }
proxy_url_info = f" with proxy '{error_details['proxy_url']}'" if error_details.get('proxy_url') else ""
if error_code == 'SOCKET_TIMEOUT':
logger.error(f"Thrift call for account '{account_id}'{proxy_url_info} failed due to a socket timeout after {timeout} seconds.")
elif isinstance(e, TTransportException) and e.type == TTransportException.TIMED_OUT:
logger.error(f"Thrift call for account '{account_id}'{proxy_url_info} timed out after {timeout} seconds.")
else:
logger.error(f"Thrift call failed for account '{account_id}'{proxy_url_info}. Exception: {error_details['error_message']}")
ti.xcom_push(key='error_details', value=error_details) ti.xcom_push(key='error_details', value=error_details)
ti.xcom_push(key='get_token_succeeded', value=False) ti.xcom_push(key='get_token_succeeded', value=False)
# For non-bannable errors like Connection Refused, fail the task immediately to stop the loop.
# Bannable errors will let the task succeed, allowing the branch operator to decide on a retry.
error_code = error_details.get('error_code', '').strip()
error_message = error_details.get('error_message', '').lower()
bannable_codes = ["BOT_DETECTED", "BOT_DETECTION_SIGN_IN_REQUIRED", "SOCKS5_CONNECTION_FAILED"]
is_bannable = error_code in bannable_codes
# Override bannable status for age-restricted content, which is not a true bot detection. # Always fail the task on any Thrift exception. The branch operator will inspect the failure.
if is_bannable and ('confirm your age' in error_message or 'age-restricted' in error_message): raise AirflowException(f"Thrift call failed: {error_details['error_message']}")
logger.warning(f"Error is age-related ('{error_code}'). Treating as a non-bannable failure to avoid banning the account.")
is_bannable = False
if not is_bannable:
logger.error(f"Non-bannable error '{error_code}' detected. Failing task to stop the loop.")
raise AirflowException(f"Non-bannable Thrift call failed: {error_details['error_message']}")
else:
logger.warning(f"Bannable error '{error_code}' detected. Passing to branch operator for handling.")
# Do not raise exception here; let the branch operator handle it.
finally: finally:
if transport and transport.isOpen(): if transport and transport.isOpen():
transport.close() transport.close()
@ -540,22 +586,22 @@ def get_token_callable(**context):
def handle_bannable_error_branch_callable(**context): def handle_bannable_error_branch_callable(**context):
""" """
Checks if a `get_token` failure is bannable and if a retry is allowed. Inspects a failed `get_token` task. If the failure was a "bannable" error,
it routes to the retry logic. Otherwise, it lets the DAG fail.
This task only runs if the upstream `get_token` task fails.
""" """
ti = context['task_instance'] ti = context['task_instance']
params = context['params'] params = context['params']
# Check the result of the first get_token attempt
get_token_succeeded = ti.xcom_pull(task_ids='get_token', key='get_token_succeeded')
if get_token_succeeded:
return 'setup_download_and_probe'
# It failed, so check the error details # We know get_token failed because of the trigger_rule='one_failed'.
error_details = ti.xcom_pull(task_ids='get_token', key='error_details') # Pull the error details it left behind.
error_details = ti.xcom_pull(task_ids='acquire_token_with_retry.get_token', key='error_details')
if not error_details: if not error_details:
logger.error("get_token failed but no error details were found in XCom. Stopping loop.") logger.error("The 'get_token' task failed, but no error details were found in XCom. "
return 'handle_generic_failure' "This indicates an unexpected error. Letting the DAG fail.")
return None # Do nothing, let the group fail.
# We have error details, now check if the error is "bannable".
error_code = error_details.get('error_code', '').strip() error_code = error_details.get('error_code', '').strip()
error_message = error_details.get('error_message', '').lower() error_message = error_details.get('error_message', '').lower()
policy = params.get('on_bannable_failure', 'retry_with_new_account') policy = params.get('on_bannable_failure', 'retry_with_new_account')
@ -567,17 +613,17 @@ def handle_bannable_error_branch_callable(**context):
logger.warning(f"Error is age-related ('{error_code}'). Treating as a non-bannable failure to avoid banning the account.") logger.warning(f"Error is age-related ('{error_code}'). Treating as a non-bannable failure to avoid banning the account.")
is_bannable = False is_bannable = False
logger.info(f"Handling failure from 'get_token'. Error code: '{error_code}', Policy: '{policy}'") logger.info(f"Handling failure from 'get_token'. Error code: '{error_code}', Is Bannable: {is_bannable}, Policy: '{policy}'")
if is_bannable and policy == 'retry_with_new_account': if is_bannable and policy == 'retry_with_new_account':
logger.info("Error is bannable and policy allows retry. Proceeding to ban first account and retry.") logger.info("Error is bannable and policy allows retry. Proceeding to ban first account and retry.")
return 'ban_account_and_prepare_for_retry' return 'acquire_token_with_retry.ban_account_and_prepare_for_retry'
elif is_bannable: # and policy is 'stop_loop' elif is_bannable: # and policy is 'stop_loop'
logger.warning("Error is bannable and policy is 'stop_loop'. Banning account and stopping.") logger.warning("Error is bannable and policy is 'stop_loop'. Banning account and stopping.")
return 'ban_account_and_fail' return 'acquire_token_with_retry.ban_account_and_fail'
else: else: # Not a bannable error
logger.warning("Error is not considered bannable. Proceeding to generic failure handling.") logger.error(f"Error '{error_code}' is not bannable. Letting the DAG fail.")
return 'handle_generic_failure' return None # Do nothing, let the group fail.
def assign_new_account_for_retry_callable(**context): def assign_new_account_for_retry_callable(**context):
@ -585,7 +631,7 @@ def assign_new_account_for_retry_callable(**context):
ti = context['task_instance'] ti = context['task_instance']
params = context['params'] params = context['params']
accounts_tried = ti.xcom_pull(task_ids='assign_account', key='accounts_tried') accounts_tried = ti.xcom_pull(task_ids='pull_url_and_assign_account', key='accounts_tried')
if not accounts_tried: if not accounts_tried:
raise AirflowException("Cannot retry, list of previously tried accounts not found.") raise AirflowException("Cannot retry, list of previously tried accounts not found.")
@ -628,43 +674,142 @@ def assign_new_account_for_retry_callable(**context):
def handle_retry_failure_branch_callable(**context): def handle_retry_failure_branch_callable(**context):
"""Checks the result of the retry_get_token task.""" """
Checks a failed `retry_get_token` task. If the failure was a handled Thrift error,
it triggers the banning of the second account/proxy.
This task only runs if the upstream `retry_get_token` task fails.
"""
ti = context['task_instance'] ti = context['task_instance']
retry_succeeded = ti.xcom_pull(task_ids='retry_get_token', key='get_token_succeeded')
if retry_succeeded: # We know retry_get_token failed. Check if it was a handled failure.
logger.info("Retry attempt was successful.") error_details = ti.xcom_pull(task_ids='acquire_token_with_retry.retry_get_token', key='error_details')
return 'setup_download_and_probe'
else: if not error_details:
logger.error("Retry attempt also failed. Banning second account and proxy.") logger.error("The 'retry_get_token' task failed unexpectedly before it could record error details. "
return 'ban_second_account_and_proxy' "Letting the DAG fail without banning the account/proxy.")
return None
# If we are here, it means the retry failed with a handled Thrift error.
# We will proceed to ban the second account and proxy.
logger.error("Retry attempt also failed with a handled Thrift error. Banning second account and proxy.")
return 'acquire_token_with_retry.ban_second_account_and_proxy'
def ban_second_account_and_proxy_callable(**context): def ban_first_account_callable(**context):
"""Bans the second account and the proxy used in the failed retry.""" """Bans the first account that failed due to a bannable error."""
ti = context['task_instance'] ti = context['task_instance']
params = context['params'] params = context['params']
# Ban the second account # The account ID is pulled from the initial assignment task.
account_to_ban = ti.xcom_pull(task_ids='assign_new_account_for_retry', key='account_id') account_to_ban = ti.xcom_pull(task_ids='pull_url_and_assign_account', key='account_id')
if account_to_ban: if not account_to_ban:
ti.xcom_push(key='account_to_ban', value=account_to_ban) logger.warning("Could not find the initial account ID to ban. Skipping.")
_ban_resource_task( return
'account', 'account_to_ban',
params['service_ip'], int(params['service_port']), int(params.get('timeout', DEFAULT_TIMEOUT)), client, transport = None, None
ti=ti, reason="Failed on retry attempt" try:
) host = params['service_ip']
port = int(params['service_port'])
timeout = int(params.get('timeout', DEFAULT_TIMEOUT))
client, transport = _get_thrift_client(host, port, timeout)
reason = "Banned by Airflow worker due to bannable error on first attempt"
logger.warning(f"Banning account '{account_to_ban}'. Reason: {reason}")
client.banAccount(accountId=account_to_ban, reason=reason)
logger.info(f"Successfully sent request to ban account '{account_to_ban}'.")
except Exception as e:
logger.error(f"Failed to issue ban for account '{account_to_ban}': {e}", exc_info=True)
# Don't fail the task, as this is a best-effort cleanup action.
finally:
if transport and transport.isOpen():
transport.close()
def ban_first_account_and_fail_callable(**context):
"""Bans the first account that failed, and then intentionally fails the task."""
ti = context['task_instance']
params = context['params']
# Ban the proxy # The account ID is pulled from the initial assignment task.
error_details = ti.xcom_pull(task_ids='retry_get_token', key='error_details') account_to_ban = ti.xcom_pull(task_ids='pull_url_and_assign_account', key='account_id')
if not account_to_ban:
logger.warning("Could not find the initial account ID to ban. Skipping.")
else:
client, transport = None, None
try:
host = params['service_ip']
port = int(params['service_port'])
timeout = int(params.get('timeout', DEFAULT_TIMEOUT))
client, transport = _get_thrift_client(host, port, timeout)
reason = "Banned by Airflow worker due to bannable error (policy is stop_loop)"
logger.warning(f"Banning account '{account_to_ban}'. Reason: {reason}")
client.banAccount(accountId=account_to_ban, reason=reason)
logger.info(f"Successfully sent request to ban account '{account_to_ban}'.")
except Exception as e:
logger.error(f"Failed to issue ban for account '{account_to_ban}': {e}", exc_info=True)
# Log error, but continue to fail the task.
finally:
if transport and transport.isOpen():
transport.close()
# Intentionally fail the task to stop the DAG run as per policy.
reason = "Bannable error detected, policy is stop_loop."
logger.warning(f"INTENTIONAL FAILURE: This task is now failing itself as per the 'stop_loop' policy. Reason: {reason}")
raise AirflowException(f"Failing task as per policy. Reason: {reason}")
def ban_second_account_and_proxy_callable(**context):
"""Bans the second account and the proxy used in the failed retry, then fails the task."""
ti = context['task_instance']
params = context['params']
account_to_ban = ti.xcom_pull(task_ids='acquire_token_with_retry.assign_new_account_for_retry', key='account_id')
error_details = ti.xcom_pull(task_ids='acquire_token_with_retry.retry_get_token', key='error_details')
proxy_to_ban = error_details.get('proxy_url') if error_details else None proxy_to_ban = error_details.get('proxy_url') if error_details else None
if proxy_to_ban:
ti.xcom_push(key='proxy_to_ban', value=proxy_to_ban) if not account_to_ban and not proxy_to_ban:
_ban_resource_task( logger.warning("Could not find an account or proxy to ban from the failed retry. Nothing to do.")
'proxy', 'proxy_to_ban', # Still fail the task to stop the DAG.
params['service_ip'], int(params['service_port']), int(params.get('timeout', DEFAULT_TIMEOUT)), raise AirflowException("Token acquisition failed on retry, but no resources found to ban.")
ti=ti, server_identity=(params.get('machine_id') or socket.gethostname())
) client, transport = None, None
try:
host = params['service_ip']
port = int(params['service_port'])
timeout = int(params.get('timeout', DEFAULT_TIMEOUT))
client, transport = _get_thrift_client(host, port, timeout)
# Ban the second account
if account_to_ban:
reason = "Banned by Airflow worker due to failure on retry attempt"
logger.warning(f"Banning account '{account_to_ban}'. Reason: {reason}")
try:
client.banAccount(accountId=account_to_ban, reason=reason)
logger.info(f"Successfully sent request to ban account '{account_to_ban}'.")
except Exception as e:
logger.error(f"Failed to issue ban for account '{account_to_ban}': {e}", exc_info=True)
# Ban the proxy
if proxy_to_ban:
server_identity = params.get('machine_id') or socket.gethostname()
logger.warning(f"Banning proxy '{proxy_to_ban}' for server '{server_identity}'.")
try:
client.banProxy(proxyUrl=proxy_to_ban, serverIdentity=server_identity)
logger.info(f"Successfully sent request to ban proxy '{proxy_to_ban}'.")
except Exception as e:
logger.error(f"Failed to issue ban for proxy '{proxy_to_ban}': {e}", exc_info=True)
except Exception as e:
logger.error(f"An error occurred while trying to connect to the Thrift service to ban resources: {e}", exc_info=True)
# Log the error but continue to the failure exception, as this is a best-effort cleanup.
finally:
if transport and transport.isOpen():
transport.close()
# After attempting to ban, we must fail this task to fail the group.
logger.warning("INTENTIONAL FAILURE: This task is now failing itself to correctly signal the end of the retry process and stop the worker lane. The second account and/or proxy have been banned.")
raise AirflowException("Token acquisition failed on retry. Banned second account and proxy.")
def trigger_self_run_callable(**context): def trigger_self_run_callable(**context):
@ -674,7 +819,7 @@ def trigger_self_run_callable(**context):
dag_run = context['dag_run'] dag_run = context['dag_run']
# Check if this was triggered due to an empty queue to apply the specific delay. # Check if this was triggered due to an empty queue to apply the specific delay.
pull_task_instance = dag_run.get_task_instance(task_id='pull_url_from_redis') pull_task_instance = dag_run.get_task_instance(task_id='pull_url_and_assign_account')
is_empty_queue_scenario = pull_task_instance and pull_task_instance.state == 'skipped' is_empty_queue_scenario = pull_task_instance and pull_task_instance.state == 'skipped'
delay = 0 delay = 0
@ -690,6 +835,7 @@ def trigger_self_run_callable(**context):
if delay > 0: if delay > 0:
logger.info(f"Waiting for {delay}s before triggering next run.") logger.info(f"Waiting for {delay}s before triggering next run.")
time.sleep(delay) time.sleep(delay)
logger.info(f"Finished waiting {delay}s. Proceeding to trigger next run.")
# Generate a unique run_id for the new worker run # Generate a unique run_id for the new worker run
run_id = f"self_triggered_{datetime.utcnow().isoformat()}" run_id = f"self_triggered_{datetime.utcnow().isoformat()}"
@ -703,7 +849,7 @@ def trigger_self_run_callable(**context):
# Pass the successful account ID to the next run for affinity. # Pass the successful account ID to the next run for affinity.
# It could come from the first attempt or the retry. # It could come from the first attempt or the retry.
successful_account_ids = ti.xcom_pull(task_ids=['get_token', 'retry_get_token'], key='successful_account_id') successful_account_ids = ti.xcom_pull(task_ids=['acquire_token_with_retry.get_token', 'acquire_token_with_retry.retry_get_token'], key='successful_account_id')
successful_account_id = next((acc for acc in successful_account_ids if acc), None) successful_account_id = next((acc for acc in successful_account_ids if acc), None)
if successful_account_id: if successful_account_id:
@ -738,6 +884,7 @@ default_args = {
'retries': 0, 'retries': 0,
'retry_delay': timedelta(minutes=1), 'retry_delay': timedelta(minutes=1),
'start_date': days_ago(1), 'start_date': days_ago(1),
'queue': "{{ params.get('queue') }}",
} }
with DAG( with DAG(
@ -756,7 +903,7 @@ with DAG(
1. **Ignition:** An initial run is triggered by the orchestrator. 1. **Ignition:** An initial run is triggered by the orchestrator.
2. **Pull & Assign:** It pulls a URL from Redis and assigns an account for the job, reusing the last successful account if available (affinity). 2. **Pull & Assign:** It pulls a URL from Redis and assigns an account for the job, reusing the last successful account if available (affinity).
3. **Get Token:** It calls the `ytdlp-ops-server` to get tokens and `info.json`. 3. **Get Token:** It calls the `ytdlp-ops-server` to get tokens and `info.json`. This step is encapsulated in a `TaskGroup` that handles a single retry on failure.
4. **Failure Handling:** If `get_token` fails with a "bannable" error (like bot detection), it follows the `on_bannable_failure` policy: 4. **Failure Handling:** If `get_token` fails with a "bannable" error (like bot detection), it follows the `on_bannable_failure` policy:
- `retry_with_new_account` (default): It bans the failing account, picks a new one, and retries the `get_token` call once. If the retry also fails, it bans the second account and the proxy, then stops the loop. - `retry_with_new_account` (default): It bans the failing account, picks a new one, and retries the `get_token` call once. If the retry also fails, it bans the second account and the proxy, then stops the loop.
- `stop_loop`: It bans the account and stops the loop immediately. - `stop_loop`: It bans the account and stops the loop immediately.
@ -765,7 +912,7 @@ with DAG(
This creates a "processing lane" that runs independently until the queue is empty or a failure occurs. This creates a "processing lane" that runs independently until the queue is empty or a failure occurs.
""", """,
tags=['ytdlp', 'thrift', 'client', 'worker', 'loop'], tags=['ytdlp', 'worker'],
params={ params={
# Worker loop control params (passed from orchestrator) # Worker loop control params (passed from orchestrator)
'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Base name for Redis queues."), 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Base name for Redis queues."),
@ -795,95 +942,94 @@ with DAG(
} }
) as dag: ) as dag:
pull_url_from_redis = PythonOperator( pull_url_and_assign_account = PythonOperator(
task_id='pull_url_from_redis', task_id='pull_url_and_assign_account',
python_callable=pull_url_from_redis_callable, python_callable=pull_url_and_assign_account_callable,
) )
assign_account = PythonOperator( # --- Encapsulate token acquisition logic in a TaskGroup for visual clarity ---
task_id='assign_account', with TaskGroup(group_id='acquire_token_with_retry') as acquire_token_group:
python_callable=assign_account_callable, get_token = PythonOperator(
) task_id='get_token',
python_callable=get_token_callable,
templates_dict={'info_json_dir': "{{ dag_run.conf.get('info_json_dir', var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles')) }}"},
)
get_token = PythonOperator( handle_bannable_error_branch = BranchPythonOperator(
task_id='get_token', task_id='handle_bannable_error_branch',
python_callable=get_token_callable, python_callable=handle_bannable_error_branch_callable,
templates_dict={'info_json_dir': "{{ dag_run.conf.get('info_json_dir', var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles')) }}"}, trigger_rule='one_failed', # This task should only run if get_token fails
) )
handle_bannable_error_branch = BranchPythonOperator( # --- Retry Path ---
task_id='handle_bannable_error_branch', ban_account_and_prepare_for_retry = PythonOperator(
python_callable=handle_bannable_error_branch_callable, task_id='ban_account_and_prepare_for_retry',
trigger_rule='all_done', # Run even if get_token succeeds python_callable=ban_first_account_callable,
) )
# --- Retry Path --- assign_new_account_for_retry = PythonOperator(
ban_account_and_prepare_for_retry = PythonOperator( task_id='assign_new_account_for_retry',
task_id='ban_account_and_prepare_for_retry', python_callable=assign_new_account_for_retry_callable,
python_callable=_ban_resource_task, )
op_kwargs={
'resource_type': 'account',
'resource_id_xcom_key': 'account_id',
'host': "{{ params.service_ip }}",
'port': "{{ params.service_port }}",
'timeout': "{{ params.timeout }}",
'reason': "Bannable error detected, preparing for retry."
},
)
assign_new_account_for_retry = PythonOperator( retry_get_token = PythonOperator(
task_id='assign_new_account_for_retry', task_id='retry_get_token',
python_callable=assign_new_account_for_retry_callable, python_callable=get_token_callable,
) templates_dict={'info_json_dir': "{{ dag_run.conf.get('info_json_dir', var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles')) }}"},
)
retry_get_token = PythonOperator( handle_retry_failure_branch = BranchPythonOperator(
task_id='retry_get_token', task_id='handle_retry_failure_branch',
python_callable=get_token_callable, python_callable=handle_retry_failure_branch_callable,
templates_dict={'info_json_dir': "{{ dag_run.conf.get('info_json_dir', var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles')) }}"}, trigger_rule='one_failed', # This task should only run if retry_get_token fails
) )
handle_retry_failure_branch = BranchPythonOperator( ban_second_account_and_proxy = PythonOperator(
task_id='handle_retry_failure_branch', task_id='ban_second_account_and_proxy',
python_callable=handle_retry_failure_branch_callable, python_callable=ban_second_account_and_proxy_callable,
trigger_rule='none_skipped', )
)
ban_second_account_and_proxy = PythonOperator( # --- Stop Path ---
task_id='ban_second_account_and_proxy', ban_account_and_fail = PythonOperator(
python_callable=ban_second_account_and_proxy_callable, task_id='ban_account_and_fail',
) python_callable=ban_first_account_and_fail_callable,
)
# --- Stop Path --- # --- Internal Success Merge Point ---
ban_account_and_fail = PythonOperator( token_acquisition_succeeded = DummyOperator(
task_id='ban_account_and_fail', task_id='token_acquisition_succeeded',
python_callable=_ban_resource_task, trigger_rule='one_success',
op_kwargs={ )
'resource_type': 'account',
'resource_id_xcom_key': 'account_id',
'host': "{{ params.service_ip }}",
'port': "{{ params.service_port }}",
'timeout': "{{ params.timeout }}",
'reason': "Bannable error detected, policy is stop_loop."
},
)
# --- Main Execution Path --- # --- Define dependencies within the TaskGroup ---
setup_download_and_probe = DummyOperator( # The success dummy task is the merge point for the two possible success tasks.
task_id='setup_download_and_probe', [get_token, retry_get_token] >> token_acquisition_succeeded
trigger_rule='one_success',
)
# The first branch operator runs only if get_token fails.
get_token >> handle_bannable_error_branch
# It branches to the retry path or the hard-fail path.
handle_bannable_error_branch >> [ban_account_and_prepare_for_retry, ban_account_and_fail]
# The retry path
ban_account_and_prepare_for_retry >> assign_new_account_for_retry >> retry_get_token
# The second branch operator runs only if retry_get_token fails.
retry_get_token >> handle_retry_failure_branch
# It only branches to the final failure task.
handle_retry_failure_branch >> ban_second_account_and_proxy
# --- Main Execution Path (outside the TaskGroup) ---
download_and_probe = BashOperator( download_and_probe = BashOperator(
task_id='download_and_probe', task_id='download_and_probe',
bash_command=""" bash_command="""
set -e set -e
INFO_JSON_PATH_1="{{ ti.xcom_pull(task_ids='get_token', key='info_json_path') }}" INFO_JSON_PATH_1="{{ ti.xcom_pull(task_ids='acquire_token_with_retry.get_token', key='info_json_path') }}"
INFO_JSON_PATH_2="{{ ti.xcom_pull(task_ids='retry_get_token', key='info_json_path') }}" INFO_JSON_PATH_2="{{ ti.xcom_pull(task_ids='acquire_token_with_retry.retry_get_token', key='info_json_path') }}"
INFO_JSON_PATH="${INFO_JSON_PATH_1:-$INFO_JSON_PATH_2}" INFO_JSON_PATH="${INFO_JSON_PATH_1:-$INFO_JSON_PATH_2}"
PROXY_1="{{ ti.xcom_pull(task_ids='get_token', key='socks_proxy') }}" PROXY_1="{{ ti.xcom_pull(task_ids='acquire_token_with_retry.get_token', key='socks_proxy') }}"
PROXY_2="{{ ti.xcom_pull(task_ids='retry_get_token', key='socks_proxy') }}" PROXY_2="{{ ti.xcom_pull(task_ids='acquire_token_with_retry.retry_get_token', key='socks_proxy') }}"
PROXY="${PROXY_1:-$PROXY_2}" PROXY="${PROXY_1:-$PROXY_2}"
FORMAT="{{ params.download_format }}" FORMAT="{{ params.download_format }}"
@ -991,48 +1137,31 @@ with DAG(
trigger_rule='all_done', trigger_rule='all_done',
) )
trigger_self_run = PythonOperator( continue_loop_and_trigger_next_run = PythonOperator(
task_id='trigger_self_run', task_id='continue_loop_and_trigger_next_run',
python_callable=trigger_self_run_callable, python_callable=trigger_self_run_callable,
) )
stop_loop = DummyOperator(task_id='stop_loop') stop_worker_lane_gracefully = DummyOperator(task_id='stop_worker_lane_gracefully')
fail_loop = BashOperator(task_id='fail_loop', bash_command='exit 1') mark_dag_run_as_failed = BashOperator(task_id='mark_dag_run_as_failed', bash_command='exit 1')
# --- Define Task Dependencies --- # --- Define Task Dependencies ---
pull_url_from_redis >> assign_account >> get_token >> handle_bannable_error_branch pull_url_and_assign_account >> acquire_token_group
# The branch operator decides the path after the first token attempt. # The TaskGroup's internal success task (`token_acquisition_succeeded`) is the trigger for the download.
# It can go to the success path (setup_download_and_probe), the retry path (ban_account_and_prepare_for_retry), # This is more explicit than depending on the entire group's state and prevents the skip issue.
# the stop-on-failure path (ban_account_and_fail), or the generic failure path. dag.get_task('acquire_token_with_retry.token_acquisition_succeeded') >> download_and_probe
handle_bannable_error_branch >> [
setup_download_and_probe,
ban_account_and_prepare_for_retry,
ban_account_and_fail,
handle_generic_failure,
]
# The retry path itself download_and_probe >> mark_url_as_success
ban_account_and_prepare_for_retry >> assign_new_account_for_retry >> retry_get_token >> handle_retry_failure_branch
# The branch operator after the retry attempt. # Define the failure path. The generic failure handler is set downstream of the two
# It can go to the success path (setup_download_and_probe) or the final failure path (ban_second_account_and_proxy). # main tasks that can fail. Its 'one_failed' trigger rule ensures it only runs on failure.
handle_retry_failure_branch >> [ # This explicit list avoids potential scheduler ambiguity.
setup_download_and_probe, [acquire_token_group, download_and_probe] >> handle_generic_failure
ban_second_account_and_proxy,
]
# The main success path, which can be reached from either the first attempt or the retry. # Define the final decision point. This task must run after the success path completes
setup_download_and_probe >> download_and_probe >> mark_url_as_success # OR after the failure path completes. Its 'all_done' trigger rule makes this possible.
mark_url_as_success >> decide_next_step
handle_generic_failure >> decide_next_step
# Define all paths that lead to the generic failure handler. decide_next_step >> [continue_loop_and_trigger_next_run, stop_worker_lane_gracefully, mark_dag_run_as_failed]
# This task runs if any of its direct upstreams fail.
download_and_probe >> handle_generic_failure
ban_account_and_fail >> handle_generic_failure
ban_second_account_and_proxy >> handle_generic_failure
# A non-bannable failure in get_token will fail the task, which is handled by the branch operator
# which then correctly routes to handle_generic_failure. A direct link is not needed.
# Final decision point. It runs after success or after failure has been handled.
[mark_url_as_success, handle_generic_failure] >> decide_next_step
decide_next_step >> [trigger_self_run, stop_loop, fail_loop]