import logging import os import sys from copy import deepcopy from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG logger = logging.getLogger(__name__) # Add the config directory to the path to allow for local imports like `custom_task_hooks`. # This is necessary because this file is executed by the Airflow scheduler in a context # where the config directory is not automatically on the Python path. config_dir = os.path.dirname(os.path.abspath(__file__)) if config_dir not in sys.path: sys.path.insert(0, config_dir) logger.info(f"Added '{config_dir}' to sys.path for local imports.") LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG) # The task_instance_mutation_hook is now self-registering to be robust # against different loading configurations. See custom_task_hooks.py for details. try: import custom_task_hooks logger.info(f"Successfully imported custom_task_hooks module (Version: {getattr(custom_task_hooks, '__version__', 'unknown')}).") except ImportError as e: logger.warning(f"Could not import custom_task_hooks: {e}. Worker pinning will not function.", exc_info=True)