Update on --probe and add experimental service dags

This commit is contained in:
aperez 2025-04-06 11:55:53 +03:00
parent 516eff4ac1
commit 1f092d6f80
4 changed files with 979 additions and 12 deletions

View File

@ -36,7 +36,7 @@ USER airflow
# Install Python dependencies and ensure ffprobe3 is installed correctly # Install Python dependencies and ensure ffprobe3 is installed correctly
RUN pip install --no-cache-dir \ RUN pip install --no-cache-dir \
"apache-airflow==${AIRFLOW_VERSION}" && \ "apache-airflow==${AIRFLOW_VERSION}" apache-airflow-providers-docker apache-airflow-providers-http && \
pip install --no-cache-dir -r /app/requirements.txt && \ pip install --no-cache-dir -r /app/requirements.txt && \
pip install --no-cache-dir ffprobe3 python-ffmpeg pip install --no-cache-dir ffprobe3 python-ffmpeg

View File

@ -396,17 +396,17 @@ class YtdlpOpsOperator(BaseOperator):
# preferably use the explicitly pushed 'ytdlp_command' key for clarity. # preferably use the explicitly pushed 'ytdlp_command' key for clarity.
return ytdlp_cmd # Return the original command return ytdlp_cmd # Return the original command
# Keep specific exception handling from original block if needed, but AirflowException is often sufficient except AirflowException as e: # Catch AirflowExceptions raised explicitly in the code above
except AirflowException as e: # Catch AirflowExceptions raised earlier
logger.error(f"Operation failed due to AirflowException: {e}") logger.error(f"Operation failed due to AirflowException: {e}")
raise # Re-raise AirflowExceptions raise # Re-raise AirflowExceptions to ensure task failure
except (TTransportException, PBServiceException) as e: # Catch specific Thrift/Service errors not already wrapped except (TTransportException, PBServiceException) as e: # Catch specific Thrift/Service errors not already wrapped
logger.error(f"Unhandled Thrift/Service error: {e}") logger.error(f"Unhandled Thrift/Service error: {e}", exc_info=True) # Add traceback for context
raise AirflowException(f"Unhandled YTDLP service error: {e}") raise AirflowException(f"Unhandled YTDLP service error: {e}") # Wrap in AirflowException
except Exception as e: # General catch-all except Exception as e: # General catch-all for truly unexpected errors
# Log with traceback for unexpected errors # Log with traceback for unexpected errors
logger.error(f"Unexpected error in YtdlpOpsOperator: {e}", exc_info=True) logger.error(f"Caught unexpected error in YtdlpOpsOperator: {e}", exc_info=True)
raise AirflowException(f"Unexpected error in YtdlpOpsOperator: {e}") # Ensure any unexpected error explicitly fails the task with AirflowException
raise AirflowException(f"Unexpected error caused task failure: {e}")
finally: finally:
if transport and transport.isOpen(): # Check if transport exists and is open before closing if transport and transport.isOpen(): # Check if transport exists and is open before closing
logger.info("Closing Thrift transport.") logger.info("Closing Thrift transport.")
@ -820,7 +820,7 @@ with DAG(
'redis_enabled': Param(False, type="boolean", description="Use Redis for service discovery? If False, uses service_ip/port."), # Default to direct connection 'redis_enabled': Param(False, type="boolean", description="Use Redis for service discovery? If False, uses service_ip/port."), # Default to direct connection
'service_ip': Param('85.192.30.55', type="string", description="Service IP if redis_enabled=False."), # Default service IP 'service_ip': Param('85.192.30.55', type="string", description="Service IP if redis_enabled=False."), # Default service IP
'service_port': Param(9090, type="integer", description="Service port if redis_enabled=False."), # Default service port 'service_port': Param(9090, type="integer", description="Service port if redis_enabled=False."), # Default service port
'account_id': Param('accoutns_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09', type="string", description="Account ID for Redis lookup or direct call."), # Updated default account_id 'account_id': Param('account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09', type="string", description="Account ID for Redis lookup or direct call."), # Updated default account_id
'timeout': Param(DEFAULT_TIMEOUT, type="integer", description="Timeout in seconds for the Thrift connection."), 'timeout': Param(DEFAULT_TIMEOUT, type="integer", description="Timeout in seconds for the Thrift connection."),
# Use Airflow Variable for downloads directory, matching reference DAG structure # Use Airflow Variable for downloads directory, matching reference DAG structure
'info_json_dir': Param("{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}", type="string", description="Directory to save info.json. Uses Airflow Variable 'DOWNLOADS_TEMP' or default.") 'info_json_dir': Param("{{ var.value.get('DOWNLOADS_TEMP', '/opt/airflow/downloadfiles') }}", type="string", description="Directory to save info.json. Uses Airflow Variable 'DOWNLOADS_TEMP' or default.")

966
dags/ytdlp_service_dag.py Normal file
View File

@ -0,0 +1,966 @@
"""
DAG to deploy and manage YTDLP token service.
This DAG handles the deployment, monitoring, and cleanup of a YTDLP token service
for a given account. It supports both Redis-based service discovery and direct
connection via manually specified host and port.
Configuration Options:
- account_id: (Required) The account ID for which the service is being deployed.
- proxy: (Optional) The proxy to use for the service.
- redis_enabled: (Optional, default=True) Whether to use Redis for service discovery.
If False, you must provide `host` and `port` manually.
- host: (Optional) The host IP of the service. Required if `redis_enabled=False`.
- port: (Optional) The port of the service. Required if `redis_enabled=False`.
Usage:
1. Redis-based service discovery:
- Set `redis_enabled=True` (default).
- Ensure Redis is configured in Airflow connections.
- The DAG will automatically discover the service IP and port from Redis.
2. Manual host and port:
- Set `redis_enabled=False`.
- Provide `host` and `port` manually in the DAG configuration.
- Example: {"host": "192.168.1.100", "port": 9090}.
Example Trigger Configuration:
{
"account_id": "test_account",
"proxy": "socks5://proxy.example.com:1080",
"redis_enabled": False,
"host": "192.168.1.100",
"port": 9090
}
"""
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
# HttpSensor is no longer used
# from airflow.providers.http.sensors.http import HttpSensor
from airflow.utils.trigger_rule import TriggerRule
from airflow.hooks.base import BaseHook
from airflow.exceptions import AirflowException
from typing import Sequence # Add Sequence for type hinting
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago # Add this import
import uuid
import os
import logging
import shutil
import docker
import uuid
import redis
import requests
import socket
import time
import sys # Import sys for maxsize
from airflow.configuration import conf # Import conf
# Import and apply Thrift exceptions patch
try:
# Always apply the patch, regardless of environment
from thrift_exceptions_patch import patch_thrift_exceptions
patch_thrift_exceptions()
logging.info("Applied Thrift exceptions patch for Airflow compatibility")
# Verify the patch was applied correctly
try:
from pangramia.yt.exceptions.ttypes import PBServiceException
test_exception = PBServiceException(message="Test")
# Try to modify attributes to verify patch works
test_exception.args = ("Test",)
test_exception.message = "Modified test"
logging.info("Verified Thrift exception patch is working correctly")
except Exception as verify_error:
logging.error(f"Thrift exception patch verification failed: {verify_error}")
logging.error("This may cause 'immutable instance' errors during error handling")
except ImportError as e:
logging.warning(f"Could not import thrift_exceptions_patch: {e}")
logging.warning("Airflow compatibility will be affected - expect 'immutable instance' errors")
except Exception as e:
logging.error(f"Error applying Thrift exceptions patch: {e}")
# Default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0, # Disable retries for all tasks in this DAG
'retry_delay': timedelta(minutes=5),
# Removed 'queue': 'auth_queue' to use the default queue
# Optional: Further filter workers by tags if using CeleryExecutor
'executor_config': {"CeleryExecutor": {"tags": ["auth_node"]}},
}
def get_redis_connection(redis_host=None, redis_port=None):
"""Get a Redis connection using Airflow's Redis connection or manually specified host/port."""
if redis_host and redis_port:
# Use manually specified host and port
return redis.Redis(
host=redis_host,
port=redis_port,
db=0,
decode_responses=True
)
else:
# Use Airflow's Redis connection
redis_conn = BaseHook.get_connection("redis_default")
# Use the password from the connection if available, otherwise use 'airflow' as default
password = redis_conn.password or 'airflow'
return redis.Redis(
host=redis_conn.host, # 'redis' (service name in docker-compose)
port=redis_conn.port, # 6379
password=password,
db=0,
decode_responses=True
)
def get_free_port():
"""Find and return a free port."""
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('0.0.0.0', 0))
return s.getsockname()[1]
def is_port_free(p):
"""Check if a port is free to use."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(('0.0.0.0', p))
return True
except OSError:
return False
def store_account_metadata(account_id, ip, port, proxy=None, health_port=None, container_id=None):
"""Store account metadata in Redis."""
redis_client = get_redis_connection()
try:
# Verify Redis connection
if not redis_client.ping():
raise ConnectionError("Failed to connect to Redis")
# Store main account metadata
mapping = {
"ip": ip,
"port": str(port),
"status": "running",
"start_time": str(time.time())
}
if proxy:
mapping["proxy"] = proxy
if health_port:
mapping["health_port"] = str(health_port)
if container_id:
mapping["container_id"] = container_id
# Use pipeline for atomic operations
with redis_client.pipeline() as pipe:
# Store main metadata
pipe.hset(f"ytdlp:{account_id}", mapping=mapping)
# Set expiration (1 week)
pipe.expire(f"ytdlp:{account_id}", 604800)
# Add to account list
pipe.sadd("ytdlp:accounts", account_id)
# Execute all commands
results = pipe.execute()
# Verify all commands succeeded
if not all(results):
raise RuntimeError(f"Failed to store metadata for {account_id}. Pipeline results: {results}")
# Verify the data was actually stored
stored_data = redis_client.hgetall(f"ytdlp:{account_id}")
if not stored_data:
raise RuntimeError(f"Failed to verify stored data for {account_id}")
logging.info(f"Successfully stored account metadata for {account_id} in Redis: {stored_data}")
return True
except Exception as e:
logging.error(f"Failed to store account metadata for {account_id}: {e}", exc_info=True)
# Attempt cleanup if storage failed
try:
redis_client = get_redis_connection() # Ensure client is available
redis_client.delete(f"ytdlp:{account_id}")
redis_client.srem("ytdlp:accounts", account_id)
except Exception as cleanup_error:
logging.error(f"Failed to cleanup failed storage for {account_id}: {cleanup_error}")
raise
# Removed get_account_metadata function as the service now handles Redis registration checks.
def prepare_and_deploy_service(**context):
"""Prepare deployment and deploy the Docker service."""
# Retrieve account_id, proxy, clients, and other parameters from DAG run configuration (conf)
# Set default values for account_id, proxy, and redis_enabled
account_id = context['dag_run'].conf.get('account_id') or context['params'].get('account_id', 'account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09')
proxy = context['dag_run'].conf.get('proxy') or context['params'].get('proxy', 'socks5://sslocal-rust-1084:1084')
clients = context['dag_run'].conf.get('clients') or context['params'].get('clients', 'ios,android,mweb')
redis_enabled = context['dag_run'].conf.get('redis_enabled', False) # Default to False
host_param = context['dag_run'].conf.get('host') # Host parameter from config
port_param = context['dag_run'].conf.get('port') # Port parameter from config
docker_network = context['dag_run'].conf.get('docker_network') or context['params'].get('docker_network', 'airflow_prod_proxynet')
host_external_ip_env = os.getenv('HOST_EXTERNAL_IP') # Explicit external IP from environment
if not account_id:
raise ValueError("Account ID is missing.")
# --- Port Determination ---
# Assign a free port if not provided, or validate the provided one
if not port_param:
port = get_free_port()
if not is_port_free(port):
raise ValueError(f"Assigned port {port} is already in use")
logging.info(f"No port provided, assigned free port: {port}")
else:
port = int(port_param)
if not is_port_free(port):
raise ValueError(f"Provided port {port} is already in use")
logging.info(f"Using provided port: {port}")
# Determine health port
health_port = port + 1
if not is_port_free(health_port):
raise ValueError(f"Health port {health_port} (derived from port {port}) is already in use")
logging.info(f"Using health port: {health_port}")
# --- Host Determination ---
# host_for_registration: IP/Host for client discovery (Redis/Logs)
# host_for_sensor: Hostname/IP for Airflow HttpSensor health check
host_for_registration = host_param # Start with the parameter value
if redis_enabled:
# If Redis is enabled, registration host should ideally be externally reachable
if not host_for_registration:
host_for_registration = host_external_ip_env # Use external IP from env var if available
if not host_for_registration:
# If no env var, try fetching external IP using requests
try:
logging.info("HOST_EXTERNAL_IP not set. Attempting to fetch external IP from api.ipify.org...")
response = requests.get('https://api.ipify.org', timeout=10) # 10 second timeout
response.raise_for_status() # Raise exception for bad status codes
host_for_registration = response.text.strip()
if not host_for_registration: # Check if response was empty
raise ValueError("Received empty response from api.ipify.org")
logging.info(f"Successfully fetched external IP: {host_for_registration}")
except requests.exceptions.RequestException as e:
logging.warning(f"Failed to fetch external IP: {e}. Falling back to Docker bridge IP.")
# Fallback to default Docker bridge IP if fetching fails
host_for_registration = "172.17.0.1"
logging.warning(f"Defaulting registration host to Docker bridge IP: {host_for_registration}. Ensure clients can reach this IP.")
except Exception as e:
logging.error(f"Unexpected error fetching external IP: {e}. Falling back to Docker bridge IP.")
host_for_registration = "172.17.0.1"
logging.warning(f"Defaulting registration host to Docker bridge IP: {host_for_registration}. Ensure clients can reach this IP.")
else:
logging.info(f"Redis enabled. Using HOST_EXTERNAL_IP environment variable for registration: {host_for_registration}")
else:
logging.info(f"Redis enabled. Using provided host parameter for registration: {host_for_registration}")
else: # Redis disabled
# If Redis is disabled, registration host defaults to 0.0.0.0 if not provided
if not host_for_registration:
host_for_registration = "0.0.0.0"
logging.warning(f"Redis disabled and no host param provided. Defaulting registration host to {host_for_registration}.")
else:
logging.info(f"Redis disabled. Using provided host parameter for registration: {host_for_registration}")
# host_for_sensor determination will happen *after* container creation, using container name.
logging.info(f"Preparing deployment for account {account_id}. Registration Host: {host_for_registration}, Port: {port}, Health Port: {health_port}")
# Generate unique work ID and context directory
work_id = str(uuid.uuid4())
context['task_instance'].xcom_push(key='work_id', value=work_id)
context_dir = os.path.join(os.getenv('AIRFLOW_HOME', '/tmp'), 'service-data', work_id, 'context-data')
os.makedirs(context_dir, exist_ok=True, mode=0o777)
os.chmod(context_dir, 0o777)
# Push context directory and account details to XCom
context['task_instance'].xcom_push(key='context_dir', value=context_dir)
context['task_instance'].xcom_push(key='account_id', value=account_id)
# Deploy the Docker service
# The 'host_for_registration' variable here represents the externally accessible IP for registration/XCom.
# The service inside the container will listen on 0.0.0.0.
logging.info(f"Deploying service for account {account_id}. Registration Host: {host_for_registration}, Port: {port}")
# Get Redis connection details ONLY if redis_enabled (for the container to register itself)
redis_host_for_container = ''
redis_port_for_container = ''
if redis_enabled:
try:
# Get connection details to pass to the container environment
redis_conn_details = get_redis_connection().connection_pool.connection_kwargs
redis_host_for_container = os.getenv('REDIS_HOST', redis_conn_details.get('host', 'redis'))
redis_port_for_container = str(os.getenv('REDIS_PORT', redis_conn_details.get('port', 6379)))
logging.info(f"Redis enabled. Passing REDIS_HOST={redis_host_for_container}, REDIS_PORT={redis_port_for_container} to container.")
except Exception as e:
logging.error(f"Failed to get Redis connection details for container environment: {e}")
logging.warning("Proceeding without Redis details in container environment due to error.")
# Depending on container requirements, you might want to raise an error here instead
else:
logging.info("Redis disabled. Not passing REDIS_HOST/REDIS_PORT to container environment.")
# Get Docker connection details from Airflow
try:
secrets_backend = conf.get('secrets', 'backend', fallback='None')
logging.info(f"Attempting to get 'docker_hub' connection. Configured secrets backend: {secrets_backend}")
docker_conn = BaseHook.get_connection("docker_hub")
docker_username = docker_conn.login
docker_password = docker_conn.password
logging.info("Successfully retrieved 'docker_hub' connection.")
except Exception as e:
logging.error(f"Failed to retrieve 'docker_hub' connection: {e}")
# Log details about potential secrets backend issues
secrets_backend_kwargs = conf.get('secrets', 'backend_kwargs', fallback='{}')
logging.error(f"Secrets backend details: backend={secrets_backend}, kwargs={secrets_backend_kwargs}")
# Re-raise the exception to fail the task
raise
try:
# Initialize Docker client to connect to docker-socket-proxy
client = docker.DockerClient(base_url='tcp://docker-socket-proxy:2375')
# Authenticate with Docker Hub
client.login(
username=docker_username,
password=docker_password,
registry=docker_conn.host # Typically "https://index.docker.io/v1/"
)
# Generate a unique container name
container_name = f"ytdlp_service_{account_id}_{uuid.uuid4().hex[:8]}"
# Pull the Docker image (if not already present)
client.images.pull('pangramia/ytdlp-ops-server:latest')
# Use the configured network name (from params or default)
network_name = docker_network # Use the retrieved parameter
logging.info(f"Attempting to run container on network: {network_name}")
# Determine if --probe flag should be added based on DAG param
exit_on_proxy_fail = context['dag_run'].conf.get('exit_on_proxy_fail', True) # Default to True if not set
command_args = [
'--script-dir', '/app/scripts',
'--context-dir', '/app/context-data', # Use the bind mount target inside container
'--port', str(port),
'--health-port', str(health_port),
'--clients', clients,
'--timeout', '120',
'--proxy', proxy if proxy else ''
]
if exit_on_proxy_fail:
command_args.append('--probe')
logging.info("Adding --probe flag to container command as exit_on_proxy_fail=True")
else:
logging.info("Not adding --probe flag to container command as exit_on_proxy_fail=False")
# Run the Docker container with health port
container = client.containers.run(
image='pangramia/ytdlp-ops-server:latest',
command=command_args, # Use the constructed command list
environment={
'PYTHONUNBUFFERED': '1', # Ensure logs are not buffered
'SERVER_PORT': str(port), # Port the service listens on *inside* the container
'SERVER_HOST': '0.0.0.0', # Service should listen on all interfaces *inside* the container
'ACCOUNT_ID': account_id,
# Pass Redis details *if enabled* for the service to register itself
'REDIS_HOST': redis_host_for_container,
'REDIS_PORT': redis_port_for_container,
# Pass PROXY_URL for health check access
'PROXY_URL': proxy if proxy else '',
},
ports={
f"{port}/tcp": port,
f"{health_port}/tcp": health_port
},
volumes={
context_dir: {'bind': '/app/context-data', 'mode': 'rw'}
},
network_mode=network_name, # Use the specified network variable
auto_remove=False, # Do not auto-remove the container
name=container_name, # Use a unique name
detach=True,
tty=True,
shm_size='256m',
# Updated healthcheck to test external connectivity via proxy
healthcheck={
# Use CMD-SHELL to allow conditional logic based on PROXY_URL env var
'test': [
'CMD-SHELL',
# Script checks if PROXY_URL is set, uses it with curl if yes, otherwise curls directly.
# -f: Fail silently (exit non-zero on error)
# --connect-timeout 10: Timeout for connection phase
# > /dev/null: Discard output, we only care about exit code
'if [ -n "$PROXY_URL" ]; then '
'curl -f --connect-timeout 10 -x "$PROXY_URL" https://ifconfig.co > /dev/null; '
'else '
'curl -f --connect-timeout 10 https://ifconfig.co > /dev/null; '
'fi'
],
'interval': 30 * 1000000000, # Check every 30 seconds (30 * 1e9 nanoseconds)
'timeout': 15 * 1000000000, # Timeout after 15 seconds (15 * 1e9 nanoseconds)
'retries': 5, # Retry 5 times on failure
'start_period': 15 * 1000000000 # Grace period of 15 seconds after start
},
# Add labels for better identification
labels={
'service': 'ytdlp',
'account_id': account_id
}
)
# Wait for container to be running (skip health check verification)
start_time = time.time()
while True:
container.reload()
if container.status == 'running':
break
if time.time() - start_time > 10: # 10 second timeout
raise TimeoutError("Container failed to start within 10 seconds")
time.sleep(1)
logging.info(f"Container started: {container.id} (health check verification skipped)")
# Push container details immediately after creation using simplified keys
context['task_instance'].xcom_push(key='container_id', value=container.id)
context['task_instance'].xcom_push(key='container_name', value=container_name)
logging.info(f"Pushed container_id={container.id} and container_name={container_name} to XCom.")
# --- Determine Host for Sensor ---
# Get the container's IP address on the specified network for the HttpSensor
try:
container.reload() # Refresh container attributes
network_settings = container.attrs.get('NetworkSettings', {}).get('Networks', {})
if network_name in network_settings:
host_for_sensor = network_settings[network_name].get('IPAddress')
if not host_for_sensor:
raise ValueError(f"Container {container.id} has no IPAddress on network '{network_name}'")
logging.info(f"Using container IP '{host_for_sensor}' on network '{network_name}' for HttpSensor.")
else:
# Fallback or error if container not on expected network
logging.error(f"Container {container.id} is not attached to the expected network '{network_name}'. Network settings: {network_settings}")
# Option 1: Fallback to container name (might fail as observed)
# host_for_sensor = container_name
# logging.warning(f"Falling back to container name '{host_for_sensor}' for sensor.")
# Option 2: Raise error
raise ValueError(f"Container {container.id} not found on network '{network_name}'. Cannot determine IP for sensor.")
except Exception as e:
logging.error(f"Failed to get container IP address: {e}", exc_info=True)
raise AirflowException(f"Failed to determine IP address for HttpSensor: {e}")
# Ensure we don't use 0.0.0.0 or empty string for the sensor
if not host_for_sensor or host_for_sensor == "0.0.0.0":
raise ValueError(f"Determined host_for_sensor is invalid ('{host_for_sensor}'). Check container network attachment and IP assignment.")
# --- Add extra logging before pushing ---
logging.info(f"FINAL CHECK before XCom push:")
logging.info(f" Account ID: {account_id}")
logging.info(f" Host for Sensor (IP Address): {host_for_sensor}")
logging.info(f" Host for Registration: {host_for_registration}")
logging.info(f" Service Port: {port}")
logging.info(f" Health Port: {health_port}")
logging.info(f" Pushing to XCom key: service_host with value: {host_for_sensor}")
# --- End extra logging ---
# Push distinct service connection details using simplified keys
context['task_instance'].xcom_push(key='service_host_registration', value=host_for_registration) # For client discovery (e.g., Redis)
context['task_instance'].xcom_push(key='service_host', value=host_for_sensor) # IP Address for HttpSensor
context['task_instance'].xcom_push(key='service_port', value=port) # Port is the same
context['task_instance'].xcom_push(key='service_health_port', value=health_port) # Health port is the same
logging.info(f"Pushed host_for_sensor (IP Address)={host_for_sensor} to XCom key 'service_host'")
logging.info(f"Pushed host_for_registration={host_for_registration} to XCom key 'service_host_registration'")
# Store account metadata in Redis only if redis_enabled is True
# This uses the 'host_for_registration' for client discovery
if redis_enabled:
store_account_metadata(account_id, host_for_registration, port, proxy, health_port, container.id)
# If we reach here, deployment is considered successful for now
logging.info("Deployment preparation successful.")
# Return values are implicitly pushed to XCom (but we pushed explicitly above)
return context_dir, host_for_registration, port
except Exception as e:
logging.error(f"Error during service deployment: {e}", exc_info=True)
# Attempt to cleanup the container if it was created before the error
try:
if 'container' in locals() and container and container.id:
logging.warning(f"Attempting to stop and remove container {container.id} due to deployment error.")
container.stop(timeout=5)
container.remove(force=True)
logging.info(f"Successfully stopped and removed container {container.id} after error.")
elif 'container_name' in locals() and container_name:
# Try finding by name if ID wasn't captured
containers = client.containers.list(filters={'name': container_name})
if containers:
logging.warning(f"Attempting to stop and remove container {containers[0].name} by name due to deployment error.")
containers[0].stop(timeout=5)
containers[0].remove(force=True)
logging.info(f"Successfully stopped and removed container {containers[0].name} after error.")
except Exception as cleanup_err:
logging.error(f"Failed during post-error container cleanup: {cleanup_err}")
raise # Re-raise the original exception to fail the task
# Removed the old monitor_health PythonOperator
# stop_service and cleanup_service are now defined directly in the DAG below.
def check_service_health(ti=None, **context):
"""
Periodically checks the service's /health endpoint using requests.
Acts as a long-running sentinel task. Fails if the health check fails
repeatedly or times out.
"""
# Get parameters from XCom
host_reg = ti.xcom_pull(task_ids='prepare_and_deploy', key='service_host_registration')
host_svc = ti.xcom_pull(task_ids='prepare_and_deploy', key='service_host')
health_port = ti.xcom_pull(task_ids='prepare_and_deploy', key='service_health_port')
# Determine the host to use (prioritize registration host)
host = host_reg if host_reg and host_reg != '0.0.0.0' else host_svc
if not host or not health_port:
raise AirflowException("Could not retrieve host or health_port from XCom for health check.")
health_url = f"http://{host}:{health_port}/health"
logging.info(f"Starting health check for: {health_url}")
# Get configuration for polling
# Use task's execution_timeout if available, otherwise default to 1 year
task_timeout = ti.task.execution_timeout or timedelta(days=365)
poke_interval = 60 # Check every 60 seconds (adjust as needed)
start_time = time.monotonic()
timeout_seconds = task_timeout.total_seconds()
consecutive_error_start_time = None # Track start time of consecutive connection errors
error_retry_window = 10 # Seconds to retry connection errors before failing
while True:
current_time = time.monotonic()
if current_time - start_time > timeout_seconds:
raise AirflowException(f"Health check timed out after {timeout_seconds} seconds for {health_url}")
try:
# Use a reasonable timeout for the individual request
response = requests.get(health_url, timeout=15) # 15 second request timeout
response.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx)
# Check response content if needed (optional)
# Example: Check for specific JSON content
# try:
# data = response.json()
# if data.get("status") == "healthy":
# logging.info(f"Health check successful: Status {response.status_code}")
# else:
# logging.warning(f"Health check OK (Status {response.status_code}), but content unexpected: {data}")
# except requests.exceptions.JSONDecodeError:
# logging.warning(f"Health check OK (Status {response.status_code}), but response is not valid JSON.")
# If we got a 2xx status, log success and reset error timer if needed
if consecutive_error_start_time is not None:
logging.info(f"Connection to {health_url} recovered.")
consecutive_error_start_time = None
logging.info(f"Health check successful: Status {response.status_code} for {health_url}")
except requests.exceptions.Timeout:
current_monotonic_time = time.monotonic()
if consecutive_error_start_time is None:
consecutive_error_start_time = current_monotonic_time
logging.warning(f"Health check request timed out for {health_url}. Starting {error_retry_window}s retry window...")
else:
elapsed_error_time = current_monotonic_time - consecutive_error_start_time
if elapsed_error_time > error_retry_window:
error_msg = f"Health check failed for {health_url}: Timeout persisted for over {error_retry_window} seconds."
logging.error(error_msg)
raise AirflowException(error_msg)
else:
logging.warning(f"Health check request timed out for {health_url}. Retrying within {error_retry_window}s window ({elapsed_error_time:.1f}s elapsed)...")
except requests.exceptions.ConnectionError as e:
# Check if the error is specifically "Connection refused" - fail immediately
if "[Errno 111] Connection refused" in str(e):
logging.error(f"Health check failed for {health_url}: Connection refused. Failing task immediately.")
raise AirflowException(f"Health check failed for {health_url}: Connection refused")
else:
# Handle other connection errors with the retry window
current_monotonic_time = time.monotonic()
if consecutive_error_start_time is None:
consecutive_error_start_time = current_monotonic_time
logging.warning(f"Health check connection error for {health_url}: {e}. Starting {error_retry_window}s retry window...")
else:
elapsed_error_time = current_monotonic_time - consecutive_error_start_time
if elapsed_error_time > error_retry_window:
error_msg = f"Health check failed for {health_url}: Connection error persisted for over {error_retry_window} seconds. Last error: {e}"
logging.error(error_msg)
raise AirflowException(error_msg)
else:
logging.warning(f"Health check connection error for {health_url}: {e}. Retrying within {error_retry_window}s window ({elapsed_error_time:.1f}s elapsed)...")
except requests.exceptions.HTTPError as e:
# This catches 4xx/5xx errors - fail immediately
logging.error(f"Health check failed for {health_url}: Status {e.response.status_code}. Failing task.")
# Fail the task immediately on HTTP error
raise AirflowException(f"Health check failed for {health_url}: Status {e.response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"Health check failed for {health_url} with unexpected error: {e}. Failing task.")
# Fail the task immediately on other request errors
raise AirflowException(f"Health check failed for {health_url}: {e}")
except Exception as e:
# Catch any other unexpected errors during the check
logging.error(f"Unexpected error during health check for {health_url}: {e}", exc_info=True)
raise AirflowException(f"Unexpected error during health check: {e}")
# Wait for the poke interval before the next check
time.sleep(poke_interval)
def _wait_forever():
"""Sleeps indefinitely (or until task timeout) to simulate a running service."""
logging.info("Sentinel task started. Sleeping in a loop...")
# Sleep in a loop with a reasonable interval to avoid OverflowError
# The task will keep running until it times out based on execution_timeout
# or is manually stopped/failed.
while True:
try:
# Sleep for a long interval (e.g., 1 day)
# You can adjust this interval if needed.
time.sleep(86400) # Sleep for 24 hours
except KeyboardInterrupt:
logging.info("Sentinel task interrupted. Exiting.")
break
except Exception as e:
# Log other potential errors during sleep, though unlikely
logging.error(f"Error during sentinel sleep loop: {e}")
# Optionally break or continue based on error handling strategy
break # Exit loop on unexpected error
def stop_service(**context):
"""Stop the running Docker container with verification."""
# Retrieve account_id from params or kwargs
account_id = context.get('params', {}).get('account_id') or context.get('account_id')
if not account_id:
raise ValueError("Account ID is missing.")
# Initialize Docker client to connect to docker-socket-proxy
client = docker.DockerClient(base_url='tcp://docker-socket-proxy:2375')
try:
# For testing, try to get container ID from environment if XCom is not available
container_id = None
if 'ti' in context:
# Use simplified XCom key
container_id = context['ti'].xcom_pull(task_ids='prepare_and_deploy', key='container_id')
if not container_id:
# If not found in XCom, try to find container by account_id pattern (keep this fallback)
containers = client.containers.list(filters={"name": f"ytdlp_service_{account_id}"})
if containers:
container = containers[0]
container_id = container.id
logging.info(f"Found container by name pattern: {container.name} (ID: {container_id})")
else:
logging.warning(f"No container found for account {account_id} - nothing to stop")
return
if container_id:
# If found in XCom, stop by container ID
container = client.containers.get(container_id)
# Verify container is running before stopping
if container.status != 'running':
logging.warning(f"Container {container_id} is not running (status: {container.status})")
return
logging.info(f"Stopping container {container_id}...")
container.stop(timeout=10) # 10 second timeout
# Verify container is stopped
container.reload()
if container.status == 'exited':
logging.info(f"Successfully stopped container {container_id}")
else:
logging.error(f"Container {container_id} failed to stop (status: {container.status})")
raise RuntimeError(f"Container {container_id} failed to stop")
# Clear Redis entries only if redis_enabled is True
# Retrieve redis_enabled status from DAG run conf or params
redis_enabled = context['dag_run'].conf.get('redis_enabled', False) or context['params'].get('redis_enabled', False)
if redis_enabled:
redis_client = get_redis_connection()
try:
# Verify Redis connection
if not redis_client.ping():
raise ConnectionError("Failed to connect to Redis")
# Remove main metadata
redis_client.delete(f"ytdlp:{account_id}")
# Remove from accounts set
redis_client.srem("ytdlp:accounts", account_id)
logging.info(f"Successfully cleared Redis entries for account: {account_id}")
except Exception as e:
logging.error(f"Failed to clear Redis entries for account {account_id}: {e}")
# Do not raise here, allow container stop to be considered successful
# raise # Optional: re-raise if Redis cleanup failure should fail the task
return
logging.warning(f"No container found for account {account_id} - nothing to stop")
except docker.errors.NotFound as e:
logging.warning(f"Container for account {account_id} not found: {e}")
except Exception as e:
logging.error(f"Failed to stop container: {e}")
raise
def cleanup_service(**context):
"""Cleanup service resources including Redis entries and XCom data."""
# Note: This function is now called within the manual_stop_cleanup TaskGroup
try:
# Retrieve account_id from params first, then from XCom
account_id = context['params'].get('account_id')
if not account_id:
# Try to get it from XCom
account_id = context['task_instance'].xcom_pull(task_ids='prepare_and_deploy', key='account_id')
if not account_id:
logging.warning("Account ID not found in params or XCom - skipping resource cleanup")
return
# Redis cleanup (if redis_enabled=True) is handled in the 'stop_service' task.
logging.info(f"Redis cleanup for account {account_id} is handled by the 'stop_service' task if enabled.")
# Cleanup XCom data (using simplified keys where applicable)
# Note: XCom cleanup is generally not strictly necessary but can be good practice.
# Airflow manages XCom expiry. This code doesn't actually *delete* XComs.
# To truly delete, you'd use the Airflow API or DB directly.
# We'll leave the pull calls here as they don't harm anything.
ti = context['task_instance']
ti.xcom_pull(key='container_id', task_ids='prepare_and_deploy', include_prior_dates=True)
ti.xcom_pull(key='container_name', task_ids='prepare_and_deploy', include_prior_dates=True)
ti.xcom_pull(key='service_host_registration', task_ids='prepare_and_deploy', include_prior_dates=True)
ti.xcom_pull(key='service_host', task_ids='prepare_and_deploy', include_prior_dates=True)
ti.xcom_pull(key='service_port', task_ids='prepare_and_deploy', include_prior_dates=True)
ti.xcom_pull(key='service_health_port', task_ids='prepare_and_deploy', include_prior_dates=True)
ti.xcom_pull(key='work_id', task_ids='prepare_and_deploy', include_prior_dates=True)
ti.xcom_pull(key='context_dir', task_ids='prepare_and_deploy', include_prior_dates=True)
ti.xcom_pull(key='account_id', task_ids='prepare_and_deploy', include_prior_dates=True) # Keep account_id pull
logging.info(f"Pulled XCom data for potential cleanup logging for account: {account_id}")
# Initialize Docker client
client = docker.DockerClient(base_url='tcp://docker-socket-proxy:2375')
container_found_and_removed = False
# Attempt 1: Get container ID from XCom using simplified key
container_id_xcom = context['task_instance'].xcom_pull(task_ids='prepare_and_deploy', key='container_id')
if container_id_xcom:
logging.info(f"Attempting to remove container using XCom ID: {container_id_xcom}")
try:
container = client.containers.get(container_id_xcom)
logging.info(f"Found container {container.id} (Name: {container.name}). Removing...")
container.remove(force=True)
logging.info(f"Successfully removed container {container.id}")
container_found_and_removed = True
except docker.errors.NotFound:
logging.warning(f"Container with XCom ID {container_id_xcom} not found. Trying other methods.")
except Exception as e:
logging.error(f"Error removing container {container_id_xcom}: {e}")
# Attempt 2: Find container by labels if not found/removed via XCom ID
if not container_found_and_removed:
logging.info(f"Attempting to find and remove container by labels: service=ytdlp, account_id={account_id}")
try:
containers = client.containers.list(
filters={'label': [f'service=ytdlp', f'account_id={account_id}']},
all=True # Include stopped containers
)
if containers:
for container in containers:
logging.info(f"Found container {container.id} (Name: {container.name}) by labels. Removing...")
try:
container.remove(force=True)
logging.info(f"Successfully removed container {container.id}")
container_found_and_removed = True # Mark as found even if only one is removed
except Exception as e:
logging.error(f"Error removing container {container.id} found by labels: {e}")
else:
logging.info("No containers found matching labels.")
except Exception as e:
logging.error(f"Error searching for containers by labels: {e}")
# Attempt 3: Find container by name pattern if still not found/removed
if not container_found_and_removed:
container_name_pattern = f"ytdlp_service_{account_id}_*"
logging.info(f"Attempting to find and remove container by name pattern: {container_name_pattern}")
try:
containers = client.containers.list(filters={'name': container_name_pattern}, all=True)
if containers:
for container in containers:
logging.info(f"Found container {container.id} (Name: {container.name}) by name pattern. Removing...")
try:
container.remove(force=True)
logging.info(f"Successfully removed container {container.id}")
container_found_and_removed = True
except Exception as e:
logging.error(f"Error removing container {container.id} found by name: {e}")
else:
logging.info("No containers found matching name pattern.")
except Exception as e:
logging.error(f"Error searching for containers by name: {e}")
if not container_found_and_removed:
logging.warning(f"Could not find or remove any container for account {account_id} using ID, labels, or name.")
# Get context directory from XCom and remove it
context_dir = context['task_instance'].xcom_pull(task_ids='prepare_and_deploy', key='context_dir')
if context_dir and os.path.exists(context_dir):
shutil.rmtree(context_dir)
logging.info(f"Cleaned up working directory: {context_dir}")
except Exception as e:
logging.error(f"Error during cleanup: {e}")
raise
# Define the DAG
with DAG(
'ytdlp_service',
default_args=default_args,
description='Deploy YTDLP token service for ios, android, mweb',
schedule_interval=None,
start_date=days_ago(1), # Use dynamic start date for manually triggered DAG
catchup=False,
tags=['youtube', 'tokens', 'service', 'docker'],
# executor_config moved to default_args
is_paused_upon_creation=False,
params={
'account_id': Param(
'account_fr_2025-04-03T1220_anonomyous_2ssdfsf2342afga09',
type="string",
description="Required: The account ID for which the service is being deployed."
),
'proxy': Param(
'socks5://sslocal-rust-1084:1084',
type=["null", "string"],
description="Optional: The SOCKS5 proxy URL to use for the service (e.g., socks5://host:port)."
),
'clients': Param(
'ios,android,mweb',
type="string",
description="Comma-separated list of client types (e.g., ios,android,mweb)."
),
'redis_enabled': Param(
False,
type="boolean",
description="Use Redis for service discovery? If False, host/port must be provided or will be auto-assigned."
),
'host': Param(
None,
type=["null", "string"],
description="Optional: Host IP for the service. If redis_enabled=False and host is not provided, defaults to '0.0.0.0'. If redis_enabled=True and host is not provided, uses HOST_EXTERNAL_IP or defaults to '0.0.0.0'."
),
'port': Param(
None,
type=["null", "integer"],
description="Optional: Port for the service. If None, a free port will be assigned automatically. If redis_enabled=False and a port is provided, it will be used (after checking availability)."
),
# redis_host and redis_port parameters are removed.
# If redis_enabled=True, the DAG will use the 'redis_default' Airflow connection.
'docker_network': Param(
'airflow_prod_proxynet',
type="string",
description="Optional: The Docker network to attach the container to. Defaults to 'airflow_prod_proxynet'."
),
'exit_on_proxy_fail': Param(
True,
type="boolean",
description="Exit the service container immediately if the initial proxy test fails?"
),
}
) as dag:
# Task to prepare and deploy the service
prepare_and_deploy = PythonOperator(
task_id='prepare_and_deploy',
python_callable=prepare_and_deploy_service,
provide_context=True,
trigger_rule='all_success' # Keep default trigger rule for prepare_and_deploy
)
# Combined Health Check and Sentinel Task using PythonOperator
# This task runs for a long time, checking health periodically using the 'requests' library.
# If the health check fails repeatedly or times out, the task fails, triggering 'stop_service'.
monitor_service_health = PythonOperator(
task_id='monitor_service_health',
python_callable=check_service_health,
provide_context=True,
# Set execution timeout for the task itself (acts as the overall timeout)
execution_timeout=timedelta(days=365), # Long timeout (e.g., 1 year)
# op_kwargs can pass static config, but host/port come from XCom inside the function
# poke_interval and request timeout are handled within check_service_health
)
monitor_service_health.doc_md = """
### Monitor Service Health Task (PythonOperator)
Uses a Python function to periodically check the service's `/health` endpoint using the `requests` library.
Acts as both a health check and a sentinel for the running service.
- **Pulls from XCom:** Reads `service_host_registration`, `service_host`, and `service_health_port` from the `prepare_and_deploy` task to construct the target URL.
- **Polling:** Checks the `/health` endpoint every 60 seconds.
- **Timeout:** Uses the task's `execution_timeout` (set to 1 year) as the overall maximum duration. Individual requests have a 15-second timeout.
- **Failure:** If a health check request returns a 4xx/5xx status code or encounters other request errors, the task fails immediately. If the overall `execution_timeout` is reached without a failure, the task would eventually time out and fail.
"""
# Task to stop the service (runs if monitor_service_health fails)
stop = PythonOperator(
task_id='stop_service',
python_callable=stop_service,
provide_context=True,
trigger_rule=TriggerRule.ONE_FAILED # Run only if monitor_service_health fails
)
stop.doc_md = """
### Stop Service Task
Stops the Docker container associated with the service.
- **Trigger Rule:** `one_failed` - This task only runs if the upstream `monitor_service_health` task fails.
- Pulls container ID/name from XCom or finds it using labels/name patterns.
- Clears Redis entries if `redis_enabled=True`.
"""
# Marker task to indicate that the deployment failed
prepare_failed_marker = EmptyOperator(
task_id='prepare_failed_marker',
trigger_rule=TriggerRule.ONE_FAILED # Run only if 'prepare_and_deploy' fails
)
# Task to cleanup resources (runs after stop sequence OR if prepare fails)
cleanup = PythonOperator(
task_id='cleanup_service',
python_callable=cleanup_service,
provide_context=True,
trigger_rule=TriggerRule.ALL_DONE # Run after upstream (stop or prepare_failed_marker) is done
)
cleanup.doc_md = """
### Cleanup Service Task
Removes the Docker container and cleans up related resources.
- **Trigger Rule:** `all_done` - Runs after the `stop_service` task finishes, whether it succeeded or failed.
- Removes the container using ID from XCom, labels, or name patterns.
- Cleans up XCom variables.
- Removes the context directory.
"""
# Define task dependencies
# Success Path: prepare -> monitor (runs indefinitely)
# Monitor Failure Path: monitor (fails) -> stop -> cleanup
# Prepare Failure Path: prepare (fails) -> prepare_failed_marker -> cleanup
prepare_and_deploy >> monitor_service_health
prepare_and_deploy >> prepare_failed_marker # Trigger marker if prepare fails
monitor_service_health >> stop # Trigger stop if monitor fails
# Cleanup is triggered after stop finishes OR after prepare_failed_marker finishes
stop >> cleanup
prepare_failed_marker >> cleanup

View File

@ -9,7 +9,7 @@ services:
volumes: volumes:
- context-data:/app/context-data - context-data:/app/context-data
networks: networks:
- airflow_worker_proxynet - airflow_workers_prod_proxynet
command: command:
- "--script-dir" - "--script-dir"
- "/app/scripts" - "/app/scripts"
@ -21,6 +21,7 @@ services:
- "ios,android,mweb" - "ios,android,mweb"
- "--proxy" - "--proxy"
- "socks5://sslocal-rust-1084:1084" - "socks5://sslocal-rust-1084:1084"
- "--probe"
restart: unless-stopped restart: unless-stopped
pull_policy: always pull_policy: always
@ -29,5 +30,5 @@ volumes:
name: context-data name: context-data
networks: networks:
airflow_worker_proxynet: airflow_workers_prod_proxynet:
external: true external: true