27 lines
1.1 KiB
Python
27 lines
1.1 KiB
Python
import logging
|
|
import os
|
|
import sys
|
|
from copy import deepcopy
|
|
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Add the config directory to the path to allow for local imports like `custom_task_hooks`.
|
|
# This is necessary because this file is executed by the Airflow scheduler in a context
|
|
# where the config directory is not automatically on the Python path.
|
|
config_dir = os.path.dirname(os.path.abspath(__file__))
|
|
if config_dir not in sys.path:
|
|
sys.path.insert(0, config_dir)
|
|
logger.info(f"Added '{config_dir}' to sys.path for local imports.")
|
|
|
|
|
|
LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)
|
|
|
|
# The task_instance_mutation_hook is now self-registering to be robust
|
|
# against different loading configurations. See custom_task_hooks.py for details.
|
|
try:
|
|
import custom_task_hooks
|
|
logger.info(f"Successfully imported custom_task_hooks module (Version: {getattr(custom_task_hooks, '__version__', 'unknown')}).")
|
|
except ImportError as e:
|
|
logger.warning(f"Could not import custom_task_hooks: {e}. Worker pinning will not function.", exc_info=True)
|