# -*- coding: utf-8 -*- # vim:fenc=utf-8 # # Copyright © 2024 rl # # Distributed under terms of the MIT license. """ Redis utility functions for Airflow DAGs. """ from airflow.exceptions import AirflowException from airflow.providers.redis.hooks.redis import RedisHook import logging import redis logger = logging.getLogger(__name__) def _get_redis_client(redis_conn_id): """Gets a Redis client connection using RedisHook.""" try: hook = RedisHook(redis_conn_id=redis_conn_id) client = hook.get_conn() client.ping() logger.info(f"Successfully connected to Redis using connection '{redis_conn_id}'.") return client except redis.exceptions.AuthenticationError: logger.error(f"Redis authentication failed for connection '{redis_conn_id}'. Check password.") raise AirflowException(f"Redis authentication failed for '{redis_conn_id}'.") except Exception as e: logger.error(f"Failed to get Redis client for connection '{redis_conn_id}': {e}") raise AirflowException(f"Redis connection failed for '{redis_conn_id}': {e}")