diff --git a/airflow/dags/ytdlp_mgmt_proxy_account.py b/airflow/dags/ytdlp_mgmt_proxy_account.py index 63b5558..5175874 100644 --- a/airflow/dags/ytdlp_mgmt_proxy_account.py +++ b/airflow/dags/ytdlp_mgmt_proxy_account.py @@ -592,8 +592,20 @@ def manage_system_callable(**context): if not account_id: raise ValueError("An 'account_id' is required.") reason = f"Manual un-ban from Airflow mgmt DAG by {socket.gethostname()}" logger.info(f"Unbanning account '{account_id}'...") + + # Fetch status to get current success count before unbanning + statuses = client.getAccountStatus(accountId=account_id, accountPrefix=None) + if not statuses: + raise AirflowException(f"Account '{account_id}' not found.") + current_success_count = statuses[0].successCount or 0 + client.unbanAccount(accountId=account_id, reason=reason) print(f"Successfully sent request to unban account '{account_id}'.") + + # Set the success_count_at_activation to baseline the account + redis_client = _get_redis_client(params["redis_conn_id"]) + redis_client.hset(f"account_status:{account_id}", "success_count_at_activation", current_success_count) + logger.info(f"Set 'success_count_at_activation' for '{account_id}' to {current_success_count}.") elif action == "unban_all": account_prefix = account_id # Repurpose account_id param as an optional prefix logger.info(f"Unbanning all account statuses to ACTIVE (prefix: '{account_prefix or 'ALL'}')...") @@ -604,6 +616,9 @@ def manage_system_callable(**context): return accounts_to_unban = [s.accountId for s in all_statuses] + account_map = {s.accountId: s for s in all_statuses} + redis_client = _get_redis_client(params["redis_conn_id"]) + logger.info(f"Found {len(accounts_to_unban)} accounts to unban.") print(f"Found {len(accounts_to_unban)} accounts. Sending unban request for each...") @@ -614,6 +629,12 @@ def manage_system_callable(**context): reason = f"Manual unban_all from Airflow mgmt DAG by {socket.gethostname()}" client.unbanAccount(accountId=acc_id, reason=reason) logger.info(f" - Sent unban for '{acc_id}'.") + + # Also set the success_count_at_activation to baseline the account + current_success_count = account_map[acc_id].successCount or 0 + redis_client.hset(f"account_status:{acc_id}", "success_count_at_activation", current_success_count) + logger.info(f" - Set 'success_count_at_activation' for '{acc_id}' to {current_success_count}.") + unban_count += 1 except Exception as e: logger.error(f" - Failed to unban account '{acc_id}': {e}") @@ -729,8 +750,20 @@ def manage_system_callable(**context): if not account_id: raise ValueError("An 'account_id' is required.") reason = f"Manual un-ban from Airflow mgmt DAG by {socket.gethostname()}" logger.info(f"Unbanning account '{account_id}'...") - client.unbanAccount(accountId=account_id, reason=reason) - print(f"Successfully sent request to unban account '{account_id}'.") + + # Fetch status to get current success count before unbanning + statuses = client.getAccountStatus(accountId=account_id, accountPrefix=None) + if not statuses: + logger.warning(f"Account '{account_id}' not found. Skipping account unban.") + else: + current_success_count = statuses[0].successCount or 0 + client.unbanAccount(accountId=account_id, reason=reason) + print(f"Successfully sent request to unban account '{account_id}'.") + + # Set the success_count_at_activation to baseline the account + redis_client = _get_redis_client(params["redis_conn_id"]) + redis_client.hset(f"account_status:{account_id}", "success_count_at_activation", current_success_count) + logger.info(f"Set 'success_count_at_activation' for '{account_id}' to {current_success_count}.") elif action == "unban_all": account_prefix = account_id # Repurpose account_id param as an optional prefix logger.info(f"Unbanning all account statuses to ACTIVE (prefix: '{account_prefix or 'ALL'}')...") @@ -740,6 +773,9 @@ def manage_system_callable(**context): print(f"No accounts found with prefix '{account_prefix or 'ALL'}' to unban.") else: accounts_to_unban = [s.accountId for s in all_statuses] + account_map = {s.accountId: s for s in all_statuses} + redis_client = _get_redis_client(params["redis_conn_id"]) + logger.info(f"Found {len(accounts_to_unban)} accounts to unban.") print(f"Found {len(accounts_to_unban)} accounts. Sending unban request for each...") @@ -750,6 +786,12 @@ def manage_system_callable(**context): reason = f"Manual unban_all from Airflow mgmt DAG by {socket.gethostname()}" client.unbanAccount(accountId=acc_id, reason=reason) logger.info(f" - Sent unban for '{acc_id}'.") + + # Also set the success_count_at_activation to baseline the account + current_success_count = account_map[acc_id].successCount or 0 + redis_client.hset(f"account_status:{acc_id}", "success_count_at_activation", current_success_count) + logger.info(f" - Set 'success_count_at_activation' for '{acc_id}' to {current_success_count}.") + unban_count += 1 except Exception as e: logger.error(f" - Failed to unban account '{acc_id}': {e}") diff --git a/airflow/dags/ytdlp_mgmt_queues.py b/airflow/dags/ytdlp_mgmt_queues.py index fa8a26e..73fe2db 100644 --- a/airflow/dags/ytdlp_mgmt_queues.py +++ b/airflow/dags/ytdlp_mgmt_queues.py @@ -19,6 +19,7 @@ from airflow.models.param import Param from airflow.operators.python import PythonOperator, BranchPythonOperator from airflow.operators.empty import EmptyOperator from airflow.operators.bash import BashOperator +from airflow.providers.celery.executors.celery_executor import app as celery_app from airflow.providers.redis.hooks.redis import RedisHook from airflow.utils.dates import days_ago from airflow.models.variable import Variable @@ -535,6 +536,45 @@ def requeue_failed_callable(**context): raise AirflowException(f"Failed to requeue failed URLs: {e}") +def purge_celery_queue_callable(**context): + """ + Purges messages from the specified Celery queues using the Airflow Celery app. + This is more reliable than shelling out to `celery purge` as it uses the same + app context and broker connection as the workers. + """ + params = context['params'] + if not params.get('confirm_purge'): + raise AirflowException("'Confirm Purge' is not checked. Aborting to prevent accidental data loss.") + + queues_to_purge_str = params.get('celery_queue_to_purge') + if not queues_to_purge_str: + raise AirflowException("No Celery queues specified to purge.") + + queues = [q.strip() for q in queues_to_purge_str.split(',') if q.strip()] + + logger.info(f"Attempting to purge {len(queues)} Celery queue(s): {queues}") + logger.info(f"Using broker: {celery_app.conf.broker_url}") + + purged_counts = {} + with celery_app.connection_for_read() as conn: + with conn.channel() as channel: + for queue in queues: + try: + message_count = channel.queue_purge(queue) + purged_counts[queue] = message_count + logger.info(f"Purged {message_count} messages from queue '{queue}'.") + except Exception as e: + # This can happen if the queue doesn't exist on the broker. + # kombu might raise an operational error. + logger.error(f"Failed to purge queue '{queue}': {e}", exc_info=True) + purged_counts[queue] = f"ERROR: {e}" + + logger.info("--- Celery Purge Summary ---") + for queue, result in purged_counts.items(): + logger.info(f" - {queue}: {result}") + logger.info("--- Purge complete. ---") + + def add_videos_to_queue_callable(**context): """ Parses video inputs from manual text, a predefined file, or a file path/URL, @@ -821,32 +861,9 @@ with DAG( """, ) - action_purge_celery_queue = BashOperator( + action_purge_celery_queue = PythonOperator( task_id="action_purge_celery_queue", - bash_command=""" - if [ "{{ params.confirm_purge }}" != "True" ]; then - echo "ERROR: 'Confirm Purge' is not checked. Aborting to prevent accidental data loss." - exit 1 - fi - - BROKER_URL=$(airflow config get-value celery broker_url) - QUEUES_TO_PURGE="{{ params.celery_queue_to_purge }}" - - if [ -z "$QUEUES_TO_PURGE" ]; then - echo "ERROR: No Celery queues specified to purge." - exit 1 - fi - - echo "--- Purging Celery Queues (Broker: $BROKER_URL) ---" - - # Use tr to convert comma-separated string to space-separated for the loop - for Q in $(echo $QUEUES_TO_PURGE | tr ',' ' '); do - echo "Purging queue: $Q" - celery -A airflow.providers.celery.executors.celery_executor.app -b "$BROKER_URL" purge -f -Q "$Q" - done - - echo "--- Purge command sent for all specified queues. ---" - """, + python_callable=purge_celery_queue_callable, ) # --- Wire up tasks --- diff --git a/airflow/dags/ytdlp_ops_account_maintenance.py b/airflow/dags/ytdlp_ops_account_maintenance.py index 3d8b14a..0ae7b52 100644 --- a/airflow/dags/ytdlp_ops_account_maintenance.py +++ b/airflow/dags/ytdlp_ops_account_maintenance.py @@ -147,10 +147,17 @@ def manage_account_states(**context): logger.info("--- Step 3: Processing un-bans ---") if accounts_to_unban: logger.info(f"Un-banning {len(accounts_to_unban)} accounts: {accounts_to_unban}") + account_map = {acc.accountId: acc for acc in all_accounts} for acc_id in accounts_to_unban: try: client.unbanAccount(acc_id, "Automatic un-ban by Airflow maintenance DAG.") logger.info(f"Successfully un-banned account '{acc_id}'.") + + # Set the activation count to baseline the account immediately after un-banning. + key = f"account_status:{acc_id}" + current_success_count = account_map[acc_id].successCount or 0 + redis_client.hset(key, "success_count_at_activation", current_success_count) + logger.info(f"Set 'success_count_at_activation' for un-banned account '{acc_id}' to {current_success_count}.") except Exception as e: logger.error(f"Failed to un-ban account '{acc_id}': {e}") else: diff --git a/ansible/playbook-master.yml b/ansible/playbook-master.yml index 5056bd1..c13ba9d 100644 --- a/ansible/playbook-master.yml +++ b/ansible/playbook-master.yml @@ -25,7 +25,6 @@ - thrift - aria2p - PyYAML - - apache-airflow-providers-amazon state: present become: yes @@ -279,20 +278,23 @@ var: config_generator_result.stdout_lines when: config_generator_result.changed - - name: Start ytdlp-ops services on master - community.docker.docker_compose_v2: - project_src: "{{ airflow_master_dir }}" - files: - - configs/docker-compose-ytdlp-ops.yaml - state: present - remove_orphans: true - pull: "{{ 'never' if fast_deploy | default(false) else 'missing' }}" - roles: - ytdlp-master - airflow-master post_tasks: + - name: Wait for airflow-scheduler to be running before proceeding + ansible.builtin.command: docker compose ps --filter "status=running" --services + args: + chdir: "{{ airflow_master_dir }}" + register: running_services + until: "'airflow-scheduler' in running_services.stdout_lines" + retries: 30 + delay: 10 + changed_when: false + become: yes + become_user: "{{ ansible_user }}" + - name: Delete existing Airflow redis_default connection to ensure an idempotent update ansible.builtin.command: > docker compose exec -T airflow-scheduler diff --git a/ansible/playbook-sync-local.yml b/ansible/playbook-sync-local.yml index 3492053..472a4d2 100644 --- a/ansible/playbook-sync-local.yml +++ b/ansible/playbook-sync-local.yml @@ -102,13 +102,7 @@ dest: "{{ airflow_worker_dir }}/{{ item }}" perms: yes loop: - - "README.client.md" - "cli.config" - - "format_download.py" - - "get_info_json_client.py" - - "list_formats.py" - - "stress_test_formats.py" - - "stress_enhanced.py" - "package_client.py" - "bin/ytops-client" become: yes diff --git a/ansible/playbook-update-s3-vars.yml b/ansible/playbook-update-s3-vars.yml index c7bddd6..da20807 100644 --- a/ansible/playbook-update-s3-vars.yml +++ b/ansible/playbook-update-s3-vars.yml @@ -1,9 +1,10 @@ --- -- name: Update S3 Delivery Airflow Connection +- name: Update S3 Connection Variable hosts: airflow_master + gather_facts: no vars_files: - - "{{ inventory_dir }}/group_vars/all/vault.yml" - "{{ inventory_dir }}/group_vars/all/generated_vars.yml" + - "{{ inventory_dir }}/group_vars/all/vault.yml" tasks: - name: Delete existing s3_delivery_connection to ensure an idempotent update ansible.builtin.command: > diff --git a/ansible/playbooks/restart_worker.yml b/ansible/playbooks/restart_worker.yml index 5df5d65..4d1e03b 100644 --- a/ansible/playbooks/restart_worker.yml +++ b/ansible/playbooks/restart_worker.yml @@ -17,12 +17,6 @@ mode: '0755' become: yes - - name: "Copy get_info_json_client.py to worker" - ansible.builtin.copy: - src: ../../get_info_json_client.py - dest: "{{ project_dir }}/get_info_json_client.py" - mode: '0755' - become: yes - name: "Pull the latest image for the ytdlp-ops service" community.docker.docker_image: diff --git a/ansible/roles/airflow-master/tasks/main.yml b/ansible/roles/airflow-master/tasks/main.yml index 98afd73..f991224 100644 --- a/ansible/roles/airflow-master/tasks/main.yml +++ b/ansible/roles/airflow-master/tasks/main.yml @@ -81,7 +81,6 @@ - "thrift_model" - "VERSION" - "airflow/update-yt-dlp.sh" - - "get_info_json_client.py" - "proxy_manager_client.py" - "utils" @@ -328,6 +327,7 @@ project_src: "{{ airflow_master_dir }}" files: - "configs/docker-compose-master.yaml" + - "configs/docker-compose-ytdlp-ops.yaml" state: present remove_orphans: true pull: "{{ 'never' if fast_deploy | default(false) else 'missing' }}" diff --git a/ansible/roles/airflow-worker/tasks/main.yml b/ansible/roles/airflow-worker/tasks/main.yml index 020596a..9e7904c 100644 --- a/ansible/roles/airflow-worker/tasks/main.yml +++ b/ansible/roles/airflow-worker/tasks/main.yml @@ -71,7 +71,6 @@ - "thrift_model" - "VERSION" - "airflow/update-yt-dlp.sh" - - "get_info_json_client.py" - "proxy_manager_client.py" - "utils"