164 lines
19 KiB
Markdown
164 lines
19 KiB
Markdown
# Архитектура и описание YTDLP Airflow DAGs
|
||
|
||
Этот документ описывает архитектуру и назначение DAG'ов, используемых для скачивания видео с YouTube. Система построена на модели непрерывного, самоподдерживающегося цикла для параллельной и отказоустойчивой обработки.
|
||
|
||
## Основной цикл обработки
|
||
|
||
Обработка выполняется двумя основными DAG'ами, которые работают в паре: оркестратор и воркер.
|
||
|
||
### `ytdlp_ops_orchestrator` (Система "зажигания")
|
||
|
||
- **Назначение:** Этот DAG действует как "система зажигания" для запуска обработки. Он запускается вручную для старта указанного количества параллельных циклов-воркеров.
|
||
- **Принцип работы:**
|
||
- Он **не** обрабатывает URL-адреса самостоятельно.
|
||
- Его единственная задача — запустить сконфигурированное количество DAG'ов `ytdlp_ops_worker_per_url`.
|
||
- Он передает всю необходимую конфигурацию (пул аккаунтов, подключение к Redis и т.д.) воркерам.
|
||
|
||
### `ytdlp_ops_worker_per_url` (Самоподдерживающийся воркер)
|
||
|
||
- **Назначение:** Этот DAG обрабатывает один URL и спроектирован для работы в непрерывном цикле.
|
||
- **Принцип работы:**
|
||
1. **Запуск:** Начальный запуск инициируется `ytdlp_ops_orchestrator`.
|
||
2. **Получение задачи:** Воркер извлекает один URL из очереди `_inbox` в Redis. Если очередь пуста, выполнение воркера завершается, и его "линия" обработки останавливается.
|
||
3. **Обработка:** Он взаимодействует с сервисом `ytdlp-ops-server` для получения `info.json` и прокси, после чего скачивает видео.
|
||
4. **Продолжение или остановка:**
|
||
- **В случае успеха:** Он запускает новый экземпляр самого себя, создавая непрерывный цикл для обработки следующего URL.
|
||
- **В случае сбоя:** Цикл прерывается (если `stop_on_failure` установлено в `True`), останавливая эту "линию" обработки. Это предотвращает остановку всей системы из-за одного проблемного URL или аккаунта.
|
||
|
||
## Управляющие DAG'и
|
||
|
||
### `ytdlp_mgmt_proxy_account`
|
||
|
||
- **Назначение:** Это основной инструмент для мониторинга и управления состоянием ресурсов, используемых `ytdlp-ops-server`.
|
||
- **Функциональность:**
|
||
- **Просмотр статусов:** Позволяет увидеть текущий статус всех прокси и аккаунтов (например, `ACTIVE`, `BANNED`, `RESTING`).
|
||
- **Управление прокси:** Позволяет вручную банить, разбанивать или сбрасывать статус прокси.
|
||
- **Управление аккаунтами:** Позволяет вручную банить или разбанивать аккаунты.
|
||
|
||
### `ytdlp_mgmt_queues`
|
||
|
||
- **Назначение:** Предоставляет набор инструментов для управления очередями Redis, используемыми в конвейере обработки.
|
||
- **Функциональность (через параметр `action`):**
|
||
- `add_videos`: Добавление одного или нескольких URL-адресов YouTube в очередь.
|
||
- `clear_queue`: Очистка (удаление) указанного ключа Redis.
|
||
- `list_contents`: Просмотр содержимого ключа Redis (списка или хэша).
|
||
- `check_status`: Проверка общего состояния очередей (тип, размер).
|
||
- `requeue_failed`: Перемещение всех URL-адресов из очереди сбоев `_fail` обратно в очередь `_inbox` для повторной обработки.
|
||
|
||
## Стратегия управления ресурсами (Прокси и Аккаунты)
|
||
|
||
Система использует интеллектуальную стратегию для управления жизненным циклом и состоянием аккаунтов и прокси, чтобы максимизировать процент успеха и минимизировать блокировки.
|
||
|
||
- **Жизненный цикл аккаунта ("Cooldown"):**
|
||
- Чтобы предотвратить "выгорание", аккаунты автоматически переходят в состояние "отдыха" (`RESTING`) после периода интенсивного использования.
|
||
- По истечении периода отдыха они автоматически возвращаются в `ACTIVE` и снова становятся доступными для воркеров.
|
||
|
||
- **Умная стратегия банов:**
|
||
- **Сначала бан аккаунта:** При возникновении серьезной ошибки (например, `BOT_DETECTED`) система наказывает **только аккаунт**, который вызвал сбой. Прокси при этом продолжает работать.
|
||
- **Бан прокси по "скользящему окну":** Прокси банится автоматически, только если он демонстрирует **систематические сбои с РАЗНЫМИ аккаунтами** за короткий промежуток времени. Это является надежным индикатором того, что проблема именно в прокси.
|
||
|
||
- **Мониторинг:**
|
||
- DAG `ytdlp_mgmt_proxy_account` является основным инструментом для мониторинга. Он показывает текущий статус всех ресурсов, включая время, оставшееся до активации забаненных или отдыхающих аккаунтов.
|
||
- Граф выполнения DAG `ytdlp_ops_worker_per_url` теперь явно показывает шаги, такие как `assign_account`, `get_token`, `ban_account`, `retry_get_token`, что делает процесс отладки более наглядным.
|
||
|
||
## Внешние сервисы
|
||
|
||
### `ytdlp-ops-server` (Thrift Service)
|
||
|
||
- **Назначение:** Внешний сервис, который предоставляет аутентификационные данные (токены, cookies, proxy) для скачивания видео.
|
||
- **Взаимодействие:** Worker DAG (`ytdlp_ops_worker_per_url`) обращается к этому сервису перед началом загрузки для получения необходимых данных для `yt-dlp`.
|
||
|
||
## Логика работы Worker DAG (`ytdlp_ops_worker_per_url`)
|
||
|
||
Этот DAG является "рабочей лошадкой" системы. Он спроектирован как самоподдерживающийся цикл для обработки одного URL за запуск.
|
||
|
||
### Задачи и их назначение:
|
||
|
||
- **`pull_url_from_redis`**: Извлекает один URL из очереди `_inbox` в Redis. Если очередь пуста, DAG завершается со статусом `skipped`, останавливая эту "линию" обработки.
|
||
- **`assign_account`**: Выбирает аккаунт для выполнения задачи. Он будет повторно использовать тот же аккаунт, который был успешно использован в предыдущем запуске в своей "линии" (привязка аккаунта). Если это первый запуск, он выбирает случайный аккаунт.
|
||
- **`get_token`**: Основная задача. Она обращается к `ytdlp-ops-server` для получения `info.json`.
|
||
- **`handle_bannable_error_branch`**: Если `get_token` завершается с ошибкой, требующей бана, эта задача-развилка решает, что делать дальше, в зависимости от политики `on_bannable_failure`.
|
||
- **`ban_account_and_prepare_for_retry`**: Если политика разрешает повтор, эта задача банит сбойный аккаунт и выбирает новый для повторной попытки.
|
||
- **`retry_get_token`**: Выполняет вторую попытку получить токен с новым аккаунтом.
|
||
- **`ban_second_account_and_proxy`**: Если и вторая попытка неудачна, эта задача банит второй аккаунт и использованный прокси.
|
||
- **`download_and_probe`**: Если `get_token` (или `retry_get_token`) завершилась успешно, эта задача использует `yt-dlp` для скачивания медиа и `ffmpeg` для проверки целостности скачанного файла.
|
||
- **`mark_url_as_success`**: Если `download_and_probe` завершилась успешно, эта задача записывает результат в хэш `_result` в Redis.
|
||
- **`handle_generic_failure`**: Если любая из основных задач завершается с неисправимой ошибкой, эта задача записывает подробную информацию об ошибке в хэш `_fail` в Redis.
|
||
- **`decide_what_to_do_next`**: Задача-развилка, которая запускается после успеха или неудачи. Она решает, продолжать ли цикл.
|
||
- **`trigger_self_run`**: Задача, которая фактически запускает следующий экземпляр DAG, создавая непрерывный цикл.
|
||
|
||
## Управление Воркерами (Пауза/Возобновление)
|
||
|
||
Система предоставляет механизм для "охлаждения" или временной приостановки работы воркера. Это полезно для проведения технического обслуживания, безопасного выключения машины или уменьшения нагрузки на кластер без генерации ошибок.
|
||
|
||
### Принцип работы
|
||
|
||
Механизм основан на файле-блокировке (`lock file`), который создается на узле воркера с помощью Ansible.
|
||
|
||
1. **Пауза:** Администратор запускает Ansible-плейбук, который создает пустой файл `AIRFLOW.PREVENT_URL_PULL.lock` в рабочей директории воркера (`/srv/airflow_dl_worker`).
|
||
2. **Проверка:** DAG `ytdlp_ops_dispatcher`, который отвечает за распределение URL-адресов, перед тем как взять новую задачу из Redis, проверяет наличие этого файла.
|
||
3. **Пропуск задачи:** Если файл существует, `dispatcher` логирует, что воркер на паузе, и завершает свою задачу со статусом `skipped`. Это предотвращает получение новых URL-адресов этим воркером, но не влияет на уже запущенные задачи.
|
||
4. **Возобновление:** Администратор запускает другой Ansible-плейбук, который переименовывает файл блокировки (добавляя временную метку), тем самым "разблокируя" воркер. При следующем запуске `dispatcher` не найдет файл и продолжит работу в обычном режиме.
|
||
|
||
### Команды для управления
|
||
|
||
Для управления состоянием воркера используются специальные Ansible-плейбуки. Команды следует выполнять из корневой директории проекта.
|
||
|
||
**Поставить воркер на паузу:**
|
||
(Замените `"hostname"` на имя хоста из вашего inventory-файла)
|
||
```bash
|
||
ansible-playbook ansible/playbooks/pause_worker.yml --limit "hostname"
|
||
```
|
||
|
||
**Возобновить работу воркера:**
|
||
```bash
|
||
ansible-playbook ansible/playbooks/resume_worker.yml --limit "hostname"
|
||
```
|
||
|
||
## Механизм привязки воркеров к конкретным машинам (Worker Pinning / Affinity)
|
||
|
||
Для обеспечения того, чтобы все задачи, связанные с обработкой одного конкретного URL, выполнялись на одной и той же машине (воркере), система использует комбинацию из трех компонентов: Оркестратора, Диспетчера и специального хука Airflow.
|
||
|
||
### 1. `ytdlp_ops_orchestrator` (Оркестратор)
|
||
|
||
- **Роль:** Инициирует процесс обработки.
|
||
- **Действие:** При запуске он создает несколько DAG-запусков `ytdlp_ops_dispatcher`. Каждый такой запуск предназначен для обработки одного URL.
|
||
- **Передача параметров:** Оркестратор передает свои параметры конфигурации (например, `account_pool`, `redis_conn_id`, `service_ip`) каждому запуску диспетчера.
|
||
|
||
### 2. `ytdlp_ops_dispatcher` (Диспетчер)
|
||
|
||
- **Роль:** Основной механизм обеспечения привязки.
|
||
- **Действие:**
|
||
1. **Получает URL:** Извлекает один URL из очереди Redis (`_inbox`).
|
||
2. **Определяет воркер:** Использует `socket.gethostname()` для определения имени текущей машины (воркера), на которой он выполняется.
|
||
3. **Формирует имя очереди:** Создает уникальное имя очереди для этого воркера, например, `queue-dl-dl-worker-1`.
|
||
4. **Запускает Worker DAG:** Инициирует запуск DAG `ytdlp_ops_worker_per_url`, передавая ему:
|
||
* Извлеченный `url_to_process`.
|
||
* Сформированное имя очереди `worker_queue` через параметр `conf`.
|
||
* Все остальные параметры, полученные от оркестратора.
|
||
- **Ключевой момент:** Именно на этом этапе устанавливается связь между конкретным URL и конкретным воркером, на котором началась обработка этого URL.
|
||
|
||
### 3. `task_instance_mutation_hook` (Хук изменения задач)
|
||
|
||
- **Расположение:** `airflow/config/custom_task_hooks.py`
|
||
- **Роль:** Является механизмом, который обеспечивает выполнение *всех* задач Worker DAG на нужной машине.
|
||
- **Как это работает:**
|
||
1. **Регистрация:** Хук регистрируется в конфигурации Airflow и вызывается перед запуском *каждой* задачи.
|
||
2. **Проверка DAG ID:** Хук проверяет, принадлежит ли задача (`TaskInstance`) DAG `ytdlp_ops_worker_per_url`.
|
||
3. **Извлечение `conf`:** Если да, он безопасно извлекает `conf` из `DagRun`, связанного с этой задачей.
|
||
4. **Изменение очереди:**
|
||
* Если в `conf` найден ключ `worker_queue` (что будет true для всех запусков, инициированных диспетчером), хук *переопределяет* стандартную очередь задачи на это значение.
|
||
* Это означает, что Airflow планировщик поставит эту задачу именно в ту очередь, которая прослушивается нужным воркером.
|
||
5. **Резервный вариант:** Если `worker_queue` не найден (например, DAG запущен вручную), задача возвращается в стандартную очередь `queue-dl`.
|
||
- **Ключевой момент:** Этот хук гарантирует, что *все последующие задачи* в рамках одного запуска `ytdlp_ops_worker_per_url` (например, `get_token`, `download_and_probe`, `mark_url_as_success`) будут выполнены на том же воркере, который изначально получил URL в диспетчере.
|
||
|
||
### Резюме
|
||
|
||
Комбинация `Оркестратор -> Диспетчер -> Хук` эффективно реализует привязку задач к воркерам:
|
||
|
||
1. **Оркестратор** запускает процесс.
|
||
2. **Диспетчер** связывает конкретный URL с конкретным воркером, определяя его имя хоста и передавая его как `worker_queue` в Worker DAG.
|
||
3. **Хук** гарантирует, что все задачи Worker DAG выполняются в очереди, соответствующей этому воркеру.
|
||
|
||
Это позволяет системе использовать локальные ресурсы воркера (например, кэш, временные файлы) эффективно и предсказуемо для обработки каждого отдельного URL.
|