import asyncio import os import json from datetime import datetime # Конфигурация formats = "best" # Используем наилучший доступный формат cookies_file = "cookies.txt" output_template = "video/%(id)s.f%(format_id)s.%(ext)s" concurrent_fragments = 16 # Количество фрагментов для параллельной загрузки max_concurrent_downloads = 8 # Максимальное количество одновременных загрузок # Путь к файлу со списком URL url_file = "video_urls.txt" log_file = "download_log.txt" metadata_file = "metadata.json" # Функция для записи информации о загруженных видео в лог def log_download(url, start_time, end_time, duration): with open(log_file, "a") as log: log.write(f"{url} скачано.\nНачало: {start_time}\nКонец: {end_time}\nПродолжительность: {duration}\n\n") # Функция для записи ошибок в лог def log_error(error_msg): with open(log_file, "a") as log: log.write(f"Ошибка: {error_msg}\n\n") # Функция для обновления метаданных def update_metadata(url): video_id = url.split('=')[-1] metadata_path = f"video/{video_id}.info.json" if os.path.exists(metadata_path): with open(metadata_path, "r") as meta_file: metadata = json.load(meta_file) # Загрузка существующих метаданных if os.path.exists(metadata_file): with open(metadata_file, "r") as file: all_metadata = json.load(file) else: all_metadata = {} # Добавление новых метаданных all_metadata[url] = metadata # Сохранение метаданных with open(metadata_file, "w") as file: json.dump(all_metadata, file, ensure_ascii=False, indent=4) # Асинхронная функция для загрузки одного видео async def download_video(url): start_time = datetime.now() try: command = [ "yt-dlp", "-f", formats, "--cookies", cookies_file, "--output", output_template, "--write-info-json", "--concurrent-fragments", str(concurrent_fragments), url ] print(f"Запуск команды: {' '.join(command)}") # Асинхронный запуск процесса process = await asyncio.create_subprocess_exec( *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # Ожидание завершения процесса stdout, stderr = await process.communicate() if process.returncode != 0: error_msg = f"Ошибка при загрузке видео {url}: {stderr.decode().strip()}" print(error_msg) log_error(error_msg) else: end_time = datetime.now() duration = end_time - start_time log_download(url, start_time, end_time, duration) update_metadata(url) except Exception as e: error_msg = f"Ошибка при загрузке видео {url}: {e}" print(error_msg) log_error(error_msg) # Асинхронная функция для загрузки всех видео async def download_all_videos(urls): semaphore = asyncio.Semaphore(max_concurrent_downloads) # Ограничение на количество одновременных загрузок async def limited_download(url): async with semaphore: await download_video(url) tasks = [limited_download(url) for url in urls] await asyncio.gather(*tasks) # Основная функция async def main(): # Проверка наличия файла с URL if not os.path.exists(url_file): print(f"Файл {url_file} не найден.") return # Чтение URL из файла with open(url_file, "r") as file: video_urls = [line.strip() for line in file if line.strip()] # Проверка наличия папки для вывода os.makedirs("video", exist_ok=True) # Запуск асинхронной загрузки await download_all_videos(video_urls) # Запуск асинхронного скрипта if __name__ == "__main__": asyncio.run(main())