yt-dlp-dags/airflow/dags/ytdlp_ops_orchestrator.py
2025-08-26 18:00:55 +03:00

288 lines
14 KiB
Python

# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2024 rl <rl@rlmbp>
#
# Distributed under terms of the MIT license.
"""
DAG to orchestrate ytdlp_ops_dispatcher DAG runs based on a defined policy.
It fetches URLs from a Redis queue and launches dispatchers in controlled bunches,
which in turn trigger workers with affinity.
"""
from airflow import DAG
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.operators.python import PythonOperator
from airflow.models.param import Param
from airflow.models.variable import Variable
from airflow.utils.dates import days_ago
from airflow.api.common.trigger_dag import trigger_dag
from airflow.models.dagrun import DagRun
from airflow.models.dag import DagModel
from datetime import timedelta
import logging
import random
import time
import json
# Import utility functions
from utils.redis_utils import _get_redis_client
# Import Thrift modules for proxy status check
from pangramia.yt.tokens_ops import YTTokenOpService
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport
# Configure logging
logger = logging.getLogger(__name__)
# Default settings
DEFAULT_QUEUE_NAME = 'video_queue'
DEFAULT_REDIS_CONN_ID = 'redis_default'
DEFAULT_TOTAL_WORKERS = 3
DEFAULT_WORKERS_PER_BUNCH = 1
DEFAULT_WORKER_DELAY_S = 5
DEFAULT_BUNCH_DELAY_S = 20
DEFAULT_YT_AUTH_SERVICE_IP = Variable.get("YT_AUTH_SERVICE_IP", default_var="172.17.0.1")
DEFAULT_YT_AUTH_SERVICE_PORT = Variable.get("YT_AUTH_SERVICE_PORT", default_var=9080)
# --- Helper Functions ---
def _check_application_queue(redis_client, queue_base_name: str) -> int:
"""Checks and logs the length of the application's inbox queue."""
inbox_queue_name = f"{queue_base_name}_inbox"
logger.info(f"--- Checking Application Work Queue ---")
try:
q_len = redis_client.llen(inbox_queue_name)
logger.info(f"Application work queue '{inbox_queue_name}' has {q_len} item(s).")
return q_len
except Exception as e:
logger.error(f"Failed to check application queue '{inbox_queue_name}': {e}", exc_info=True)
return -1 # Indicate an error
def _inspect_celery_queues(redis_client, queue_names: list):
"""Inspects Celery queues in Redis and logs their status."""
logger.info("--- Inspecting Celery Queues in Redis ---")
for queue_name in queue_names:
try:
q_len = redis_client.llen(queue_name)
logger.info(f"Queue '{queue_name}': Length = {q_len}")
if q_len > 0:
logger.info(f"Showing up to 10 tasks in '{queue_name}':")
# Fetch up to 10 items from the start of the list (queue)
items_bytes = redis_client.lrange(queue_name, 0, 9)
for i, item_bytes in enumerate(items_bytes):
try:
# Celery tasks are JSON-encoded strings
task_data = json.loads(item_bytes.decode('utf-8'))
# Pretty print for readability in logs
pretty_task_data = json.dumps(task_data, indent=2)
logger.info(f" Task {i+1}:\n{pretty_task_data}")
except (json.JSONDecodeError, UnicodeDecodeError) as e:
logger.warning(f" Task {i+1}: Could not decode/parse task data. Error: {e}. Raw: {item_bytes!r}")
except Exception as e:
logger.error(f"Failed to inspect queue '{queue_name}': {e}", exc_info=True)
logger.info("--- End of Queue Inspection ---")
# --- Main Orchestration Callable ---
def orchestrate_workers_ignition_callable(**context):
"""
Main orchestration logic. Triggers a specified number of dispatcher DAGs
to initiate self-sustaining processing loops.
"""
params = context['params']
logger.info("Starting dispatcher ignition sequence.")
dispatcher_dag_id = 'ytdlp_ops_dispatcher'
dag_model = DagModel.get_dagmodel(dispatcher_dag_id)
if dag_model and dag_model.is_paused:
raise AirflowException(f"Dispatcher DAG '{dispatcher_dag_id}' is paused. Cannot start dispatcher loops.")
total_workers = int(params['total_workers'])
workers_per_bunch = int(params['workers_per_bunch'])
# --- Input Validation ---
if total_workers <= 0:
logger.warning(f"'total_workers' is {total_workers}. No workers will be started. Skipping ignition.")
raise AirflowSkipException(f"No workers to start (total_workers={total_workers}).")
if workers_per_bunch <= 0:
logger.error(f"'workers_per_bunch' must be a positive integer, but got {workers_per_bunch}. Aborting.")
raise AirflowException(f"'workers_per_bunch' must be a positive integer, but got {workers_per_bunch}.")
# --- End Input Validation ---
worker_delay = int(params['delay_between_workers_s'])
bunch_delay = int(params['delay_between_bunches_s'])
# Create a list of worker numbers to trigger
worker_indices = list(range(total_workers))
bunches = [worker_indices[i:i + workers_per_bunch] for i in range(0, len(worker_indices), workers_per_bunch)]
# --- Inspect Queues before starting ---
worker_queue = 'queue-dl' # The static queue the worker DAG uses.
try:
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
redis_client = _get_redis_client(redis_conn_id)
# First, check the application queue for work
app_queue_len = _check_application_queue(redis_client, params['queue_name'])
if params.get('skip_if_queue_empty') and app_queue_len == 0:
logger.info("'skip_if_queue_empty' is True and application queue is empty. Skipping worker ignition.")
raise AirflowSkipException("Application work queue is empty.")
# Then, inspect the target Celery queue for debugging
_inspect_celery_queues(redis_client, [worker_queue])
except AirflowSkipException:
raise # Re-raise to let Airflow handle the skip
except Exception as e:
logger.error(f"Could not inspect queues due to an error: {e}. Continuing with ignition sequence.")
# --- End of Inspection ---
logger.info(f"Plan: Triggering {total_workers} total dispatcher runs in {len(bunches)} bunches. Each run will attempt to process one URL.")
dag_run_id = context['dag_run'].run_id
total_triggered = 0
for i, bunch in enumerate(bunches):
logger.info(f"--- Triggering Bunch {i+1}/{len(bunches)} (contains {len(bunch)} dispatcher(s)) ---")
for j, _ in enumerate(bunch):
# Create a unique run_id for each dispatcher run
run_id = f"dispatched_{dag_run_id}_{total_triggered}"
# Pass all orchestrator params to the dispatcher, which will then pass them to the worker.
conf_to_pass = {p: params[p] for p in params}
logger.info(f"Triggering dispatcher {j+1}/{len(bunch)} in bunch {i+1} (run {total_triggered + 1}/{total_workers}) (Run ID: {run_id})")
logger.debug(f"Full conf for dispatcher run {run_id}: {conf_to_pass}")
trigger_dag(
dag_id=dispatcher_dag_id,
run_id=run_id,
conf=conf_to_pass,
replace_microseconds=False
)
total_triggered += 1
# Delay between dispatches in a bunch
if j < len(bunch) - 1:
logger.info(f"Waiting {worker_delay}s before next dispatcher in bunch...")
time.sleep(worker_delay)
# Delay between bunches
if i < len(bunches) - 1:
logger.info(f"--- Bunch {i+1} triggered. Waiting {bunch_delay}s before next bunch... ---")
time.sleep(bunch_delay)
logger.info(f"--- Ignition sequence complete. Total dispatcher runs triggered: {total_triggered}. ---")
# --- Final Queue Inspection ---
final_check_delay = 30 # seconds
logger.info(f"Waiting {final_check_delay}s for a final queue status check to see if workers picked up tasks...")
time.sleep(final_check_delay)
try:
redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID)
redis_client = _get_redis_client(redis_conn_id)
# Log connection details for debugging broker mismatch issues
conn_kwargs = redis_client.connection_pool.connection_kwargs
logger.info(f"Final check using Redis connection '{redis_conn_id}': "
f"host={conn_kwargs.get('host')}, "
f"port={conn_kwargs.get('port')}, "
f"db={conn_kwargs.get('db')}")
_inspect_celery_queues(redis_client, [worker_queue])
logger.info("Final queue inspection complete. If queues are not empty, workers have not picked up tasks yet. "
"If queues are empty, workers have started processing.")
except Exception as e:
logger.error(f"Could not perform final queue inspection: {e}. This does not affect worker ignition.")
# =============================================================================
# DAG Definition
# =============================================================================
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'start_date': days_ago(1),
}
with DAG(
dag_id='ytdlp_ops_orchestrator',
default_args=default_args,
schedule_interval=None, # This DAG runs only when triggered.
max_active_runs=1, # Only one ignition process should run at a time.
catchup=False,
description='Ignition system for ytdlp_ops_dispatcher DAGs. Starts self-sustaining worker loops via dispatchers.',
doc_md="""
### YT-DLP Worker Ignition System
This DAG acts as an "ignition system" to start one or more self-sustaining worker loops.
It does **not** process URLs itself. Its only job is to trigger a specified number of `ytdlp_ops_dispatcher` DAGs,
which in turn pull URLs and trigger `ytdlp_ops_worker_per_url` with worker affinity.
#### How it Works:
1. **Manual Trigger:** You manually trigger this DAG with parameters defining how many dispatcher loops to start (`total_workers`), in what configuration (`workers_per_bunch`, delays).
2. **Ignition:** The orchestrator triggers the initial set of dispatcher DAGs in a "fire-and-forget" manner, passing all its configuration parameters to them.
3. **Completion:** Once all initial dispatchers have been triggered, the orchestrator's job is complete.
The dispatchers then take over, each pulling a URL, determining affinity, and triggering a worker DAG.
""",
tags=['ytdlp', 'mgmt', 'master'],
params={
# --- Ignition Control Parameters ---
'total_workers': Param(DEFAULT_TOTAL_WORKERS, type="integer", description="Total number of dispatcher loops to start."),
'workers_per_bunch': Param(DEFAULT_WORKERS_PER_BUNCH, type="integer", description="Number of dispatchers to start in each bunch."),
'delay_between_workers_s': Param(DEFAULT_WORKER_DELAY_S, type="integer", description="Delay in seconds between starting each dispatcher within a bunch."),
'delay_between_bunches_s': Param(DEFAULT_BUNCH_DELAY_S, type="integer", description="Delay in seconds between starting each bunch."),
'skip_if_queue_empty': Param(False, type="boolean", title="[Ignition Control] Skip if Queue Empty", description="If True, the orchestrator will not start any dispatchers if the application's work queue is empty."),
# --- Worker Passthrough Parameters ---
'on_bannable_failure': Param(
'stop_loop',
type="string",
enum=['stop_loop', 'retry_with_new_account', 'retry_without_ban', 'retry_and_ban_account_only', 'retry_on_connection_error'],
title="[Worker Param] On Bannable Failure Policy",
description="Policy for a worker when a bannable error occurs. "
"'stop_loop': Ban the account, mark URL as failed, and stop the worker's loop. "
"'retry_with_new_account': Ban the failed account, retry ONCE with a new account. If retry fails, ban the second account and proxy, then stop."
"'retry_on_connection_error': If a connection error (e.g. SOCKS timeout) occurs, retry with a new account but do NOT ban the first account/proxy. If retry fails, stop the loop without banning."
),
'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="[Worker Param] Base name for Redis queues."),
'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="[Worker Param] Airflow Redis connection ID."),
'clients': Param('web', type="string", description="[Worker Param] Comma-separated list of clients for token generation. Full list: web, mweb, ios, android, web_safari, web_embedded, web_music, web_creator"),
'account_pool': Param('ytdlp_account', type="string", description="[Worker Param] Account pool prefix or comma-separated list."),
'account_pool_size': Param(10, type=["integer", "null"], description="[Worker Param] If using a prefix for 'account_pool', this specifies the number of accounts to generate (e.g., 10 for 'prefix_01' through 'prefix_10'). Required when using a prefix."),
'service_ip': Param(DEFAULT_YT_AUTH_SERVICE_IP, type="string", description="[Worker Param] IP of the ytdlp-ops-server. Default is from Airflow variable YT_AUTH_SERVICE_IP or hardcoded."),
'service_port': Param(DEFAULT_YT_AUTH_SERVICE_PORT, type="integer", description="[Worker Param] Port of the Envoy load balancer. Default is from Airflow variable YT_AUTH_SERVICE_PORT or hardcoded."),
'machine_id': Param("ytdlp-ops-airflow-service", type="string", description="[Worker Param] Identifier for the client machine."),
'auto_create_new_accounts_on_exhaustion': Param(True, type="boolean", description="[Worker Param] If True and all accounts in a prefix-based pool are exhausted, create a new one automatically."),
'retrigger_delay_on_empty_s': Param(60, type="integer", description="[Worker Param] Delay in seconds before a worker re-triggers itself if the queue is empty. Set to -1 to stop the loop."),
}
) as dag:
orchestrate_task = PythonOperator(
task_id='start_worker_loops',
python_callable=orchestrate_workers_ignition_callable,
)
orchestrate_task.doc_md = """
### Start Worker Loops
This is the main task that executes the ignition policy.
- It triggers `ytdlp_ops_dispatcher` DAGs according to the batch settings.
- It passes all its parameters down to the dispatchers, which will use them to trigger workers.
"""