diff --git a/README.md b/README.md index 22f80b4..e69f731 100644 --- a/README.md +++ b/README.md @@ -14,21 +14,22 @@ Generate the inventory and configuration files from your cluster definition: ```bash ./tools/generate-inventory.py cluster.test.yml -cd ansible ``` +**Note:** All Ansible commands should be run from the project root directory. + ## Full Deployment ### Deploy entire cluster with proxies (recommended for new setups): ```bash -ansible-playbook playbook-full-with-proxies.yml +ansible-playbook ansible/playbook-full-with-proxies.yml ``` ### Deploy cluster without proxies: ```bash -ansible-playbook playbook-full.yml +ansible-playbook ansible/playbook-full.yml ``` ## Targeted Deployments @@ -36,13 +37,13 @@ ansible-playbook playbook-full.yml ### Deploy only to master node: ```bash -ansible-playbook playbook-master.yml --limit="af-test" +ansible-playbook ansible/playbook-master.yml --limit="af-test" ``` ### Deploy only to worker nodes: ```bash -ansible-playbook playbook-worker.yml +ansible-playbook ansible/playbook-worker.yml ``` ## Deploy Specific Steps @@ -50,7 +51,7 @@ ansible-playbook playbook-worker.yml To start at a specific task (useful for debugging or partial deployments): ```bash -ansible-playbook playbook-master.yml --limit="af-test" --start-at-task="Prepare Caddy asset extraction directory" +ansible-playbook ansible/playbook-master.yml --limit="af-test" --start-at-task="Prepare Caddy asset extraction directory" ``` ## Debug Deployments @@ -58,7 +59,7 @@ ansible-playbook playbook-master.yml --limit="af-test" --start-at-task="Prepare Run with dry-run and verbose output for debugging: ```bash -ansible-playbook playbook-full.yml --check --diff -vv +ansible-playbook ansible/playbook-full.yml --check --diff -vv ``` ## DAGs Only Deployment @@ -66,7 +67,7 @@ ansible-playbook playbook-full.yml --check --diff -vv To update only DAG files and configurations: ```bash -ansible-playbook playbook-dags.yml +ansible-playbook ansible/playbook-dags.yml ``` ## Vault Management diff --git a/airflow/Dockerfile b/airflow/Dockerfile index 9a48fc6..5242560 100644 --- a/airflow/Dockerfile +++ b/airflow/Dockerfile @@ -14,7 +14,11 @@ RUN apt-get update && \ python3-dev \ wget \ tar \ - xz-utils && \ + xz-utils \ + iputils-ping \ + curl \ + traceroute \ + tcpdump && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* /usr/share/man /usr/share/doc /usr/share/doc-base diff --git a/airflow/config/custom_task_hooks.py b/airflow/config/custom_task_hooks.py index 07cc1b2..fdb8176 100644 --- a/airflow/config/custom_task_hooks.py +++ b/airflow/config/custom_task_hooks.py @@ -1,20 +1,24 @@ # Version: 2025-08-20-02 # This file contains custom hooks for the Airflow environment. from airflow import settings +from airflow.models.dagrun import DagRun +from airflow.utils.session import provide_session -def task_instance_mutation_hook(ti): + +@provide_session +def task_instance_mutation_hook(ti, session=None): if ti.dag_id == 'ytdlp_ops_worker_per_url': - # Safely access dag_run and conf. The ti.dag_run attribute may not be populated - # when the hook is called during TaskInstance creation. - dag_run = getattr(ti, 'dag_run', None) - conf = getattr(dag_run, 'conf', {}) if dag_run else {} + # Query the DagRun from the DB using run_id to reliably get the conf. + # The ti.dag_run attribute is not always populated when the hook is called. + dag_run = session.query(DagRun).filter(DagRun.run_id == ti.run_id).first() + conf = dag_run.conf if dag_run else {} worker_queue = conf.get('worker_queue') if worker_queue: - print(f"Mutating queue for task {ti.task_id} to {worker_queue} based on dag_run.conf") + print(f"MUTATION HOOK: For dag '{ti.dag_id}', pinning task '{ti.task_id}' (run_id: {ti.run_id}) to queue '{worker_queue}'.") ti.queue = worker_queue else: - print(f"No worker_queue in conf for {ti.dag_id}. Falling back to 'queue-dl'") + print(f"MUTATION HOOK: For dag '{ti.dag_id}', no 'worker_queue' in conf for run_id '{ti.run_id}'. Falling back to 'queue-dl'.") ti.queue = 'queue-dl' # Register the hook only in appropriate contexts diff --git a/airflow/config/minio_default_conn.json.j2 b/airflow/config/minio_default_conn.json.j2 index 65e72ef..24be986 100644 --- a/airflow/config/minio_default_conn.json.j2 +++ b/airflow/config/minio_default_conn.json.j2 @@ -4,9 +4,9 @@ "host": "{{ hostvars[groups['airflow_master'][0]].ansible_host }}", "login": "admin", "password": "0153093693-0009", - "port": 9000, + "port": 80, "extra": { - "endpoint_url": "http://{{ hostvars[groups['airflow_master'][0]].ansible_host }}:9000", + "endpoint_url": "http://{{ hostvars[groups['airflow_master'][0]].ansible_host }}:80", "region_name": "us-east-1", "aws_access_key_id": "admin", "aws_secret_access_key": "0153093693-0009", diff --git a/airflow/configs/docker-compose-dl.yaml.j2 b/airflow/configs/docker-compose-dl.yaml.j2 index 41c3633..75e714a 100644 --- a/airflow/configs/docker-compose-dl.yaml.j2 +++ b/airflow/configs/docker-compose-dl.yaml.j2 @@ -19,7 +19,7 @@ x-airflow-common: # This section is auto-generated by Ansible from the inventory. extra_hosts: {% for host in groups['all'] %} - - "{{ hostvars[host]['inventory_hostname'] }}:{{ hostvars[host]['ansible_host'] }}" + - "{{ hostvars[host]['inventory_hostname'] }}:{{ hostvars[host]['ansible_host'] | default(hostvars[host]['inventory_hostname']) }}" {% endfor %} env_file: # The .env file is located in the project root (e.g., /srv/airflow_dl_worker), diff --git a/airflow/configs/docker-compose-master.yaml.j2 b/airflow/configs/docker-compose-master.yaml.j2 index 4954bc6..22e7ec0 100644 --- a/airflow/configs/docker-compose-master.yaml.j2 +++ b/airflow/configs/docker-compose-master.yaml.j2 @@ -57,7 +57,7 @@ x-airflow-common: # This section is auto-generated by Ansible from the inventory. extra_hosts: {% for host in groups['all'] %} - - "{{ hostvars[host]['inventory_hostname'] }}:{{ hostvars[host]['ansible_host'] }}" + - "{{ hostvars[host]['inventory_hostname'] }}:{{ hostvars[host]['ansible_host'] | default(hostvars[host]['inventory_hostname']) }}" {% endfor %} env_file: # The .env file is located in the project root, one level above the 'configs' directory. @@ -237,14 +237,15 @@ services: networks: - proxynet ports: - - "9000:9000" + - "80:80" + - "81:81" volumes: - ./configs/nginx.conf:/etc/nginx/nginx.conf:ro depends_on: minio: condition: service_healthy healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + test: ["CMD", "curl", "-f", "http://localhost:80/minio/health/live"] interval: 30s timeout: 10s retries: 5 @@ -261,7 +262,7 @@ services: entrypoint: > /bin/sh -c " set -e; - /usr/bin/mc alias set minio http://nginx-minio-lb:9000 $$MINIO_ROOT_USER $$MINIO_ROOT_PASSWORD; + /usr/bin/mc alias set minio http://nginx-minio-lb:80 $$MINIO_ROOT_USER $$MINIO_ROOT_PASSWORD; # Retry loop for bucket creation MAX_ATTEMPTS=10 SUCCESS=false diff --git a/airflow/configs/docker-compose-ytdlp-ops.yaml.j2 b/airflow/configs/docker-compose-ytdlp-ops.yaml.j2 index 574d550..936653e 100644 --- a/airflow/configs/docker-compose-ytdlp-ops.yaml.j2 +++ b/airflow/configs/docker-compose-ytdlp-ops.yaml.j2 @@ -44,6 +44,7 @@ services: - ./.env # Path is relative to the project directory volumes: - context-data:/app/context-data + - ./logs/communication_logs:/app/communication_logs {% if service_role != 'management' %} # Mount the generated endpoints file to make it available to the server - ./configs/camoufox_endpoints.json:/app/config/camoufox_endpoints.json:ro diff --git a/airflow/configs/nginx.conf b/airflow/configs/nginx.conf index 64f4d7d..106e774 100644 --- a/airflow/configs/nginx.conf +++ b/airflow/configs/nginx.conf @@ -2,7 +2,7 @@ events { worker_connections 1024; } -stream { +http { upstream minio_servers { server minio:9000; } @@ -12,12 +12,24 @@ stream { } server { - listen 9000; - proxy_pass minio_servers; + listen 80; + location / { + proxy_pass http://minio_servers; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + } } server { - listen 9001; - proxy_pass minio_console_servers; + listen 81; + location / { + proxy_pass http://minio_console_servers; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + } } } diff --git a/airflow/dags/ORCHESTRATOR.ru.md b/airflow/dags/ORCHESTRATOR.ru.md deleted file mode 100644 index 72a203e..0000000 --- a/airflow/dags/ORCHESTRATOR.ru.md +++ /dev/null @@ -1,135 +0,0 @@ -# Архитектура и описание YTDLP Airflow DAGs - -Этот документ описывает архитектуру и назначение DAG'ов, используемых для скачивания видео с YouTube. Система построена на модели непрерывного, самоподдерживающегося цикла для параллельной и отказоустойчивой обработки. - -## Основной цикл обработки - -Обработка выполняется двумя основными DAG'ами, которые работают в паре: оркестратор и воркер. - -### `ytdlp_ops_orchestrator` (Система "зажигания") - -- **Назначение:** Этот DAG действует как "система зажигания" для запуска обработки. Он запускается вручную для старта указанного количества параллельных циклов-воркеров. -- **Принцип работы:** - - Он **не** обрабатывает URL-адреса самостоятельно. - - Его единственная задача — запустить сконфигурированное количество DAG'ов `ytdlp_ops_worker_per_url`. - - Он передает всю необходимую конфигурацию (пул аккаунтов, подключение к Redis и т.д.) воркерам. - -### `ytdlp_ops_worker_per_url` (Самоподдерживающийся воркер) - -- **Назначение:** Этот DAG обрабатывает один URL и спроектирован для работы в непрерывном цикле. -- **Принцип работы:** - 1. **Запуск:** Начальный запуск инициируется `ytdlp_ops_orchestrator`. - 2. **Получение задачи:** Воркер извлекает один URL из очереди `_inbox` в Redis. Если очередь пуста, выполнение воркера завершается, и его "линия" обработки останавливается. - 3. **Обработка:** Он взаимодействует с сервисом `ytdlp-ops-server` для получения `info.json` и прокси, после чего скачивает видео. - 4. **Продолжение или остановка:** - - **В случае успеха:** Он запускает новый экземпляр самого себя, создавая непрерывный цикл для обработки следующего URL. - - **В случае сбоя:** Цикл прерывается (если `stop_on_failure` установлено в `True`), останавливая эту "линию" обработки. Это предотвращает остановку всей системы из-за одного проблемного URL или аккаунта. - -## Управляющие DAG'и - -### `ytdlp_mgmt_proxy_account` - -- **Назначение:** Это основной инструмент для мониторинга и управления состоянием ресурсов, используемых `ytdlp-ops-server`. -- **Функциональность:** - - **Просмотр статусов:** Позволяет увидеть текущий статус всех прокси и аккаунтов (например, `ACTIVE`, `BANNED`, `RESTING`). - - **Управление прокси:** Позволяет вручную банить, разбанивать или сбрасывать статус прокси. - - **Управление аккаунтами:** Позволяет вручную банить или разбанивать аккаунты. - -### `ytdlp_mgmt_queues` - -- **Назначение:** Предоставляет набор инструментов для управления очередями Redis, используемыми в конвейере обработки. -- **Функциональность (через параметр `action`):** - - `add_videos`: Добавление одного или нескольких URL-адресов YouTube в очередь. - - `clear_queue`: Очистка (удаление) указанного ключа Redis. - - `list_contents`: Просмотр содержимого ключа Redis (списка или хэша). - - `check_status`: Проверка общего состояния очередей (тип, размер). - - `requeue_failed`: Перемещение всех URL-адресов из очереди сбоев `_fail` обратно в очередь `_inbox` для повторной обработки. - -## Стратегия управления ресурсами (Прокси и Аккаунты) - -Система использует интеллектуальную стратегию для управления жизненным циклом и состоянием аккаунтов и прокси, чтобы максимизировать процент успеха и минимизировать блокировки. - -- **Жизненный цикл аккаунта ("Cooldown"):** - - Чтобы предотвратить "выгорание", аккаунты автоматически переходят в состояние "отдыха" (`RESTING`) после периода интенсивного использования. - - По истечении периода отдыха они автоматически возвращаются в `ACTIVE` и снова становятся доступными для воркеров. - -- **Умная стратегия банов:** - - **Сначала бан аккаунта:** При возникновении серьезной ошибки (например, `BOT_DETECTED`) система наказывает **только аккаунт**, который вызвал сбой. Прокси при этом продолжает работать. - - **Бан прокси по "скользящему окну":** Прокси банится автоматически, только если он демонстрирует **систематические сбои с РАЗНЫМИ аккаунтами** за короткий промежуток времени. Это является надежным индикатором того, что проблема именно в прокси. - -- **Мониторинг:** - - DAG `ytdlp_mgmt_proxy_account` является основным инструментом для мониторинга. Он показывает текущий статус всех ресурсов, включая время, оставшееся до активации забаненных или отдыхающих аккаунтов. - - Граф выполнения DAG `ytdlp_ops_worker_per_url` теперь явно показывает шаги, такие как `assign_account`, `get_token`, `ban_account`, `retry_get_token`, что делает процесс отладки более наглядным. - -## Внешние сервисы - -### `ytdlp-ops-server` (Thrift Service) - -- **Назначение:** Внешний сервис, который предоставляет аутентификационные данные (токены, cookies, proxy) для скачивания видео. -- **Взаимодействие:** Worker DAG (`ytdlp_ops_worker_per_url`) обращается к этому сервису перед началом загрузки для получения необходимых данных для `yt-dlp`. - -## Логика работы Worker DAG (`ytdlp_ops_worker_per_url`) - -Этот DAG является "рабочей лошадкой" системы. Он спроектирован как самоподдерживающийся цикл для обработки одного URL за запуск. - -### Задачи и их назначение: - -- **`pull_url_from_redis`**: Извлекает один URL из очереди `_inbox` в Redis. Если очередь пуста, DAG завершается со статусом `skipped`, останавливая эту "линию" обработки. -- **`assign_account`**: Выбирает аккаунт для выполнения задачи. Он будет повторно использовать тот же аккаунт, который был успешно использован в предыдущем запуске в своей "линии" (привязка аккаунта). Если это первый запуск, он выбирает случайный аккаунт. -- **`get_token`**: Основная задача. Она обращается к `ytdlp-ops-server` для получения `info.json`. -- **`handle_bannable_error_branch`**: Если `get_token` завершается с ошибкой, требующей бана, эта задача-развилка решает, что делать дальше, в зависимости от политики `on_bannable_failure`. -- **`ban_account_and_prepare_for_retry`**: Если политика разрешает повтор, эта задача банит сбойный аккаунт и выбирает новый для повторной попытки. -- **`retry_get_token`**: Выполняет вторую попытку получить токен с новым аккаунтом. -- **`ban_second_account_and_proxy`**: Если и вторая попытка неудачна, эта задача банит второй аккаунт и использованный прокси. -- **`download_and_probe`**: Если `get_token` (или `retry_get_token`) завершилась успешно, эта задача использует `yt-dlp` для скачивания медиа и `ffmpeg` для проверки целостности скачанного файла. -- **`mark_url_as_success`**: Если `download_and_probe` завершилась успешно, эта задача записывает результат в хэш `_result` в Redis. -- **`handle_generic_failure`**: Если любая из основных задач завершается с неисправимой ошибкой, эта задача записывает подробную информацию об ошибке в хэш `_fail` в Redis. -- **`decide_what_to_do_next`**: Задача-развилка, которая запускается после успеха или неудачи. Она решает, продолжать ли цикл. -- **`trigger_self_run`**: Задача, которая фактически запускает следующий экземпляр DAG, создавая непрерывный цикл. - -## Механизм привязки воркеров к конкретным машинам (Worker Pinning / Affinity) - -Для обеспечения того, чтобы все задачи, связанные с обработкой одного конкретного URL, выполнялись на одной и той же машине (воркере), система использует комбинацию из трех компонентов: Оркестратора, Диспетчера и специального хука Airflow. - -### 1. `ytdlp_ops_orchestrator` (Оркестратор) - -- **Роль:** Инициирует процесс обработки. -- **Действие:** При запуске он создает несколько DAG-запусков `ytdlp_ops_dispatcher`. Каждый такой запуск предназначен для обработки одного URL. -- **Передача параметров:** Оркестратор передает свои параметры конфигурации (например, `account_pool`, `redis_conn_id`, `service_ip`) каждому запуску диспетчера. - -### 2. `ytdlp_ops_dispatcher` (Диспетчер) - -- **Роль:** Основной механизм обеспечения привязки. -- **Действие:** - 1. **Получает URL:** Извлекает один URL из очереди Redis (`_inbox`). - 2. **Определяет воркер:** Использует `socket.gethostname()` для определения имени текущей машины (воркера), на которой он выполняется. - 3. **Формирует имя очереди:** Создает уникальное имя очереди для этого воркера, например, `queue-dl-dl-worker-1`. - 4. **Запускает Worker DAG:** Инициирует запуск DAG `ytdlp_ops_worker_per_url`, передавая ему: - * Извлеченный `url_to_process`. - * Сформированное имя очереди `worker_queue` через параметр `conf`. - * Все остальные параметры, полученные от оркестратора. -- **Ключевой момент:** Именно на этом этапе устанавливается связь между конкретным URL и конкретным воркером, на котором началась обработка этого URL. - -### 3. `task_instance_mutation_hook` (Хук изменения задач) - -- **Расположение:** `airflow/config/custom_task_hooks.py` -- **Роль:** Является механизмом, который обеспечивает выполнение *всех* задач Worker DAG на нужной машине. -- **Как это работает:** - 1. **Регистрация:** Хук регистрируется в конфигурации Airflow и вызывается перед запуском *каждой* задачи. - 2. **Проверка DAG ID:** Хук проверяет, принадлежит ли задача (`TaskInstance`) DAG `ytdlp_ops_worker_per_url`. - 3. **Извлечение `conf`:** Если да, он безопасно извлекает `conf` из `DagRun`, связанного с этой задачей. - 4. **Изменение очереди:** - * Если в `conf` найден ключ `worker_queue` (что будет true для всех запусков, инициированных диспетчером), хук *переопределяет* стандартную очередь задачи на это значение. - * Это означает, что Airflow планировщик поставит эту задачу именно в ту очередь, которая прослушивается нужным воркером. - 5. **Резервный вариант:** Если `worker_queue` не найден (например, DAG запущен вручную), задача возвращается в стандартную очередь `queue-dl`. -- **Ключевой момент:** Этот хук гарантирует, что *все последующие задачи* в рамках одного запуска `ytdlp_ops_worker_per_url` (например, `get_token`, `download_and_probe`, `mark_url_as_success`) будут выполнены на том же воркере, который изначально получил URL в диспетчере. - -### Резюме - -Комбинация `Оркестратор -> Диспетчер -> Хук` эффективно реализует привязку задач к воркерам: - -1. **Оркестратор** запускает процесс. -2. **Диспетчер** связывает конкретный URL с конкретным воркером, определяя его имя хоста и передавая его как `worker_queue` в Worker DAG. -3. **Хук** гарантирует, что все задачи Worker DAG выполняются в очереди, соответствующей этому воркеру. - -Это позволяет системе использовать локальные ресурсы воркера (например, кэш, временные файлы) эффективно и предсказуемо для обработки каждого отдельного URL. diff --git a/airflow/dags/README.ru.md b/airflow/dags/README.ru.md index d8d8882..5978c1b 100644 --- a/airflow/dags/README.ru.md +++ b/airflow/dags/README.ru.md @@ -107,12 +107,12 @@ **Поставить воркер на паузу:** (Замените `"hostname"` на имя хоста из вашего inventory-файла) ```bash -ansible-playbook ansible/playbooks/pause_worker.yml --limit "hostname" +ansible-playbook -i ansible/inventory.ini ansible/playbooks/pause_worker.yml --limit "hostname" ``` **Возобновить работу воркера:** ```bash -ansible-playbook ansible/playbooks/resume_worker.yml --limit "hostname" +ansible-playbook -i ansible/inventory.ini ansible/playbooks/resume_worker.yml --limit "hostname" ``` ## Механизм привязки воркеров к конкретным машинам (Worker Pinning / Affinity) diff --git a/airflow/dags/ytdlp_ops_dispatcher.py b/airflow/dags/ytdlp_ops_dispatcher.py index 62b9b80..82daaa1 100644 --- a/airflow/dags/ytdlp_ops_dispatcher.py +++ b/airflow/dags/ytdlp_ops_dispatcher.py @@ -32,7 +32,7 @@ def dispatch_url_to_worker(**context): """ # --- Check for worker pause lock file --- # This path must be consistent with the Ansible playbook. - lock_file_path = '/srv/airflow_dl_worker/AIRFLOW.PREVENT_URL_PULL.lock' + lock_file_path = '/opt/airflow/inputfiles/AIRFLOW.PREVENT_URL_PULL.lockfile' hostname = socket.gethostname() if os.path.exists(lock_file_path): logger.info(f"Worker '{hostname}' is paused. Lock file found at '{lock_file_path}'. Skipping URL pull.") diff --git a/airflow/dags/ytdlp_ops_orchestrator.py b/airflow/dags/ytdlp_ops_orchestrator.py index 105b54a..526130c 100644 --- a/airflow/dags/ytdlp_ops_orchestrator.py +++ b/airflow/dags/ytdlp_ops_orchestrator.py @@ -264,7 +264,7 @@ with DAG( ), 'queue_name': Param(DEFAULT_QUEUE_NAME, type="string", description="[Worker Param] Base name for Redis queues."), 'redis_conn_id': Param(DEFAULT_REDIS_CONN_ID, type="string", description="[Worker Param] Airflow Redis connection ID."), - 'clients': Param('web', type="string", description="[Worker Param] Comma-separated list of clients for token generation. Full list: web, mweb, ios, android, web_safari, web_embedded, web_music, web_creator"), + 'clients': Param('web', type="string", description="[Worker Param] Comma-separated list of clients for token generation. Full list: web, web_safari, web_embedded, web_music, web_creator, mweb, web_camoufox, web_safari_camoufox, web_embedded_camoufox, web_music_camoufox, web_creator_camoufox, mweb_camoufox, android, android_music, android_creator, android_vr, ios, ios_music, ios_creator, tv, tv_simply, tv_sample, tv_embedded"), 'account_pool': Param('ytdlp_account', type="string", description="[Worker Param] Account pool prefix or comma-separated list."), 'account_pool_size': Param(10, type=["integer", "null"], description="[Worker Param] If using a prefix for 'account_pool', this specifies the number of accounts to generate (e.g., 10 for 'prefix_01' through 'prefix_10'). Required when using a prefix."), 'service_ip': Param(DEFAULT_YT_AUTH_SERVICE_IP, type="string", description="[Worker Param] IP of the ytdlp-ops-server. Default is from Airflow variable YT_AUTH_SERVICE_IP or hardcoded."), diff --git a/ansible.cfg b/ansible.cfg new file mode 100644 index 0000000..26c553f --- /dev/null +++ b/ansible.cfg @@ -0,0 +1,17 @@ +[defaults] +timeout = 30 +inventory = ansible/inventory.ini +roles_path = ansible/roles +retry_files_enabled = False +host_key_checking = False +vault_password_file = .vault_pass + +[inventory] +enable_plugins = ini + +[ssh_connection] +pipelining = True +control_path_dir = ~/.ansible/cp +# SSH connection timeout increased for jump host connections +# Enable connection sharing (ControlMaster) to speed up multiple tasks. +ssh_args = -o ControlMaster=auto -o ControlPersist=60s -o ConnectTimeout=60 -o ConnectionAttempts=3 diff --git a/ansible/README.md b/ansible/README.md index f3bdf97..ef5aea7 100644 --- a/ansible/README.md +++ b/ansible/README.md @@ -2,18 +2,21 @@ This directory contains the Ansible playbooks, roles, and configurations for deploying and managing the YT-DLP Airflow cluster. +**Note:** All commands should be run from the project root, not from within this directory. +Example: `ansible-playbook ansible/playbook-full.yml` + ## Full Deployment ### Deploy entire cluster with proxies (recommended for new setups): ```bash -ansible-playbook playbook-full-with-proxies.yml +ansible-playbook ansible/playbook-full-with-proxies.yml ``` ### Deploy cluster without proxies: ```bash -ansible-playbook playbook-full.yml +ansible-playbook ansible/playbook-full.yml ``` ## Targeted Deployments @@ -21,13 +24,13 @@ ansible-playbook playbook-full.yml ### Deploy only to master node: ```bash -ansible-playbook playbook-master.yml --limit="af-test" +ansible-playbook ansible/playbook-master.yml --limit="af-test" ``` ### Deploy only to worker nodes: ```bash -ansible-playbook playbook-worker.yml +ansible-playbook ansible/playbook-worker.yml ``` ## DAGs Only Deployment @@ -35,7 +38,7 @@ ansible-playbook playbook-worker.yml To update only DAG files and configurations: ```bash -ansible-playbook playbook-dags.yml +ansible-playbook ansible/playbook-dags.yml ``` ## Managing Worker State (Pause/Resume) @@ -48,7 +51,7 @@ This command creates the lock file, causing the `ytdlp_ops_dispatcher` DAG to sk ```bash # Replace "worker-hostname" with the target host from your inventory -ansible-playbook playbooks/pause_worker.yml --limit "worker-hostname" +ansible-playbook ansible/playbooks/pause_worker.yml --limit "worker-hostname" ``` ### To Resume a Worker @@ -57,5 +60,5 @@ This command removes the lock file, allowing the worker to resume picking up tas ```bash # Replace "worker-hostname" with the target host from your inventory -ansible-playbook playbooks/resume_worker.yml --limit "worker-hostname" +ansible-playbook ansible/playbooks/resume_worker.yml --limit "worker-hostname" ``` diff --git a/ansible/ansible.cfg b/ansible/ansible.cfg deleted file mode 100644 index 246e9fa..0000000 --- a/ansible/ansible.cfg +++ /dev/null @@ -1,9 +0,0 @@ -[defaults] -inventory = ansible/inventory.ini -roles_path = ansible/roles -retry_files_enabled = False -host_key_checking = False -vault_password_file = .vault_pass - -[inventory] -enable_plugins = ini diff --git a/ansible/group_vars/all/generated_vars.yml b/ansible/group_vars/all/generated_vars.yml index 3052d31..14b5a76 100644 --- a/ansible/group_vars/all/generated_vars.yml +++ b/ansible/group_vars/all/generated_vars.yml @@ -6,6 +6,7 @@ airflow_master_dir: /srv/airflow_master airflow_uid: 1003 airflow_worker_dir: /srv/airflow_dl_worker ansible_user: alex_p +camoufox_base_port: 9070 camoufox_base_vnc_port: 5901 deploy_group: ytdl dir_permissions: '0755' @@ -39,4 +40,4 @@ shadowsocks_mode: tcp_and_udp shadowsocks_timeout: 20 ssh_user: alex_p ytdlp_base_port: 9090 -ytdlp_ops_image: pangramia/ytdlp-ops-server:latest +ytdlp_ops_image: pangramia/ytdlp-ops-server:3.10.1-exp diff --git a/ansible/host_vars/af-test.yml b/ansible/host_vars/af-test.yml deleted file mode 100644 index 7f00d45..0000000 --- a/ansible/host_vars/af-test.yml +++ /dev/null @@ -1,23 +0,0 @@ ---- -# Variables for af-test -master_host_ip: 89.253.223.97 -redis_port: 52909 -shadowsocks_proxies: - sslocal-rust-1087: - server: "91.103.252.51" - server_port: 8388 - local_port: 1087 - vault_password_key: "vault_ss_password_1" - sslocal-rust-1086: - server: "62.60.178.45" - server_port: 8388 - local_port: 1086 - vault_password_key: "vault_ss_password_2" - sslocal-rust-1081: - server: "79.137.207.43" - server_port: 8388 - local_port: 1081 - vault_password_key: "vault_ss_password_2" -worker_proxies: - - "socks5://sslocal-rust-1086:1086" - - "socks5://sslocal-rust-1081:1081" diff --git a/ansible/host_vars/dl002.yml b/ansible/host_vars/dl002.yml deleted file mode 100644 index 9ee82b3..0000000 --- a/ansible/host_vars/dl002.yml +++ /dev/null @@ -1,23 +0,0 @@ ---- -# Variables for dl002 -master_host_ip: 89.253.223.97 -redis_port: 52909 -shadowsocks_proxies: - sslocal-rust-1087: - server: "91.103.252.51" - server_port: 8388 - local_port: 1087 - vault_password_key: "vault_ss_password_1" - sslocal-rust-1086: - server: "62.60.178.45" - server_port: 8388 - local_port: 1086 - vault_password_key: "vault_ss_password_2" - sslocal-rust-1081: - server: "79.137.207.43" - server_port: 8388 - local_port: 1081 - vault_password_key: "vault_ss_password_2" -worker_proxies: - - "socks5://sslocal-rust-1081:1081" - - "socks5://sslocal-rust-1086:1086" diff --git a/ansible/playbook-dags.yml b/ansible/playbook-dags.yml index c9beb54..92b7b1a 100644 --- a/ansible/playbook-dags.yml +++ b/ansible/playbook-dags.yml @@ -45,3 +45,16 @@ rsync_opts: - "--exclude=__pycache__/" - "--exclude=*.pyc" + + - name: Sync Config to WORKER server + ansible.posix.synchronize: + src: "../airflow/config/{{ item }}" + dest: /srv/airflow_dl_worker/config/ + archive: yes + rsync_path: "sudo rsync" + rsync_opts: + - "--exclude=__pycache__/" + - "--exclude=*.pyc" + loop: + - "airflow.cfg" + - "custom_task_hooks.py" diff --git a/ansible/playbook-full.yml b/ansible/playbook-full.yml index 60b119c..6123cb5 100644 --- a/ansible/playbook-full.yml +++ b/ansible/playbook-full.yml @@ -54,6 +54,10 @@ - vim - python3-pip - iputils-ping + - traceroute + - fail2ban + - conntrack + - tcpdump state: present update_cache: yes diff --git a/ansible/playbook-worker.yml b/ansible/playbook-worker.yml index 0dc3092..23dcd80 100644 --- a/ansible/playbook-worker.yml +++ b/ansible/playbook-worker.yml @@ -149,9 +149,3 @@ roles: - ytdlp-worker - - airflow-worker - - post_tasks: - - name: Include camoufox verification tasks - include_tasks: tasks/verify_camoufox.yml - when: not fast_deploy | default(false) diff --git a/ansible/playbooks/pause_worker.yml b/ansible/playbooks/pause_worker.yml index 4d11579..5235379 100644 --- a/ansible/playbooks/pause_worker.yml +++ b/ansible/playbooks/pause_worker.yml @@ -6,7 +6,7 @@ tasks: - name: "Create lock file to pause worker" file: - path: "{{ airflow_worker_dir }}/AIRFLOW.PREVENT_URL_PULL.lock" + path: "{{ airflow_worker_dir }}/inputfiles/AIRFLOW.PREVENT_URL_PULL.lockfile" state: touch owner: "{{ ssh_user }}" group: "{{ deploy_group }}" diff --git a/ansible/playbooks/resume_worker.yml b/ansible/playbooks/resume_worker.yml index e7bafe6..8b0b7ce 100644 --- a/ansible/playbooks/resume_worker.yml +++ b/ansible/playbooks/resume_worker.yml @@ -6,8 +6,8 @@ tasks: - name: "Archive lock file to resume worker" command: > - mv {{ airflow_worker_dir }}/AIRFLOW.PREVENT_URL_PULL.lock - {{ airflow_worker_dir }}/AIRFLOW.PREVENT_URL_PULL.lock.removed-{{ ansible_date_time.year }}{{ '%02d' | format(ansible_date_time.month) }}{{ '%02d' | format(ansible_date_time.day) }}-{{ '%02d' | format(ansible_date_time.hour) }}{{ '%02d' | format(ansible_date_time.minute) }} + mv {{ airflow_worker_dir }}/inputfiles/AIRFLOW.PREVENT_URL_PULL.lockfile + {{ airflow_worker_dir }}/inputfiles/AIRFLOW.PREVENT_URL_PULL.lockfile.removed-{{ ansible_date_time.year }}{{ '%02d' | format(ansible_date_time.month | int) }}{{ '%02d' | format(ansible_date_time.day | int) }}-{{ '%02d' | format(ansible_date_time.hour | int) }}{{ '%02d' | format(ansible_date_time.minute | int) }} args: - removes: "{{ airflow_worker_dir }}/AIRFLOW.PREVENT_URL_PULL.lock" + removes: "{{ airflow_worker_dir }}/inputfiles/AIRFLOW.PREVENT_URL_PULL.lockfile" become: yes diff --git a/ansible/roles/airflow-master/tasks/main.yml b/ansible/roles/airflow-master/tasks/main.yml index 324456c..98afd73 100644 --- a/ansible/roles/airflow-master/tasks/main.yml +++ b/ansible/roles/airflow-master/tasks/main.yml @@ -218,6 +218,7 @@ owner: "999" # UID for the 'postgres' user in the official postgres image group: "999" # GID for the 'postgres' group in the official postgres image mode: '0700' + recurse: yes become: yes - name: Set proper ownership and permissions on master logs directory contents diff --git a/ansible/roles/ytdlp-worker/tasks/main.yml b/ansible/roles/ytdlp-worker/tasks/main.yml index c829647..d628112 100644 --- a/ansible/roles/ytdlp-worker/tasks/main.yml +++ b/ansible/roles/ytdlp-worker/tasks/main.yml @@ -1,14 +1,14 @@ --- - name: Ensure worker is not paused on deploy (remove .lock file) file: - path: "{{ airflow_worker_dir }}/AIRFLOW.PREVENT_URL_PULL.lock" + path: "{{ airflow_worker_dir }}/inputfiles/AIRFLOW.PREVENT_URL_PULL.lockfile" state: absent become: yes - name: Clean up old renamed lock files (older than 7 days) ansible.builtin.find: - paths: "{{ airflow_worker_dir }}" - patterns: "AIRFLOW.PREVENT_URL_PULL.lock.removed-*" + paths: "{{ airflow_worker_dir }}/inputfiles" + patterns: "AIRFLOW.PREVENT_URL_PULL.lockfile.removed-*" age: "7d" use_regex: false register: old_lock_files @@ -22,6 +22,24 @@ become: yes when: old_lock_files.files | length > 0 +- name: Ensure YT-DLP worker inputfiles directory exists + file: + path: "{{ airflow_worker_dir }}/inputfiles" + state: directory + owner: "{{ ssh_user }}" + group: "{{ deploy_group }}" + mode: '0755' + become: yes + +- name: Ensure YT-DLP worker logs directory exists + file: + path: "{{ airflow_worker_dir }}/logs" + state: directory + owner: "{{ airflow_uid }}" + group: "{{ deploy_group }}" + mode: '0775' + become: yes + - name: Check if YT-DLP worker deployment directory exists stat: path: "{{ airflow_worker_dir }}" @@ -141,6 +159,26 @@ become: yes become_user: "{{ ssh_user }}" +- name: Clean up old root docker-compose files to prevent conflicts + ansible.builtin.file: + path: "{{ airflow_worker_dir }}/{{ item }}" + state: absent + loop: + - "docker-compose.yml" + - "docker-compose.yaml" + - "docker-compose.override.yml" + - "docker-compose.airflow.yml" + become: yes + +- name: Template docker-compose file for Airflow worker + template: + src: "{{ playbook_dir }}/../airflow/configs/docker-compose-dl.yaml.j2" + dest: "{{ airflow_worker_dir }}/configs/docker-compose.airflow.yml" + mode: "{{ file_permissions }}" + owner: "{{ ssh_user }}" + group: "{{ deploy_group }}" + become: yes + - name: "Log: Building Camoufox (remote browser) image" debug: msg: "Building the Camoufox image locally. This image provides remote-controlled Firefox browsers for token generation." @@ -168,15 +206,21 @@ path: "/srv/shadowsocks-rust/docker-compose.proxies.yaml" register: proxy_compose_file -- name: "Log: Starting YT-DLP worker services" +- name: "Log: Starting all worker services" debug: - msg: "Starting the core YT-DLP worker services: ytdlp-ops-service (Thrift API), envoy (load balancer), and camoufox (remote browsers)." + msg: "Starting all worker services: ytdlp-ops, camoufox, and airflow-worker." -- name: Start YT-DLP worker service +- name: Start all worker services community.docker.docker_compose_v2: project_src: "{{ airflow_worker_dir }}" files: - "configs/docker-compose-ytdlp-ops.yaml" + - "configs/docker-compose.camoufox.yaml" + - "configs/docker-compose.airflow.yml" state: present remove_orphans: true pull: "{{ 'never' if fast_deploy | default(false) else 'missing' }}" + +- name: Include camoufox verification tasks + include_tasks: ../../../tasks/verify_camoufox.yml + when: not fast_deploy | default(false) diff --git a/ansible/templates/.env.j2 b/ansible/templates/.env.j2 index f99cdeb..cd2cc13 100644 --- a/ansible/templates/.env.j2 +++ b/ansible/templates/.env.j2 @@ -50,7 +50,7 @@ YTDLP_TIMEOUT=600 CAMOUFOX_PROXIES="{{ (worker_proxies | default([])) | join(',') }}" VNC_PASSWORD="{{ vault_vnc_password }}" CAMOUFOX_BASE_VNC_PORT={{ camoufox_base_vnc_port }} -CAMOUFOX_PORT={{ camoufox_port }} +CAMOUFOX_PORT={{ camoufox_base_port }} # --- Account Manager Configuration --- ACCOUNT_ACTIVE_DURATION_MIN={{ account_active_duration_min | default(7) }} diff --git a/cluster.green.yml b/cluster.green.yml index 4eb54c7..8d4f516 100644 --- a/cluster.green.yml +++ b/cluster.green.yml @@ -1,6 +1,6 @@ global_vars: # Docker image versions - ytdlp_ops_image: "pangramia/ytdlp-ops-server:latest" + ytdlp_ops_image: "pangramia/ytdlp-ops-server:3.10.1-exp" airflow_image_name: "pangramia/ytdlp-ops-airflow:latest" # Default ports @@ -9,6 +9,7 @@ global_vars: envoy_port: 9080 envoy_admin_port: 9901 management_service_port: 9091 + camoufox_base_port: 9070 camoufox_base_vnc_port: 5901 # Default UID diff --git a/get_info_json_client.py b/get_info_json_client.py index ba4393c..d364cee 100644 --- a/get_info_json_client.py +++ b/get_info_json_client.py @@ -41,7 +41,7 @@ def parse_args(): parser.add_argument('--host', default='127.0.0.1', help="Thrift server host. Using 127.0.0.1 avoids harmless connection errors when the local Envoy proxy only listens on IPv4.") parser.add_argument('--port', type=int, default=9080, help='Thrift server port') parser.add_argument('--profile', default='default_profile', help='The profile name (accountId) to use for the request.') - parser.add_argument('--client', help='Specific client to use (e.g., web, ios, android). Overrides server default.') + parser.add_argument('--client', help='Specific client to use (e.g., web, ios). Overrides server default. Append "_camoufox" to any client name (e.g., "web_camoufox") to force the browser-based generation strategy.') parser.add_argument('--output', help='Output file path for the info.json. If not provided, prints to stdout.') parser.add_argument('--machine-id', help='Identifier for the client machine. Defaults to hostname.') parser.add_argument('--verbose', action='store_true', help='Enable verbose output')