# -*- coding: utf-8 -*- """ DAG to upload completed video directories to an S3-compatible service. This DAG creates one long-running task for each configured S3 worker. """ from __future__ import annotations import logging import os import shutil import subprocess import time from datetime import datetime, timedelta from airflow.decorators import task from airflow.exceptions import AirflowException from airflow.models.dag import DAG from airflow.models.param import Param from airflow.models.variable import Variable from airflow.operators.dummy import DummyOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.utils.dates import days_ago logger = logging.getLogger(__name__) DEFAULT_ARGS = { 'owner': 'airflow', 'retries': 1, 'retry_delay': timedelta(minutes=1), } BASE_DOWNLOAD_PATH = '/opt/airflow/downloadfiles' VIDEOS_PATH = os.path.join(BASE_DOWNLOAD_PATH, 'videos') READY_PATH = os.path.join(VIDEOS_PATH, 'ready') def run_s3_upload_batch(**context): """ This function runs in a continuous loop to check for completed video directories and upload them to S3. If no videos are found, it sleeps for a configurable interval before checking again. Dry run mode is non-destructive and will pause briefly after checking to prevent tight loops. """ params = context['params'] ti = context['task_instance'] # Log the configured execution timeout for debugging purposes. # This helps verify that the timeout setting from the DAG file is being applied. timeout_delta = ti.task.execution_timeout logger.info(f"Task is configured with execution_timeout: {timeout_delta}") concurrency = params['concurrency'] mode = params['mode'] dry_run = params['dry_run'] sleep_interval_min = params['sleep_if_no_videos_min'] sleep_interval_sec = sleep_interval_min * 60 s3_conn_id = params['s3_conn_id'] s3_bucket = params['s3_bucket_name'] s3_access_key_id = None s3_secret_access_key = None s3_endpoint = None s3_region = None config_source = "Unknown" profile_name = "rusonyx" # --- Attempt 1: Get S3 Configuration from Airflow Connection --- if s3_conn_id: try: logger.info(f"Attempting to load S3 configuration from Airflow connection '{s3_conn_id}'.") s3_hook = S3Hook(aws_conn_id=s3_conn_id) s3_conn = s3_hook.get_connection(s3_conn_id) s3_access_key_id = s3_conn.login s3_secret_access_key = s3_conn.password s3_endpoint = s3_conn.host extra_config = s3_conn.extra_dejson s3_region = extra_config.get('region_name') if not all([s3_access_key_id, s3_secret_access_key, s3_endpoint, s3_region]): logger.warning("S3 connection from Airflow is missing one or more required fields (excluding bucket). Will attempt to fall back to environment variables.") s3_access_key_id = s3_secret_access_key = s3_endpoint = s3_region = None # Reset all else: config_source = f"Airflow Connection '{s3_conn_id}'" profile_name = "rusonyx-airflow" except Exception as e: logger.warning(f"Failed to load S3 configuration from Airflow connection '{s3_conn_id}': {e}. Will attempt to fall back to environment variables.") # --- Attempt 2: Fallback to Environment Variables --- if not all([s3_access_key_id, s3_secret_access_key, s3_endpoint, s3_region]): try: logger.info("Attempting to load S3 configuration from environment variables as a fallback.") s3_access_key_id = os.environ['S3_DELIVERY_AWS_ACCESS_KEY_ID'] s3_secret_access_key = os.environ['S3_DELIVERY_AWS_SECRET_ACCESS_KEY'] s3_endpoint = os.environ['S3_DELIVERY_ENDPOINT'] s3_region = os.environ['S3_DELIVERY_AWS_REGION'] if not all([s3_access_key_id, s3_secret_access_key, s3_endpoint, s3_region]): raise ValueError("One or more S3 configuration environment variables are empty (excluding bucket).") config_source = "Environment Variables" profile_name = "rusonyx" except (KeyError, ValueError) as e: logger.error(f"Having problems reading S3 configuration from environment variables: {e}", exc_info=True) raise AirflowException("S3 configuration is missing. Could not load from Airflow connection or environment variables.") if not s3_bucket: raise AirflowException("S3 bucket name is not specified in DAG parameters.") s3_destination = f"s3://{s3_bucket}/" logger.info(f"Starting S3 upload loop. Watching source '{READY_PATH}' for delivery to '{s3_destination}'.") logger.info(f"Mode: {mode}, Dry Run: {dry_run}, Idle Sleep: {sleep_interval_min} min") logger.info(f"S3 Config loaded from {config_source}: Endpoint='{s3_endpoint}', Bucket='{s3_bucket}', Region='{s3_region}', Profile='{profile_name}'") # --- Write credentials to file for s5cmd profile --- aws_credentials_path = os.path.expanduser("~/.aws/credentials") aws_config_path = os.path.expanduser("~/.aws/config") try: os.makedirs(os.path.dirname(aws_credentials_path), exist_ok=True) with open(aws_credentials_path, 'w') as f: f.write(f"[{profile_name}]\n") f.write(f"aws_access_key_id = {s3_access_key_id}\n") f.write(f"aws_secret_access_key = {s3_secret_access_key}\n") logger.info(f"Wrote credentials for profile '{profile_name}' to {aws_credentials_path}") with open(aws_config_path, 'w') as f: f.write(f"[profile {profile_name}]\n") f.write(f"region = {s3_region}\n") logger.info(f"Wrote config for profile '{profile_name}' to {aws_config_path}") except Exception as e: logger.error(f"Failed to write AWS credentials/config file: {e}", exc_info=True) raise AirflowException(f"Failed to write AWS credentials/config file: {e}") while True: logger.info("--- Starting new S3 upload cycle ---") # --- Dry Run Logic (Non-destructive) --- if dry_run: logger.info("[DRY RUN] Checking for completed video batches...") if not os.path.exists(READY_PATH): logger.info(f"[DRY RUN] Source directory '{READY_PATH}' does not exist. Nothing to upload.") else: now = datetime.now() wait_minutes = params['batch_completion_wait_min'] cutoff_time = now - timedelta(minutes=wait_minutes) rounded_minute = (cutoff_time.minute // 10) * 10 cutoff_batch_ts = cutoff_time.strftime('%Y%m%dT%H') + f"{rounded_minute:02d}" logger.info(f"[DRY RUN] Current time is {now.strftime('%H:%M:%S')}. With a {wait_minutes} min wait, processing batches up to and including '{cutoff_batch_ts}'.") all_video_dirs_to_process = [] processed_batch_dirs = set() all_batch_dirs = sorted([d for d in os.listdir(READY_PATH) if os.path.isdir(os.path.join(READY_PATH, d))]) for ts_dir in all_batch_dirs: if ts_dir > cutoff_batch_ts: continue batch_dir_path = os.path.join(READY_PATH, ts_dir) video_dirs_in_batch = [os.path.join(batch_dir_path, d) for d in os.listdir(batch_dir_path) if os.path.isdir(os.path.join(batch_dir_path, d))] if video_dirs_in_batch: all_video_dirs_to_process.extend(video_dirs_in_batch) processed_batch_dirs.add(batch_dir_path) else: logger.info(f"[DRY RUN] Batch directory '{batch_dir_path}' is empty. Would remove it.") if all_video_dirs_to_process: logger.info(f"[DRY RUN] Found {len(all_video_dirs_to_process)} total video director(y/ies) in {len(processed_batch_dirs)} batch(es) to process.") # Construct and log the command that would be run cmd = [ 's5cmd', '--endpoint-url', s3_endpoint, '--log', 'debug', '--no-verify-ssl', '--use-list-objects-v1', '--profile', profile_name, '--stat', '--numworkers', str(concurrency), 'run' ] cmd_str = ' '.join(cmd) # Construct the commands to be piped commands_to_pipe = '\n'.join([f"cp \"{dir_path}\" \"{s3_destination}\"" for dir_path in all_video_dirs_to_process]) logger.info(f"[DRY RUN] The following command would be executed:\n{cmd_str}") logger.info(f"[DRY RUN] The following commands would be piped to stdin:\n{commands_to_pipe}") if mode == 'mv': logger.info(f"[DRY RUN] Mode is 'mv'. Would delete {len(processed_batch_dirs)} source batch directories after successful upload.") # Pause briefly in dry-run mode if videos are found to avoid a fast, noisy loop. dry_run_pause_s = 10 logger.info(f"[DRY RUN] Pausing for {dry_run_pause_s} seconds to prevent rapid re-listing of the same files (this is a short, fixed pause for dry-run only).") time.sleep(dry_run_pause_s) continue # Go to the start of the next cycle else: logger.info("[DRY RUN] No completed video batches found.") # If in dry-run and no videos are found, sleep for the main interval. logger.info(f"[DRY RUN] Sleeping for {sleep_interval_min} minute(s)...") time.sleep(sleep_interval_sec) continue # --- Normal Operation Logic (Destructive) --- work_done_in_cycle = False try: # --- 1. Find all videos to upload from all completed batches --- if not os.path.exists(READY_PATH): logger.info(f"Ready directory '{READY_PATH}' does not exist. Nothing to upload.") else: now = datetime.now() wait_minutes = params['batch_completion_wait_min'] cutoff_time = now - timedelta(minutes=wait_minutes) rounded_minute = (cutoff_time.minute // 10) * 10 cutoff_batch_ts = cutoff_time.strftime('%Y%m%dT%H') + f"{rounded_minute:02d}" logger.info(f"Current time is {now.strftime('%H:%M:%S')}. With a {wait_minutes} min wait, processing batches up to and including '{cutoff_batch_ts}'.") all_video_dirs_to_process = [] processed_batch_dirs = set() all_batch_dirs = sorted([d for d in os.listdir(READY_PATH) if os.path.isdir(os.path.join(READY_PATH, d))]) for ts_dir in all_batch_dirs: if ts_dir > cutoff_batch_ts: continue # This batch is not old enough to be processed batch_dir_path = os.path.join(READY_PATH, ts_dir) video_dirs_in_batch = [os.path.join(batch_dir_path, d) for d in os.listdir(batch_dir_path) if os.path.isdir(os.path.join(batch_dir_path, d))] if not video_dirs_in_batch: logger.info(f"Batch directory '{batch_dir_path}' is empty. Removing it.") try: os.rmdir(batch_dir_path) except OSError as e: logger.error(f"Could not remove empty batch directory {batch_dir_path}: {e}") continue # Move to the next batch all_video_dirs_to_process.extend(video_dirs_in_batch) processed_batch_dirs.add(batch_dir_path) # --- 2. Upload All Found Videos in a Single Batch Command --- if all_video_dirs_to_process: work_done_in_cycle = True logger.info(f"Found {len(all_video_dirs_to_process)} total video director(y/ies) in {len(processed_batch_dirs)} batch(es) to upload.") cmd = [ 's5cmd', '--endpoint-url', s3_endpoint, '--log', 'debug', '--no-verify-ssl', '--use-list-objects-v1', '--profile', profile_name, '--stat', '--numworkers', str(concurrency), 'run' ] cmd_str = ' '.join(cmd) # Construct the commands to be piped to stdin commands_to_pipe = '\n'.join([f"cp \"{dir_path}\" \"{s3_destination}\"" for dir_path in all_video_dirs_to_process]) logger.info(f"Executing s5cmd batch command:\n{cmd_str}") logger.info(f"Piping {len(all_video_dirs_to_process)} 'cp' commands to stdin.") upload_start_time = time.time() process = subprocess.run(cmd, check=True, capture_output=True, text=True, input=commands_to_pipe) upload_duration = time.time() - upload_start_time logger.info(f"s5cmd STDOUT: {process.stdout}") if process.stderr: logger.info(f"s5cmd STDERR: {process.stderr}") logger.info(f"Upload command completed successfully in {upload_duration:.2f} seconds.") logger.info(f"Successfully copied {len(all_video_dirs_to_process)} director(y/ies) to S3.") # --- 3. Cleanup --- if mode == 'mv': logger.info(f"Mode is 'mv'. Cleaning up {len(processed_batch_dirs)} source batch director(y/ies).") cleanup_start_time = time.time() # Create a temporary empty directory to use as a source for rsync deletion empty_dir_for_rsync = os.path.join(READY_PATH, f"__empty_{int(time.time())}") os.makedirs(empty_dir_for_rsync, exist_ok=True) try: for batch_dir_path in processed_batch_dirs: try: # Use rsync with an empty source to efficiently delete the contents of the batch directory # The trailing slash on both source and destination is important. rsync_cmd = [ 'rsync', '-a', '--delete', f'{empty_dir_for_rsync}/', f'{batch_dir_path}/' ] subprocess.run(rsync_cmd, check=True, capture_output=True, text=True) # After the contents are deleted, remove the now-empty directory os.rmdir(batch_dir_path) logger.info(f"Successfully removed {batch_dir_path}") except Exception as cleanup_e: logger.error(f"Failed to remove directory {batch_dir_path}: {cleanup_e}", exc_info=True) if isinstance(cleanup_e, subprocess.CalledProcessError): logger.error(f"rsync STDERR: {cleanup_e.stderr}") finally: # Clean up the temporary empty directory shutil.rmtree(empty_dir_for_rsync) cleanup_duration = time.time() - cleanup_start_time logger.info(f"Cleanup complete in {cleanup_duration:.2f} seconds.") else: # mode == 'cp' logger.info(f"Mode is 'cp'. Source directories will be left for inspection.") if not work_done_in_cycle: logger.info(f"No completed video batches found in '{READY_PATH}'.") except Exception as e: logger.error(f"An error occurred during the S3 upload cycle: {e}", exc_info=True) if isinstance(e, subprocess.CalledProcessError): logger.error(f"s5cmd STDERR: {e.stderr}") # On error, we do NOT clean up, to allow for investigation and retries. # The failed directories will be picked up in the next cycle. # Treat errors as "no work done" to trigger sleep and prevent fast failure loops work_done_in_cycle = False # --- Loop Control --- if not work_done_in_cycle: logger.info(f"No work done in this cycle. Sleeping for {sleep_interval_min} minute(s)...") time.sleep(sleep_interval_sec) else: logger.info("Work was completed in this cycle. Checking for more immediately.") with DAG( dag_id='ytdlp_s3_uploader', default_args=DEFAULT_ARGS, schedule=None, start_date=days_ago(1), catchup=False, tags=['ytdlp', 's3', 'upload'], doc_md="""### S3 Uploader DAG 1. This DAG creates dynamic uploader tasks with clear names depicting their worker machine (e.g., `upload_batch_on_dl001`). 2. Ansible updates an Airflow Variable named `s3_worker_hostnames` with a JSON list of all active uploader workers (typically dlXXX machines). Each worker listens to its own queue (e.g., `queue-dl-dl001`). 3. This DAG reads the variable on manual trigger or after a pause/resume cycle to create the dynamic tasks. This allows for easy inspection of per-worker logs and status from the Airflow UI. 4. Each dynamic task watches a shared folder (`/opt/airflow/downloadfiles/videos/ready`). Download workers place completed videos into timestamped sub-folders (e.g., `20241122T1050`). The uploader processes these 10-minute batches, copying them to S3 with `s5cmd` and then deleting the source directories. This design avoids race conditions and improves performance. #### Why use 10-minute batch folders? While an `mv` command (atomic on the same filesystem) is sufficient to ensure a single video directory is complete when it appears in the `ready` folder, the batching system solves higher-level concurrency and efficiency problems in a high-throughput environment. - **Concurrency Management**: The uploader needs to process a discrete *set* of videos. By working on batches from a *previous* time window (e.g., uploading the `10:40` batch after `10:50`), it guarantees that no new files will be added to that batch while it's being processed. This creates a clean, reliable unit of work and prevents the uploader from missing videos that are moved in while it's compiling its list. - **Bulk Operation Efficiency**: It is far more efficient to upload hundreds of videos in a single bulk command than one by one. The batching system allows videos to accumulate, and the uploader sends them all to S3 in one highly optimized `s5cmd run` command. Similarly, after a successful upload, the uploader can delete the single parent batch directory, which is much faster than deleting hundreds of individual video folders. - **Continuous Operation**: The uploader task is a long-running loop. If processing a batch takes longer than 10 minutes (e.g., due to a large volume of videos or slow network), the uploader will continue working on that batch until it is complete. It only sleeps when it has processed all available completed batches and is waiting for new ones to become ready. #### Cleanup Method: `rsync` vs `shutil.rmtree` The cleanup process uses the `rsync` empty-folder trick to delete the contents of the batch directory before removing the directory itself. This is a deliberate performance optimization. The command is effectively: `rsync -a --delete /path/to/empty/ /path/to/delete/`. - Python's `shutil.rmtree` can be slow as it makes an individual `os.remove()` system call for every file. - The `rsync` method is a well-known and highly efficient alternative for this scenario, as `rsync` is a mature C program optimized for these operations. More details on this performance difference can be found here: https://stackoverflow.com/questions/5470939/why-is-shutil-rmtree-so-slow """, params={ 'mode': Param( 'mv', type="string", enum=['cp', 'mv'], title="Operation Mode", description="`mv` (move): After a successful upload, the temporary batch directory is deleted. This is the standard behavior. `cp` (copy): The temporary batch directory is left intact for debugging; it will be cleaned up on the next run." ), 'dry_run': Param( True, type="boolean", title="Dry Run", description="If True, the DAG will perform all steps except the actual upload and cleanup. `s5cmd` will be run with `--dry-run`, and the final directory removal will be skipped. Log messages will indicate what would have happened." ), 'concurrency': Param(10, type="integer", title="s5cmd Concurrency"), 'sleep_if_no_videos_min': Param(5, type="integer", title="Sleep if Idle (minutes)", description="How many minutes the task should sleep if no videos are found to upload. This should be less than any external timeout (e.g., Celery's worker_proc_timeout)."), 'batch_completion_wait_min': Param(0, type="integer", title="Batch Completion Wait (minutes)", description="How many minutes to wait after a 10-minute batch window closes before considering it for upload. Default is 0, which processes the current batch immediately. A value of 10 restores the old behavior of waiting for the next 10-minute window."), 's3_conn_id': Param('s3_delivery_connection', type="string", title="S3 Connection ID", description="The Airflow connection ID for the S3-compatible storage. If this connection is invalid or missing, the task will fall back to environment variables."), 's3_bucket_name': Param( 'videos', type="string", title="S3 Bucket Name", description="The name of the S3 bucket to upload to. Common values are 'videos' or 'videos-prod'." ), } ) as dag: # Dynamically create one task per S3 worker hostname # IMPORTANT: The tasks are created when this DAG file is parsed by the Airflow Scheduler. # If you add/change the 's3_worker_hostnames' Airflow Variable, you may need to # wait a few minutes for the scheduler to re-parse the file and update the tasks. # Forcing a re-parse can be done by pausing and un-pausing the DAG in the UI. s3_worker_hostnames = [] # Initialize to be safe try: # The variable should be a JSON list of strings, e.g., ["s3-001", "s3-002"] s3_worker_hostnames = Variable.get("s3_worker_hostnames", deserialize_json=True, default_var=[]) logger.info(f"DAG 'ytdlp_s3_uploader' successfully loaded s3_worker_hostnames variable. Value: {s3_worker_hostnames}") if not isinstance(s3_worker_hostnames, list): logger.error(f"Airflow Variable 's3_worker_hostnames' is not a valid JSON list. Value: {s3_worker_hostnames}") s3_worker_hostnames = [] # Reset to empty to prevent errors except Exception as e: logger.error( f"Could not read or parse Airflow Variable 's3_worker_hostnames'. " f"Please create it in the Airflow UI as a JSON list of your S3 worker hostnames (e.g., [\"s3-001\"]). " f"No S3 worker tasks will be created. Error: {e}", exc_info=True ) s3_worker_hostnames = [] @task(task_id='check_s3_worker_configuration') def check_s3_worker_configuration_callable(): """Logs the current value of the s3_worker_hostnames variable at runtime for debugging.""" logger.info("--- S3 Worker Configuration Check (at runtime) ---") try: hostnames = Variable.get("s3_worker_hostnames", deserialize_json=True, default_var=None) if hostnames is None: logger.error("Airflow Variable 's3_worker_hostnames' is not defined.") logger.info("Please create it in the Airflow UI (Admin -> Variables) as a JSON list of strings, e.g., [\"s3-worker-01\"]") elif not isinstance(hostnames, list): logger.error(f"Airflow Variable 's3_worker_hostnames' is not a valid JSON list. Current value: {hostnames}") elif not hostnames: logger.warning("Airflow Variable 's3_worker_hostnames' is defined but is an empty list []. No worker tasks will be run.") else: logger.info(f"Successfully read 's3_worker_hostnames'. It contains {len(hostnames)} worker(s): {hostnames}") logger.info("If you see this task but no worker tasks in the UI, it means the DAG did not find these workers when it was parsed by the scheduler.") logger.info("This can happen due to caching. Please wait a few minutes for the scheduler to re-parse the DAG file, or pause/un-pause the DAG.") except Exception as e: logger.error(f"An error occurred while trying to read the 's3_worker_hostnames' variable at runtime: {e}", exc_info=True) logger.info("--- End of Configuration Check ---") check_s3_worker_configuration_task = check_s3_worker_configuration_callable() check_s3_worker_configuration_task.doc_md = """ ### S3 Worker Configuration Check This task runs at the start of every DAG run to check the `s3_worker_hostnames` Airflow Variable. The dynamic worker tasks are created based on this variable *at the time the DAG is parsed by the scheduler*. **Check the logs for this task to see the current value of the variable as read at runtime.** This can help diagnose why worker tasks may not have been created. If the logs show the variable is correct but you don't see the worker tasks in the UI, you may need to wait for the scheduler to re-parse the DAG file. You can force this by pausing and un-pausing the DAG. """ if s3_worker_hostnames: worker_tasks = [] for hostname in s3_worker_hostnames: # Sanitize hostname for task_id task_id_hostname = hostname.replace('.', '_') # Create a task for each worker, pinned to its specific queue upload_task = task( task_id=f'upload_batch_on_{task_id_hostname}', queue=f'queue-s3-{hostname}', execution_timeout=timedelta(days=1), )(run_s3_upload_batch)() worker_tasks.append(upload_task) check_s3_worker_configuration_task >> worker_tasks