From 1f092d6f808ee1eb782e86d628657544a36136bf Mon Sep 17 00:00:00 2001 From: aperez Date: Sun, 6 Apr 2025 11:55:53 +0300 Subject: [PATCH] Update on --probe and add experimental service dags --- Dockerfile | 2 +- dags/ytdlp_client_dag_v2.1.py | 18 +- dags/ytdlp_service_dag.py | 966 ++++++++++++++++++++++++++++++++++ docker-compose-ytdlp-ops.yaml | 5 +- 4 files changed, 979 insertions(+), 12 deletions(-) create mode 100644 dags/ytdlp_service_dag.py diff --git a/Dockerfile b/Dockerfile index 8ba5c05..9f01c2f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -36,7 +36,7 @@ USER airflow # Install Python dependencies and ensure ffprobe3 is installed correctly RUN pip install --no-cache-dir \ - "apache-airflow==${AIRFLOW_VERSION}" && \ + "apache-airflow==${AIRFLOW_VERSION}" apache-airflow-providers-docker apache-airflow-providers-http && \ pip install --no-cache-dir -r /app/requirements.txt && \ pip install --no-cache-dir ffprobe3 python-ffmpeg diff --git a/dags/ytdlp_client_dag_v2.1.py b/dags/ytdlp_client_dag_v2.1.py index f47d721..9782a59 100644 --- a/dags/ytdlp_client_dag_v2.1.py +++ b/dags/ytdlp_client_dag_v2.1.py @@ -396,17 +396,17 @@ class YtdlpOpsOperator(BaseOperator): # preferably use the explicitly pushed 'ytdlp_command' key for clarity. return ytdlp_cmd # Return the original command - # Keep specific exception handling from original block if needed, but AirflowException is often sufficient - except AirflowException as e: # Catch AirflowExceptions raised earlier + except AirflowException as e: # Catch AirflowExceptions raised explicitly in the code above logger.error(f"Operation failed due to AirflowException: {e}") - raise # Re-raise AirflowExceptions + raise # Re-raise AirflowExceptions to ensure task failure except (TTransportException, PBServiceException) as e: # Catch specific Thrift/Service errors not already wrapped - logger.error(f"Unhandled Thrift/Service error: {e}") - raise AirflowException(f"Unhandled YTDLP service error: {e}") - except Exception as e: # General catch-all + logger.error(f"Unhandled Thrift/Service error: {e}", exc_info=True) # Add traceback for context + raise AirflowException(f"Unhandled YTDLP service error: {e}") # Wrap in AirflowException + except Exception as e: # General catch-all for truly unexpected errors # Log with traceback for unexpected errors - logger.error(f"Unexpected error in YtdlpOpsOperator: {e}", exc_info=True) - raise AirflowException(f"Unexpected error in YtdlpOpsOperator: {e}") + logger.error(f"Caught unexpected error in YtdlpOpsOperator: {e}", exc_info=True) + # Ensure any unexpected error explicitly fails the task with AirflowException + raise AirflowException(f"Unexpected error caused task failure: {e}") finally: if transport and transport.isOpen(): # Check if transport exists and is open before closing logger.info("Closing Thrift transport.") @@ -820,7 +820,7 @@ with DAG( 'redis_enabled': Param(False, type="boolean", description="Use Redis for service discovery? If False, uses service_ip/port."), # Default to direct connection 'service_ip': Param('85.192.30.55', type="string", description="Service IP if redis_enabled=False."), # Default service IP 'service_port': Param(9090, type="integer", description="Service port if redis_enabled=False."), # Default service port - 'account_id': Param('accoutns_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09', type="string", description="Account ID for Redis lookup or direct call."), # Updated default account_id + 'account_id': Param('account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09', type="string", description="Account ID for Redis lookup or direct call."), # Updated default account_id 'timeout': Param(DEFAULT_TIMEOUT, type="integer", description="Timeout in seconds for the Thrift connection."), # Use Airflow Variable for downloads directory, matching reference DAG structure 'info_json_dir': Param("{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}", type="string", description="Directory to save info.json. Uses Airflow Variable 'DOWNLOADS_TEMP' or default.") diff --git a/dags/ytdlp_service_dag.py b/dags/ytdlp_service_dag.py new file mode 100644 index 0000000..5f1e5ae --- /dev/null +++ b/dags/ytdlp_service_dag.py @@ -0,0 +1,966 @@ +""" +DAG to deploy and manage YTDLP token service. + +This DAG handles the deployment, monitoring, and cleanup of a YTDLP token service +for a given account. It supports both Redis-based service discovery and direct +connection via manually specified host and port. + +Configuration Options: +- account_id: (Required) The account ID for which the service is being deployed. +- proxy: (Optional) The proxy to use for the service. +- redis_enabled: (Optional, default=True) Whether to use Redis for service discovery. + If False, you must provide `host` and `port` manually. +- host: (Optional) The host IP of the service. Required if `redis_enabled=False`. +- port: (Optional) The port of the service. Required if `redis_enabled=False`. + +Usage: +1. Redis-based service discovery: + - Set `redis_enabled=True` (default). + - Ensure Redis is configured in Airflow connections. + - The DAG will automatically discover the service IP and port from Redis. + +2. Manual host and port: + - Set `redis_enabled=False`. + - Provide `host` and `port` manually in the DAG configuration. + - Example: {"host": "192.168.1.100", "port": 9090}. + +Example Trigger Configuration: +{ + "account_id": "test_account", + "proxy": "socks5://proxy.example.com:1080", + "redis_enabled": False, + "host": "192.168.1.100", + "port": 9090 +} +""" + +from airflow import DAG +from airflow.models.param import Param +from airflow.operators.empty import EmptyOperator +from airflow.operators.python import PythonOperator +# HttpSensor is no longer used +# from airflow.providers.http.sensors.http import HttpSensor +from airflow.utils.trigger_rule import TriggerRule +from airflow.hooks.base import BaseHook +from airflow.exceptions import AirflowException +from typing import Sequence # Add Sequence for type hinting +from datetime import datetime, timedelta +from airflow.utils.dates import days_ago # Add this import +import uuid +import os +import logging +import shutil +import docker +import uuid +import redis +import requests +import socket +import time +import sys # Import sys for maxsize +from airflow.configuration import conf # Import conf + +# Import and apply Thrift exceptions patch +try: + # Always apply the patch, regardless of environment + from thrift_exceptions_patch import patch_thrift_exceptions + patch_thrift_exceptions() + logging.info("Applied Thrift exceptions patch for Airflow compatibility") + + # Verify the patch was applied correctly + try: + from pangramia.yt.exceptions.ttypes import PBServiceException + test_exception = PBServiceException(message="Test") + # Try to modify attributes to verify patch works + test_exception.args = ("Test",) + test_exception.message = "Modified test" + logging.info("Verified Thrift exception patch is working correctly") + except Exception as verify_error: + logging.error(f"Thrift exception patch verification failed: {verify_error}") + logging.error("This may cause 'immutable instance' errors during error handling") +except ImportError as e: + logging.warning(f"Could not import thrift_exceptions_patch: {e}") + logging.warning("Airflow compatibility will be affected - expect 'immutable instance' errors") +except Exception as e: + logging.error(f"Error applying Thrift exceptions patch: {e}") + +# Default arguments for the DAG +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 0, # Disable retries for all tasks in this DAG + 'retry_delay': timedelta(minutes=5), + # Removed 'queue': 'auth_queue' to use the default queue + # Optional: Further filter workers by tags if using CeleryExecutor + 'executor_config': {"CeleryExecutor": {"tags": ["auth_node"]}}, +} + +def get_redis_connection(redis_host=None, redis_port=None): + """Get a Redis connection using Airflow's Redis connection or manually specified host/port.""" + if redis_host and redis_port: + # Use manually specified host and port + return redis.Redis( + host=redis_host, + port=redis_port, + db=0, + decode_responses=True + ) + else: + # Use Airflow's Redis connection + redis_conn = BaseHook.get_connection("redis_default") + # Use the password from the connection if available, otherwise use 'airflow' as default + password = redis_conn.password or 'airflow' + return redis.Redis( + host=redis_conn.host, # 'redis' (service name in docker-compose) + port=redis_conn.port, # 6379 + password=password, + db=0, + decode_responses=True + ) + +def get_free_port(): + """Find and return a free port.""" + import socket + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('0.0.0.0', 0)) + return s.getsockname()[1] + +def is_port_free(p): + """Check if a port is free to use.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + try: + s.bind(('0.0.0.0', p)) + return True + except OSError: + return False + +def store_account_metadata(account_id, ip, port, proxy=None, health_port=None, container_id=None): + """Store account metadata in Redis.""" + redis_client = get_redis_connection() + try: + # Verify Redis connection + if not redis_client.ping(): + raise ConnectionError("Failed to connect to Redis") + + # Store main account metadata + mapping = { + "ip": ip, + "port": str(port), + "status": "running", + "start_time": str(time.time()) + } + if proxy: + mapping["proxy"] = proxy + if health_port: + mapping["health_port"] = str(health_port) + if container_id: + mapping["container_id"] = container_id + + # Use pipeline for atomic operations + with redis_client.pipeline() as pipe: + # Store main metadata + pipe.hset(f"ytdlp:{account_id}", mapping=mapping) + # Set expiration (1 week) + pipe.expire(f"ytdlp:{account_id}", 604800) + # Add to account list + pipe.sadd("ytdlp:accounts", account_id) + # Execute all commands + results = pipe.execute() + + # Verify all commands succeeded + if not all(results): + raise RuntimeError(f"Failed to store metadata for {account_id}. Pipeline results: {results}") + + # Verify the data was actually stored + stored_data = redis_client.hgetall(f"ytdlp:{account_id}") + if not stored_data: + raise RuntimeError(f"Failed to verify stored data for {account_id}") + + logging.info(f"Successfully stored account metadata for {account_id} in Redis: {stored_data}") + return True + except Exception as e: + logging.error(f"Failed to store account metadata for {account_id}: {e}", exc_info=True) + # Attempt cleanup if storage failed + try: + redis_client = get_redis_connection() # Ensure client is available + redis_client.delete(f"ytdlp:{account_id}") + redis_client.srem("ytdlp:accounts", account_id) + except Exception as cleanup_error: + logging.error(f"Failed to cleanup failed storage for {account_id}: {cleanup_error}") + raise + +# Removed get_account_metadata function as the service now handles Redis registration checks. + +def prepare_and_deploy_service(**context): + """Prepare deployment and deploy the Docker service.""" + # Retrieve account_id, proxy, clients, and other parameters from DAG run configuration (conf) + # Set default values for account_id, proxy, and redis_enabled + account_id = context['dag_run'].conf.get('account_id') or context['params'].get('account_id', 'account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09') + proxy = context['dag_run'].conf.get('proxy') or context['params'].get('proxy', 'socks5://sslocal-rust-1084:1084') + clients = context['dag_run'].conf.get('clients') or context['params'].get('clients', 'ios,android,mweb') + redis_enabled = context['dag_run'].conf.get('redis_enabled', False) # Default to False + host_param = context['dag_run'].conf.get('host') # Host parameter from config + port_param = context['dag_run'].conf.get('port') # Port parameter from config + docker_network = context['dag_run'].conf.get('docker_network') or context['params'].get('docker_network', 'airflow_prod_proxynet') + host_external_ip_env = os.getenv('HOST_EXTERNAL_IP') # Explicit external IP from environment + + if not account_id: + raise ValueError("Account ID is missing.") + + # --- Port Determination --- + # Assign a free port if not provided, or validate the provided one + if not port_param: + port = get_free_port() + if not is_port_free(port): + raise ValueError(f"Assigned port {port} is already in use") + logging.info(f"No port provided, assigned free port: {port}") + else: + port = int(port_param) + if not is_port_free(port): + raise ValueError(f"Provided port {port} is already in use") + logging.info(f"Using provided port: {port}") + + # Determine health port + health_port = port + 1 + if not is_port_free(health_port): + raise ValueError(f"Health port {health_port} (derived from port {port}) is already in use") + logging.info(f"Using health port: {health_port}") + + + # --- Host Determination --- + # host_for_registration: IP/Host for client discovery (Redis/Logs) + # host_for_sensor: Hostname/IP for Airflow HttpSensor health check + + host_for_registration = host_param # Start with the parameter value + + if redis_enabled: + # If Redis is enabled, registration host should ideally be externally reachable + if not host_for_registration: + host_for_registration = host_external_ip_env # Use external IP from env var if available + if not host_for_registration: + # If no env var, try fetching external IP using requests + try: + logging.info("HOST_EXTERNAL_IP not set. Attempting to fetch external IP from api.ipify.org...") + response = requests.get('https://api.ipify.org', timeout=10) # 10 second timeout + response.raise_for_status() # Raise exception for bad status codes + host_for_registration = response.text.strip() + if not host_for_registration: # Check if response was empty + raise ValueError("Received empty response from api.ipify.org") + logging.info(f"Successfully fetched external IP: {host_for_registration}") + except requests.exceptions.RequestException as e: + logging.warning(f"Failed to fetch external IP: {e}. Falling back to Docker bridge IP.") + # Fallback to default Docker bridge IP if fetching fails + host_for_registration = "172.17.0.1" + logging.warning(f"Defaulting registration host to Docker bridge IP: {host_for_registration}. Ensure clients can reach this IP.") + except Exception as e: + logging.error(f"Unexpected error fetching external IP: {e}. Falling back to Docker bridge IP.") + host_for_registration = "172.17.0.1" + logging.warning(f"Defaulting registration host to Docker bridge IP: {host_for_registration}. Ensure clients can reach this IP.") + else: + logging.info(f"Redis enabled. Using HOST_EXTERNAL_IP environment variable for registration: {host_for_registration}") + else: + logging.info(f"Redis enabled. Using provided host parameter for registration: {host_for_registration}") + else: # Redis disabled + # If Redis is disabled, registration host defaults to 0.0.0.0 if not provided + if not host_for_registration: + host_for_registration = "0.0.0.0" + logging.warning(f"Redis disabled and no host param provided. Defaulting registration host to {host_for_registration}.") + else: + logging.info(f"Redis disabled. Using provided host parameter for registration: {host_for_registration}") + + # host_for_sensor determination will happen *after* container creation, using container name. + + logging.info(f"Preparing deployment for account {account_id}. Registration Host: {host_for_registration}, Port: {port}, Health Port: {health_port}") + + # Generate unique work ID and context directory + work_id = str(uuid.uuid4()) + context['task_instance'].xcom_push(key='work_id', value=work_id) + + context_dir = os.path.join(os.getenv('AIRFLOW_HOME', '/tmp'), 'service-data', work_id, 'context-data') + os.makedirs(context_dir, exist_ok=True, mode=0o777) + os.chmod(context_dir, 0o777) + + # Push context directory and account details to XCom + context['task_instance'].xcom_push(key='context_dir', value=context_dir) + context['task_instance'].xcom_push(key='account_id', value=account_id) + + # Deploy the Docker service + # The 'host_for_registration' variable here represents the externally accessible IP for registration/XCom. + # The service inside the container will listen on 0.0.0.0. + logging.info(f"Deploying service for account {account_id}. Registration Host: {host_for_registration}, Port: {port}") + + # Get Redis connection details ONLY if redis_enabled (for the container to register itself) + redis_host_for_container = '' + redis_port_for_container = '' + if redis_enabled: + try: + # Get connection details to pass to the container environment + redis_conn_details = get_redis_connection().connection_pool.connection_kwargs + redis_host_for_container = os.getenv('REDIS_HOST', redis_conn_details.get('host', 'redis')) + redis_port_for_container = str(os.getenv('REDIS_PORT', redis_conn_details.get('port', 6379))) + logging.info(f"Redis enabled. Passing REDIS_HOST={redis_host_for_container}, REDIS_PORT={redis_port_for_container} to container.") + except Exception as e: + logging.error(f"Failed to get Redis connection details for container environment: {e}") + logging.warning("Proceeding without Redis details in container environment due to error.") + # Depending on container requirements, you might want to raise an error here instead + else: + logging.info("Redis disabled. Not passing REDIS_HOST/REDIS_PORT to container environment.") + + + # Get Docker connection details from Airflow + try: + secrets_backend = conf.get('secrets', 'backend', fallback='None') + logging.info(f"Attempting to get 'docker_hub' connection. Configured secrets backend: {secrets_backend}") + docker_conn = BaseHook.get_connection("docker_hub") + docker_username = docker_conn.login + docker_password = docker_conn.password + logging.info("Successfully retrieved 'docker_hub' connection.") + except Exception as e: + logging.error(f"Failed to retrieve 'docker_hub' connection: {e}") + # Log details about potential secrets backend issues + secrets_backend_kwargs = conf.get('secrets', 'backend_kwargs', fallback='{}') + logging.error(f"Secrets backend details: backend={secrets_backend}, kwargs={secrets_backend_kwargs}") + # Re-raise the exception to fail the task + raise + + try: + # Initialize Docker client to connect to docker-socket-proxy + client = docker.DockerClient(base_url='tcp://docker-socket-proxy:2375') + + # Authenticate with Docker Hub + client.login( + username=docker_username, + password=docker_password, + registry=docker_conn.host # Typically "https://index.docker.io/v1/" + ) + + # Generate a unique container name + container_name = f"ytdlp_service_{account_id}_{uuid.uuid4().hex[:8]}" + + # Pull the Docker image (if not already present) + client.images.pull('pangramia/ytdlp-ops-server:latest') + + # Use the configured network name (from params or default) + network_name = docker_network # Use the retrieved parameter + logging.info(f"Attempting to run container on network: {network_name}") + + # Determine if --probe flag should be added based on DAG param + exit_on_proxy_fail = context['dag_run'].conf.get('exit_on_proxy_fail', True) # Default to True if not set + command_args = [ + '--script-dir', '/app/scripts', + '--context-dir', '/app/context-data', # Use the bind mount target inside container + '--port', str(port), + '--health-port', str(health_port), + '--clients', clients, + '--timeout', '120', + '--proxy', proxy if proxy else '' + ] + if exit_on_proxy_fail: + command_args.append('--probe') + logging.info("Adding --probe flag to container command as exit_on_proxy_fail=True") + else: + logging.info("Not adding --probe flag to container command as exit_on_proxy_fail=False") + + # Run the Docker container with health port + container = client.containers.run( + image='pangramia/ytdlp-ops-server:latest', + command=command_args, # Use the constructed command list + environment={ + 'PYTHONUNBUFFERED': '1', # Ensure logs are not buffered + 'SERVER_PORT': str(port), # Port the service listens on *inside* the container + 'SERVER_HOST': '0.0.0.0', # Service should listen on all interfaces *inside* the container + 'ACCOUNT_ID': account_id, + # Pass Redis details *if enabled* for the service to register itself + 'REDIS_HOST': redis_host_for_container, + 'REDIS_PORT': redis_port_for_container, + # Pass PROXY_URL for health check access + 'PROXY_URL': proxy if proxy else '', + }, + ports={ + f"{port}/tcp": port, + f"{health_port}/tcp": health_port + }, + volumes={ + context_dir: {'bind': '/app/context-data', 'mode': 'rw'} + }, + network_mode=network_name, # Use the specified network variable + auto_remove=False, # Do not auto-remove the container + name=container_name, # Use a unique name + detach=True, + tty=True, + shm_size='256m', + # Updated healthcheck to test external connectivity via proxy + healthcheck={ + # Use CMD-SHELL to allow conditional logic based on PROXY_URL env var + 'test': [ + 'CMD-SHELL', + # Script checks if PROXY_URL is set, uses it with curl if yes, otherwise curls directly. + # -f: Fail silently (exit non-zero on error) + # --connect-timeout 10: Timeout for connection phase + # > /dev/null: Discard output, we only care about exit code + 'if [ -n "$PROXY_URL" ]; then ' + 'curl -f --connect-timeout 10 -x "$PROXY_URL" https://ifconfig.co > /dev/null; ' + 'else ' + 'curl -f --connect-timeout 10 https://ifconfig.co > /dev/null; ' + 'fi' + ], + 'interval': 30 * 1000000000, # Check every 30 seconds (30 * 1e9 nanoseconds) + 'timeout': 15 * 1000000000, # Timeout after 15 seconds (15 * 1e9 nanoseconds) + 'retries': 5, # Retry 5 times on failure + 'start_period': 15 * 1000000000 # Grace period of 15 seconds after start + }, + # Add labels for better identification + labels={ + 'service': 'ytdlp', + 'account_id': account_id + } + ) + + # Wait for container to be running (skip health check verification) + start_time = time.time() + while True: + container.reload() + if container.status == 'running': + break + if time.time() - start_time > 10: # 10 second timeout + raise TimeoutError("Container failed to start within 10 seconds") + time.sleep(1) + + logging.info(f"Container started: {container.id} (health check verification skipped)") + # Push container details immediately after creation using simplified keys + context['task_instance'].xcom_push(key='container_id', value=container.id) + context['task_instance'].xcom_push(key='container_name', value=container_name) + logging.info(f"Pushed container_id={container.id} and container_name={container_name} to XCom.") + + # --- Determine Host for Sensor --- + # Get the container's IP address on the specified network for the HttpSensor + try: + container.reload() # Refresh container attributes + network_settings = container.attrs.get('NetworkSettings', {}).get('Networks', {}) + if network_name in network_settings: + host_for_sensor = network_settings[network_name].get('IPAddress') + if not host_for_sensor: + raise ValueError(f"Container {container.id} has no IPAddress on network '{network_name}'") + logging.info(f"Using container IP '{host_for_sensor}' on network '{network_name}' for HttpSensor.") + else: + # Fallback or error if container not on expected network + logging.error(f"Container {container.id} is not attached to the expected network '{network_name}'. Network settings: {network_settings}") + # Option 1: Fallback to container name (might fail as observed) + # host_for_sensor = container_name + # logging.warning(f"Falling back to container name '{host_for_sensor}' for sensor.") + # Option 2: Raise error + raise ValueError(f"Container {container.id} not found on network '{network_name}'. Cannot determine IP for sensor.") + + except Exception as e: + logging.error(f"Failed to get container IP address: {e}", exc_info=True) + raise AirflowException(f"Failed to determine IP address for HttpSensor: {e}") + + # Ensure we don't use 0.0.0.0 or empty string for the sensor + if not host_for_sensor or host_for_sensor == "0.0.0.0": + raise ValueError(f"Determined host_for_sensor is invalid ('{host_for_sensor}'). Check container network attachment and IP assignment.") + + # --- Add extra logging before pushing --- + logging.info(f"FINAL CHECK before XCom push:") + logging.info(f" Account ID: {account_id}") + logging.info(f" Host for Sensor (IP Address): {host_for_sensor}") + logging.info(f" Host for Registration: {host_for_registration}") + logging.info(f" Service Port: {port}") + logging.info(f" Health Port: {health_port}") + logging.info(f" Pushing to XCom key: service_host with value: {host_for_sensor}") + # --- End extra logging --- + + # Push distinct service connection details using simplified keys + context['task_instance'].xcom_push(key='service_host_registration', value=host_for_registration) # For client discovery (e.g., Redis) + context['task_instance'].xcom_push(key='service_host', value=host_for_sensor) # IP Address for HttpSensor + context['task_instance'].xcom_push(key='service_port', value=port) # Port is the same + context['task_instance'].xcom_push(key='service_health_port', value=health_port) # Health port is the same + logging.info(f"Pushed host_for_sensor (IP Address)={host_for_sensor} to XCom key 'service_host'") + logging.info(f"Pushed host_for_registration={host_for_registration} to XCom key 'service_host_registration'") + + + # Store account metadata in Redis only if redis_enabled is True + # This uses the 'host_for_registration' for client discovery + if redis_enabled: + store_account_metadata(account_id, host_for_registration, port, proxy, health_port, container.id) + + # If we reach here, deployment is considered successful for now + logging.info("Deployment preparation successful.") + # Return values are implicitly pushed to XCom (but we pushed explicitly above) + return context_dir, host_for_registration, port + + except Exception as e: + logging.error(f"Error during service deployment: {e}", exc_info=True) + # Attempt to cleanup the container if it was created before the error + try: + if 'container' in locals() and container and container.id: + logging.warning(f"Attempting to stop and remove container {container.id} due to deployment error.") + container.stop(timeout=5) + container.remove(force=True) + logging.info(f"Successfully stopped and removed container {container.id} after error.") + elif 'container_name' in locals() and container_name: + # Try finding by name if ID wasn't captured + containers = client.containers.list(filters={'name': container_name}) + if containers: + logging.warning(f"Attempting to stop and remove container {containers[0].name} by name due to deployment error.") + containers[0].stop(timeout=5) + containers[0].remove(force=True) + logging.info(f"Successfully stopped and removed container {containers[0].name} after error.") + except Exception as cleanup_err: + logging.error(f"Failed during post-error container cleanup: {cleanup_err}") + raise # Re-raise the original exception to fail the task + +# Removed the old monitor_health PythonOperator + +# stop_service and cleanup_service are now defined directly in the DAG below. + +def check_service_health(ti=None, **context): + """ + Periodically checks the service's /health endpoint using requests. + Acts as a long-running sentinel task. Fails if the health check fails + repeatedly or times out. + """ + # Get parameters from XCom + host_reg = ti.xcom_pull(task_ids='prepare_and_deploy', key='service_host_registration') + host_svc = ti.xcom_pull(task_ids='prepare_and_deploy', key='service_host') + health_port = ti.xcom_pull(task_ids='prepare_and_deploy', key='service_health_port') + + # Determine the host to use (prioritize registration host) + host = host_reg if host_reg and host_reg != '0.0.0.0' else host_svc + if not host or not health_port: + raise AirflowException("Could not retrieve host or health_port from XCom for health check.") + + health_url = f"http://{host}:{health_port}/health" + logging.info(f"Starting health check for: {health_url}") + + # Get configuration for polling + # Use task's execution_timeout if available, otherwise default to 1 year + task_timeout = ti.task.execution_timeout or timedelta(days=365) + poke_interval = 60 # Check every 60 seconds (adjust as needed) + start_time = time.monotonic() + timeout_seconds = task_timeout.total_seconds() + consecutive_error_start_time = None # Track start time of consecutive connection errors + error_retry_window = 10 # Seconds to retry connection errors before failing + + while True: + current_time = time.monotonic() + if current_time - start_time > timeout_seconds: + raise AirflowException(f"Health check timed out after {timeout_seconds} seconds for {health_url}") + + try: + # Use a reasonable timeout for the individual request + response = requests.get(health_url, timeout=15) # 15 second request timeout + response.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx) + + # Check response content if needed (optional) + # Example: Check for specific JSON content + # try: + # data = response.json() + # if data.get("status") == "healthy": + # logging.info(f"Health check successful: Status {response.status_code}") + # else: + # logging.warning(f"Health check OK (Status {response.status_code}), but content unexpected: {data}") + # except requests.exceptions.JSONDecodeError: + # logging.warning(f"Health check OK (Status {response.status_code}), but response is not valid JSON.") + + # If we got a 2xx status, log success and reset error timer if needed + if consecutive_error_start_time is not None: + logging.info(f"Connection to {health_url} recovered.") + consecutive_error_start_time = None + logging.info(f"Health check successful: Status {response.status_code} for {health_url}") + + except requests.exceptions.Timeout: + current_monotonic_time = time.monotonic() + if consecutive_error_start_time is None: + consecutive_error_start_time = current_monotonic_time + logging.warning(f"Health check request timed out for {health_url}. Starting {error_retry_window}s retry window...") + else: + elapsed_error_time = current_monotonic_time - consecutive_error_start_time + if elapsed_error_time > error_retry_window: + error_msg = f"Health check failed for {health_url}: Timeout persisted for over {error_retry_window} seconds." + logging.error(error_msg) + raise AirflowException(error_msg) + else: + logging.warning(f"Health check request timed out for {health_url}. Retrying within {error_retry_window}s window ({elapsed_error_time:.1f}s elapsed)...") + + except requests.exceptions.ConnectionError as e: + # Check if the error is specifically "Connection refused" - fail immediately + if "[Errno 111] Connection refused" in str(e): + logging.error(f"Health check failed for {health_url}: Connection refused. Failing task immediately.") + raise AirflowException(f"Health check failed for {health_url}: Connection refused") + else: + # Handle other connection errors with the retry window + current_monotonic_time = time.monotonic() + if consecutive_error_start_time is None: + consecutive_error_start_time = current_monotonic_time + logging.warning(f"Health check connection error for {health_url}: {e}. Starting {error_retry_window}s retry window...") + else: + elapsed_error_time = current_monotonic_time - consecutive_error_start_time + if elapsed_error_time > error_retry_window: + error_msg = f"Health check failed for {health_url}: Connection error persisted for over {error_retry_window} seconds. Last error: {e}" + logging.error(error_msg) + raise AirflowException(error_msg) + else: + logging.warning(f"Health check connection error for {health_url}: {e}. Retrying within {error_retry_window}s window ({elapsed_error_time:.1f}s elapsed)...") + + except requests.exceptions.HTTPError as e: + # This catches 4xx/5xx errors - fail immediately + logging.error(f"Health check failed for {health_url}: Status {e.response.status_code}. Failing task.") + # Fail the task immediately on HTTP error + raise AirflowException(f"Health check failed for {health_url}: Status {e.response.status_code}") + except requests.exceptions.RequestException as e: + logging.error(f"Health check failed for {health_url} with unexpected error: {e}. Failing task.") + # Fail the task immediately on other request errors + raise AirflowException(f"Health check failed for {health_url}: {e}") + except Exception as e: + # Catch any other unexpected errors during the check + logging.error(f"Unexpected error during health check for {health_url}: {e}", exc_info=True) + raise AirflowException(f"Unexpected error during health check: {e}") + + # Wait for the poke interval before the next check + time.sleep(poke_interval) + + +def _wait_forever(): + """Sleeps indefinitely (or until task timeout) to simulate a running service.""" + logging.info("Sentinel task started. Sleeping in a loop...") + # Sleep in a loop with a reasonable interval to avoid OverflowError + # The task will keep running until it times out based on execution_timeout + # or is manually stopped/failed. + while True: + try: + # Sleep for a long interval (e.g., 1 day) + # You can adjust this interval if needed. + time.sleep(86400) # Sleep for 24 hours + except KeyboardInterrupt: + logging.info("Sentinel task interrupted. Exiting.") + break + except Exception as e: + # Log other potential errors during sleep, though unlikely + logging.error(f"Error during sentinel sleep loop: {e}") + # Optionally break or continue based on error handling strategy + break # Exit loop on unexpected error + +def stop_service(**context): + """Stop the running Docker container with verification.""" + # Retrieve account_id from params or kwargs + account_id = context.get('params', {}).get('account_id') or context.get('account_id') + if not account_id: + raise ValueError("Account ID is missing.") + + # Initialize Docker client to connect to docker-socket-proxy + client = docker.DockerClient(base_url='tcp://docker-socket-proxy:2375') + + try: + # For testing, try to get container ID from environment if XCom is not available + container_id = None + if 'ti' in context: + # Use simplified XCom key + container_id = context['ti'].xcom_pull(task_ids='prepare_and_deploy', key='container_id') + + if not container_id: + # If not found in XCom, try to find container by account_id pattern (keep this fallback) + containers = client.containers.list(filters={"name": f"ytdlp_service_{account_id}"}) + if containers: + container = containers[0] + container_id = container.id + logging.info(f"Found container by name pattern: {container.name} (ID: {container_id})") + else: + logging.warning(f"No container found for account {account_id} - nothing to stop") + return + + if container_id: + # If found in XCom, stop by container ID + container = client.containers.get(container_id) + + # Verify container is running before stopping + if container.status != 'running': + logging.warning(f"Container {container_id} is not running (status: {container.status})") + return + + logging.info(f"Stopping container {container_id}...") + container.stop(timeout=10) # 10 second timeout + + # Verify container is stopped + container.reload() + if container.status == 'exited': + logging.info(f"Successfully stopped container {container_id}") + else: + logging.error(f"Container {container_id} failed to stop (status: {container.status})") + raise RuntimeError(f"Container {container_id} failed to stop") + + # Clear Redis entries only if redis_enabled is True + # Retrieve redis_enabled status from DAG run conf or params + redis_enabled = context['dag_run'].conf.get('redis_enabled', False) or context['params'].get('redis_enabled', False) + if redis_enabled: + redis_client = get_redis_connection() + try: + # Verify Redis connection + if not redis_client.ping(): + raise ConnectionError("Failed to connect to Redis") + + # Remove main metadata + redis_client.delete(f"ytdlp:{account_id}") + # Remove from accounts set + redis_client.srem("ytdlp:accounts", account_id) + logging.info(f"Successfully cleared Redis entries for account: {account_id}") + except Exception as e: + logging.error(f"Failed to clear Redis entries for account {account_id}: {e}") + # Do not raise here, allow container stop to be considered successful + # raise # Optional: re-raise if Redis cleanup failure should fail the task + + return + + logging.warning(f"No container found for account {account_id} - nothing to stop") + + except docker.errors.NotFound as e: + logging.warning(f"Container for account {account_id} not found: {e}") + except Exception as e: + logging.error(f"Failed to stop container: {e}") + raise + + +def cleanup_service(**context): + """Cleanup service resources including Redis entries and XCom data.""" + # Note: This function is now called within the manual_stop_cleanup TaskGroup + try: + # Retrieve account_id from params first, then from XCom + account_id = context['params'].get('account_id') + if not account_id: + # Try to get it from XCom + account_id = context['task_instance'].xcom_pull(task_ids='prepare_and_deploy', key='account_id') + if not account_id: + logging.warning("Account ID not found in params or XCom - skipping resource cleanup") + return + + # Redis cleanup (if redis_enabled=True) is handled in the 'stop_service' task. + logging.info(f"Redis cleanup for account {account_id} is handled by the 'stop_service' task if enabled.") + + # Cleanup XCom data (using simplified keys where applicable) + # Note: XCom cleanup is generally not strictly necessary but can be good practice. + # Airflow manages XCom expiry. This code doesn't actually *delete* XComs. + # To truly delete, you'd use the Airflow API or DB directly. + # We'll leave the pull calls here as they don't harm anything. + ti = context['task_instance'] + ti.xcom_pull(key='container_id', task_ids='prepare_and_deploy', include_prior_dates=True) + ti.xcom_pull(key='container_name', task_ids='prepare_and_deploy', include_prior_dates=True) + ti.xcom_pull(key='service_host_registration', task_ids='prepare_and_deploy', include_prior_dates=True) + ti.xcom_pull(key='service_host', task_ids='prepare_and_deploy', include_prior_dates=True) + ti.xcom_pull(key='service_port', task_ids='prepare_and_deploy', include_prior_dates=True) + ti.xcom_pull(key='service_health_port', task_ids='prepare_and_deploy', include_prior_dates=True) + ti.xcom_pull(key='work_id', task_ids='prepare_and_deploy', include_prior_dates=True) + ti.xcom_pull(key='context_dir', task_ids='prepare_and_deploy', include_prior_dates=True) + ti.xcom_pull(key='account_id', task_ids='prepare_and_deploy', include_prior_dates=True) # Keep account_id pull + logging.info(f"Pulled XCom data for potential cleanup logging for account: {account_id}") + + # Initialize Docker client + client = docker.DockerClient(base_url='tcp://docker-socket-proxy:2375') + container_found_and_removed = False + + # Attempt 1: Get container ID from XCom using simplified key + container_id_xcom = context['task_instance'].xcom_pull(task_ids='prepare_and_deploy', key='container_id') + if container_id_xcom: + logging.info(f"Attempting to remove container using XCom ID: {container_id_xcom}") + try: + container = client.containers.get(container_id_xcom) + logging.info(f"Found container {container.id} (Name: {container.name}). Removing...") + container.remove(force=True) + logging.info(f"Successfully removed container {container.id}") + container_found_and_removed = True + except docker.errors.NotFound: + logging.warning(f"Container with XCom ID {container_id_xcom} not found. Trying other methods.") + except Exception as e: + logging.error(f"Error removing container {container_id_xcom}: {e}") + + # Attempt 2: Find container by labels if not found/removed via XCom ID + if not container_found_and_removed: + logging.info(f"Attempting to find and remove container by labels: service=ytdlp, account_id={account_id}") + try: + containers = client.containers.list( + filters={'label': [f'service=ytdlp', f'account_id={account_id}']}, + all=True # Include stopped containers + ) + if containers: + for container in containers: + logging.info(f"Found container {container.id} (Name: {container.name}) by labels. Removing...") + try: + container.remove(force=True) + logging.info(f"Successfully removed container {container.id}") + container_found_and_removed = True # Mark as found even if only one is removed + except Exception as e: + logging.error(f"Error removing container {container.id} found by labels: {e}") + else: + logging.info("No containers found matching labels.") + except Exception as e: + logging.error(f"Error searching for containers by labels: {e}") + + # Attempt 3: Find container by name pattern if still not found/removed + if not container_found_and_removed: + container_name_pattern = f"ytdlp_service_{account_id}_*" + logging.info(f"Attempting to find and remove container by name pattern: {container_name_pattern}") + try: + containers = client.containers.list(filters={'name': container_name_pattern}, all=True) + if containers: + for container in containers: + logging.info(f"Found container {container.id} (Name: {container.name}) by name pattern. Removing...") + try: + container.remove(force=True) + logging.info(f"Successfully removed container {container.id}") + container_found_and_removed = True + except Exception as e: + logging.error(f"Error removing container {container.id} found by name: {e}") + else: + logging.info("No containers found matching name pattern.") + except Exception as e: + logging.error(f"Error searching for containers by name: {e}") + + if not container_found_and_removed: + logging.warning(f"Could not find or remove any container for account {account_id} using ID, labels, or name.") + + # Get context directory from XCom and remove it + context_dir = context['task_instance'].xcom_pull(task_ids='prepare_and_deploy', key='context_dir') + if context_dir and os.path.exists(context_dir): + shutil.rmtree(context_dir) + logging.info(f"Cleaned up working directory: {context_dir}") + except Exception as e: + logging.error(f"Error during cleanup: {e}") + raise + +# Define the DAG +with DAG( + 'ytdlp_service', + default_args=default_args, + description='Deploy YTDLP token service for ios, android, mweb', + schedule_interval=None, + start_date=days_ago(1), # Use dynamic start date for manually triggered DAG + catchup=False, + tags=['youtube', 'tokens', 'service', 'docker'], + # executor_config moved to default_args + is_paused_upon_creation=False, + params={ + 'account_id': Param( + 'account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09', + type="string", + description="Required: The account ID for which the service is being deployed." + ), + 'proxy': Param( + 'socks5://sslocal-rust-1084:1084', + type=["null", "string"], + description="Optional: The SOCKS5 proxy URL to use for the service (e.g., socks5://host:port)." + ), + 'clients': Param( + 'ios,android,mweb', + type="string", + description="Comma-separated list of client types (e.g., ios,android,mweb)." + ), + 'redis_enabled': Param( + False, + type="boolean", + description="Use Redis for service discovery? If False, host/port must be provided or will be auto-assigned." + ), + 'host': Param( + None, + type=["null", "string"], + description="Optional: Host IP for the service. If redis_enabled=False and host is not provided, defaults to '0.0.0.0'. If redis_enabled=True and host is not provided, uses HOST_EXTERNAL_IP or defaults to '0.0.0.0'." + ), + 'port': Param( + None, + type=["null", "integer"], + description="Optional: Port for the service. If None, a free port will be assigned automatically. If redis_enabled=False and a port is provided, it will be used (after checking availability)." + ), + # redis_host and redis_port parameters are removed. + # If redis_enabled=True, the DAG will use the 'redis_default' Airflow connection. + 'docker_network': Param( + 'airflow_prod_proxynet', + type="string", + description="Optional: The Docker network to attach the container to. Defaults to 'airflow_prod_proxynet'." + ), + 'exit_on_proxy_fail': Param( + True, + type="boolean", + description="Exit the service container immediately if the initial proxy test fails?" + ), + } +) as dag: + + # Task to prepare and deploy the service + prepare_and_deploy = PythonOperator( + task_id='prepare_and_deploy', + python_callable=prepare_and_deploy_service, + provide_context=True, + trigger_rule='all_success' # Keep default trigger rule for prepare_and_deploy + ) + + # Combined Health Check and Sentinel Task using PythonOperator + # This task runs for a long time, checking health periodically using the 'requests' library. + # If the health check fails repeatedly or times out, the task fails, triggering 'stop_service'. + monitor_service_health = PythonOperator( + task_id='monitor_service_health', + python_callable=check_service_health, + provide_context=True, + # Set execution timeout for the task itself (acts as the overall timeout) + execution_timeout=timedelta(days=365), # Long timeout (e.g., 1 year) + # op_kwargs can pass static config, but host/port come from XCom inside the function + # poke_interval and request timeout are handled within check_service_health + ) + monitor_service_health.doc_md = """ + ### Monitor Service Health Task (PythonOperator) + Uses a Python function to periodically check the service's `/health` endpoint using the `requests` library. + Acts as both a health check and a sentinel for the running service. + - **Pulls from XCom:** Reads `service_host_registration`, `service_host`, and `service_health_port` from the `prepare_and_deploy` task to construct the target URL. + - **Polling:** Checks the `/health` endpoint every 60 seconds. + - **Timeout:** Uses the task's `execution_timeout` (set to 1 year) as the overall maximum duration. Individual requests have a 15-second timeout. + - **Failure:** If a health check request returns a 4xx/5xx status code or encounters other request errors, the task fails immediately. If the overall `execution_timeout` is reached without a failure, the task would eventually time out and fail. + """ + + # Task to stop the service (runs if monitor_service_health fails) + stop = PythonOperator( + task_id='stop_service', + python_callable=stop_service, + provide_context=True, + trigger_rule=TriggerRule.ONE_FAILED # Run only if monitor_service_health fails + ) + stop.doc_md = """ + ### Stop Service Task + Stops the Docker container associated with the service. + - **Trigger Rule:** `one_failed` - This task only runs if the upstream `monitor_service_health` task fails. + - Pulls container ID/name from XCom or finds it using labels/name patterns. + - Clears Redis entries if `redis_enabled=True`. + """ + + # Marker task to indicate that the deployment failed + prepare_failed_marker = EmptyOperator( + task_id='prepare_failed_marker', + trigger_rule=TriggerRule.ONE_FAILED # Run only if 'prepare_and_deploy' fails + ) + + # Task to cleanup resources (runs after stop sequence OR if prepare fails) + cleanup = PythonOperator( + task_id='cleanup_service', + python_callable=cleanup_service, + provide_context=True, + trigger_rule=TriggerRule.ALL_DONE # Run after upstream (stop or prepare_failed_marker) is done + ) + cleanup.doc_md = """ + ### Cleanup Service Task + Removes the Docker container and cleans up related resources. + - **Trigger Rule:** `all_done` - Runs after the `stop_service` task finishes, whether it succeeded or failed. + - Removes the container using ID from XCom, labels, or name patterns. + - Cleans up XCom variables. + - Removes the context directory. + """ + + # Define task dependencies + # Success Path: prepare -> monitor (runs indefinitely) + # Monitor Failure Path: monitor (fails) -> stop -> cleanup + # Prepare Failure Path: prepare (fails) -> prepare_failed_marker -> cleanup + + prepare_and_deploy >> monitor_service_health + prepare_and_deploy >> prepare_failed_marker # Trigger marker if prepare fails + + monitor_service_health >> stop # Trigger stop if monitor fails + + # Cleanup is triggered after stop finishes OR after prepare_failed_marker finishes + stop >> cleanup + prepare_failed_marker >> cleanup + diff --git a/docker-compose-ytdlp-ops.yaml b/docker-compose-ytdlp-ops.yaml index 12018fb..cc3f57e 100644 --- a/docker-compose-ytdlp-ops.yaml +++ b/docker-compose-ytdlp-ops.yaml @@ -9,7 +9,7 @@ services: volumes: - context-data:/app/context-data networks: - - airflow_worker_proxynet + - airflow_workers_prod_proxynet command: - "--script-dir" - "/app/scripts" @@ -21,6 +21,7 @@ services: - "ios,android,mweb" - "--proxy" - "socks5://sslocal-rust-1084:1084" + - "--probe" restart: unless-stopped pull_policy: always @@ -29,5 +30,5 @@ volumes: name: context-data networks: - airflow_worker_proxynet: + airflow_workers_prod_proxynet: external: true