# Архитектура и описание YTDLP Airflow DAGs Этот документ описывает архитектуру и назначение DAG'ов, используемых для скачивания видео с YouTube. Система построена на модели непрерывного, самоподдерживающегося цикла для параллельной и отказоустойчивой обработки. ## Основной цикл обработки Обработка выполняется двумя основными DAG'ами, которые работают в паре: оркестратор и воркер. ### `ytdlp_ops_orchestrator` (Система "зажигания") - **Назначение:** Этот DAG действует как "система зажигания" для запуска обработки. Он запускается вручную для старта указанного количества параллельных циклов-воркеров. - **Принцип работы:** - Он **не** обрабатывает URL-адреса самостоятельно. - Его единственная задача — запустить сконфигурированное количество DAG'ов `ytdlp_ops_worker_per_url`. - Он передает всю необходимую конфигурацию (пул аккаунтов, подключение к Redis и т.д.) воркерам. ### `ytdlp_ops_worker_per_url` (Самоподдерживающийся воркер) - **Назначение:** Этот DAG обрабатывает один URL и спроектирован для работы в непрерывном цикле. - **Принцип работы:** 1. **Запуск:** Начальный запуск инициируется `ytdlp_ops_orchestrator`. 2. **Получение задачи:** Воркер извлекает один URL из очереди `_inbox` в Redis. Если очередь пуста, выполнение воркера завершается, и его "линия" обработки останавливается. 3. **Обработка:** Он взаимодействует с сервисом `ytdlp-ops-server` для получения `info.json` и прокси, после чего скачивает видео. 4. **Продолжение или остановка:** - **В случае успеха:** Он запускает новый экземпляр самого себя, создавая непрерывный цикл для обработки следующего URL. - **В случае сбоя:** Цикл прерывается (если `stop_on_failure` установлено в `True`), останавливая эту "линию" обработки. Это предотвращает остановку всей системы из-за одного проблемного URL или аккаунта. ## Управляющие DAG'и ### `ytdlp_mgmt_proxy_account` - **Назначение:** Это основной инструмент для мониторинга и управления состоянием ресурсов, используемых `ytdlp-ops-server`. - **Функциональность:** - **Просмотр статусов:** Позволяет увидеть текущий статус всех прокси и аккаунтов (например, `ACTIVE`, `BANNED`, `RESTING`). - **Управление прокси:** Позволяет вручную банить, разбанивать или сбрасывать статус прокси. - **Управление аккаунтами:** Позволяет вручную банить или разбанивать аккаунты. ### `ytdlp_mgmt_queues` - **Назначение:** Предоставляет набор инструментов для управления очередями Redis, используемыми в конвейере обработки. - **Функциональность (через параметр `action`):** - `add_videos`: Добавление одного или нескольких URL-адресов YouTube в очередь. - `clear_queue`: Очистка (удаление) указанного ключа Redis. - `list_contents`: Просмотр содержимого ключа Redis (списка или хэша). - `check_status`: Проверка общего состояния очередей (тип, размер). - `requeue_failed`: Перемещение всех URL-адресов из очереди сбоев `_fail` обратно в очередь `_inbox` для повторной обработки. ## Стратегия управления ресурсами (Прокси и Аккаунты) Система использует интеллектуальную стратегию для управления жизненным циклом и состоянием аккаунтов и прокси, чтобы максимизировать процент успеха и минимизировать блокировки. - **Жизненный цикл аккаунта ("Cooldown"):** - Чтобы предотвратить "выгорание", аккаунты автоматически переходят в состояние "отдыха" (`RESTING`) после периода интенсивного использования. - По истечении периода отдыха они автоматически возвращаются в `ACTIVE` и снова становятся доступными для воркеров. - **Умная стратегия банов:** - **Сначала бан аккаунта:** При возникновении серьезной ошибки (например, `BOT_DETECTED`) система наказывает **только аккаунт**, который вызвал сбой. Прокси при этом продолжает работать. - **Бан прокси по "скользящему окну":** Прокси банится автоматически, только если он демонстрирует **систематические сбои с РАЗНЫМИ аккаунтами** за короткий промежуток времени. Это является надежным индикатором того, что проблема именно в прокси. - **Мониторинг:** - DAG `ytdlp_mgmt_proxy_account` является основным инструментом для мониторинга. Он показывает текущий статус всех ресурсов, включая время, оставшееся до активации забаненных или отдыхающих аккаунтов. - Граф выполнения DAG `ytdlp_ops_worker_per_url` теперь явно показывает шаги, такие как `assign_account`, `get_token`, `ban_account`, `retry_get_token`, что делает процесс отладки более наглядным. ## Внешние сервисы ### `ytdlp-ops-server` (Thrift Service) - **Назначение:** Внешний сервис, который предоставляет аутентификационные данные (токены, cookies, proxy) для скачивания видео. - **Взаимодействие:** Worker DAG (`ytdlp_ops_worker_per_url`) обращается к этому сервису перед началом загрузки для получения необходимых данных для `yt-dlp`. ## Логика работы Worker DAG (`ytdlp_ops_worker_per_url`) Этот DAG является "рабочей лошадкой" системы. Он спроектирован как самоподдерживающийся цикл для обработки одного URL за запуск. ### Задачи и их назначение: - **`pull_url_from_redis`**: Извлекает один URL из очереди `_inbox` в Redis. Если очередь пуста, DAG завершается со статусом `skipped`, останавливая эту "линию" обработки. - **`assign_account`**: Выбирает аккаунт для выполнения задачи. Он будет повторно использовать тот же аккаунт, который был успешно использован в предыдущем запуске в своей "линии" (привязка аккаунта). Если это первый запуск, он выбирает случайный аккаунт. - **`get_token`**: Основная задача. Она обращается к `ytdlp-ops-server` для получения `info.json`. - **`handle_bannable_error_branch`**: Если `get_token` завершается с ошибкой, требующей бана, эта задача-развилка решает, что делать дальше, в зависимости от политики `on_bannable_failure`. - **`ban_account_and_prepare_for_retry`**: Если политика разрешает повтор, эта задача банит сбойный аккаунт и выбирает новый для повторной попытки. - **`retry_get_token`**: Выполняет вторую попытку получить токен с новым аккаунтом. - **`ban_second_account_and_proxy`**: Если и вторая попытка неудачна, эта задача банит второй аккаунт и использованный прокси. - **`download_and_probe`**: Если `get_token` (или `retry_get_token`) завершилась успешно, эта задача использует `yt-dlp` для скачивания медиа и `ffmpeg` для проверки целостности скачанного файла. - **`mark_url_as_success`**: Если `download_and_probe` завершилась успешно, эта задача записывает результат в хэш `_result` в Redis. - **`handle_generic_failure`**: Если любая из основных задач завершается с неисправимой ошибкой, эта задача записывает подробную информацию об ошибке в хэш `_fail` в Redis. - **`decide_what_to_do_next`**: Задача-развилка, которая запускается после успеха или неудачи. Она решает, продолжать ли цикл. - **`trigger_self_run`**: Задача, которая фактически запускает следующий экземпляр DAG, создавая непрерывный цикл. ## Механизм привязки воркеров к конкретным машинам (Worker Pinning / Affinity) Для обеспечения того, чтобы все задачи, связанные с обработкой одного конкретного URL, выполнялись на одной и той же машине (воркере), система использует комбинацию из трех компонентов: Оркестратора, Диспетчера и специального хука Airflow. ### 1. `ytdlp_ops_orchestrator` (Оркестратор) - **Роль:** Инициирует процесс обработки. - **Действие:** При запуске он создает несколько DAG-запусков `ytdlp_ops_dispatcher`. Каждый такой запуск предназначен для обработки одного URL. - **Передача параметров:** Оркестратор передает свои параметры конфигурации (например, `account_pool`, `redis_conn_id`, `service_ip`) каждому запуску диспетчера. ### 2. `ytdlp_ops_dispatcher` (Диспетчер) - **Роль:** Основной механизм обеспечения привязки. - **Действие:** 1. **Получает URL:** Извлекает один URL из очереди Redis (`_inbox`). 2. **Определяет воркер:** Использует `socket.gethostname()` для определения имени текущей машины (воркера), на которой он выполняется. 3. **Формирует имя очереди:** Создает уникальное имя очереди для этого воркера, например, `queue-dl-dl-worker-1`. 4. **Запускает Worker DAG:** Инициирует запуск DAG `ytdlp_ops_worker_per_url`, передавая ему: * Извлеченный `url_to_process`. * Сформированное имя очереди `worker_queue` через параметр `conf`. * Все остальные параметры, полученные от оркестратора. - **Ключевой момент:** Именно на этом этапе устанавливается связь между конкретным URL и конкретным воркером, на котором началась обработка этого URL. ### 3. `task_instance_mutation_hook` (Хук изменения задач) - **Расположение:** `airflow/config/custom_task_hooks.py` - **Роль:** Является механизмом, который обеспечивает выполнение *всех* задач Worker DAG на нужной машине. - **Как это работает:** 1. **Регистрация:** Хук регистрируется в конфигурации Airflow и вызывается перед запуском *каждой* задачи. 2. **Проверка DAG ID:** Хук проверяет, принадлежит ли задача (`TaskInstance`) DAG `ytdlp_ops_worker_per_url`. 3. **Извлечение `conf`:** Если да, он безопасно извлекает `conf` из `DagRun`, связанного с этой задачей. 4. **Изменение очереди:** * Если в `conf` найден ключ `worker_queue` (что будет true для всех запусков, инициированных диспетчером), хук *переопределяет* стандартную очередь задачи на это значение. * Это означает, что Airflow планировщик поставит эту задачу именно в ту очередь, которая прослушивается нужным воркером. 5. **Резервный вариант:** Если `worker_queue` не найден (например, DAG запущен вручную), задача возвращается в стандартную очередь `queue-dl`. - **Ключевой момент:** Этот хук гарантирует, что *все последующие задачи* в рамках одного запуска `ytdlp_ops_worker_per_url` (например, `get_token`, `download_and_probe`, `mark_url_as_success`) будут выполнены на том же воркере, который изначально получил URL в диспетчере. ### Резюме Комбинация `Оркестратор -> Диспетчер -> Хук` эффективно реализует привязку задач к воркерам: 1. **Оркестратор** запускает процесс. 2. **Диспетчер** связывает конкретный URL с конкретным воркером, определяя его имя хоста и передавая его как `worker_queue` в Worker DAG. 3. **Хук** гарантирует, что все задачи Worker DAG выполняются в очереди, соответствующей этому воркеру. Это позволяет системе использовать локальные ресурсы воркера (например, кэш, временные файлы) эффективно и предсказуемо для обработки каждого отдельного URL.