# -*- coding: utf-8 -*- # # Copyright © 2024 rl # # Distributed under terms of the MIT license. """ Maintenance DAG for managing the lifecycle of ytdlp-ops accounts. This DAG is responsible for: - Un-banning accounts whose ban duration has expired. - Transitioning accounts from RESTING to ACTIVE after their cooldown period. - Transitioning accounts from ACTIVE to RESTING after their active duration. This logic was previously handled inside the ytdlp-ops-server and has been moved here to give the orchestrator full control over account state. """ from __future__ import annotations import logging import time from datetime import datetime from airflow.decorators import task from airflow.models import Variable from airflow.models.dag import DAG from airflow.models.param import Param from airflow.utils.dates import days_ago # Import utility functions and Thrift modules from utils.redis_utils import _get_redis_client from pangramia.yt.tokens_ops import YTTokenOpService from thrift.protocol import TBinaryProtocol from thrift.transport import TSocket, TTransport # Configure logging logger = logging.getLogger(__name__) # Default settings from Airflow Variables or hardcoded fallbacks DEFAULT_REDIS_CONN_ID = 'redis_default' DEFAULT_YT_AUTH_SERVICE_IP = Variable.get("YT_AUTH_SERVICE_IP", default_var="172.17.0.1") DEFAULT_YT_AUTH_SERVICE_PORT = Variable.get("YT_AUTH_SERVICE_PORT", default_var=9980) DEFAULT_ARGS = { 'owner': 'airflow', 'retries': 1, 'retry_delay': 30, 'queue': 'default', } # --- Helper Functions --- def _get_thrift_client(host, port, timeout=60): """Helper to create and connect a Thrift client.""" transport = TSocket.TSocket(host, port) transport.setTimeout(timeout * 1000) transport = TTransport.TFramedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = YTTokenOpService.Client(protocol) transport.open() logger.info(f"Connected to Thrift server at {host}:{port}") return client, transport @task def manage_account_states(**context): """ Fetches all account statuses and performs necessary state transitions based on time durations configured in the DAG parameters. """ params = context['params'] requests_limit = params['account_requests_limit'] cooldown_duration_s = params['account_cooldown_duration_min'] * 60 ban_duration_s = params['account_ban_duration_hours'] * 3600 host = DEFAULT_YT_AUTH_SERVICE_IP port = int(DEFAULT_YT_AUTH_SERVICE_PORT) redis_conn_id = DEFAULT_REDIS_CONN_ID logger.info(f"Starting account maintenance. Service: {host}:{port}, Redis: {redis_conn_id}") logger.info(f"Using limits: Requests={requests_limit}, Cooldown={params['account_cooldown_duration_min']}m, Ban={params['account_ban_duration_hours']}h") client, transport = None, None try: client, transport = _get_thrift_client(host, port) redis_client = _get_redis_client(redis_conn_id) logger.info(f"--- Step 1: Fetching all account statuses from the ytdlp-ops-server at {host}:{port}... ---") all_accounts = client.getAccountStatus(accountId=None, accountPrefix=None) logger.info(f"Found {len(all_accounts)} total accounts to process.") accounts_to_unban = [] accounts_to_activate = [] accounts_to_rest = [] now_ts = int(time.time()) for acc in all_accounts: # Thrift can return 0 for unset integer fields. # The AccountStatus thrift object is missing status_changed_timestamp and active_since_timestamp. # We use available timestamps as proxies. last_failure_ts = int(acc.lastFailureTimestamp or 0) last_success_ts = int(acc.lastSuccessTimestamp or 0) last_usage_ts = max(last_failure_ts, last_success_ts) if acc.status == "BANNED" and last_failure_ts > 0 and (now_ts - last_failure_ts) >= ban_duration_s: accounts_to_unban.append(acc.accountId) elif acc.status == "RESTING" and last_usage_ts > 0 and (now_ts - last_usage_ts) >= cooldown_duration_s: accounts_to_activate.append(acc.accountId) elif acc.status == "ACTIVE": # For ACTIVE -> RESTING, check how many requests have been made since activation. count_at_activation_raw = redis_client.hget(f"account_status:{acc.accountId}", "success_count_at_activation") if count_at_activation_raw is not None: count_at_activation = int(count_at_activation_raw) current_success_count = acc.successCount or 0 requests_made = current_success_count - count_at_activation if requests_made >= requests_limit: logger.info(f"Account {acc.accountId} reached request limit ({requests_made}/{requests_limit}). Moving to RESTING.") accounts_to_rest.append(acc.accountId) else: # This is a fallback for accounts that were activated before this logic was deployed. # We can activate them "fresh" by setting their baseline count now. logger.info(f"Account {acc.accountId} is ACTIVE but has no 'success_count_at_activation'. Setting it now.") redis_client.hset(f"account_status:{acc.accountId}", "success_count_at_activation", acc.successCount or 0) logger.info("--- Step 2: Analyzing accounts for state transitions ---") logger.info(f"Found {len(accounts_to_unban)} accounts with expired bans to un-ban.") logger.info(f"Found {len(accounts_to_activate)} accounts with expired rest periods to activate.") logger.info(f"Found {len(accounts_to_rest)} accounts with expired active periods to put to rest.") # --- Perform State Transitions --- # 1. Un-ban accounts via Thrift call logger.info("--- Step 3: Processing un-bans ---") if accounts_to_unban: logger.info(f"Un-banning {len(accounts_to_unban)} accounts: {accounts_to_unban}") for acc_id in accounts_to_unban: try: client.unbanAccount(acc_id, "Automatic un-ban by Airflow maintenance DAG.") logger.info(f"Successfully un-banned account '{acc_id}'.") except Exception as e: logger.error(f"Failed to un-ban account '{acc_id}': {e}") else: logger.info("No accounts to un-ban.") # 2. Activate resting accounts via direct Redis write logger.info("--- Step 4: Processing activations ---") if accounts_to_activate: logger.info(f"Activating {len(accounts_to_activate)} accounts: {accounts_to_activate}") now_ts = int(time.time()) account_map = {acc.accountId: acc for acc in all_accounts} with redis_client.pipeline() as pipe: for acc_id in accounts_to_activate: key = f"account_status:{acc_id}" current_success_count = account_map[acc_id].successCount or 0 pipe.hset(key, "status", "ACTIVE") pipe.hset(key, "active_since_timestamp", now_ts) pipe.hset(key, "status_changed_timestamp", now_ts) pipe.hset(key, "success_count_at_activation", current_success_count) pipe.execute() logger.info("Finished activating accounts.") else: logger.info("No accounts to activate.") # 3. Rest active accounts via direct Redis write logger.info("--- Step 5: Processing rests ---") if accounts_to_rest: logger.info(f"Putting {len(accounts_to_rest)} accounts to rest: {accounts_to_rest}") now_ts = int(time.time()) with redis_client.pipeline() as pipe: for acc_id in accounts_to_rest: key = f"account_status:{acc_id}" pipe.hset(key, "status", "RESTING") pipe.hset(key, "status_changed_timestamp", now_ts) pipe.hdel(key, "success_count_at_activation") pipe.execute() logger.info("Finished putting accounts to rest.") else: logger.info("No accounts to put to rest.") logger.info("--- Account maintenance run complete. ---") finally: if transport and transport.isOpen(): transport.close() with DAG( dag_id='ytdlp_ops_account_maintenance', default_args=DEFAULT_ARGS, schedule='*/5 * * * *', # Run every 5 minutes start_date=days_ago(1), catchup=False, tags=['ytdlp', 'maintenance'], doc_md=""" ### YT-DLP Account Maintenance: Time-Based State Transitions This DAG is the central authority for automated, **time-based** state management for ytdlp-ops accounts. It runs periodically to fetch the status of all accounts and applies its own logic to determine if an account's state should change based on configurable time durations. The thresholds are defined as DAG parameters and can be configured via the Airflow UI: - **Requests Limit**: How many successful requests an account can perform before it needs to rest. - **Cooldown Duration**: How long an account must rest before it can be used again. - **Ban Duration**: How long a ban lasts before the account is automatically un-banned. --- #### Separation of Concerns: Time vs. Errors It is critical to understand that this DAG primarily handles time-based state changes. Error-based banning may be handled by worker DAGs during URL processing. This separation ensures that maintenance is predictable and based on timers, while acute, error-driven actions are handled immediately by the workers that encounter them. --- #### State Transitions Performed by This DAG: On each run, this DAG fetches the raw status and timestamps for all accounts and performs the following checks: 1. **Un-banning (`BANNED` -> `ACTIVE`)**: - **Condition**: An account has been in the `BANNED` state for longer than the configured `account_ban_duration_hours`. - **Action**: The DAG calls the `unbanAccount` service endpoint to lift the ban. 2. **Activation (`RESTING` -> `ACTIVE`)**: - **Condition**: An account has been in the `RESTING` state for longer than the configured `account_cooldown_duration_min`. - **Action**: The DAG updates the account's status to `ACTIVE` directly in Redis. 3. **Resting (`ACTIVE` -> `RESTING`)**: - **Condition**: An account has performed more successful requests than the configured `account_requests_limit` since it was last activated. - **Action**: The DAG updates the account's status to `RESTING` directly in Redis. This process gives full control over time-based account lifecycle management to the Airflow orchestrator. """, params={ 'account_requests_limit': Param(250, type="integer", description="Number of successful requests an account can make before it is rested."), 'account_cooldown_duration_min': Param(60, type="integer", description="Duration in minutes an account must rest before being activated again. Default is 1 hour."), 'account_ban_duration_hours': Param(24, type="integer", description="Duration in hours an account stays banned before it can be un-banned."), } ) as dag: manage_account_states()