diff --git a/.gitignore b/.gitignore index fda177f..b0ac3ed 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1 @@ .aider* -*/.DS_Store diff --git a/VERSION b/VERSION index 9575d51..406729f 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.6.1 +3.10.1-exp diff --git a/airflow/config/airflow_local_settings.py b/airflow/config/airflow_local_settings.py index 240a8ea..c963cb9 100644 --- a/airflow/config/airflow_local_settings.py +++ b/airflow/config/airflow_local_settings.py @@ -1,7 +1,26 @@ import logging +import os +import sys from copy import deepcopy from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG logger = logging.getLogger(__name__) +# Add the config directory to the path to allow for local imports like `custom_task_hooks`. +# This is necessary because this file is executed by the Airflow scheduler in a context +# where the config directory is not automatically on the Python path. +config_dir = os.path.dirname(os.path.abspath(__file__)) +if config_dir not in sys.path: + sys.path.insert(0, config_dir) + logger.info(f"Added '{config_dir}' to sys.path for local imports.") + + LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG) + +# The task_instance_mutation_hook is now self-registering to be robust +# against different loading configurations. See custom_task_hooks.py for details. +try: + import custom_task_hooks + logger.info(f"Successfully imported custom_task_hooks module (Version: {getattr(custom_task_hooks, '__version__', 'unknown')}).") +except ImportError as e: + logger.warning(f"Could not import custom_task_hooks: {e}. Worker pinning will not function.", exc_info=True) diff --git a/airflow/config/custom_task_hooks.py b/airflow/config/custom_task_hooks.py index fdb8176..47c2a97 100644 --- a/airflow/config/custom_task_hooks.py +++ b/airflow/config/custom_task_hooks.py @@ -1,28 +1,55 @@ -# Version: 2025-08-20-02 +# Version: 2025-09-22-08 +__version__ = "2025-09-22-08" # This file contains custom hooks for the Airflow environment. +import logging from airflow import settings -from airflow.models.dagrun import DagRun -from airflow.utils.session import provide_session +from airflow.configuration import conf +logger = logging.getLogger(__name__) -@provide_session -def task_instance_mutation_hook(ti, session=None): +def task_instance_mutation_hook(ti): + """ + This hook modifies the task instance queue at runtime for worker pinning. + It relies exclusively on parsing the queue from the run_id, which is guaranteed + to be set by the dispatcher DAG. This avoids database race conditions. + """ + logger.debug(f"MUTATION HOOK: Running for dag '{ti.dag_id}', task '{ti.task_id}'.") if ti.dag_id == 'ytdlp_ops_worker_per_url': - # Query the DagRun from the DB using run_id to reliably get the conf. - # The ti.dag_run attribute is not always populated when the hook is called. - dag_run = session.query(DagRun).filter(DagRun.run_id == ti.run_id).first() - conf = dag_run.conf if dag_run else {} - worker_queue = conf.get('worker_queue') + # If the run_id isn't populated yet, just return. The hook may be called again. + if not ti.run_id: + logger.debug(f"MUTATION HOOK: run_id not yet available for task '{ti.task_id}'. Skipping this invocation.") + return + + logger.debug(f"MUTATION HOOK: Matched DAG '{ti.dag_id}'. Attempting to pin task '{ti.task_id}' for run_id '{ti.run_id}'.") + worker_queue = None + # The dispatcher embeds the queue in the run_id like: ..._q_queue-dl-worker-hostname + if ti.run_id and '_q_' in ti.run_id: + try: + parsed_queue = ti.run_id.split('_q_')[-1] + if parsed_queue.startswith('queue-dl-'): + worker_queue = parsed_queue + except Exception as e: + logger.error(f"MUTATION HOOK: CRITICAL: Error parsing queue from run_id '{ti.run_id}': {e}.", exc_info=True) if worker_queue: - print(f"MUTATION HOOK: For dag '{ti.dag_id}', pinning task '{ti.task_id}' (run_id: {ti.run_id}) to queue '{worker_queue}'.") + logger.debug(f"MUTATION HOOK: Pinning task '{ti.task_id}' (run_id: {ti.run_id}) to queue '{worker_queue}' from run_id.") ti.queue = worker_queue else: - print(f"MUTATION HOOK: For dag '{ti.dag_id}', no 'worker_queue' in conf for run_id '{ti.run_id}'. Falling back to 'queue-dl'.") + # If the queue is not found, it's a critical failure in the dispatching logic. + # We fall back to the default queue but log it as a high-severity warning. + logger.warning(f"MUTATION HOOK: Could not find worker queue in run_id '{ti.run_id}'. Falling back to 'queue-dl'. Pinning will fail.") ti.queue = 'queue-dl' -# Register the hook only in appropriate contexts -# This hook can cause issues with the Triggerer, which does not have a `dag_run` context -# when it runs its own maintenance tasks. -if not settings.CONFIG.get('core', 'executor').lower().startswith('debug'): - settings.task_instance_mutation_hook = task_instance_mutation_hook + +# --- Hook Registration --- +# This registration logic is placed here to work around environments where this file +# might be loaded directly as the local settings file via AIRFLOW__CORE__LOCAL_SETTINGS_PATH. +try: + if not conf.get('core', 'executor').lower().startswith('debug'): + settings.task_instance_mutation_hook = task_instance_mutation_hook + logger.info(f"Successfully self-registered task_instance_mutation_hook (Version: {__version__}) for worker pinning.") + else: + logger.info("Skipping self-registration of task_instance_mutation_hook due to DebugExecutor.") +except Exception as e: + logger.warning(f"Could not self-register custom_task_hooks: {e}. Worker pinning may not function.", exc_info=True) + diff --git a/airflow/dags/ytdlp_mgmt_queues.py b/airflow/dags/ytdlp_mgmt_queues.py index be3b573..fef9f76 100644 --- a/airflow/dags/ytdlp_mgmt_queues.py +++ b/airflow/dags/ytdlp_mgmt_queues.py @@ -251,6 +251,8 @@ def dump_redis_data_to_csv(redis_client, dump_dir, patterns): def clear_queue_callable(**context): """Dumps Redis data to CSV and/or clears specified Redis keys based on selection.""" params = context['params'] + ti = context['task_instance'] + logger.info(f"Task '{ti.task_id}' running on queue '{ti.queue}'.") redis_conn_id = params['redis_conn_id'] queue_base_name = params['queue_base_name'] queues_to_clear_options = params.get('queues_to_clear_options', []) @@ -302,6 +304,8 @@ def clear_queue_callable(**context): def list_contents_callable(**context): """Lists the contents of the specified Redis key(s) (list or hash).""" params = context['params'] + ti = context['task_instance'] + logger.info(f"Task '{ti.task_id}' running on queue '{ti.queue}'.") redis_conn_id = params['redis_conn_id'] queues_to_list_str = params.get('queue_to_list') max_items = params.get('max_items', 10) @@ -379,6 +383,8 @@ def list_contents_callable(**context): def check_status_callable(**context): """Checks the status (type and size) of all standard Redis queues for a given base name.""" params = context['params'] + ti = context['task_instance'] + logger.info(f"Task '{ti.task_id}' running on queue '{ti.queue}'.") redis_conn_id = params['redis_conn_id'] queue_name = params.get('queue_base_name', DEFAULT_QUEUE_NAME) queue_suffixes = ['_inbox', '_progress', '_result', '_fail'] @@ -414,6 +420,8 @@ def requeue_failed_callable(**context): Copies all URLs from the fail hash to the inbox list and optionally clears the fail hash. """ params = context['params'] + ti = context['task_instance'] + logger.info(f"Task '{ti.task_id}' running on queue '{ti.queue}'.") redis_conn_id = params['redis_conn_id'] queue_name = params['queue_base_name'] clear_fail_queue = params['clear_fail_queue_after_requeue'] @@ -468,6 +476,8 @@ def add_videos_to_queue_callable(**context): normalizes them to URLs, and adds them to a Redis queue. """ params = context["params"] + ti = context['task_instance'] + logger.info(f"Task '{ti.task_id}' running on queue '{ti.queue}'.") queue_name = params["queue_base_name"] redis_conn_id = params["redis_conn_id"] dry_run = params["dry_run"] diff --git a/airflow/dags/ytdlp_ops_dispatcher.py b/airflow/dags/ytdlp_ops_dispatcher.py index 82daaa1..a6f50fa 100644 --- a/airflow/dags/ytdlp_ops_dispatcher.py +++ b/airflow/dags/ytdlp_ops_dispatcher.py @@ -30,6 +30,9 @@ def dispatch_url_to_worker(**context): Pulls one URL from Redis, determines the current worker's dedicated queue, and triggers the main worker DAG to process the URL on that specific queue. """ + ti = context['task_instance'] + logger.info(f"Dispatcher task '{ti.task_id}' running on queue '{ti.queue}'.") + # --- Check for worker pause lock file --- # This path must be consistent with the Ansible playbook. lock_file_path = '/opt/airflow/inputfiles/AIRFLOW.PREVENT_URL_PULL.lockfile' @@ -65,7 +68,9 @@ def dispatch_url_to_worker(**context): # We add the specific URL and the determined worker queue to the configuration. conf_to_pass = {**params, 'url_to_process': url_to_process, 'worker_queue': worker_queue} - run_id = f"worker_run_{context['dag_run'].run_id}_{context['ts_nodash']}" + # Embed the worker queue in the run_id to avoid DB race conditions in the mutation hook. + # The hook will parse the queue name from the run_id itself. + run_id = f"worker_run_{context['dag_run'].run_id}_{context['ts_nodash']}_q_{worker_queue}" logger.info(f"Triggering 'ytdlp_ops_worker_per_url' with run_id '{run_id}'") trigger_dag( diff --git a/airflow/dags/ytdlp_ops_orchestrator.py b/airflow/dags/ytdlp_ops_orchestrator.py index 526130c..b7feff1 100644 --- a/airflow/dags/ytdlp_ops_orchestrator.py +++ b/airflow/dags/ytdlp_ops_orchestrator.py @@ -96,12 +96,15 @@ def orchestrate_workers_ignition_callable(**context): to initiate self-sustaining processing loops. """ params = context['params'] + ti = context['task_instance'] + logger.info(f"Orchestrator task '{ti.task_id}' running on queue '{ti.queue}'.") logger.info("Starting dispatcher ignition sequence.") dispatcher_dag_id = 'ytdlp_ops_dispatcher' dag_model = DagModel.get_dagmodel(dispatcher_dag_id) if dag_model and dag_model.is_paused: - raise AirflowException(f"Dispatcher DAG '{dispatcher_dag_id}' is paused. Cannot start dispatcher loops.") + logger.warning(f"Dispatcher DAG '{dispatcher_dag_id}' is paused. Skipping dispatcher ignition.") + raise AirflowSkipException(f"Dispatcher DAG '{dispatcher_dag_id}' is paused.") total_workers = int(params['total_workers']) workers_per_bunch = int(params['workers_per_bunch']) @@ -264,7 +267,7 @@ with DAG( ), 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="[Worker Param] Base name for Redis queues."), 'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="[Worker Param] Airflow Redis connection ID."), - 'clients': Param('web', type="string", description="[Worker Param] Comma-separated list of clients for token generation. Full list: web, web_safari, web_embedded, web_music, web_creator, mweb, web_camoufox, web_safari_camoufox, web_embedded_camoufox, web_music_camoufox, web_creator_camoufox, mweb_camoufox, android, android_music, android_creator, android_vr, ios, ios_music, ios_creator, tv, tv_simply, tv_sample, tv_embedded"), + 'clients': Param('tv_sample,mweb,web_camoufox', type="string", description="[Worker Param] Comma-separated list of clients for token generation. Full list: web, web_safari, web_embedded, web_music, web_creator, mweb, web_camoufox, web_safari_camoufox, web_embedded_camoufox, web_music_camoufox, web_creator_camoufox, mweb_camoufox, android, android_music, android_creator, android_vr, ios, ios_music, ios_creator, tv, tv_simply, tv_sample, tv_embedded"), 'account_pool': Param('ytdlp_account', type="string", description="[Worker Param] Account pool prefix or comma-separated list."), 'account_pool_size': Param(10, type=["integer", "null"], description="[Worker Param] If using a prefix for 'account_pool', this specifies the number of accounts to generate (e.g., 10 for 'prefix_01' through 'prefix_10'). Required when using a prefix."), 'service_ip': Param(DEFAULT_YT_AUTH_SERVICE_IP, type="string", description="[Worker Param] IP of the ytdlp-ops-server. Default is from Airflow variable YT_AUTH_SERVICE_IP or hardcoded."), diff --git a/airflow/dags/ytdlp_ops_worker_per_url.py b/airflow/dags/ytdlp_ops_worker_per_url.py index d931522..050ef1a 100644 --- a/airflow/dags/ytdlp_ops_worker_per_url.py +++ b/airflow/dags/ytdlp_ops_worker_per_url.py @@ -55,8 +55,8 @@ DEFAULT_YT_AUTH_SERVICE_IP = Variable.get("YT_AUTH_SERVICE_IP", default_var="172 DEFAULT_YT_AUTH_SERVICE_PORT = Variable.get("YT_AUTH_SERVICE_PORT", default_var=9080) # The queue is set to a fallback here. The actual worker-specific queue is -# assigned just-in-time by the task_instance_mutation_hook in airflow_local_settings.py, -# which reads the 'worker_queue' from the DAG run configuration. +# assigned just-in-time by the task_instance_mutation_hook (see: airflow/config/custom_task_hooks.py), +# which parses the target queue from the DAG run_id. DEFAULT_ARGS = { 'owner': 'airflow', 'retries': 0, @@ -151,6 +151,32 @@ def get_url_and_assign_account(**context): This is the first task in the pinned-worker DAG. """ params = context['params'] + ti = context['task_instance'] + + # --- Worker Pinning Verification --- + # This is a safeguard against a known Airflow issue where clearing a task + # can cause the task_instance_mutation_hook to be skipped, breaking pinning. + # See: https://github.com/apache/airflow/issues/20143 + expected_queue = None + if ti.run_id and '_q_' in ti.run_id: + expected_queue = ti.run_id.split('_q_')[-1] + + if not expected_queue: + # Fallback to conf if run_id parsing fails for some reason + expected_queue = params.get('worker_queue') + + if expected_queue and ti.queue != expected_queue: + error_msg = ( + f"WORKER PINNING FAILURE: Task is running on queue '{ti.queue}' but was expected on '{expected_queue}'. " + "This usually happens after manually clearing a task, which is not the recommended recovery method for this DAG. " + "To recover a failed URL, let the DAG run fail, use the 'ytdlp_mgmt_queues' DAG to requeue the URL, " + "and use the 'ytdlp_ops_orchestrator' to start a new worker loop if needed." + ) + logger.error(error_msg) + raise AirflowException(error_msg) + elif expected_queue: + logger.info(f"Worker pinning verified. Task is correctly running on queue '{ti.queue}'.") + # --- End Verification --- # The URL is passed by the dispatcher DAG. url_to_process = params.get('url_to_process') diff --git a/ansible/playbook-dags.yml b/ansible/playbook-dags.yml index 92b7b1a..473c8a5 100644 --- a/ansible/playbook-dags.yml +++ b/ansible/playbook-dags.yml @@ -3,7 +3,8 @@ hosts: airflow_master gather_facts: no vars_files: - - group_vars/all.yml + - "{{ inventory_dir }}/group_vars/all/generated_vars.yml" + - "{{ inventory_dir }}/group_vars/all/vault.yml" tasks: - name: Sync DAGs to MASTER server ansible.posix.synchronize: @@ -33,7 +34,8 @@ hosts: airflow_workers gather_facts: no vars_files: - - group_vars/all.yml + - "{{ inventory_dir }}/group_vars/all/generated_vars.yml" + - "{{ inventory_dir }}/group_vars/all/vault.yml" tasks: - name: Sync DAGs to WORKER server ansible.posix.synchronize: diff --git a/ansible/playbook-dl.yml b/ansible/playbook-dl.yml new file mode 100644 index 0000000..e69de29 diff --git a/ansible/playbook-full.yml b/ansible/playbook-full.yml index 6123cb5..d9ec167 100644 --- a/ansible/playbook-full.yml +++ b/ansible/playbook-full.yml @@ -3,8 +3,8 @@ hosts: all gather_facts: true vars_files: - - group_vars/all.yml - - group_vars/all/vault.yml + - "{{ inventory_dir }}/group_vars/all/generated_vars.yml" + - "{{ inventory_dir }}/group_vars/all/vault.yml" pre_tasks: - name: Announce fast deploy mode if enabled debug: diff --git a/ansible/playbook-hook.yml b/ansible/playbook-hook.yml new file mode 100644 index 0000000..25629b7 --- /dev/null +++ b/ansible/playbook-hook.yml @@ -0,0 +1,53 @@ +--- +- name: Deploy and Reload Airflow Task Hook + hosts: all + gather_facts: no + vars_files: + - "{{ inventory_dir }}/group_vars/all/generated_vars.yml" + - "{{ inventory_dir }}/group_vars/all/vault.yml" + tasks: + - name: Sync custom_task_hooks.py to MASTER server + when: inventory_hostname in groups['airflow_master'] + synchronize: + src: "../airflow/config/custom_task_hooks.py" + dest: "{{ airflow_master_dir }}/config/" + archive: yes + rsync_path: "sudo rsync" + + - name: Sync airflow_local_settings.py to MASTER server + when: inventory_hostname in groups['airflow_master'] + synchronize: + src: "../airflow/config/airflow_local_settings.py" + dest: "{{ airflow_master_dir }}/config/" + archive: yes + rsync_path: "sudo rsync" + + - name: Sync custom_task_hooks.py to WORKER server + when: inventory_hostname in groups['airflow_workers'] + synchronize: + src: "../airflow/config/custom_task_hooks.py" + dest: "{{ airflow_worker_dir }}/config/" + archive: yes + rsync_path: "sudo rsync" + + - name: Sync airflow_local_settings.py to WORKER server + when: inventory_hostname in groups['airflow_workers'] + synchronize: + src: "../airflow/config/airflow_local_settings.py" + dest: "{{ airflow_worker_dir }}/config/" + archive: yes + rsync_path: "sudo rsync" + + - name: Restart Airflow services on MASTER + when: inventory_hostname in groups['airflow_master'] + ansible.builtin.command: + cmd: "docker compose restart airflow-scheduler airflow-webserver airflow-master-worker airflow-triggerer" + chdir: "{{ airflow_master_dir }}" + become: yes + + - name: Restart Airflow worker on WORKER + when: inventory_hostname in groups['airflow_workers'] + ansible.builtin.command: + cmd: "docker compose restart airflow-worker" + chdir: "{{ airflow_worker_dir }}" + become: yes diff --git a/ansible/playbook-master.yml b/ansible/playbook-master.yml index cae26f9..0ed92f2 100644 --- a/ansible/playbook-master.yml +++ b/ansible/playbook-master.yml @@ -3,8 +3,8 @@ hosts: airflow_master gather_facts: yes vars_files: - - group_vars/all.yml - - group_vars/all/vault.yml + - "{{ inventory_dir }}/group_vars/all/generated_vars.yml" + - "{{ inventory_dir }}/group_vars/all/vault.yml" pre_tasks: - name: Announce master deployment debug: diff --git a/ansible/playbook-worker.yml b/ansible/playbook-worker.yml index 23dcd80..d8fe17b 100644 --- a/ansible/playbook-worker.yml +++ b/ansible/playbook-worker.yml @@ -3,8 +3,8 @@ hosts: airflow_workers gather_facts: yes vars_files: - - group_vars/all.yml - - group_vars/all/vault.yml + - "{{ inventory_dir }}/group_vars/all/generated_vars.yml" + - "{{ inventory_dir }}/group_vars/all/vault.yml" pre_tasks: - name: Announce worker deployment debug: diff --git a/ansible/playbooks/pause_worker.yml b/ansible/playbooks/pause_worker.yml index 5235379..27dca8a 100644 --- a/ansible/playbooks/pause_worker.yml +++ b/ansible/playbooks/pause_worker.yml @@ -2,7 +2,8 @@ - hosts: airflow_workers gather_facts: no vars_files: - - ../group_vars/all.yml + - "{{ inventory_dir }}/group_vars/all/generated_vars.yml" + - "{{ inventory_dir }}/group_vars/all/vault.yml" tasks: - name: "Create lock file to pause worker" file: diff --git a/ansible/playbooks/resume_worker.yml b/ansible/playbooks/resume_worker.yml index 8b0b7ce..0ac778f 100644 --- a/ansible/playbooks/resume_worker.yml +++ b/ansible/playbooks/resume_worker.yml @@ -2,7 +2,8 @@ - hosts: airflow_workers gather_facts: yes vars_files: - - ../group_vars/all.yml + - "{{ inventory_dir }}/group_vars/all/generated_vars.yml" + - "{{ inventory_dir }}/group_vars/all/vault.yml" tasks: - name: "Archive lock file to resume worker" command: > diff --git a/deploy.sh b/deploy.sh new file mode 100644 index 0000000..4a728fb --- /dev/null +++ b/deploy.sh @@ -0,0 +1,9 @@ +#!/bin/bash +set -e + +echo "Generating inventory..." +./tools/generate-inventory.py cluster.test.yml + +echo "Deploying full cluster..." +cd ansible +ansible-playbook playbook-full.yml