yt-dlp-dags/dags/ytdlp_worker_per_url.py
2025-07-18 17:17:19 +03:00

487 lines
20 KiB
Python

# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2024 rl <rl@rlmbp>
#
# Distributed under terms of the MIT license.
"""
DAG for processing a single YouTube URL passed via DAG run configuration.
This is the "Worker" part of a Sensor/Worker pattern.
"""
from airflow import DAG
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models import BaseOperator, Variable
from airflow.models.param import Param
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.redis.hooks.redis import RedisHook
from airflow.utils.dates import days_ago
from airflow.utils.decorators import apply_defaults
from datetime import datetime, timedelta
from pangramia.yt.common.ttypes import TokenUpdateMode
from pangramia.yt.exceptions.ttypes import PBServiceException
from pangramia.yt.tokens_ops import YTTokenOpService
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport
from thrift.transport.TTransport import TTransportException
import json
import logging
import os
import redis
import socket
import time
import traceback
# Import utility functions
from utils.redis_utils import _get_redis_client
# Configure logging
logger = logging.getLogger(__name__)
# Default settings
DEFAULT_QUEUE_NAME = 'video_queue'
DEFAULT_REDIS_CONN_ID = 'redis_default'
DEFAULT_MAX_URLS = 1
DEFAULT_TIMEOUT = 30 # Default Thrift timeout in seconds
# --- Helper Functions ---
def _extract_video_id(url):
"""Extracts YouTube video ID from URL."""
if not url or not isinstance(url, str):
logger.debug("URL is empty or not a string, cannot extract video ID.")
return None
try:
video_id = None
if 'youtube.com/watch?v=' in url:
video_id = url.split('v=')[1].split('&')[0]
elif 'youtu.be/' in url:
video_id = url.split('youtu.be/')[1].split('?')[0]
if video_id and len(video_id) >= 11:
video_id = video_id[:11] # Standard ID length
logger.debug(f"Extracted video ID '{video_id}' from URL: {url}")
return video_id
else:
logger.debug(f"Could not extract a standard video ID pattern from URL: {url}")
return None
except Exception as e:
logger.error(f"Failed to extract video ID from URL '{url}'. Error: {e}")
return None
# --- Queue Management Callables (for success/failure reporting) ---
def handle_success(**context):
"""Moves URL from progress to result hash on success."""
ti = context['task_instance']
params = context['params']
url = params.get('url') # Get URL from params, not XCom
if not url:
logger.warning("handle_success called but no URL found in DAG run parameters.")
return
queue_name = params['queue_name']
result_queue = f"{queue_name}_result"
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
# Pull results from previous tasks
info_json_path = ti.xcom_pull(task_ids='get_token', key='info_json_path')
socks_proxy = ti.xcom_pull(task_ids='get_token', key='socks_proxy')
ytdlp_command = ti.xcom_pull(task_ids='get_token', key='ytdlp_command')
downloaded_file_path = ti.xcom_pull(task_ids='download_video')
logger.info(f"Handling success for URL: {url}")
logger.info(f" Downloaded File Path: {downloaded_file_path}")
result_data = {
'status': 'success',
'end_time': time.time(),
'info_json_path': info_json_path,
'socks_proxy': socks_proxy,
'ytdlp_command': ytdlp_command,
'downloaded_file_path': downloaded_file_path,
'url': url,
'dag_run_id': context['dag_run'].run_id,
}
try:
# In the worker pattern, there's no "progress" hash to remove from.
# We just add the result to the success hash.
client = _get_redis_client(redis_conn_id)
client.hset(result_queue, url, json.dumps(result_data))
logger.info(f"Stored success result for URL '{url}' in result hash '{result_queue}'.")
except Exception as e:
logger.error(f"Error handling success in Redis for URL '{url}': {e}", exc_info=True)
# Log error but don't fail the task, as the main work succeeded.
def handle_failure(**context):
"""
Handles failed processing. Records detailed error information to the fail hash
and, if stop_on_failure is True, fails the task to make the DAG run failure visible.
"""
ti = context['task_instance']
params = context['params']
url = params.get('url') # Get URL from params
if not url:
logger.error("handle_failure called but no URL found in DAG run parameters.")
return
queue_name = params['queue_name']
fail_queue = f"{queue_name}_fail"
inbox_queue = f"{queue_name}_inbox"
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
requeue_on_failure = params.get('requeue_on_failure', False)
stop_on_failure = params.get('stop_on_failure', True)
# --- Extract Detailed Error Information ---
exception = context.get('exception')
error_message = str(exception) if exception else "Unknown error"
error_type = type(exception).__name__ if exception else "Unknown"
tb_str = "".join(traceback.format_exception(etype=type(exception), value=exception, tb=exception.__traceback__)) if exception else "No traceback available."
# Find the specific task that failed
dag_run = context['dag_run']
failed_task_id = "unknown"
# Look at direct upstream tasks of the current task ('handle_failure')
upstream_tasks = ti.get_direct_relatives(upstream=True)
for task in upstream_tasks:
upstream_ti = dag_run.get_task_instance(task_id=task.task_id)
if upstream_ti and upstream_ti.state == 'failed':
failed_task_id = task.task_id
break
logger.info(f"Handling failure for URL: {url}")
logger.error(f" Failed Task: {failed_task_id}")
logger.error(f" Failure Type: {error_type}")
logger.error(f" Failure Reason: {error_message}")
logger.debug(f" Traceback:\n{tb_str}")
try:
client = _get_redis_client(redis_conn_id)
if requeue_on_failure:
client.rpush(inbox_queue, url)
logger.info(f"Re-queued failed URL '{url}' to inbox '{inbox_queue}' for retry.")
else:
fail_data = {
'status': 'failed',
'end_time': time.time(),
'failed_task': failed_task_id,
'error_type': error_type,
'error_message': error_message,
'traceback': tb_str,
'url': url,
'dag_run_id': context['dag_run'].run_id,
}
client.hset(fail_queue, url, json.dumps(fail_data, indent=2))
logger.info(f"Stored detailed failure info for URL '{url}' in fail hash '{fail_queue}'.")
except Exception as e:
logger.error(f"Critical error during failure handling in Redis for URL '{url}': {e}", exc_info=True)
# This is a critical error in the failure handling logic itself.
raise AirflowException(f"Could not handle failure in Redis: {e}")
# If stop_on_failure is True, we should fail this task to make the DAG run fail.
# The loop is already stopped by the DAG structure, but this makes the failure visible.
if stop_on_failure:
logger.error("stop_on_failure is True. Failing this task to mark the DAG run as failed.")
# Re-raise the original exception to fail the task instance.
if exception:
raise exception
else:
# If for some reason there's no exception, fail explicitly.
raise AirflowException("Failing task as per stop_on_failure=True, but original exception was not found.")
# --- YtdlpOpsOperator ---
class YtdlpOpsOperator(BaseOperator):
"""
Custom Airflow operator to interact with YTDLP Thrift service.
Processes a single URL passed via DAG run configuration.
"""
template_fields = ('service_ip', 'service_port', 'account_id', 'timeout', 'info_json_dir')
@apply_defaults
def __init__(self,
service_ip=None,
service_port=None,
account_id=None,
info_json_dir=None,
timeout=DEFAULT_TIMEOUT,
*args, **kwargs):
super().__init__(*args, **kwargs)
logger.info(f"Initializing YtdlpOpsOperator (Worker Version) with parameters: "
f"service_ip={service_ip}, service_port={service_port}, "
f"account_id={account_id}, info_json_dir={info_json_dir}, timeout={timeout}")
if not service_ip or not service_port:
raise ValueError("Both service_ip and service_port must be specified.")
if not account_id:
logger.warning("No account_id provided. Ensure it's set in DAG params or operator config.")
self.service_ip = service_ip
self.service_port = service_port
self.account_id = account_id
self.info_json_dir = info_json_dir
self.timeout = timeout
def execute(self, context):
logger.info("Executing YtdlpOpsOperator (Worker Version)")
transport = None
ti = context['task_instance']
try:
params = context['params']
url = params.get('url')
if not url:
raise AirflowException("DAG was triggered without a 'url' in its configuration.")
logger.info(f"Processing URL from DAG run config: {url}")
service_ip = self.render_template(self.service_ip, context)
service_port_rendered = self.render_template(self.service_port, context)
account_id = self.render_template(self.account_id, context)
timeout_rendered = self.render_template(self.timeout, context)
info_json_dir = self.render_template(self.info_json_dir, context)
host = params.get('service_ip', service_ip)
port_str = params.get('service_port', service_port_rendered)
account_id = params.get('account_id', account_id)
logger.info(f"Using direct connection settings: service_ip={host}, service_port={port_str}")
if not host or not port_str:
raise ValueError("Direct connection requires service_ip and service_port")
try:
port = int(port_str)
except (ValueError, TypeError):
raise ValueError(f"Invalid service_port value: {port_str}")
try:
timeout = int(timeout_rendered)
if timeout <= 0: raise ValueError("Timeout must be positive")
except (ValueError, TypeError):
timeout = DEFAULT_TIMEOUT
socket_conn = TSocket.TSocket(host, port, socket_family=socket.AF_INET)
socket_conn.setTimeout(timeout * 1000)
transport = TTransport.TFramedTransport(socket_conn)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = YTTokenOpService.Client(protocol)
transport.open()
logger.info("Successfully connected to Thrift server.")
client.ping()
logger.info("Server ping successful.")
token_data = client.getOrRefreshToken(
accountId=account_id,
updateType=TokenUpdateMode.AUTO,
url=url
)
logger.info("Successfully retrieved token data from service.")
info_json_path = None
info_json = self._get_info_json(token_data)
if info_json and self._is_valid_json(info_json):
info_json_path = self._save_info_json(context, info_json, url, account_id, info_json_dir)
if info_json_path:
ti.xcom_push(key='info_json_path', value=info_json_path)
else:
ti.xcom_push(key='info_json_path', value=None)
else:
ti.xcom_push(key='info_json_path', value=None)
socks_proxy = None
proxy_attr = next((attr for attr in ['socks5Proxy', 'socksProxy', 'socks'] if hasattr(token_data, attr)), None)
if proxy_attr:
socks_proxy = getattr(token_data, proxy_attr)
ti.xcom_push(key='socks_proxy', value=socks_proxy)
ytdlp_cmd = getattr(token_data, 'ytdlpCommand', None)
ti.xcom_push(key='ytdlp_command', value=ytdlp_cmd)
except Exception as e:
logger.error(f"YtdlpOpsOperator (Worker) failed: {e}", exc_info=True)
raise AirflowException(f"Task failed: {e}")
finally:
if transport and transport.isOpen():
transport.close()
def _get_info_json(self, token_data):
return getattr(token_data, 'infoJson', None)
def _is_valid_json(self, json_str):
if not json_str or not isinstance(json_str, str): return False
try:
json.loads(json_str)
return True
except json.JSONDecodeError:
return False
def _save_info_json(self, context, info_json, url, account_id, rendered_info_json_dir):
try:
video_id = _extract_video_id(url)
save_dir = rendered_info_json_dir or "."
os.makedirs(save_dir, exist_ok=True)
timestamp = int(time.time())
base_filename = f"info_{video_id or 'unknown'}_{account_id}_{timestamp}.json"
info_json_path = os.path.join(save_dir, base_filename)
with open(info_json_path, 'w', encoding='utf-8') as f:
f.write(info_json)
return info_json_path
except Exception as e:
logger.error(f"Failed to save info.json: {e}", exc_info=True)
return None
# =============================================================================
# DAG Definition
# =============================================================================
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'start_date': days_ago(1),
}
with DAG(
dag_id='ytdlp_worker_per_url',
default_args=default_args,
schedule_interval=None,
catchup=False,
description='Processes a single YouTube URL passed via configuration.',
tags=['ytdlp', 'thrift', 'client', 'worker'],
params={
'url': Param(None, type=["string", "null"], description="The YouTube URL to process. This is set by the triggering DAG."),
# Sensor params (passed through to re-trigger the sensor, with defaults for standalone runs)
'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="Sensor param: Base name for Redis queues."),
'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="Sensor param: Airflow Redis connection ID."),
'max_urls_per_run': Param(DEFAULT_MAX_URLS, type="integer", description="Sensor param: Maximum number of URLs to process in one batch."),
# Worker-specific params
'service_ip': Param('89.253.221.173', type="string", description="Service IP."),
'service_port': Param(9090, type="integer", description="Service port."),
'account_id': Param('default_account', type="string", description="Account ID for the API call."),
'timeout': Param(DEFAULT_TIMEOUT, type="integer", description="Timeout in seconds for the Thrift connection."),
'download_format': Param('ba[ext=m4a]/bestaudio/best', type="string", description="yt-dlp format selection string."),
'output_path_template': Param("%(title)s [%(id)s].%(ext)s", type="string", description="yt-dlp output filename template."),
'info_json_dir': Param("{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}", type="string", description="Directory to save info.json."),
'requeue_on_failure': Param(False, type="boolean", description="If True, re-adds the URL to the inbox on failure instead of moving to the fail hash."),
'stop_on_failure': Param(True, type="boolean", description="If True, a worker failure will stop the entire processing loop."),
}
) as dag:
get_token = YtdlpOpsOperator(
task_id='get_token',
service_ip="{{ params.service_ip }}",
service_port="{{ params.service_port }}",
account_id="{{ params.account_id }}",
timeout="{{ params.timeout }}",
info_json_dir="{{ params.info_json_dir }}",
retries=0,
)
download_video = BashOperator(
task_id='download_video',
bash_command="""
INFO_JSON_PATH="{{ ti.xcom_pull(task_ids='get_token', key='info_json_path') }}"
PROXY="{{ ti.xcom_pull(task_ids='get_token', key='socks_proxy') }}"
FORMAT="{{ params.download_format }}"
DOWNLOAD_DIR="{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles/video') }}"
FILENAME_TEMPLATE="{{ params.output_path_template }}"
FULL_OUTPUT_PATH="$DOWNLOAD_DIR/$FILENAME_TEMPLATE"
echo "Starting download..."
echo "Info JSON Path: $INFO_JSON_PATH"
echo "Proxy: $PROXY"
echo "Format: $FORMAT"
echo "Download Directory: $DOWNLOAD_DIR"
echo "Full Output Path: $FULL_OUTPUT_PATH"
if [ -z "$INFO_JSON_PATH" ] || [ ! -f "$INFO_JSON_PATH" ]; then
echo "Error: info.json path is missing or file does not exist ($INFO_JSON_PATH)."
exit 1
fi
CMD_ARRAY=(yt-dlp --load-info-json "$INFO_JSON_PATH")
if [ -n "$PROXY" ]; then
CMD_ARRAY+=(--proxy "$PROXY")
fi
CMD_ARRAY+=(-f "$FORMAT" -o "$FULL_OUTPUT_PATH" --print filename)
CMD_ARRAY+=(--no-progress --no-simulate --no-write-info-json --ignore-errors --no-playlist)
printf "Executing: %q " "${CMD_ARRAY[@]}"
echo ""
FINAL_FILENAME=$("${CMD_ARRAY[@]}")
EXIT_CODE=$?
echo "yt-dlp exited with code: $EXIT_CODE"
if [ $EXIT_CODE -ne 0 ]; then
echo "Error: yt-dlp command failed."
exit $EXIT_CODE
fi
if [ -z "$FINAL_FILENAME" ] || [ ! -f "$FINAL_FILENAME" ]; then
echo "Error: Download failed or did not produce a file."
exit 1
fi
echo "SUCCESS: Final file confirmed at: $FINAL_FILENAME"
echo "$FINAL_FILENAME"
""",
retries=3,
retry_delay=timedelta(minutes=2),
)
# This task triggers the sensor DAG to check for more work as soon as this worker is done.
trigger_sensor_for_next_batch = TriggerDagRunOperator(
task_id='trigger_sensor_for_next_batch',
trigger_dag_id='ytdlp_sensor_redis_queue',
# Pass only the sensor's needed parameters back to it.
# These values were originally passed from the sensor to this worker.
# The values are templated and will be passed as strings to the triggered DAG.
conf={
"queue_name": "{{ params.queue_name }}",
"redis_conn_id": "{{ params.redis_conn_id }}",
"max_urls_per_run": "{{ params.max_urls_per_run }}",
},
# This task will only run on the success path, so it inherits the default
# trigger_rule='all_success'.
wait_for_completion=False,
)
trigger_sensor_for_next_batch.doc_md = """
### Trigger Sensor for Next Batch
Triggers a new run of the `ytdlp_sensor_redis_queue` DAG to create a continuous processing loop.
This task **only runs on the success path** after a URL has been fully processed.
This ensures that the system immediately checks for more URLs to process, but stops the loop on failure.
"""
# Define success and failure handling tasks
success_task = PythonOperator(
task_id='handle_success',
python_callable=handle_success,
trigger_rule='all_success', # Run only if upstream tasks succeeded
)
failure_task = PythonOperator(
task_id='handle_failure',
python_callable=handle_failure,
trigger_rule='one_failed', # Run if any upstream task failed
)
# --- Define Task Dependencies ---
# The main processing flow
get_token >> download_video
# The success path: if download_video succeeds, run success_task, then trigger the next sensor run.
download_video >> success_task >> trigger_sensor_for_next_batch
# The failure path: if get_token OR download_video fails, run the failure_task.
# This is a "fan-in" dependency.
[get_token, download_video] >> failure_task