yt-dlp-dags/dags/ytdlp_ops_worker_per_url.py

656 lines
30 KiB
Python

# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2024 rl <rl@rlmbp>
#
# Distributed under terms of the MIT license.
"""
DAG for processing a single YouTube URL passed via DAG run configuration.
This is the "Worker" part of a Sensor/Worker pattern.
"""
from airflow import DAG
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models import BaseOperator, Variable
from airflow.models.param import Param
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.redis.hooks.redis import RedisHook
from airflow.utils.dates import days_ago
from airflow.utils.decorators import apply_defaults
from datetime import datetime, timedelta
from pangramia.yt.common.ttypes import TokenUpdateMode
from pangramia.yt.exceptions.ttypes import PBServiceException
from pangramia.yt.tokens_ops import YTTokenOpService
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport
from thrift.transport.TTransport import TTransportException
import json
import logging
import os
import redis
import socket
import time
import traceback
# Import utility functions
from utils.redis_utils import _get_redis_client
# Configure logging
logger = logging.getLogger(__name__)
# Default settings
DEFAULT_QUEUE_NAME = 'video_queue'
DEFAULT_REDIS_CONN_ID = 'redis_default'
DEFAULT_MAX_URLS = 1
DEFAULT_TIMEOUT = 30 # Default Thrift timeout in seconds
# --- Helper Functions ---
def _extract_video_id(url):
"""Extracts YouTube video ID from URL."""
if not url or not isinstance(url, str):
logger.debug("URL is empty or not a string, cannot extract video ID.")
return None
try:
video_id = None
if 'youtube.com/watch?v=' in url:
video_id = url.split('v=')[1].split('&')[0]
elif 'youtu.be/' in url:
video_id = url.split('youtu.be/')[1].split('?')[0]
if video_id and len(video_id) >= 11:
video_id = video_id[:11] # Standard ID length
logger.debug(f"Extracted video ID '{video_id}' from URL: {url}")
return video_id
else:
logger.debug(f"Could not extract a standard video ID pattern from URL: {url}")
return None
except Exception as e:
logger.error(f"Failed to extract video ID from URL '{url}'. Error: {e}")
return None
# --- Queue Management Callables (for success/failure reporting) ---
def mark_proxy_banned_callable(**context):
"""Makes a Thrift call to ban a proxy if the get_token task failed with a bannable error."""
ti = context['task_instance']
proxy_to_ban = ti.xcom_pull(task_ids='get_token', key='proxy_to_ban')
if not proxy_to_ban:
logger.info("No proxy to ban was pushed to XCom. Skipping task.")
raise AirflowSkipException("No proxy to ban was identified in the upstream failure.")
server_identity = ti.xcom_pull(task_ids='get_token', key='server_identity_for_ban')
host = ti.xcom_pull(task_ids='get_token', key='service_host_for_ban')
port = ti.xcom_pull(task_ids='get_token', key='service_port_for_ban')
if not all([server_identity, host, port]):
logger.error("Missing connection details (identity, host, or port) from XCom. Cannot ban proxy.")
raise AirflowException("Missing connection details to ban proxy.")
logger.warning(f"Attempting to ban proxy '{proxy_to_ban}' for server '{server_identity}' at {host}:{port}.")
transport = None
try:
socket_conn = TSocket.TSocket(host, port, socket_family=socket.AF_INET)
socket_conn.setTimeout(15 * 1000) # 15s timeout for ban call
transport = TTransport.TFramedTransport(socket_conn)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = YTTokenOpService.Client(protocol)
transport.open()
client.banProxy(proxyUrl=proxy_to_ban, serverIdentity=server_identity)
logger.info(f"Successfully sent request to ban proxy '{proxy_to_ban}'.")
except Exception as ban_exc:
logger.error(f"Failed to send ban request for proxy '{proxy_to_ban}': {ban_exc}", exc_info=True)
# We should fail the task if the ban call fails, as it's an important side-effect.
raise AirflowException(f"Failed to ban proxy: {ban_exc}")
finally:
if transport and transport.isOpen():
transport.close()
def mark_url_as_success(**context):
"""Moves URL from progress to result hash on success."""
ti = context['task_instance']
params = context['params']
url = params.get('url') # Get URL from params, not XCom
if not url:
logger.warning("mark_url_as_success called but no URL found in DAG run parameters.")
return
queue_name = params['queue_name']
result_queue = f"{queue_name}_result"
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
# Pull results from previous tasks
info_json_path = ti.xcom_pull(task_ids='get_token', key='info_json_path')
socks_proxy = ti.xcom_pull(task_ids='get_token', key='socks_proxy')
ytdlp_command = ti.xcom_pull(task_ids='get_token', key='ytdlp_command')
downloaded_file_path = ti.xcom_pull(task_ids='download_and_probe')
logger.info(f"Handling success for URL: {url}")
logger.info(f" Downloaded File Path: {downloaded_file_path}")
result_data = {
'status': 'success',
'end_time': time.time(),
'info_json_path': info_json_path,
'socks_proxy': socks_proxy,
'ytdlp_command': ytdlp_command,
'downloaded_file_path': downloaded_file_path,
'url': url,
'dag_run_id': context['dag_run'].run_id,
}
try:
# In the worker pattern, there's no "progress" hash to remove from.
# We just add the result to the success hash.
client = _get_redis_client(redis_conn_id)
client.hset(result_queue, url, json.dumps(result_data))
logger.info(f"Stored success result for URL '{url}' in result hash '{result_queue}'.")
except Exception as e:
logger.error(f"Error handling success in Redis for URL '{url}': {e}", exc_info=True)
# Log error but don't fail the task, as the main work succeeded.
def mark_url_as_failed(**context):
"""
Handles failed processing. Records detailed error information to the fail hash
and, if stop_on_failure is True, fails the task to make the DAG run failure visible.
"""
ti = context['task_instance']
params = context['params']
url = params.get('url') # Get URL from params
if not url:
logger.error("mark_url_as_failed called but no URL found in DAG run parameters.")
return
queue_name = params['queue_name']
fail_queue = f"{queue_name}_fail"
inbox_queue = f"{queue_name}_inbox"
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
stop_on_failure = params.get('stop_on_failure', True)
# Determine if we should requeue based on various parameters
should_requeue = params.get('requeue_on_failure', False)
requeue_on_bannable_error = params.get('requeue_on_bannable_error', False)
requeue_on_ffprobe_failure = params.get('requeue_on_ffprobe_failure', False)
# --- Extract Detailed Error Information ---
exception = context.get('exception')
# Find the specific task that failed to pull its XComs
dag_run = context['dag_run']
failed_task_id = "unknown"
upstream_tasks = ti.task.get_direct_relatives(upstream=True)
for task in upstream_tasks:
upstream_ti = dag_run.get_task_instance(task_id=task.task_id)
if upstream_ti and upstream_ti.state == 'failed':
failed_task_id = task.task_id
break
error_details = None
if failed_task_id != "unknown":
error_details = ti.xcom_pull(task_ids=failed_task_id, key='error_details')
if error_details:
error_message = error_details.get('error_message', 'Unknown error from XCom')
error_type = error_details.get('error_type', 'Unknown type from XCom')
tb_str = error_details.get('traceback', 'No traceback in XCom.')
else:
error_message = str(exception) if exception else "Unknown error"
error_type = type(exception).__name__ if exception else "Unknown"
tb_str = "".join(traceback.format_exception(etype=type(exception), value=exception, tb=exception.__traceback__)) if exception else "No traceback available."
logger.info(f"Handling failure for URL: {url}")
logger.error(f" Failed Task: {failed_task_id}")
logger.error(f" Failure Type: {error_type}")
logger.error(f" Failure Reason: {error_message}")
logger.debug(f" Traceback:\n{tb_str}")
# --- Check for specific requeue conditions ---
if not should_requeue: # Only check specific conditions if the general one is false
if requeue_on_bannable_error and isinstance(exception, PBServiceException):
bannable_error_codes = [
"BOT_DETECTED", "BOT_DETECTION_SIGN_IN_REQUIRED",
"SOCKS5_CONNECTION_FAILED", "CLIENT_TIMEOUT", "GLOBAL_TIMEOUT"
]
if hasattr(exception, 'errorCode') and exception.errorCode in bannable_error_codes:
should_requeue = True
logger.info(f"Bannable error '{exception.errorCode}' detected. Re-queuing URL as per 'requeue_on_bannable_error' param.")
if requeue_on_ffprobe_failure and isinstance(exception, AirflowException) and "Bash command failed" in str(exception):
# Check for the specific exit code for probe failure
if "exit code 2" in str(exception):
should_requeue = True
logger.info("Probe failure detected (exit code 2). Re-queuing URL as per 'requeue_on_ffprobe_failure' param.")
try:
client = _get_redis_client(redis_conn_id)
if should_requeue:
client.rpush(inbox_queue, url)
logger.info(f"Re-queued failed URL '{url}' to inbox '{inbox_queue}' for retry.")
else:
fail_data = {
'status': 'failed',
'end_time': time.time(),
'failed_task': failed_task_id,
'error_type': error_type,
'error_message': error_message,
'traceback': tb_str,
'url': url,
'dag_run_id': context['dag_run'].run_id,
}
client.hset(fail_queue, url, json.dumps(fail_data, indent=2))
logger.info(f"Stored detailed failure info for URL '{url}' in fail hash '{fail_queue}'.")
except Exception as e:
logger.error(f"Critical error during failure handling in Redis for URL '{url}': {e}", exc_info=True)
# This is a critical error in the failure handling logic itself.
raise AirflowException(f"Could not handle failure in Redis: {e}")
# If stop_on_failure is True, we should fail this task to make the DAG run fail.
# The loop is already stopped by the DAG structure, but this makes the failure visible.
if stop_on_failure:
logger.error("stop_on_failure is True. Failing this task to mark the DAG run as failed.")
# Re-raise the original exception to fail the task instance.
if exception:
raise exception
else:
# If we got details from XCom, we don't have the original exception object.
# So, we raise a new AirflowException with the details we have.
raise AirflowException(f"Failing task as per stop_on_failure=True. Upstream error: [{error_type}] {error_message}")
# --- YtdlpOpsOperator ---
class YtdlpOpsOperator(BaseOperator):
"""
Custom Airflow operator to interact with YTDLP Thrift service.
Processes a single URL passed via DAG run configuration.
"""
template_fields = ('service_ip', 'service_port', 'account_id', 'timeout', 'info_json_dir')
@apply_defaults
def __init__(self,
service_ip=None,
service_port=None,
account_id=None,
info_json_dir=None,
timeout=DEFAULT_TIMEOUT,
*args, **kwargs):
super().__init__(*args, **kwargs)
logger.info(f"Initializing YtdlpOpsOperator (Worker Version) with parameters: "
f"service_ip={service_ip}, service_port={service_port}, "
f"account_id={account_id}, info_json_dir={info_json_dir}, timeout={timeout}")
if not service_ip or not service_port:
raise ValueError("Both service_ip and service_port must be specified.")
if not account_id:
logger.warning("No account_id provided. Ensure it's set in DAG params or operator config.")
self.service_ip = service_ip
self.service_port = service_port
self.account_id = account_id
self.info_json_dir = info_json_dir
self.timeout = timeout
def execute(self, context):
logger.info("Executing YtdlpOpsOperator (Worker Version)")
transport = None
ti = context['task_instance']
# Define connection parameters outside the try block to be available in except blocks
params = context['params']
url = params.get('url')
if not url:
raise AirflowException("DAG was triggered without a 'url' in its configuration.")
service_ip = self.render_template(self.service_ip, context)
service_port_rendered = self.render_template(self.service_port, context)
account_id = self.render_template(self.account_id, context)
timeout_rendered = self.render_template(self.timeout, context)
info_json_dir = self.render_template(self.info_json_dir, context)
host = params.get('service_ip', service_ip)
port_str = params.get('service_port', service_port_rendered)
account_id = params.get('account_id', account_id)
clients = params.get('clients')
logger.info(f"Using direct connection settings: service_ip={host}, service_port={port_str}")
if not host or not port_str:
raise ValueError("Direct connection requires service_ip and service_port")
try:
port = int(port_str)
except (ValueError, TypeError):
raise ValueError(f"Invalid service_port value: {port_str}")
try:
timeout = int(timeout_rendered)
if timeout <= 0: raise ValueError("Timeout must be positive")
except (ValueError, TypeError):
timeout = DEFAULT_TIMEOUT
try:
logger.info(f"Processing URL from DAG run config: {url}")
socket_conn = TSocket.TSocket(host, port, socket_family=socket.AF_INET)
socket_conn.setTimeout(timeout * 1000)
transport = TTransport.TFramedTransport(socket_conn)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = YTTokenOpService.Client(protocol)
transport.open()
logger.info("Successfully connected to Thrift server.")
client.ping()
logger.info("Server ping successful.")
token_data = client.getOrRefreshToken(
accountId=account_id,
updateType=TokenUpdateMode.AUTO,
url=url,
clients=clients
)
logger.info("Successfully retrieved token data from service.")
info_json_path = None
info_json = self._get_info_json(token_data)
if info_json and self._is_valid_json(info_json):
info_json_path = self._save_info_json(context, info_json, url, account_id, info_json_dir)
if info_json_path:
ti.xcom_push(key='info_json_path', value=info_json_path)
else:
ti.xcom_push(key='info_json_path', value=None)
else:
ti.xcom_push(key='info_json_path', value=None)
socks_proxy = None
proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None)
if proxy_attr:
socks_proxy = getattr(token_data, proxy_attr)
ti.xcom_push(key='socks_proxy', value=socks_proxy)
ytdlp_cmd = getattr(token_data, 'ytdlpCommand', None)
ti.xcom_push(key='ytdlp_command', value=ytdlp_cmd)
except (PBServiceException, TTransportException) as e:
# Enhanced logging to make failures clear in Airflow logs.
logger.error(f"Thrift call failed for URL '{url}' with account '{account_id}'.")
logger.error(f"Exception Type: {type(e).__name__}")
logger.error(f"Exception Message: {getattr(e, 'message', str(e))}")
if isinstance(e, PBServiceException):
logger.error(f"Service Error Code: {getattr(e, 'errorCode', 'N/A')}")
if hasattr(e, 'context') and e.context:
logger.error(f"Service Context: {e.context}")
# Use exc_info=True to get the full traceback in the logs
logger.error("Full exception traceback:", exc_info=True)
# Push exception details to XCom for the failure handler
error_details = {
'error_message': getattr(e, 'message', str(e)),
'error_type': type(e).__name__,
'traceback': traceback.format_exc()
}
ti.xcom_push(key='error_details', value=error_details)
proxy_to_ban = None
if isinstance(e, PBServiceException) and hasattr(e, 'context') and e.context:
# Assuming server adds 'proxy_url' to context on failure
proxy_to_ban = e.context.get('proxy_url')
bannable_error_codes = [
"BOT_DETECTED", "BOT_DETECTION_SIGN_IN_REQUIRED",
"SOCKS5_CONNECTION_FAILED", "CLIENT_TIMEOUT", "GLOBAL_TIMEOUT"
]
if e.errorCode not in bannable_error_codes:
proxy_to_ban = None
if proxy_to_ban:
logger.info(f"Found proxy to ban: {proxy_to_ban}. Pushing to XCom for 'mark_proxy_banned' task.")
ti.xcom_push(key='proxy_to_ban', value=proxy_to_ban)
ti.xcom_push(key='server_identity_for_ban', value=account_id)
ti.xcom_push(key='service_host_for_ban', value=host)
ti.xcom_push(key='service_port_for_ban', value=port)
else:
logger.info("No specific proxy to ban based on the error context.")
# Push None explicitly so the downstream task knows not to run
ti.xcom_push(key='proxy_to_ban', value=None)
# Re-raise the original exception to fail the Airflow task
raise e
except Exception as e:
logger.error(f"YtdlpOpsOperator (Worker) failed with an unexpected exception: {e}", exc_info=True)
raise AirflowException(f"Task failed with unexpected error: {e}")
finally:
if transport and transport.isOpen():
transport.close()
def _get_info_json(self, token_data):
return getattr(token_data, 'infoJson', None)
def _is_valid_json(self, json_str):
if not json_str or not isinstance(json_str, str): return False
try:
json.loads(json_str)
return True
except json.JSONDecodeError:
return False
def _save_info_json(self, context, info_json, url, account_id, rendered_info_json_dir):
try:
video_id = _extract_video_id(url)
save_dir = rendered_info_json_dir or "."
os.makedirs(save_dir, exist_ok=True)
timestamp = int(time.time())
base_filename = f"info_{video_id or 'unknown'}_{account_id}_{timestamp}.json"
info_json_path = os.path.join(save_dir, base_filename)
with open(info_json_path, 'w', encoding='utf-8') as f:
f.write(info_json)
return info_json_path
except Exception as e:
logger.error(f"Failed to save info.json: {e}", exc_info=True)
return None
# =============================================================================
# DAG Definition
# =============================================================================
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
'start_date': days_ago(1),
}
with DAG(
dag_id='ytdlp_ops_worker_per_url',
default_args=default_args,
schedule_interval=None,
catchup=False,
description='Processes a single YouTube URL passed via configuration.',
tags=['ytdlp', 'thrift', 'client', 'worker'],
params={
'url': Param(None, type=["string", "null"], description="The YouTube URL to process. This is set by the triggering DAG."),
# Sensor params (passed through to re-trigger the sensor, with defaults for standalone runs)
'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Sensor param: Base name for Redis queues."),
'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="Sensor param: Airflow Redis connection ID."),
'max_urls_per_run': Param(DEFAULT_MAX_URLS, type="integer", description="Sensor param: Maximum number of URLs to process in one batch."),
# Worker-specific params
'service_ip': Param('89.253.221.173', type="string", description="Service IP."),
'service_port': Param(9090, type="integer", description="Service port."),
'account_id': Param('default_account', type="string", description="Account ID for the API call."),
'clients': Param('ios', type="string", description="Comma-separated list of clients to use for token generation (e.g., 'ios,android,mweb')."),
'timeout': Param(DEFAULT_TIMEOUT, type="integer", description="Timeout in seconds for the Thrift connection."),
'download_format': Param('ba[ext=m4a]/bestaudio/best', type="string", description="yt-dlp format selection string."),
'output_path_template': Param("%(title)s [%(id)s].%(ext)s", type="string", description="yt-dlp output filename template."),
'info_json_dir': Param("{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}", type="string", description="Directory to save info.json."),
'requeue_on_failure': Param(False, type="boolean", description="If True, re-adds the URL to the inbox on failure instead of moving to the fail hash."),
'stop_on_failure': Param(True, type="boolean", description="If True, a worker failure will stop the entire processing loop."),
'retry_on_probe_failure': Param(False, type="boolean", description="If True, attempts to re-download and probe a file if the initial probe fails."),
'requeue_on_bannable_error': Param(False, type="boolean", description="If True, re-queues the URL if a bannable error (proxy, bot detection) occurs."),
'requeue_on_ffprobe_failure': Param(False, type="boolean", description="If True, re-queues the URL if the ffmpeg/ffprobe check fails."),
}
) as dag:
get_token = YtdlpOpsOperator(
task_id='get_token',
service_ip="{{ params.service_ip }}",
service_port="{{ params.service_port }}",
account_id="{{ params.account_id }}",
timeout="{{ params.timeout }}",
info_json_dir="{{ params.info_json_dir }}",
)
download_and_probe = BashOperator(
task_id='download_and_probe',
bash_command="""
set -e
INFO_JSON_PATH="{{ ti.xcom_pull(task_ids='get_token', key='info_json_path') }}"
PROXY="{{ ti.xcom_pull(task_ids='get_token', key='socks_proxy') }}"
FORMAT="{{ params.download_format }}"
DOWNLOAD_DIR="{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles/video') }}"
FILENAME_TEMPLATE="{{ params.output_path_template }}"
FULL_OUTPUT_PATH="$DOWNLOAD_DIR/$FILENAME_TEMPLATE"
echo "--- Starting Download Step ---"
echo "Info JSON Path: $INFO_JSON_PATH"
echo "Proxy: $PROXY"
echo "Format: $FORMAT"
echo "Download Directory: $DOWNLOAD_DIR"
echo "Full Output Path: $FULL_OUTPUT_PATH"
if [ -z "$INFO_JSON_PATH" ] || [ ! -f "$INFO_JSON_PATH" ]; then
echo "Error: info.json path is missing or file does not exist ($INFO_JSON_PATH)."
exit 1
fi
CMD_ARRAY=(yt-dlp --load-info-json "$INFO_JSON_PATH")
if [ -n "$PROXY" ]; then
CMD_ARRAY+=(--proxy "$PROXY")
fi
CMD_ARRAY+=(-f "$FORMAT" -o "$FULL_OUTPUT_PATH" --print filename)
CMD_ARRAY+=(--continue --no-progress --no-simulate --no-write-info-json --ignore-errors --no-playlist)
echo "Executing: $(printf "%q " "${CMD_ARRAY[@]}")"
FINAL_FILENAME=$("${CMD_ARRAY[@]}")
EXIT_CODE=$?
echo "yt-dlp exited with code: $EXIT_CODE"
if [ $EXIT_CODE -ne 0 ]; then
echo "Error: yt-dlp command failed."
exit $EXIT_CODE
fi
if [ -z "$FINAL_FILENAME" ] || [ ! -f "$FINAL_FILENAME" ]; then
echo "Error: Download failed or did not produce a file."
exit 1
fi
echo "SUCCESS: Download complete. Final file at: $FINAL_FILENAME"
echo "--- Starting Probe Step ---"
echo "Probing downloaded file: $FINAL_FILENAME"
if ! ffmpeg -v error -i "$FINAL_FILENAME" -f null - ; then
echo "Error: ffmpeg probe check failed for '$FINAL_FILENAME'. The file might be corrupt."
if [ "{{ params.retry_on_probe_failure }}" == "True" ]; then
echo "Attempting one retry on probe failure..."
echo "Renaming to .part to attempt resuming download."
mv -f "$FINAL_FILENAME" "$FINAL_FILENAME.part"
# Re-run download command
echo "Re-executing: $(printf "%q " "${CMD_ARRAY[@]}")"
FINAL_FILENAME=$("${CMD_ARRAY[@]}")
EXIT_CODE=$?
echo "yt-dlp retry exited with code: $EXIT_CODE"
if [ $EXIT_CODE -ne 0 ]; then
echo "Error: yt-dlp retry command failed."
exit $EXIT_CODE
fi
if [ -z "$FINAL_FILENAME" ] || [ ! -f "$FINAL_FILENAME" ]; then
echo "Error: Retry download failed or did not produce a file."
exit 1
fi
echo "SUCCESS: Retry download complete. Final file at: $FINAL_FILENAME"
# Re-probe
echo "Probing redownloaded file: $FINAL_FILENAME"
if ! ffmpeg -v error -i "$FINAL_FILENAME" -f null - ; then
echo "Error: ffmpeg probe check failed again for '$FINAL_FILENAME'. Failing with exit code 2."
exit 2
fi
else
echo "Failing with exit code 2 due to probe failure (retries disabled)."
exit 2
fi
fi
echo "SUCCESS: Probe confirmed valid media file."
# Push the final filename for the success_task
echo "$FINAL_FILENAME"
""",
retries=0, # Retries are now handled inside the script based on a DAG param
retry_delay=timedelta(minutes=1),
)
mark_proxy_banned = PythonOperator(
task_id='mark_proxy_banned',
python_callable=mark_proxy_banned_callable,
trigger_rule='one_failed', # Run only if get_token fails
)
# This task triggers the sensor DAG to check for more work as soon as this worker is done.
trigger_sensor_for_next_batch = TriggerDagRunOperator(
task_id='trigger_sensor_for_next_batch',
trigger_dag_id='ytdlp_ops_sensor_queue',
# Pass only the sensor's needed parameters back to it.
# These values were originally passed from the sensor to this worker.
# The values are templated and will be passed as strings to the triggered DAG.
conf={
"queue_name": "{{ params.queue_name }}",
"redis_conn_id": "{{ params.redis_conn_id }}",
"max_urls_per_run": "{{ params.max_urls_per_run }}",
},
# This task will only run on the success path, so it inherits the default
# trigger_rule='all_success'.
wait_for_completion=False,
)
trigger_sensor_for_next_batch.doc_md = """
### Trigger Sensor for Next Batch
Triggers a new run of the `ytdlp_sensor_redis_queue` DAG to create a continuous processing loop.
This task **only runs on the success path** after a URL has been fully processed.
This ensures that the system immediately checks for more URLs to process, but stops the loop on failure.
"""
# Define success and failure handling tasks
success_task = PythonOperator(
task_id='mark_url_as_success',
python_callable=mark_url_as_success,
trigger_rule='all_success', # Run only if upstream tasks succeeded
)
failure_task = PythonOperator(
task_id='mark_url_as_failed',
python_callable=mark_url_as_failed,
trigger_rule='one_failed', # Run if any upstream task failed
)
# --- Define Task Dependencies ---
# The main success flow
get_token >> download_and_probe >> success_task >> trigger_sensor_for_next_batch
# The failure path for get_token, which includes the explicit ban task
get_token >> mark_proxy_banned
# The main failure handler, which listens to the primary tasks.
# If get_token or download_and_probe fails, it will trigger failure_task.
[get_token, download_and_probe] >> failure_task