yt-dlp-dags/airflow/dags/ytdlp_ops_v01_orchestrator.py

421 lines
24 KiB
Python

# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2024 rl <rl@rlmbp>
#
# Distributed under terms of the MIT license.
"""
DAG to orchestrate ytdlp_ops_dispatcher DAG runs based on a defined policy.
It fetches URLs from a Redis queue and launches dispatchers in controlled bunches,
which in turn trigger workers with affinity.
"""
from airflow import DAG
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.operators.python import PythonOperator
from airflow.models.param import Param
from airflow.models.variable import Variable
from airflow.utils.dates import days_ago
from airflow.api.common.trigger_dag import trigger_dag
from airflow.models.dagrun import DagRun
from airflow.models.dag import DagModel
from datetime import timedelta
import logging
import random
import time
import json
# Import utility functions
from utils.redis_utils import _get_redis_client
# Import Thrift modules for proxy status check
from pangramia.yt.tokens_ops import YTTokenOpService
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport
# Configure logging
logger = logging.getLogger(__name__)
DEFAULT_REQUEST_PARAMS_JSON = """{
"context_reuse_policy": {
"enabled": true,
"max_age_seconds": 86400,
"reuse_visitor_id": true,
"reuse_cookies": true
},
"token_generation_strategy": {
"youtubei_js": {
"generate_po_token": true,
"generate_gvs_token": true
}
},
"ytdlp_params": {
"use_curl_prefetch": false,
"token_supplement_strategy": {
"youtubepot_bgutilhttp_extractor": {
"enabled": true
}
},
"visitor_id_override": {
"enabled": true
}
},
"session_params": {
"lang": "en-US",
"location": "US",
"deviceCategory": "MOBILE",
"user_agents": {
"youtubei_js": "Mozilla/5.0 (iPad; CPU OS 16_7_10 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1,gzip(gfe)",
"yt_dlp": "Mozilla/5.0 (iPad; CPU OS 16_7_10 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1,gzip(gfe)"
}
}
}"""
# Default settings
DEFAULT_QUEUE_NAME = 'video_queue'
DEFAULT_REDIS_CONN_ID = 'redis_default'
DEFAULT_TOTAL_WORKERS = 8
DEFAULT_WORKERS_PER_BUNCH = 1
DEFAULT_WORKER_DELAY_S = 1
DEFAULT_BUNCH_DELAY_S = 1
DEFAULT_YT_AUTH_SERVICE_IP = Variable.get("YT_AUTH_SERVICE_IP", default_var="172.17.0.1")
DEFAULT_YT_AUTH_SERVICE_PORT = Variable.get("YT_AUTH_SERVICE_PORT", default_var=9080)
# --- Helper Functions ---
def _check_application_queue(redis_client, queue_base_name: str) -> int:
"""Checks and logs the length of the application's inbox queue."""
inbox_queue_name = f"{queue_base_name}_inbox"
logger.info(f"--- Checking Application Work Queue ---")
try:
q_len = redis_client.llen(inbox_queue_name)
logger.info(f"Application work queue '{inbox_queue_name}' has {q_len} item(s).")
return q_len
except Exception as e:
logger.error(f"Failed to check application queue '{inbox_queue_name}': {e}", exc_info=True)
return -1 # Indicate an error
def _inspect_celery_queues(redis_client, queue_names: list):
"""Inspects Celery queues in Redis and logs their status."""
logger.info("--- Inspecting Celery Queues in Redis ---")
for queue_name in queue_names:
try:
q_len = redis_client.llen(queue_name)
logger.info(f"Queue '{queue_name}': Length = {q_len}")
if q_len > 0:
logger.info(f"Showing up to 10 tasks in '{queue_name}':")
# Fetch up to 10 items from the start of the list (queue)
items_bytes = redis_client.lrange(queue_name, 0, 9)
for i, item_bytes in enumerate(items_bytes):
try:
# Celery tasks are JSON-encoded strings
task_data = json.loads(item_bytes.decode('utf-8'))
# Pretty print for readability in logs
pretty_task_data = json.dumps(task_data, indent=2)
logger.info(f" Task {i+1}:\n{pretty_task_data}")
except (json.JSONDecodeError, UnicodeDecodeError) as e:
logger.warning(f" Task {i+1}: Could not decode/parse task data. Error: {e}. Raw: {item_bytes!r}")
except Exception as e:
logger.error(f"Failed to inspect queue '{queue_name}': {e}", exc_info=True)
logger.info("--- End of Queue Inspection ---")
# --- Main Orchestration Callable ---
def orchestrate_workers_ignition_callable(**context):
"""
Main orchestration logic. Triggers a specified number of dispatcher DAGs
to initiate self-sustaining processing loops.
"""
params = context['params']
ti = context['task_instance']
logger.info(f"Orchestrator task '{ti.task_id}' running on queue '{ti.queue}'.")
logger.info("Starting dispatcher ignition sequence.")
dispatcher_dag_id = 'ytdlp_ops_v01_dispatcher'
worker_queue = 'queue-dl'
app_queue_name = params['queue_name']
logger.info(f"Running in v1 (monolithic) mode. Dispatcher DAG: '{dispatcher_dag_id}', Worker Queue: '{worker_queue}'")
dag_model = DagModel.get_dagmodel(dispatcher_dag_id)
if dag_model and dag_model.is_paused:
logger.warning(f"Dispatcher DAG '{dispatcher_dag_id}' is paused. Skipping dispatcher ignition.")
raise AirflowSkipException(f"Dispatcher DAG '{dispatcher_dag_id}' is paused.")
total_workers = int(params['total_workers'])
workers_per_bunch = int(params['workers_per_bunch'])
# --- Input Validation ---
if total_workers <= 0:
logger.warning(f"'total_workers' is {total_workers}. No workers will be started. Skipping ignition.")
raise AirflowSkipException(f"No workers to start (total_workers={total_workers}).")
if workers_per_bunch <= 0:
logger.error(f"'workers_per_bunch' must be a positive integer, but got {workers_per_bunch}. Aborting.")
raise AirflowException(f"'workers_per_bunch' must be a positive integer, but got {workers_per_bunch}.")
# --- End Input Validation ---
worker_delay = int(params['delay_between_workers_s'])
bunch_delay = int(params['delay_between_bunches_s'])
# Create a list of worker numbers to trigger
worker_indices = list(range(total_workers))
bunches = [worker_indices[i:i + workers_per_bunch] for i in range(0, len(worker_indices), workers_per_bunch)]
# --- Inspect Queues before starting ---
try:
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
redis_client = _get_redis_client(redis_conn_id)
# First, check the application queue for work
app_queue_len = _check_application_queue(redis_client, app_queue_name)
if params.get('skip_if_queue_empty') and app_queue_len == 0:
logger.info("'skip_if_queue_empty' is True and application queue is empty. Skipping worker ignition.")
raise AirflowSkipException("Application work queue is empty.")
# Then, inspect the target Celery queue for debugging
_inspect_celery_queues(redis_client, [worker_queue])
except AirflowSkipException:
raise # Re-raise to let Airflow handle the skip
except Exception as e:
logger.error(f"Could not inspect queues due to an error: {e}. Continuing with ignition sequence.")
# --- End of Inspection ---
logger.info(f"Plan: Triggering {total_workers} total dispatcher runs in {len(bunches)} bunches. Each run will attempt to process one URL.")
dag_run_id = context['dag_run'].run_id
total_triggered = 0
for i, bunch in enumerate(bunches):
logger.info(f"--- Triggering Bunch {i+1}/{len(bunches)} (contains {len(bunch)} dispatcher(s)) ---")
for j, _ in enumerate(bunch):
# Create a unique run_id for each dispatcher run
run_id = f"dispatched_{dag_run_id}_{total_triggered}"
# Pass all orchestrator params to the dispatcher, which will then pass them to the worker.
conf_to_pass = {p: params[p] for p in params}
logger.info(f"Triggering dispatcher {j+1}/{len(bunch)} in bunch {i+1} (run {total_triggered + 1}/{total_workers}) (Run ID: {run_id})")
logger.debug(f"Full conf for dispatcher run {run_id}: {conf_to_pass}")
trigger_dag(
dag_id=dispatcher_dag_id,
run_id=run_id,
conf=conf_to_pass,
replace_microseconds=False
)
total_triggered += 1
# Delay between dispatches in a bunch
if j < len(bunch) - 1:
logger.info(f"Waiting {worker_delay}s before next dispatcher in bunch...")
time.sleep(worker_delay)
# Delay between bunches
if i < len(bunches) - 1:
logger.info(f"--- Bunch {i+1} triggered. Waiting {bunch_delay}s before next bunch... ---")
time.sleep(bunch_delay)
logger.info(f"--- Ignition sequence complete. Total dispatcher runs triggered: {total_triggered}. ---")
# --- Final Queue Inspection ---
final_check_delay = 30 # seconds
logger.info(f"Waiting {final_check_delay}s for a final queue status check to see if workers picked up tasks...")
time.sleep(final_check_delay)
try:
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
redis_client = _get_redis_client(redis_conn_id)
# Log connection details for debugging broker mismatch issues
conn_kwargs = redis_client.connection_pool.connection_kwargs
logger.info(f"Final check using Redis connection '{redis_conn_id}': "
f"host={conn_kwargs.get('host')}, "
f"port={conn_kwargs.get('port')}, "
f"db={conn_kwargs.get('db')}")
_inspect_celery_queues(redis_client, [worker_queue])
logger.info("Final queue inspection complete. If queues are not empty, workers have not picked up tasks yet. "
"If queues are empty, workers have started processing.")
except Exception as e:
logger.error(f"Could not perform final queue inspection: {e}. This does not affect worker ignition.")
# =============================================================================
# DAG Definition
# =============================================================================
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'start_date': days_ago(1),
}
with DAG(
dag_id='ytdlp_ops_v01_orchestrator',
default_args=default_args,
schedule=None, # This DAG runs only when triggered.
max_active_runs=1, # Only one ignition process should run at a time.
catchup=False,
description='Ignition system for ytdlp_ops_v01_dispatcher DAGs. Starts self-sustaining worker loops via dispatchers.',
doc_md="""
### YT-DLP v1 (Monolithic) Worker Ignition System
This DAG acts as an "ignition system" to start one or more self-sustaining worker loops for the **v1 monolithic worker**.
It does **not** process URLs itself. Its only job is to trigger a specified number of `ytdlp_ops_v01_dispatcher` DAGs,
which in turn pull URLs and trigger `ytdlp_ops_v01_worker_per_url` with worker affinity.
#### How it Works:
1. **Manual Trigger:** You manually trigger this DAG with parameters defining how many dispatcher loops to start (`total_workers`), in what configuration (`workers_per_bunch`, delays).
2. **Ignition:** The orchestrator triggers the initial set of dispatcher DAGs in a "fire-and-forget" manner, passing all its configuration parameters to them.
3. **Completion:** Once all initial dispatchers have been triggered, the orchestrator's job is complete.
The dispatchers then take over, each pulling a URL, determining affinity, and triggering a worker DAG.
#### Client Selection (`clients` parameter):
The `clients` parameter determines which YouTube client persona is used for token generation. Different clients have different capabilities and requirements.
**Supported Clients:**
| Client | Visitor ID | Player poToken | GVS poToken | Cookies Support | Notes |
| ---------------- | ------------ | -------------- | ------------ | --------------- | ------------------------------------------------------------------ |
| `tv` | Required | Not Required | Not Required | Supported | All formats may have DRM if you request too much. |
| `web_safari` | Required | Required | Required* | Supported | *Provides HLS (m3u8) formats which may not require a GVS token. |
| `mweb` | Required | Required | Required | Supported | |
| `web_camoufox` | Required | Required | Required | Supported | Camoufox variant of `web`. |
**Untested / Not Recommended Clients:**
| Client | Visitor ID | Player poToken | GVS poToken | Cookies Support | Notes |
| ---------------- | ------------ | -------------- | ------------ | --------------- | ------------------------------------------------------------------ |
| `web` | Required | Required | Required | Supported | Only SABR formats available. |
| `tv_simply` | Required | Not Required | Not Required | Not Supported | |
| `tv_embedded` | Required | Not Required | Not Required | Supported | Requires account cookies for most videos. |
| `web_embedded` | Required | Not Required | Not Required | Supported | Only for embeddable videos. |
| `web_music` | Required | Required | Required | Supported | |
| `web_creator` | Required | Required | Required | Supported | Requires account cookies. |
| `android` | Required | Required | Required | Not Supported | |
| `android_vr` | Required | Not Required | Not Required | Not Supported | YouTube Kids videos are not available. |
| `ios` | Required | Required | Required | Not Supported | |
Other `_camoufox` variants are also available but untested.
""",
tags=['ytdlp', 'mgmt', 'master'],
params={
# --- Ignition Control Parameters ---
'total_workers': Param(DEFAULT_TOTAL_WORKERS, type="integer", description="Total number of dispatcher loops to start."),
'workers_per_bunch': Param(DEFAULT_WORKERS_PER_BUNCH, type="integer", description="Number of dispatchers to start in each bunch."),
'delay_between_workers_s': Param(DEFAULT_WORKER_DELAY_S, type="integer", description="Delay in seconds between starting each dispatcher within a bunch."),
'delay_between_bunches_s': Param(DEFAULT_BUNCH_DELAY_S, type="integer", description="Delay in seconds between starting each bunch."),
'skip_if_queue_empty': Param(False, type="boolean", title="[Ignition Control] Skip if Queue Empty", description="If True, the orchestrator will not start any dispatchers if the application's work queue is empty."),
# --- Worker Passthrough Parameters ---
'on_auth_failure': Param(
'proceed_loop_under_manual_inspection',
type="string",
enum=['stop_loop', 'retry_with_new_account', 'retry_without_ban', 'proceed_loop_under_manual_inspection'],
title="[Worker Param] On Authentication Failure Policy",
description="Policy for a worker when a bannable authentication error occurs. "
"'stop_loop': Ban the account, mark URL as failed, and stop the worker's loop. "
"'retry_with_new_account': (Default) Ban the failed account, retry ONCE with a new account. If retry fails, ban the second account and stop."
"'retry_without_ban': If a connection error (e.g. SOCKS timeout) occurs, retry with a new account but do NOT ban the first account/proxy. If retry fails, stop the loop without banning."
"'proceed_loop_under_manual_inspection': **BEWARE: MANUAL SUPERVISION REQUIRED.** Marks the URL as failed but continues the processing loop. Use this only when you can manually intervene."
),
'on_download_failure': Param(
'proceed_loop',
type="string",
enum=['stop_loop', 'proceed_loop', 'retry_with_new_token'],
title="[Worker Param] On Download Failure Policy",
description="Policy for a worker when a download or probe error occurs. "
"'stop_loop': Mark URL as failed and stop the worker's loop. "
"'proceed_loop': (Default) Mark URL as failed but continue the processing loop with a new URL. "
"'retry_with_new_token': Attempt to get a new token with a new account and retry the download once. If it fails again, proceed loop."
),
'request_params_json': Param('{}', type="string", title="[Worker Param] Request Params JSON", description="JSON string with request parameters for the token service."),
'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="[Worker Param] Base name for Redis queues."),
'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="[Worker Param] Airflow Redis connection ID."),
'clients': Param(
'tv_simply',
type="string",
enum=[
'tv_simply',
'mweb',
'tv',
'custom',
],
title="[Worker Param] Clients",
description="[Worker Param] Comma-separated list of clients for token generation. Full list: web, web_safari, web_embedded, web_music, web_creator, mweb, web_camoufox, web_safari_camoufox, web_embedded_camoufox, web_music_camoufox, web_creator_camoufox, mweb_camoufox, android, android_music, android_creator, android_vr, ios, ios_music, ios_creator, tv, tv_simply, tv_embedded. See DAG documentation for details."
),
'account_pool': Param('ytdlp_account', type="string", description="[Worker Param] Account pool prefix or comma-separated list."),
'account_pool_size': Param(10, type=["integer", "null"], description="[Worker Param] If using a prefix for 'account_pool', this specifies the number of accounts to generate (e.g., 10 for 'prefix_01' through 'prefix_10'). Required when using a prefix."),
'prepend_client_to_account': Param(True, type="boolean", title="[Worker Param] Prepend Client to Account", description="If True, prepends client and timestamp to account names in prefix mode. Format: prefix_YYYYMMDDHHMMSS_client_XX."),
'service_ip': Param(DEFAULT_YT_AUTH_SERVICE_IP, type="string", description="[Worker Param] IP of the ytdlp-ops-server. Default is from Airflow variable YT_AUTH_SERVICE_IP or hardcoded."),
'service_port': Param(DEFAULT_YT_AUTH_SERVICE_PORT, type="integer", description="[Worker Param] Port of the Envoy load balancer. Default is from Airflow variable YT_AUTH_SERVICE_PORT or hardcoded."),
'machine_id': Param("ytdlp-ops-airflow-service", type="string", description="[Worker Param] Identifier for the client machine."),
'assigned_proxy_url': Param(None, type=["string", "null"], title="[Worker Param] Assigned Proxy URL", description="A specific proxy URL to use for the request, overriding the server's proxy pool logic."),
'auto_create_new_accounts_on_exhaustion': Param(True, type="boolean", description="[Worker Param] If True and all accounts in a prefix-based pool are exhausted, create a new one automatically."),
# --- Download Control Parameters ---
'delay_between_formats_s': Param(15, type="integer", title="[Worker Param] Delay Between Formats (s)", description="Delay in seconds between downloading each format when multiple formats are specified. A 22s wait may be effective for batch downloads, while 6-12s may suffice if cookies are refreshed regularly."),
'yt_dlp_test_mode': Param(False, type="boolean", title="[Worker Param] yt-dlp Test Mode", description="If True, runs yt-dlp with --test flag (dry run without downloading)."),
'skip_probe': Param(True, type="boolean", title="[Worker Param] Skip Probe", description="If True, skips the ffmpeg probe of downloaded files."),
'yt_dlp_cleanup_mode': Param(True, type="boolean", title="[Worker Param] yt-dlp Cleanup Mode", description="If True, creates a .empty file and deletes the original media file after successful download and probe."),
'fragment_retries': Param(2, type="integer", title="[Worker Param] Fragment Retries", description="Number of retries for a fragment before giving up."),
'limit_rate': Param('5M', type=["string", "null"], title="[Worker Param] Limit Rate", description="Download speed limit (e.g., 50K, 4.2M)."),
'socket_timeout': Param(15, type="integer", title="[Worker Param] Socket Timeout", description="Timeout in seconds for socket operations."),
'min_sleep_interval': Param(5, type="integer", title="[Worker Param] Min Sleep Interval", description="Minimum time to sleep between downloads (seconds)."),
'max_sleep_interval': Param(10, type="integer", title="[Worker Param] Max Sleep Interval", description="Maximum time to sleep between downloads (seconds)."),
'download_format_preset': Param(
'formats_2',
type="string",
enum=['best_audio', 'formats_0', 'formats_2', 'formats_3', 'custom'],
title="[Worker Param] Download Format Preset",
description="Select a predefined format string or choose 'custom' to use the value from 'Custom Download Format'.\nformats_0: 18,140\nformats_2: 18,140-dashy,299-dashy/298-dashy/137-dashy/136-dashy/135-dashy/134-dashy/133-dashy\nformats_3: 18,599,139,140,141,160/269,133/229,134/230,135/231,136/232,137/270,298/311,299/318"
),
'download_format_custom': Param(
'18,140-dashy,299-dashy/298-dashy/137-dashy/136-dashy/135-dashy/134-dashy/133-dashy',
type="string",
title="[Worker Param] Custom Download Format",
description="Custom yt-dlp format string. Used when preset is 'custom'. E.g., 'ba[ext=m4a]/bestaudio/best'."
),
'downloader': Param(
'py',
type="string",
enum=['py', 'aria-rpc', 'cli'],
title="[Worker Param] Download Tool",
description="Choose the download tool to use: 'py' (native python, recommended), 'aria-rpc' (send to aria2c daemon), 'cli' (legacy yt-dlp wrapper)."
),
'aria_host': Param('172.17.0.1', type="string", title="[Worker Param] Aria2c Host", description="For 'aria-rpc' downloader: Host of the aria2c RPC server. Can be set via Airflow Variable 'YTDLP_ARIA_HOST'."),
'aria_port': Param(6800, type="integer", title="[Worker Param] Aria2c Port", description="For 'aria-rpc' downloader: Port of the aria2c RPC server. Can be set via Airflow Variable 'YTDLP_ARIA_PORT'."),
'aria_secret': Param('SQGCQPLVFQIASMPNPOJYLVGJYLMIDIXDXAIXOTX', type="string", title="[Worker Param] Aria2c Secret", description="For 'aria-rpc' downloader: Secret token. Can be set via Airflow Variable 'YTDLP_ARIA_SECRET'."),
'yt_dlp_extra_args': Param(
'--restrict-filenames',
type=["string", "null"],
title="[Worker Param] Extra yt-dlp arguments",
description="Extra command-line arguments for yt-dlp during download."
),
}
) as dag:
orchestrate_task = PythonOperator(
task_id='start_worker_loops',
python_callable=orchestrate_workers_ignition_callable,
)
orchestrate_task.doc_md = """
### Start Worker Loops
This is the main task that executes the ignition policy.
- It triggers `ytdlp_ops_dispatcher` DAGs according to the batch settings.
- It passes all its parameters down to the dispatchers, which will use them to trigger workers.
"""