# -*- coding: utf-8 -*- # vim:fenc=utf-8 # # Copyright © 2024 rl # # Distributed under terms of the MIT license. """ DAG to orchestrate ytdlp_ops_dispatcher DAG runs based on a defined policy. It fetches URLs from a Redis queue and launches dispatchers in controlled bunches, which in turn trigger workers with affinity. """ from airflow import DAG from airflow.exceptions import AirflowException, AirflowSkipException from airflow.operators.python import PythonOperator from airflow.models.param import Param from airflow.models.variable import Variable from airflow.utils.dates import days_ago from airflow.api.common.trigger_dag import trigger_dag from airflow.models.dagrun import DagRun from airflow.models.dag import DagModel from datetime import timedelta import logging import random import time import json # Import utility functions from utils.redis_utils import _get_redis_client # Import Thrift modules for proxy status check from pangramia.yt.tokens_ops import YTTokenOpService from thrift.protocol import TBinaryProtocol from thrift.transport import TSocket, TTransport # Configure logging logger = logging.getLogger(__name__) # Default settings DEFAULT_QUEUE_NAME = 'video_queue' DEFAULT_REDIS_CONN_ID = 'redis_default' DEFAULT_TOTAL_WORKERS = 3 DEFAULT_WORKERS_PER_BUNCH = 1 DEFAULT_WORKER_DELAY_S = 5 DEFAULT_BUNCH_DELAY_S = 20 DEFAULT_YT_AUTH_SERVICE_IP = Variable.get("YT_AUTH_SERVICE_IP", default_var="172.17.0.1") DEFAULT_YT_AUTH_SERVICE_PORT = Variable.get("YT_AUTH_SERVICE_PORT", default_var=9080) # --- Helper Functions --- def _check_application_queue(redis_client, queue_base_name: str) -> int: """Checks and logs the length of the application's inbox queue.""" inbox_queue_name = f"{queue_base_name}_inbox" logger.info(f"--- Checking Application Work Queue ---") try: q_len = redis_client.llen(inbox_queue_name) logger.info(f"Application work queue '{inbox_queue_name}' has {q_len} item(s).") return q_len except Exception as e: logger.error(f"Failed to check application queue '{inbox_queue_name}': {e}", exc_info=True) return -1 # Indicate an error def _inspect_celery_queues(redis_client, queue_names: list): """Inspects Celery queues in Redis and logs their status.""" logger.info("--- Inspecting Celery Queues in Redis ---") for queue_name in queue_names: try: q_len = redis_client.llen(queue_name) logger.info(f"Queue '{queue_name}': Length = {q_len}") if q_len > 0: logger.info(f"Showing up to 10 tasks in '{queue_name}':") # Fetch up to 10 items from the start of the list (queue) items_bytes = redis_client.lrange(queue_name, 0, 9) for i, item_bytes in enumerate(items_bytes): try: # Celery tasks are JSON-encoded strings task_data = json.loads(item_bytes.decode('utf-8')) # Pretty print for readability in logs pretty_task_data = json.dumps(task_data, indent=2) logger.info(f" Task {i+1}:\n{pretty_task_data}") except (json.JSONDecodeError, UnicodeDecodeError) as e: logger.warning(f" Task {i+1}: Could not decode/parse task data. Error: {e}. Raw: {item_bytes!r}") except Exception as e: logger.error(f"Failed to inspect queue '{queue_name}': {e}", exc_info=True) logger.info("--- End of Queue Inspection ---") # --- Main Orchestration Callable --- def orchestrate_workers_ignition_callable(**context): """ Main orchestration logic. Triggers a specified number of dispatcher DAGs to initiate self-sustaining processing loops. """ params = context['params'] logger.info("Starting dispatcher ignition sequence.") dispatcher_dag_id = 'ytdlp_ops_dispatcher' dag_model = DagModel.get_dagmodel(dispatcher_dag_id) if dag_model and dag_model.is_paused: raise AirflowException(f"Dispatcher DAG '{dispatcher_dag_id}' is paused. Cannot start dispatcher loops.") total_workers = int(params['total_workers']) workers_per_bunch = int(params['workers_per_bunch']) # --- Input Validation --- if total_workers <= 0: logger.warning(f"'total_workers' is {total_workers}. No workers will be started. Skipping ignition.") raise AirflowSkipException(f"No workers to start (total_workers={total_workers}).") if workers_per_bunch <= 0: logger.error(f"'workers_per_bunch' must be a positive integer, but got {workers_per_bunch}. Aborting.") raise AirflowException(f"'workers_per_bunch' must be a positive integer, but got {workers_per_bunch}.") # --- End Input Validation --- worker_delay = int(params['delay_between_workers_s']) bunch_delay = int(params['delay_between_bunches_s']) # Create a list of worker numbers to trigger worker_indices = list(range(total_workers)) bunches = [worker_indices[i:i + workers_per_bunch] for i in range(0, len(worker_indices), workers_per_bunch)] # --- Inspect Queues before starting --- worker_queue = 'queue-dl' # The static queue the worker DAG uses. try: redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) redis_client = _get_redis_client(redis_conn_id) # First, check the application queue for work app_queue_len = _check_application_queue(redis_client, params['queue_name']) if params.get('skip_if_queue_empty') and app_queue_len == 0: logger.info("'skip_if_queue_empty' is True and application queue is empty. Skipping worker ignition.") raise AirflowSkipException("Application work queue is empty.") # Then, inspect the target Celery queue for debugging _inspect_celery_queues(redis_client, [worker_queue]) except AirflowSkipException: raise # Re-raise to let Airflow handle the skip except Exception as e: logger.error(f"Could not inspect queues due to an error: {e}. Continuing with ignition sequence.") # --- End of Inspection --- logger.info(f"Plan: Triggering {total_workers} total dispatcher runs in {len(bunches)} bunches. Each run will attempt to process one URL.") dag_run_id = context['dag_run'].run_id total_triggered = 0 for i, bunch in enumerate(bunches): logger.info(f"--- Triggering Bunch {i+1}/{len(bunches)} (contains {len(bunch)} dispatcher(s)) ---") for j, _ in enumerate(bunch): # Create a unique run_id for each dispatcher run run_id = f"dispatched_{dag_run_id}_{total_triggered}" # Pass all orchestrator params to the dispatcher, which will then pass them to the worker. conf_to_pass = {p: params[p] for p in params} logger.info(f"Triggering dispatcher {j+1}/{len(bunch)} in bunch {i+1} (run {total_triggered + 1}/{total_workers}) (Run ID: {run_id})") logger.debug(f"Full conf for dispatcher run {run_id}: {conf_to_pass}") trigger_dag( dag_id=dispatcher_dag_id, run_id=run_id, conf=conf_to_pass, replace_microseconds=False ) total_triggered += 1 # Delay between dispatches in a bunch if j < len(bunch) - 1: logger.info(f"Waiting {worker_delay}s before next dispatcher in bunch...") time.sleep(worker_delay) # Delay between bunches if i < len(bunches) - 1: logger.info(f"--- Bunch {i+1} triggered. Waiting {bunch_delay}s before next bunch... ---") time.sleep(bunch_delay) logger.info(f"--- Ignition sequence complete. Total dispatcher runs triggered: {total_triggered}. ---") # --- Final Queue Inspection --- final_check_delay = 30 # seconds logger.info(f"Waiting {final_check_delay}s for a final queue status check to see if workers picked up tasks...") time.sleep(final_check_delay) try: redis_conn_id = params.get('redis_conn_id', DEFAULT_REDIS_CONN_ID) redis_client = _get_redis_client(redis_conn_id) # Log connection details for debugging broker mismatch issues conn_kwargs = redis_client.connection_pool.connection_kwargs logger.info(f"Final check using Redis connection '{redis_conn_id}': " f"host={conn_kwargs.get('host')}, " f"port={conn_kwargs.get('port')}, " f"db={conn_kwargs.get('db')}") _inspect_celery_queues(redis_client, [worker_queue]) logger.info("Final queue inspection complete. If queues are not empty, workers have not picked up tasks yet. " "If queues are empty, workers have started processing.") except Exception as e: logger.error(f"Could not perform final queue inspection: {e}. This does not affect worker ignition.") # ============================================================================= # DAG Definition # ============================================================================= default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=1), 'start_date': days_ago(1), } with DAG( dag_id='ytdlp_ops_orchestrator', default_args=default_args, schedule_interval=None, # This DAG runs only when triggered. max_active_runs=1, # Only one ignition process should run at a time. catchup=False, description='Ignition system for ytdlp_ops_dispatcher DAGs. Starts self-sustaining worker loops via dispatchers.', doc_md=""" ### YT-DLP Worker Ignition System This DAG acts as an "ignition system" to start one or more self-sustaining worker loops. It does **not** process URLs itself. Its only job is to trigger a specified number of `ytdlp_ops_dispatcher` DAGs, which in turn pull URLs and trigger `ytdlp_ops_worker_per_url` with worker affinity. #### How it Works: 1. **Manual Trigger:** You manually trigger this DAG with parameters defining how many dispatcher loops to start (`total_workers`), in what configuration (`workers_per_bunch`, delays). 2. **Ignition:** The orchestrator triggers the initial set of dispatcher DAGs in a "fire-and-forget" manner, passing all its configuration parameters to them. 3. **Completion:** Once all initial dispatchers have been triggered, the orchestrator's job is complete. The dispatchers then take over, each pulling a URL, determining affinity, and triggering a worker DAG. """, tags=['ytdlp', 'mgmt', 'master'], params={ # --- Ignition Control Parameters --- 'total_workers': Param(DEFAULT_TOTAL_WORKERS, type="integer", description="Total number of dispatcher loops to start."), 'workers_per_bunch': Param(DEFAULT_WORKERS_PER_BUNCH, type="integer", description="Number of dispatchers to start in each bunch."), 'delay_between_workers_s': Param(DEFAULT_WORKER_DELAY_S, type="integer", description="Delay in seconds between starting each dispatcher within a bunch."), 'delay_between_bunches_s': Param(DEFAULT_BUNCH_DELAY_S, type="integer", description="Delay in seconds between starting each bunch."), 'skip_if_queue_empty': Param(False, type="boolean", title="[Ignition Control] Skip if Queue Empty", description="If True, the orchestrator will not start any dispatchers if the application's work queue is empty."), # --- Worker Passthrough Parameters --- 'on_bannable_failure': Param( 'stop_loop', type="string", enum=['stop_loop', 'retry_with_new_account', 'retry_without_ban', 'retry_and_ban_account_only', 'retry_on_connection_error'], title="[Worker Param] On Bannable Failure Policy", description="Policy for a worker when a bannable error occurs. " "'stop_loop': Ban the account, mark URL as failed, and stop the worker's loop. " "'retry_with_new_account': Ban the failed account, retry ONCE with a new account. If retry fails, ban the second account and proxy, then stop." "'retry_on_connection_error': If a connection error (e.g. SOCKS timeout) occurs, retry with a new account but do NOT ban the first account/proxy. If retry fails, stop the loop without banning." ), 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="[Worker Param] Base name for Redis queues."), 'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="[Worker Param] Airflow Redis connection ID."), 'clients': Param('web', type="string", description="[Worker Param] Comma-separated list of clients for token generation. Full list: web, web_safari, web_embedded, web_music, web_creator, mweb, web_camoufox, web_safari_camoufox, web_embedded_camoufox, web_music_camoufox, web_creator_camoufox, mweb_camoufox, android, android_music, android_creator, android_vr, ios, ios_music, ios_creator, tv, tv_simply, tv_sample, tv_embedded"), 'account_pool': Param('ytdlp_account', type="string", description="[Worker Param] Account pool prefix or comma-separated list."), 'account_pool_size': Param(10, type=["integer", "null"], description="[Worker Param] If using a prefix for 'account_pool', this specifies the number of accounts to generate (e.g., 10 for 'prefix_01' through 'prefix_10'). Required when using a prefix."), 'service_ip': Param(DEFAULT_YT_AUTH_SERVICE_IP, type="string", description="[Worker Param] IP of the ytdlp-ops-server. Default is from Airflow variable YT_AUTH_SERVICE_IP or hardcoded."), 'service_port': Param(DEFAULT_YT_AUTH_SERVICE_PORT, type="integer", description="[Worker Param] Port of the Envoy load balancer. Default is from Airflow variable YT_AUTH_SERVICE_PORT or hardcoded."), 'machine_id': Param("ytdlp-ops-airflow-service", type="string", description="[Worker Param] Identifier for the client machine."), 'auto_create_new_accounts_on_exhaustion': Param(True, type="boolean", description="[Worker Param] If True and all accounts in a prefix-based pool are exhausted, create a new one automatically."), 'retrigger_delay_on_empty_s': Param(60, type="integer", description="[Worker Param] Delay in seconds before a worker re-triggers itself if the queue is empty. Set to -1 to stop the loop."), } ) as dag: orchestrate_task = PythonOperator( task_id='start_worker_loops', python_callable=orchestrate_workers_ignition_callable, ) orchestrate_task.doc_md = """ ### Start Worker Loops This is the main task that executes the ignition policy. - It triggers `ytdlp_ops_dispatcher` DAGs according to the batch settings. - It passes all its parameters down to the dispatchers, which will use them to trigger workers. """