yt-dlp-dags/dags/ytdlp_ops_worker_per_url.py
2025-08-06 18:02:44 +03:00

1039 lines
47 KiB
Python

# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2024 rl <rl@rlmbp>
#
# Distributed under terms of the MIT license.
"""
DAG for processing a single YouTube URL passed via DAG run configuration.
This is the "Worker" part of a Sensor/Worker pattern.
"""
from airflow import DAG
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models import BaseOperator, Variable
from airflow.models.param import Param
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.providers.redis.hooks.redis import RedisHook
from airflow.utils.dates import days_ago
from airflow.utils.decorators import apply_defaults
from datetime import datetime, timedelta
from airflow.api.common.trigger_dag import trigger_dag
from pangramia.yt.common.ttypes import TokenUpdateMode
from pangramia.yt.exceptions.ttypes import PBServiceException, PBUserException
from pangramia.yt.tokens_ops import YTTokenOpService
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport
from thrift.transport.TTransport import TTransportException
import json
import logging
import os
import random
import redis
import socket
import time
import traceback
import inspect
import uuid
import uuid
# Import utility functions
from utils.redis_utils import _get_redis_client
# Configure logging
logger = logging.getLogger(__name__)
# Default settings
DEFAULT_QUEUE_NAME = 'video_queue'
DEFAULT_REDIS_CONN_ID = 'redis_default'
DEFAULT_MAX_URLS = 1
DEFAULT_TIMEOUT = 180 # Default Thrift timeout in seconds
DEFAULT_YT_AUTH_SERVICE_IP = Variable.get("YT_AUTH_SERVICE_IP", default_var="16.162.82.212")
DEFAULT_YT_AUTH_SERVICE_PORT = Variable.get("YT_AUTH_SERVICE_PORT", default_var=9080)
# --- Helper Functions ---
def _get_thrift_client(host, port, timeout):
"""Helper to create and connect a Thrift client."""
transport = TSocket.TSocket(host, port)
transport.setTimeout(timeout * 1000)
transport = TTransport.TFramedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = YTTokenOpService.Client(protocol)
transport.open()
logger.info(f"Connected to Thrift server at {host}:{port}")
return client, transport
def _ban_resource_task(resource_type, resource_id_xcom_key, host, port, timeout, **kwargs):
"""
A callable function to ban a resource (account or proxy).
Designed to be used in a PythonOperator.
"""
ti = kwargs['ti']
resource_id = ti.xcom_pull(key=resource_id_xcom_key)
if not resource_id:
logger.warning(f"Could not find resource ID in XCom key '{resource_id_xcom_key}' to ban. Skipping.")
return
client, transport = None, None
try:
client, transport = _get_thrift_client(host, port, timeout)
if resource_type == 'account':
reason = f"Banned by Airflow worker due to {kwargs.get('reason', 'failure')}"
logger.warning(f"Banning account '{resource_id}'. Reason: {reason}")
client.banAccount(accountId=resource_id, reason=reason)
logger.info(f"Successfully sent request to ban account '{resource_id}'.")
elif resource_type == 'proxy':
server_identity = kwargs.get('server_identity')
if not server_identity:
logger.error("Cannot ban proxy without server_identity.")
return
logger.warning(f"Banning proxy '{resource_id}' for server '{server_identity}'.")
client.banProxy(proxyUrl=resource_id, serverIdentity=server_identity)
logger.info(f"Successfully sent request to ban proxy '{resource_id}'.")
except Exception as e:
# Log the error but don't fail the task, as this is a best-effort cleanup action.
logger.error(f"Failed to issue ban for {resource_type} '{resource_id}': {e}", exc_info=True)
finally:
if transport and transport.isOpen():
transport.close()
def _extract_video_id(url):
"""Extracts YouTube video ID from URL."""
if not url or not isinstance(url, str):
logger.debug("URL is empty or not a string, cannot extract video ID.")
return None
try:
video_id = None
if 'youtube.com/watch?v=' in url:
video_id = url.split('v=')[1].split('&')[0]
elif 'youtu.be/' in url:
video_id = url.split('youtu.be/')[1].split('?')[0]
if video_id and len(video_id) >= 11:
video_id = video_id[:11] # Standard ID length
logger.debug(f"Extracted video ID '{video_id}' from URL: {url}")
return video_id
else:
logger.debug(f"Could not extract a standard video ID pattern from URL: {url}")
return None
except Exception as e:
logger.error(f"Failed to extract video ID from URL '{url}'. Error: {e}")
return None
# --- Queue Management Callables (for success/failure reporting) ---
def mark_url_as_success(**context):
"""Moves URL from progress to result hash on success."""
ti = context['task_instance']
params = context['params']
url = ti.xcom_pull(task_ids='pull_url_from_redis', key='url_to_process')
if not url:
logger.warning("mark_url_as_success called but no URL found in DAG run parameters.")
return
queue_name = params['queue_name']
result_queue = f"{queue_name}_result"
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
# Pull results from previous tasks
info_json_path = ti.xcom_pull(task_ids='get_token', key='info_json_path')
socks_proxy = ti.xcom_pull(task_ids='get_token', key='socks_proxy')
ytdlp_command = ti.xcom_pull(task_ids='get_token', key='ytdlp_command')
downloaded_file_path = ti.xcom_pull(task_ids='download_and_probe')
logger.info(f"Handling success for URL: {url}")
logger.info(f" Downloaded File Path: {downloaded_file_path}")
result_data = {
'status': 'success',
'end_time': time.time(),
'info_json_path': info_json_path,
'socks_proxy': socks_proxy,
'ytdlp_command': ytdlp_command,
'downloaded_file_path': downloaded_file_path,
'url': url,
'dag_run_id': context['dag_run'].run_id,
}
try:
# In the worker pattern, there's no "progress" hash to remove from.
# We just add the result to the success hash.
client = _get_redis_client(redis_conn_id)
client.hset(result_queue, url, json.dumps(result_data))
logger.info(f"Stored success result for URL '{url}' in result hash '{result_queue}'.")
except Exception as e:
logger.error(f"Error handling success in Redis for URL '{url}': {e}", exc_info=True)
# Log error but don't fail the task, as the main work succeeded.
def handle_failure_callable(**context):
"""
Handles a failed processing run by recording the error details to Redis.
The decision to stop or continue the loop is handled by `decide_what_to_do_next`.
"""
ti = context['task_instance']
params = context['params']
dag_run = context['dag_run']
url = ti.xcom_pull(task_ids='pull_url_from_redis', key='url_to_process')
if not url:
logger.error("handle_failure_callable called but no URL found in XCom.")
return
# --- Determine the source and type of failure ---
exception = context.get('exception')
failed_task_id = "unknown"
upstream_tasks = ti.task.get_direct_relatives(upstream=True)
for task in upstream_tasks:
upstream_ti = dag_run.get_task_instance(task_id=task.task_id)
if upstream_ti and upstream_ti.state == 'failed':
failed_task_id = task.task_id
break
# --- Extract Detailed Error Information ---
error_details_from_xcom = None
if failed_task_id != "unknown":
error_details_from_xcom = ti.xcom_pull(task_ids=failed_task_id, key='error_details')
if error_details_from_xcom:
error_message = error_details_from_xcom.get('error_message', 'Unknown error from XCom')
error_type = error_details_from_xcom.get('error_type', 'Unknown type from XCom')
tb_str = error_details_from_xcom.get('traceback', 'No traceback in XCom.')
else:
error_message = str(exception) if exception else "Unknown error"
error_type = type(exception).__name__ if exception else "Unknown"
tb_str = "".join(traceback.format_exception(etype=type(exception), value=exception, tb=exception.__traceback__)) if exception else "No traceback available."
logger.info(f"Handling failure for URL: {url}")
logger.error(f" Failed Task: {failed_task_id}")
logger.error(f" Failure Type: {error_type}")
logger.error(f" Failure Reason: {error_message}")
logger.debug(f" Traceback:\n{tb_str}")
final_error_details = {
'failed_task': failed_task_id,
'error_type': error_type,
'error_message': error_message,
'traceback': tb_str,
'url': url,
'dag_run_id': context['dag_run'].run_id,
}
# For all failures, mark the URL as failed in Redis.
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
queue_name = params['queue_name']
fail_queue = f"{queue_name}_fail"
try:
client = _get_redis_client(redis_conn_id)
client.hset(fail_queue, url, json.dumps(final_error_details, indent=2))
logger.info(f"Stored detailed failure info for URL '{url}' in fail hash '{fail_queue}'.")
except Exception as e:
logger.error(f"Critical error during failure handling in Redis for URL '{url}': {e}", exc_info=True)
raise AirflowException(f"Could not handle failure in Redis: {e}")
# --- YtdlpOpsOperator ---
def _get_account_pool(params: dict) -> list:
"""
Gets the list of accounts to use for processing, filtering out banned/resting accounts.
Supports three modes for the 'account_pool' parameter:
1. Explicit List: If 'account_pool' contains a comma, it's treated as a comma-separated list.
2. Prefix-based Generation: If 'account_pool_size' is provided, 'account_pool' is treated as a prefix
to generate numbered accounts (e.g., prefix_01, prefix_02).
3. Single Account: If 'account_pool' has no comma and 'account_pool_size' is not provided, it's treated as a single account name.
If the pool is exhausted and auto-creation is enabled, it will generate a new account ID.
"""
account_pool_str = params.get('account_pool', 'default_account')
accounts = []
is_prefix_mode = False
if ',' in account_pool_str:
# Mode 1: Explicit comma-separated list
logger.info("Detected comma in 'account_pool', treating as an explicit list.")
accounts = [acc.strip() for acc in account_pool_str.split(',') if acc.strip()]
else:
# Mode 2 or 3: Prefix-based generation OR single account
prefix = account_pool_str
pool_size_param = params.get('account_pool_size')
if pool_size_param is not None:
# Mode 2: Prefix mode
is_prefix_mode = True
logger.info("Detected 'account_pool_size', treating 'account_pool' as a prefix.")
try:
pool_size = int(pool_size_param)
if pool_size <= 0:
raise AirflowException("'account_pool_size' must be a positive integer for prefix-based generation.")
except (ValueError, TypeError):
raise AirflowException(f"'account_pool_size' must be an integer, but got: {pool_size_param}")
logger.info(f"Account pool size is set to: {pool_size}")
# Generate accounts like 'prefix_01', 'prefix_02', ..., 'prefix_10'
for i in range(1, pool_size + 1):
accounts.append(f"{prefix}_{i:02d}")
else:
# Mode 3: Single account mode
logger.info("No 'account_pool_size' provided. Treating 'account_pool' as a single account name.")
accounts = [prefix]
if not accounts:
raise AirflowException("Initial account pool is empty. Please check 'account_pool' and 'account_pool_size' parameters.")
logger.info(f"Generated initial account pool with {len(accounts)} accounts: {accounts}")
# --- Filter out banned/resting accounts by checking Redis ---
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
try:
redis_client = _get_redis_client(redis_conn_id)
active_accounts = []
for account in accounts:
status_key = f"account_status:{account}"
status_bytes = redis_client.hget(status_key, "status")
status = status_bytes.decode('utf-8') if status_bytes else "ACTIVE"
if status == 'BANNED':
logger.warning(f"Account '{account}' is BANNED. Skipping.")
continue
if 'RESTING' in status: # Check for 'RESTING' or 'RESTING (active in...)'
logger.info(f"Account '{account}' is RESTING. Skipping.")
continue
active_accounts.append(account)
if not active_accounts and accounts:
logger.error(f"All {len(accounts)} accounts in the pool are banned or resting.")
auto_create = params.get('auto_create_new_accounts_on_exhaustion', False)
if auto_create and is_prefix_mode:
prefix = account_pool_str
new_account_id = f"{prefix}-auto-{str(uuid.uuid4())[:8]}"
logger.warning(f"Account pool exhausted. Auto-creating new account: '{new_account_id}'")
active_accounts.append(new_account_id)
else:
if not auto_create:
logger.error("Auto-creation is disabled. No workers can be scheduled.")
if not is_prefix_mode:
logger.error("Auto-creation is only supported for prefix-based account pools.")
raise AirflowException("All accounts in the configured pool are currently exhausted (banned or resting).")
if len(active_accounts) < len(accounts):
logger.info(f"Filtered account pool. Using {len(active_accounts)} of {len(accounts)} available accounts.")
accounts = active_accounts
except Exception as e:
logger.error(f"Could not filter accounts by status from Redis. Using unfiltered pool. Error: {e}", exc_info=True)
if not accounts:
raise AirflowException("Account pool is empty after filtering. Please check account statuses in Redis or enable auto-creation.")
logger.info(f"Final active account pool with {len(accounts)} accounts: {accounts}")
return accounts
def pull_url_from_redis_callable(**context):
"""
Pulls a single URL from the Redis inbox queue.
If the queue is empty, it skips the DAG run.
Otherwise, it pushes the URL to XCom for downstream tasks.
"""
params = context['params']
ti = context['task_instance']
queue_name = params['queue_name']
redis_conn_id = params['redis_conn_id']
inbox_queue = f"{queue_name}_inbox"
logger.info(f"Attempting to pull one URL from Redis queue '{inbox_queue}'...")
client = _get_redis_client(redis_conn_id)
url_bytes = client.lpop(inbox_queue)
if not url_bytes:
logger.info("Queue is empty. Stopping this worker loop.")
raise AirflowSkipException("Redis queue is empty.")
url_to_process = url_bytes.decode('utf-8')
logger.info(f"Pulled URL '{url_to_process}' from the queue.")
ti.xcom_push(key='url_to_process', value=url_to_process)
def decide_what_to_do_next_callable(**context):
"""
Decides whether to continue the processing loop by triggering the next worker
or to stop the loop, based on task success, failure, or an empty queue.
"""
params = context['params']
dag_run = context['dag_run']
# Check if a failure was handled. If the 'handle_generic_failure' task was not skipped,
# it means a failure occurred somewhere in the pipeline.
handle_generic_failure_ti = dag_run.get_task_instance(task_id='handle_generic_failure')
if handle_generic_failure_ti and handle_generic_failure_ti.state != 'skipped':
logger.error(f"Failure handler task 'handle_generic_failure' was in state '{handle_generic_failure_ti.state}'. Stopping this processing lane.")
return 'fail_loop'
# Check if the worker was skipped because the Redis queue was empty.
pull_task_instance = dag_run.get_task_instance(task_id='pull_url_from_redis')
if pull_task_instance and pull_task_instance.state == 'skipped':
logger.info("Worker was skipped because Redis queue was empty.")
retrigger_delay_on_empty_s = params.get('retrigger_delay_on_empty_s', 60)
if retrigger_delay_on_empty_s < 0:
logger.info(f"retrigger_delay_on_empty_s is {retrigger_delay_on_empty_s}. Stopping this worker loop.")
return 'stop_loop'
else:
logger.info(f"Queue is empty. Will re-trigger this worker loop after a delay of {retrigger_delay_on_empty_s}s.")
return 'trigger_self_run'
# If no failure was handled and the queue wasn't empty, it must be a success.
logger.info("All preceding tasks succeeded. Continuing the processing loop by triggering the next worker.")
return 'trigger_self_run'
def assign_account_callable(**context):
"""
Selects an account for the run.
It uses the account from the previous run if available (affinity),
otherwise it picks a random one from the active pool.
"""
ti = context['task_instance']
params = context['params']
# Affinity logic: check if an account was passed from a previous run
account_id = params.get('current_account_id')
if account_id:
logger.info(f"Using account '{account_id}' passed from previous run (affinity).")
else:
logger.info("No account passed from previous run. Selecting a new one from the pool.")
account_pool = _get_account_pool(params)
account_id = random.choice(account_pool)
logger.info(f"Selected initial account '{account_id}'.")
ti.xcom_push(key='account_id', value=account_id)
ti.xcom_push(key='accounts_tried', value=[account_id])
def get_token_callable(**context):
"""Makes a single attempt to get a token from the Thrift service."""
ti = context['task_instance']
params = context['params']
# Determine which account to use (initial or retry)
# Pull from all upstreams, which might return a LazySelectSequence
xcom_results = ti.xcom_pull(task_ids=context['task'].upstream_task_ids, key='account_id')
# The result can be a single value or an iterable. We need to find the first valid item.
account_id = None
if hasattr(xcom_results, '__iter__') and not isinstance(xcom_results, str):
# It's a list, tuple, or LazySelectSequence. Find the first real value.
account_id = next((item for item in xcom_results if item is not None), None)
else:
# It's a single value
account_id = xcom_results
if not account_id:
raise AirflowException("Could not find a valid account_id in XCom from any upstream task.")
url = ti.xcom_pull(task_ids='pull_url_from_redis', key='url_to_process')
if not url:
logger.info("No URL pulled from XCom. Assuming upstream task was skipped. Ending task.")
return
host = params['service_ip']
port = int(params['service_port'])
timeout = int(params.get('timeout', DEFAULT_TIMEOUT))
# The value from templates_dict is already rendered by Airflow.
info_json_dir = context['templates_dict']['info_json_dir']
machine_id = params.get('machine_id') or socket.gethostname()
clients = params.get('clients')
logger.info(f"--- Attempting to get token for URL '{url}' with account '{account_id}' ---")
client, transport = None, None
try:
client, transport = _get_thrift_client(host, port, timeout)
client.ping()
call_kwargs = {'accountId': account_id, 'updateType': TokenUpdateMode.AUTO, 'url': url, 'clients': clients, 'machineId': machine_id}
token_data = client.getOrRefreshToken(**call_kwargs)
logger.info("Successfully retrieved token data from service.")
# --- Success Case ---
info_json = getattr(token_data, 'infoJson', None)
if info_json and json.loads(info_json):
video_id = _extract_video_id(url)
save_dir = info_json_dir or "."
os.makedirs(save_dir, exist_ok=True)
timestamp = int(time.time())
base_filename = f"info_{video_id or 'unknown'}_{account_id}_{timestamp}.json"
info_json_path = os.path.join(save_dir, base_filename)
with open(info_json_path, 'w', encoding='utf-8') as f:
f.write(info_json)
ti.xcom_push(key='info_json_path', value=info_json_path)
# Log key details from the info.json to confirm success
try:
info_data = json.loads(info_json)
if isinstance(info_data, dict):
title = info_data.get('title', 'N/A')
uploader = info_data.get('uploader', 'N/A')
duration = info_data.get('duration_string', 'N/A')
logger.info(f"Successfully got info.json for video: '{title}' by '{uploader}' ({duration})")
except Exception as log_e:
logger.warning(f"Could not log info.json details: {log_e}")
proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None)
ti.xcom_push(key='socks_proxy', value=getattr(token_data, proxy_attr) if proxy_attr else None)
ti.xcom_push(key='ytdlp_command', value=getattr(token_data, 'ytdlpCommand', None))
ti.xcom_push(key='successful_account_id', value=account_id) # For affinity
ti.xcom_push(key='get_token_succeeded', value=True)
except (PBServiceException, PBUserException, TTransportException) as e:
logger.error(f"Thrift call failed for account '{account_id}'. Exception: {getattr(e, 'message', str(e))}")
error_context = getattr(e, 'context', None)
if isinstance(error_context, str):
try:
error_context = json.loads(error_context.replace("'", "\""))
except: pass
error_details = {
'error_message': getattr(e, 'message', str(e)),
'error_code': getattr(e, 'errorCode', 'TRANSPORT_ERROR'),
'error_type': type(e).__name__,
'traceback': traceback.format_exc(),
'proxy_url': error_context.get('proxy_url') if isinstance(error_context, dict) else None
}
ti.xcom_push(key='error_details', value=error_details)
ti.xcom_push(key='get_token_succeeded', value=False)
# For non-bannable errors like Connection Refused, fail the task immediately to stop the loop.
# Bannable errors will let the task succeed, allowing the branch operator to decide on a retry.
error_code = error_details.get('error_code', '').strip()
error_message = error_details.get('error_message', '').lower()
bannable_codes = ["BOT_DETECTED", "BOT_DETECTION_SIGN_IN_REQUIRED", "SOCKS5_CONNECTION_FAILED"]
is_bannable = error_code in bannable_codes
# Override bannable status for age-restricted content, which is not a true bot detection.
if is_bannable and ('confirm your age' in error_message or 'age-restricted' in error_message):
logger.warning(f"Error is age-related ('{error_code}'). Treating as a non-bannable failure to avoid banning the account.")
is_bannable = False
if not is_bannable:
logger.error(f"Non-bannable error '{error_code}' detected. Failing task to stop the loop.")
raise AirflowException(f"Non-bannable Thrift call failed: {error_details['error_message']}")
else:
logger.warning(f"Bannable error '{error_code}' detected. Passing to branch operator for handling.")
# Do not raise exception here; let the branch operator handle it.
finally:
if transport and transport.isOpen():
transport.close()
def handle_bannable_error_branch_callable(**context):
"""
Checks if a `get_token` failure is bannable and if a retry is allowed.
"""
ti = context['task_instance']
params = context['params']
# Check the result of the first get_token attempt
get_token_succeeded = ti.xcom_pull(task_ids='get_token', key='get_token_succeeded')
if get_token_succeeded:
return 'setup_download_and_probe'
# It failed, so check the error details
error_details = ti.xcom_pull(task_ids='get_token', key='error_details')
if not error_details:
logger.error("get_token failed but no error details were found in XCom. Stopping loop.")
return 'handle_generic_failure'
error_code = error_details.get('error_code', '').strip()
error_message = error_details.get('error_message', '').lower()
policy = params.get('on_bannable_failure', 'retry_with_new_account')
bannable_codes = ["BOT_DETECTED", "BOT_DETECTION_SIGN_IN_REQUIRED", "SOCKS5_CONNECTION_FAILED"]
is_bannable = error_code in bannable_codes
# Override bannable status for age-restricted content, which is not a true bot detection.
if is_bannable and ('confirm your age' in error_message or 'age-restricted' in error_message):
logger.warning(f"Error is age-related ('{error_code}'). Treating as a non-bannable failure to avoid banning the account.")
is_bannable = False
logger.info(f"Handling failure from 'get_token'. Error code: '{error_code}', Policy: '{policy}'")
if is_bannable and policy == 'retry_with_new_account':
logger.info("Error is bannable and policy allows retry. Proceeding to ban first account and retry.")
return 'ban_account_and_prepare_for_retry'
elif is_bannable: # and policy is 'stop_loop'
logger.warning("Error is bannable and policy is 'stop_loop'. Banning account and stopping.")
return 'ban_account_and_fail'
else:
logger.warning("Error is not considered bannable. Proceeding to generic failure handling.")
return 'handle_generic_failure'
def assign_new_account_for_retry_callable(**context):
"""Selects a new, unused account for the retry attempt."""
ti = context['task_instance']
params = context['params']
accounts_tried = ti.xcom_pull(task_ids='assign_account', key='accounts_tried')
if not accounts_tried:
raise AirflowException("Cannot retry, list of previously tried accounts not found.")
logger.info(f"Policy is 'retry_with_new_account'. Selecting a new account. Already tried: {accounts_tried}")
try:
account_pool = _get_account_pool(params)
available_for_retry = [acc for acc in account_pool if acc not in accounts_tried]
new_account_id = None
if available_for_retry:
new_account_id = random.choice(available_for_retry)
else:
# No unused accounts left in the pool. Check if we can auto-create one.
logger.warning("No unused accounts available in the pool for a retry. Checking for auto-creation.")
auto_create = params.get('auto_create_new_accounts_on_exhaustion', False)
account_pool_str = params.get('account_pool', 'default_account')
pool_size_param = params.get('account_pool_size')
is_prefix_mode = pool_size_param is not None and ',' not in account_pool_str
if auto_create and is_prefix_mode:
prefix = account_pool_str
new_account_id = f"{prefix}-auto-{str(uuid.uuid4())[:8]}"
logger.warning(f"Auto-creating new account for retry: '{new_account_id}'")
else:
if not auto_create:
logger.error("Auto-creation is disabled.")
if not is_prefix_mode:
logger.error("Auto-creation is only supported for prefix-based account pools (requires 'account_pool_size').")
raise AirflowException("No other accounts available in the pool for a retry.")
accounts_tried.append(new_account_id)
logger.info(f"Selected new account for retry: '{new_account_id}'")
ti.xcom_push(key='account_id', value=new_account_id)
ti.xcom_push(key='accounts_tried', value=accounts_tried)
except Exception as e:
logger.error(f"Could not get a new account for retry: {e}")
raise AirflowException(f"Failed to assign new account for retry: {e}")
def handle_retry_failure_branch_callable(**context):
"""Checks the result of the retry_get_token task."""
ti = context['task_instance']
retry_succeeded = ti.xcom_pull(task_ids='retry_get_token', key='get_token_succeeded')
if retry_succeeded:
logger.info("Retry attempt was successful.")
return 'setup_download_and_probe'
else:
logger.error("Retry attempt also failed. Banning second account and proxy.")
return 'ban_second_account_and_proxy'
def ban_second_account_and_proxy_callable(**context):
"""Bans the second account and the proxy used in the failed retry."""
ti = context['task_instance']
params = context['params']
# Ban the second account
account_to_ban = ti.xcom_pull(task_ids='assign_new_account_for_retry', key='account_id')
if account_to_ban:
ti.xcom_push(key='account_to_ban', value=account_to_ban)
_ban_resource_task(
'account', 'account_to_ban',
params['service_ip'], int(params['service_port']), int(params.get('timeout', DEFAULT_TIMEOUT)),
ti=ti, reason="Failed on retry attempt"
)
# Ban the proxy
error_details = ti.xcom_pull(task_ids='retry_get_token', key='error_details')
proxy_to_ban = error_details.get('proxy_url') if error_details else None
if proxy_to_ban:
ti.xcom_push(key='proxy_to_ban', value=proxy_to_ban)
_ban_resource_task(
'proxy', 'proxy_to_ban',
params['service_ip'], int(params['service_port']), int(params.get('timeout', DEFAULT_TIMEOUT)),
ti=ti, server_identity=(params.get('machine_id') or socket.gethostname())
)
def trigger_self_run_callable(**context):
"""Triggers a new run of this same DAG to continue the processing loop, with an optional delay."""
ti = context['task_instance']
params = context['params']
dag_run = context['dag_run']
# Check if this was triggered due to an empty queue to apply the specific delay.
pull_task_instance = dag_run.get_task_instance(task_id='pull_url_from_redis')
is_empty_queue_scenario = pull_task_instance and pull_task_instance.state == 'skipped'
delay = 0
if is_empty_queue_scenario:
# Use the specific delay for empty queues. Default to 60s.
delay = params.get('retrigger_delay_on_empty_s', 60)
logger.info(f"Queue was empty. Applying delay of {delay}s before re-triggering.")
else:
# For successful runs, re-trigger immediately by default.
logger.info("Worker finished successfully. Triggering next run of itself to continue the loop.")
delay = 0 # Immediate re-trigger on success.
if delay > 0:
logger.info(f"Waiting for {delay}s before triggering next run.")
time.sleep(delay)
# Generate a unique run_id for the new worker run
run_id = f"self_triggered_{datetime.utcnow().isoformat()}"
# Pass through all original parameters to the new run.
conf_to_pass = {k: v for k, v in params.items() if v is not None}
# The new run will pull its own URL, so we ensure 'url' is not passed.
if 'url' in conf_to_pass:
del conf_to_pass['url']
# Pass the successful account ID to the next run for affinity.
# It could come from the first attempt or the retry.
successful_account_ids = ti.xcom_pull(task_ids=['get_token', 'retry_get_token'], key='successful_account_id')
successful_account_id = next((acc for acc in successful_account_ids if acc), None)
if successful_account_id:
conf_to_pass['current_account_id'] = successful_account_id
logger.info(f"Passing successful account '{successful_account_id}' to the next worker run for affinity.")
else:
# If no account was successful (e.g., empty queue scenario), don't pass one.
# The next run will pick a new one.
conf_to_pass['current_account_id'] = None
logger.info("No successful account ID found. Next worker will select a new account from the pool.")
logger.info(f"Triggering 'ytdlp_ops_worker_per_url' with run_id '{run_id}' and conf: {conf_to_pass}")
trigger_dag(
dag_id='ytdlp_ops_worker_per_url', # Trigger itself
run_id=run_id,
conf=conf_to_pass,
replace_microseconds=False
)
logger.info("Successfully triggered the next worker run.")
# =============================================================================
# DAG Definition
# =============================================================================
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
'start_date': days_ago(1),
}
with DAG(
dag_id='ytdlp_ops_worker_per_url',
default_args=default_args,
schedule_interval=None,
catchup=False,
description='Self-sustaining worker DAG that processes URLs from a Redis queue in a continuous loop.',
doc_md="""
### YT-DLP Self-Sustaining Worker
This DAG is a self-sustaining worker that processes URLs in a continuous loop.
It is started by the `ytdlp_ops_orchestrator` (the "ignition system").
#### How it Works:
1. **Ignition:** An initial run is triggered by the orchestrator.
2. **Pull & Assign:** It pulls a URL from Redis and assigns an account for the job, reusing the last successful account if available (affinity).
3. **Get Token:** It calls the `ytdlp-ops-server` to get tokens and `info.json`.
4. **Failure Handling:** If `get_token` fails with a "bannable" error (like bot detection), it follows the `on_bannable_failure` policy:
- `retry_with_new_account` (default): It bans the failing account, picks a new one, and retries the `get_token` call once. If the retry also fails, it bans the second account and the proxy, then stops the loop.
- `stop_loop`: It bans the account and stops the loop immediately.
5. **Download:** If tokens are retrieved successfully, it downloads the media.
6. **Continue or Stop:** After success, or a non-recoverable failure, it decides whether to continue the loop by re-triggering itself or to stop.
This creates a "processing lane" that runs independently until the queue is empty or a failure occurs.
""",
tags=['ytdlp', 'thrift', 'client', 'worker', 'loop'],
params={
# Worker loop control params (passed from orchestrator)
'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Base name for Redis queues."),
'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="Airflow Redis connection ID."),
# Worker-specific params
'service_ip': Param(DEFAULT_YT_AUTH_SERVICE_IP, type="string", description="Service IP. Default is from Airflow variable YT_AUTH_SERVICE_IP or hardcoded."),
'service_port': Param(DEFAULT_YT_AUTH_SERVICE_PORT, type="integer", description="Port of the Envoy load balancer. Default is from Airflow variable YT_AUTH_SERVICE_PORT or hardcoded."),
'account_pool': Param('default_account', type="string", description="Account pool prefix or comma-separated list."),
'account_pool_size': Param(None, type=["integer", "null"], description="If using a prefix for 'account_pool', this specifies the number of accounts to generate (e.g., 10 for 'prefix_01' through 'prefix_10'). Required when using a prefix."),
'machine_id': Param(None, type=["string", "null"], description="Identifier for the client machine, used for proxy usage tracking. If not set, worker hostname will be used."),
'clients': Param('mweb', type="string", description="Comma-separated list of clients to use for token generation (e.g., 'ios,android,mweb')."),
'timeout': Param(DEFAULT_TIMEOUT, type="integer", description="Timeout in seconds for the Thrift connection."),
'download_format': Param('ba[ext=m4a]/bestaudio/best', type="string", description="yt-dlp format selection string."),
'output_path_template': Param("%(title)s [%(id)s].%(ext)s", type="string", description="yt-dlp output filename template."),
'on_bannable_failure': Param(
'retry_with_new_account',
type="string",
enum=['stop_loop', 'retry_with_new_account'],
title="On Bannable Failure Policy",
description="Policy for when a bannable error occurs. 'stop_loop' or 'retry_with_new_account'."
),
'retry_on_probe_failure': Param(False, type="boolean", description="If True, attempts to re-download and probe a file if the initial probe fails."),
'auto_create_new_accounts_on_exhaustion': Param(True, type="boolean", description="If True and all accounts in a prefix-based pool are exhausted, create a new one automatically."),
'retrigger_delay_on_empty_s': Param(60, type="integer", description="Delay in seconds before re-triggering a worker if the queue is empty. Set to -1 to stop the loop."),
# --- Internal Worker Parameters (for self-triggering loop) ---
'current_account_id': Param(None, type=["string", "null"], description="[Internal] The account ID used by the previous run in this worker lane. Used to maintain account affinity."),
}
) as dag:
pull_url_from_redis = PythonOperator(
task_id='pull_url_from_redis',
python_callable=pull_url_from_redis_callable,
)
assign_account = PythonOperator(
task_id='assign_account',
python_callable=assign_account_callable,
)
get_token = PythonOperator(
task_id='get_token',
python_callable=get_token_callable,
templates_dict={'info_json_dir': "{{ dag_run.conf.get('info_json_dir', var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles')) }}"},
)
handle_bannable_error_branch = BranchPythonOperator(
task_id='handle_bannable_error_branch',
python_callable=handle_bannable_error_branch_callable,
trigger_rule='all_done', # Run even if get_token succeeds
)
# --- Retry Path ---
ban_account_and_prepare_for_retry = PythonOperator(
task_id='ban_account_and_prepare_for_retry',
python_callable=_ban_resource_task,
op_kwargs={
'resource_type': 'account',
'resource_id_xcom_key': 'account_id',
'host': "{{ params.service_ip }}",
'port': "{{ params.service_port }}",
'timeout': "{{ params.timeout }}",
'reason': "Bannable error detected, preparing for retry."
},
)
assign_new_account_for_retry = PythonOperator(
task_id='assign_new_account_for_retry',
python_callable=assign_new_account_for_retry_callable,
)
retry_get_token = PythonOperator(
task_id='retry_get_token',
python_callable=get_token_callable,
templates_dict={'info_json_dir': "{{ dag_run.conf.get('info_json_dir', var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles')) }}"},
)
handle_retry_failure_branch = BranchPythonOperator(
task_id='handle_retry_failure_branch',
python_callable=handle_retry_failure_branch_callable,
trigger_rule='none_skipped',
)
ban_second_account_and_proxy = PythonOperator(
task_id='ban_second_account_and_proxy',
python_callable=ban_second_account_and_proxy_callable,
)
# --- Stop Path ---
ban_account_and_fail = PythonOperator(
task_id='ban_account_and_fail',
python_callable=_ban_resource_task,
op_kwargs={
'resource_type': 'account',
'resource_id_xcom_key': 'account_id',
'host': "{{ params.service_ip }}",
'port': "{{ params.service_port }}",
'timeout': "{{ params.timeout }}",
'reason': "Bannable error detected, policy is stop_loop."
},
)
# --- Main Execution Path ---
setup_download_and_probe = DummyOperator(
task_id='setup_download_and_probe',
trigger_rule='one_success',
)
download_and_probe = BashOperator(
task_id='download_and_probe',
bash_command="""
set -e
INFO_JSON_PATH_1="{{ ti.xcom_pull(task_ids='get_token', key='info_json_path') }}"
INFO_JSON_PATH_2="{{ ti.xcom_pull(task_ids='retry_get_token', key='info_json_path') }}"
INFO_JSON_PATH="${INFO_JSON_PATH_1:-$INFO_JSON_PATH_2}"
PROXY_1="{{ ti.xcom_pull(task_ids='get_token', key='socks_proxy') }}"
PROXY_2="{{ ti.xcom_pull(task_ids='retry_get_token', key='socks_proxy') }}"
PROXY="${PROXY_1:-$PROXY_2}"
FORMAT="{{ params.download_format }}"
DOWNLOAD_DIR="{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles/video') }}"
FILENAME_TEMPLATE="{{ params.output_path_template }}"
FULL_OUTPUT_PATH="$DOWNLOAD_DIR/$FILENAME_TEMPLATE"
echo "--- Starting Download Step ---"
echo "Info JSON Path: $INFO_JSON_PATH"
echo "Proxy: $PROXY"
echo "Format: $FORMAT"
echo "Download Directory: $DOWNLOAD_DIR"
echo "Full Output Path: $FULL_OUTPUT_PATH"
if [ -z "$INFO_JSON_PATH" ] || [ "$INFO_JSON_PATH" == "None" ] || [ ! -f "$INFO_JSON_PATH" ]; then
echo "Error: info.json path is missing or file does not exist ($INFO_JSON_PATH)."
exit 1
fi
CMD_ARRAY=(yt-dlp --load-info-json "$INFO_JSON_PATH")
if [ -n "$PROXY" ] && [ "$PROXY" != "None" ]; then
CMD_ARRAY+=(--proxy "$PROXY")
fi
CMD_ARRAY+=(-f "$FORMAT" -o "$FULL_OUTPUT_PATH" --print filename)
CMD_ARRAY+=(--continue --no-progress --no-simulate --no-write-info-json --ignore-errors --no-playlist)
echo "Executing: $(printf "%q " "${CMD_ARRAY[@]}")"
FINAL_FILENAME=$("${CMD_ARRAY[@]}")
EXIT_CODE=$?
echo "yt-dlp exited with code: $EXIT_CODE"
if [ $EXIT_CODE -ne 0 ]; then
echo "Error: yt-dlp command failed."
exit $EXIT_CODE
fi
if [ -z "$FINAL_FILENAME" ] || [ ! -f "$FINAL_FILENAME" ]; then
echo "Error: Download failed or did not produce a file."
exit 1
fi
echo "SUCCESS: Download complete. Final file at: $FINAL_FILENAME"
echo "--- Starting Probe Step ---"
echo "Probing downloaded file: $FINAL_FILENAME"
if ! ffmpeg -v error -i "$FINAL_FILENAME" -f null - ; then
echo "Error: ffmpeg probe check failed for '$FINAL_FILENAME'. The file might be corrupt."
if [ "{{ params.retry_on_probe_failure }}" == "True" ]; then
echo "Attempting one retry on probe failure..."
echo "Renaming to .part to attempt resuming download."
mv -f "$FINAL_FILENAME" "$FINAL_FILENAME.part"
# Re-run download command
echo "Re-executing: $(printf "%q " "${CMD_ARRAY[@]}")"
FINAL_FILENAME=$("${CMD_ARRAY[@]}")
EXIT_CODE=$?
echo "yt-dlp retry exited with code: $EXIT_CODE"
if [ $EXIT_CODE -ne 0 ]; then
echo "Error: yt-dlp retry command failed."
exit $EXIT_CODE
fi
if [ -z "$FINAL_FILENAME" ] || [ ! -f "$FINAL_FILENAME" ]; then
echo "Error: Retry download failed or did not produce a file."
exit 1
fi
echo "SUCCESS: Retry download complete. Final file at: $FINAL_FILENAME"
# Re-probe
echo "Probing redownloaded file: $FINAL_FILENAME"
if ! ffmpeg -v error -i "$FINAL_FILENAME" -f null - ; then
echo "Error: ffmpeg probe check failed again for '$FINAL_FILENAME'. Failing with exit code 2."
exit 2
fi
else
echo "Failing with exit code 2 due to probe failure (retries disabled)."
exit 2
fi
fi
echo "SUCCESS: Probe confirmed valid media file."
# Push the final filename for the success_task
echo "$FINAL_FILENAME"
""",
retries=0,
retry_delay=timedelta(minutes=1),
)
# --- Finalization Tasks ---
mark_url_as_success = PythonOperator(
task_id='mark_url_as_success',
python_callable=mark_url_as_success,
)
handle_generic_failure = PythonOperator(
task_id='handle_generic_failure',
python_callable=handle_failure_callable,
trigger_rule='one_failed', # Trigger if any upstream in the failure path fails
)
decide_next_step = BranchPythonOperator(
task_id='decide_what_to_do_next',
python_callable=decide_what_to_do_next_callable,
trigger_rule='all_done',
)
trigger_self_run = PythonOperator(
task_id='trigger_self_run',
python_callable=trigger_self_run_callable,
)
stop_loop = DummyOperator(task_id='stop_loop')
fail_loop = BashOperator(task_id='fail_loop', bash_command='exit 1')
# --- Define Task Dependencies ---
pull_url_from_redis >> assign_account >> get_token >> handle_bannable_error_branch
# The branch operator decides the path after the first token attempt.
# It can go to the success path (setup_download_and_probe), the retry path (ban_account_and_prepare_for_retry),
# the stop-on-failure path (ban_account_and_fail), or the generic failure path.
handle_bannable_error_branch >> [
setup_download_and_probe,
ban_account_and_prepare_for_retry,
ban_account_and_fail,
handle_generic_failure,
]
# The retry path itself
ban_account_and_prepare_for_retry >> assign_new_account_for_retry >> retry_get_token >> handle_retry_failure_branch
# The branch operator after the retry attempt.
# It can go to the success path (setup_download_and_probe) or the final failure path (ban_second_account_and_proxy).
handle_retry_failure_branch >> [
setup_download_and_probe,
ban_second_account_and_proxy,
]
# The main success path, which can be reached from either the first attempt or the retry.
setup_download_and_probe >> download_and_probe >> mark_url_as_success
# Define all paths that lead to the generic failure handler.
# This task runs if any of its direct upstreams fail.
download_and_probe >> handle_generic_failure
ban_account_and_fail >> handle_generic_failure
ban_second_account_and_proxy >> handle_generic_failure
# A non-bannable failure in get_token will fail the task, which is handled by the branch operator
# which then correctly routes to handle_generic_failure. A direct link is not needed.
# Final decision point. It runs after success or after failure has been handled.
[mark_url_as_success, handle_generic_failure] >> decide_next_step
decide_next_step >> [trigger_self_run, stop_loop, fail_loop]