87 lines
3.7 KiB
Python
87 lines
3.7 KiB
Python
import logging
|
|
import time
|
|
import requests
|
|
from datetime import datetime
|
|
|
|
from airflow.decorators import task
|
|
from airflow.models.dag import DAG
|
|
from airflow.models.param import Param
|
|
from airflow.models.variable import Variable
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Get the master host IP from an Airflow variable, which is set via the .env file.
|
|
# This allows the default health check target to be dynamic based on cluster.yml.
|
|
DEFAULT_MASTER_IP = Variable.get("MASTER_HOST_IP", default_var="127.0.0.1")
|
|
|
|
with DAG(
|
|
dag_id='proxy_health_check',
|
|
start_date=datetime(2023, 1, 1),
|
|
schedule=None,
|
|
catchup=False,
|
|
tags=['monitoring', 'proxy'],
|
|
doc_md="""
|
|
### Proxy Health Check DAG
|
|
|
|
This DAG runs a continuous loop to check a target URL through a SOCKS5 proxy.
|
|
It is designed for monitoring proxy connectivity and performance. Once triggered, it will run forever
|
|
until the DAG run is manually stopped.
|
|
|
|
**Parameters:**
|
|
- `target_url`: The URL to check. Defaults to the internal nginx service.
|
|
- `socks5_host`: The SOCKS5 proxy host. For Docker, `host.docker.internal` often works to target the host machine.
|
|
- `socks5_port`: The SOCKS5 proxy port.
|
|
- `check_interval_seconds`: How often to run the check.
|
|
- `latency_threshold_seconds`: A warning will be logged if the request takes longer than this.
|
|
- `timeout_seconds`: The timeout for the web request.
|
|
""",
|
|
params={
|
|
'target_url': Param(f'http://{DEFAULT_MASTER_IP}:8888', type='string', description="The URL to check. Defaults to the master node's nginx healthcheck service."),
|
|
'socks5_host': Param('sslocal-rust-1087', type='string', description="SOCKS5 proxy host. Use 'host.docker.internal' for Docker host."),
|
|
'socks5_port': Param(1087, type='integer', description="SOCKS5 proxy port."),
|
|
'check_interval_seconds': Param(25, type='integer', description="Seconds to wait between checks."),
|
|
'latency_threshold_seconds': Param(2, type='integer', description="Log a warning if latency exceeds this threshold."),
|
|
'timeout_seconds': Param(10, type='integer', description="Request timeout in seconds."),
|
|
},
|
|
) as dag:
|
|
|
|
@task
|
|
def run_proxy_check_loop(**context):
|
|
"""
|
|
Continuously checks a URL through a SOCKS5 proxy and logs if latency is high.
|
|
This task will run indefinitely until the DAG run is manually stopped or fails.
|
|
"""
|
|
params = context['params']
|
|
target_url = params['target_url']
|
|
proxy_host = params['socks5_host']
|
|
proxy_port = params['socks5_port']
|
|
interval = params['check_interval_seconds']
|
|
threshold = params['latency_threshold_seconds']
|
|
timeout = params['timeout_seconds']
|
|
|
|
proxy_url = f"socks5h://{proxy_host}:{proxy_port}"
|
|
proxies = {
|
|
'http': proxy_url,
|
|
'https': proxy_url,
|
|
}
|
|
|
|
logger.info(f"Starting proxy health check loop. Target: {target_url}, Proxy: {proxy_url}, Interval: {interval}s, Threshold: {threshold}s")
|
|
|
|
while True:
|
|
start_time = time.time()
|
|
try:
|
|
response = requests.get(target_url, proxies=proxies, timeout=timeout)
|
|
response.raise_for_status()
|
|
latency = time.time() - start_time
|
|
|
|
if latency > threshold:
|
|
logger.warning(f"High latency detected! Latency: {latency:.2f}s, Threshold: {threshold}s, Target: {target_url}")
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
latency = time.time() - start_time
|
|
logger.error(f"Proxy check failed for {target_url} via {proxy_url}. Latency: {latency:.2f}s. Error: {e}")
|
|
|
|
time.sleep(interval)
|
|
|
|
run_proxy_check_loop()
|