25 lines
1.1 KiB
Python
25 lines
1.1 KiB
Python
# Version: 2025-08-20-02
|
|
# This file contains custom hooks for the Airflow environment.
|
|
from airflow import settings
|
|
|
|
def task_instance_mutation_hook(ti):
|
|
if ti.dag_id == 'ytdlp_ops_worker_per_url':
|
|
# Safely access dag_run and conf. The ti.dag_run attribute may not be populated
|
|
# when the hook is called during TaskInstance creation.
|
|
dag_run = getattr(ti, 'dag_run', None)
|
|
conf = getattr(dag_run, 'conf', {}) if dag_run else {}
|
|
worker_queue = conf.get('worker_queue')
|
|
|
|
if worker_queue:
|
|
print(f"Mutating queue for task {ti.task_id} to {worker_queue} based on dag_run.conf")
|
|
ti.queue = worker_queue
|
|
else:
|
|
print(f"No worker_queue in conf for {ti.dag_id}. Falling back to 'queue-dl'")
|
|
ti.queue = 'queue-dl'
|
|
|
|
# Register the hook only in appropriate contexts
|
|
# This hook can cause issues with the Triggerer, which does not have a `dag_run` context
|
|
# when it runs its own maintenance tasks.
|
|
if not settings.CONFIG.get('core', 'executor').lower().startswith('debug'):
|
|
settings.task_instance_mutation_hook = task_instance_mutation_hook
|