418 lines
24 KiB
Python
418 lines
24 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
DAG to upload completed video directories to an S3-compatible service.
|
|
This DAG creates one long-running task for each configured S3 worker.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
|
|
from airflow.decorators import task
|
|
from airflow.exceptions import AirflowException
|
|
from airflow.models.dag import DAG
|
|
from airflow.models.param import Param
|
|
from airflow.models.variable import Variable
|
|
from airflow.operators.dummy import DummyOperator
|
|
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
|
|
from airflow.utils.dates import days_ago
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DEFAULT_ARGS = {
|
|
'owner': 'airflow',
|
|
'retries': 1,
|
|
'retry_delay': timedelta(minutes=1),
|
|
}
|
|
|
|
BASE_DOWNLOAD_PATH = '/opt/airflow/downloadfiles'
|
|
VIDEOS_PATH = os.path.join(BASE_DOWNLOAD_PATH, 'videos')
|
|
READY_PATH = os.path.join(VIDEOS_PATH, 'ready')
|
|
|
|
def run_s3_upload_batch(**context):
|
|
"""
|
|
This function runs in a continuous loop to check for completed video directories and upload them to S3.
|
|
If no videos are found, it sleeps for a configurable interval before checking again.
|
|
Dry run mode is non-destructive and will pause briefly after checking to prevent tight loops.
|
|
"""
|
|
params = context['params']
|
|
concurrency = params['concurrency']
|
|
mode = params['mode']
|
|
dry_run = params['dry_run']
|
|
sleep_interval_min = params['sleep_if_no_videos_min']
|
|
sleep_interval_sec = sleep_interval_min * 60
|
|
s3_conn_id = params['s3_conn_id']
|
|
|
|
s3_access_key_id = None
|
|
s3_secret_access_key = None
|
|
s3_endpoint = None
|
|
s3_bucket = None
|
|
s3_region = None
|
|
config_source = "Unknown"
|
|
profile_name = "rusonyx"
|
|
|
|
# --- Attempt 1: Get S3 Configuration from Airflow Connection ---
|
|
if s3_conn_id:
|
|
try:
|
|
logger.info(f"Attempting to load S3 configuration from Airflow connection '{s3_conn_id}'.")
|
|
s3_hook = S3Hook(aws_conn_id=s3_conn_id)
|
|
s3_conn = s3_hook.get_connection(s3_conn_id)
|
|
|
|
s3_access_key_id = s3_conn.login
|
|
s3_secret_access_key = s3_conn.password
|
|
s3_endpoint = s3_conn.host
|
|
|
|
extra_config = s3_conn.extra_dejson
|
|
s3_bucket = extra_config.get('bucket')
|
|
s3_region = extra_config.get('region_name')
|
|
|
|
if not all([s3_access_key_id, s3_secret_access_key, s3_endpoint, s3_bucket, s3_region]):
|
|
logger.warning("S3 connection from Airflow is missing one or more required fields. Will attempt to fall back to environment variables.")
|
|
s3_access_key_id = s3_secret_access_key = s3_endpoint = s3_bucket = s3_region = None # Reset all
|
|
else:
|
|
config_source = f"Airflow Connection '{s3_conn_id}'"
|
|
profile_name = "rusonyx-airflow"
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load S3 configuration from Airflow connection '{s3_conn_id}': {e}. Will attempt to fall back to environment variables.")
|
|
|
|
# --- Attempt 2: Fallback to Environment Variables ---
|
|
if not all([s3_access_key_id, s3_secret_access_key, s3_endpoint, s3_bucket, s3_region]):
|
|
try:
|
|
logger.info("Attempting to load S3 configuration from environment variables as a fallback.")
|
|
s3_access_key_id = os.environ['S3_DELIVERY_AWS_ACCESS_KEY_ID']
|
|
s3_secret_access_key = os.environ['S3_DELIVERY_AWS_SECRET_ACCESS_KEY']
|
|
s3_endpoint = os.environ['S3_DELIVERY_ENDPOINT']
|
|
s3_bucket = os.environ['S3_DELIVERY_BUCKET']
|
|
s3_region = os.environ['S3_DELIVERY_AWS_REGION']
|
|
|
|
if not all([s3_access_key_id, s3_secret_access_key, s3_endpoint, s3_bucket, s3_region]):
|
|
raise ValueError("One or more S3 configuration environment variables are empty.")
|
|
config_source = "Environment Variables"
|
|
profile_name = "rusonyx"
|
|
|
|
except (KeyError, ValueError) as e:
|
|
logger.error(f"Having problems reading S3 configuration from environment variables: {e}", exc_info=True)
|
|
raise AirflowException("S3 configuration is missing. Could not load from Airflow connection or environment variables.")
|
|
|
|
s3_destination = f"s3://{s3_bucket}/"
|
|
|
|
logger.info(f"Starting S3 upload loop. Watching source '{READY_PATH}' for delivery to '{s3_destination}'.")
|
|
logger.info(f"Mode: {mode}, Dry Run: {dry_run}, Idle Sleep: {sleep_interval_min} min")
|
|
logger.info(f"S3 Config loaded from {config_source}: Endpoint='{s3_endpoint}', Bucket='{s3_bucket}', Region='{s3_region}', Profile='{profile_name}'")
|
|
|
|
# --- Write credentials to file for s5cmd profile ---
|
|
aws_credentials_path = os.path.expanduser("~/.aws/credentials")
|
|
aws_config_path = os.path.expanduser("~/.aws/config")
|
|
|
|
try:
|
|
os.makedirs(os.path.dirname(aws_credentials_path), exist_ok=True)
|
|
|
|
with open(aws_credentials_path, 'w') as f:
|
|
f.write(f"[{profile_name}]\n")
|
|
f.write(f"aws_access_key_id = {s3_access_key_id}\n")
|
|
f.write(f"aws_secret_access_key = {s3_secret_access_key}\n")
|
|
logger.info(f"Wrote credentials for profile '{profile_name}' to {aws_credentials_path}")
|
|
|
|
with open(aws_config_path, 'w') as f:
|
|
f.write(f"[profile {profile_name}]\n")
|
|
f.write(f"region = {s3_region}\n")
|
|
logger.info(f"Wrote config for profile '{profile_name}' to {aws_config_path}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to write AWS credentials/config file: {e}", exc_info=True)
|
|
raise AirflowException(f"Failed to write AWS credentials/config file: {e}")
|
|
|
|
while True:
|
|
logger.info("--- Starting new S3 upload cycle ---")
|
|
|
|
# --- Dry Run Logic (Non-destructive) ---
|
|
if dry_run:
|
|
logger.info("[DRY RUN] Checking for completed video batches...")
|
|
if not os.path.exists(READY_PATH):
|
|
logger.info(f"[DRY RUN] Source directory '{READY_PATH}' does not exist. Nothing to upload.")
|
|
else:
|
|
now = datetime.now()
|
|
wait_minutes = params['batch_completion_wait_min']
|
|
cutoff_time = now - timedelta(minutes=wait_minutes)
|
|
rounded_minute = (cutoff_time.minute // 10) * 10
|
|
cutoff_batch_ts = cutoff_time.strftime('%Y%m%dT%H') + f"{rounded_minute:02d}"
|
|
logger.info(f"[DRY RUN] Current time is {now.strftime('%H:%M:%S')}. With a {wait_minutes} min wait, processing batches up to and including '{cutoff_batch_ts}'.")
|
|
|
|
all_video_dirs_to_process = []
|
|
processed_batch_dirs = set()
|
|
all_batch_dirs = sorted([d for d in os.listdir(READY_PATH) if os.path.isdir(os.path.join(READY_PATH, d))])
|
|
|
|
for ts_dir in all_batch_dirs:
|
|
if ts_dir > cutoff_batch_ts:
|
|
continue
|
|
|
|
batch_dir_path = os.path.join(READY_PATH, ts_dir)
|
|
video_dirs_in_batch = [os.path.join(batch_dir_path, d) for d in os.listdir(batch_dir_path) if os.path.isdir(os.path.join(batch_dir_path, d))]
|
|
|
|
if video_dirs_in_batch:
|
|
all_video_dirs_to_process.extend(video_dirs_in_batch)
|
|
processed_batch_dirs.add(batch_dir_path)
|
|
else:
|
|
logger.info(f"[DRY RUN] Batch directory '{batch_dir_path}' is empty. Would remove it.")
|
|
|
|
if all_video_dirs_to_process:
|
|
logger.info(f"[DRY RUN] Found {len(all_video_dirs_to_process)} total video director(y/ies) in {len(processed_batch_dirs)} batch(es) to process.")
|
|
|
|
# Construct and log the command that would be run
|
|
cmd = [
|
|
's5cmd', '--endpoint-url', s3_endpoint, '--log', 'debug', '--no-verify-ssl',
|
|
'--use-list-objects-v1', '--profile', profile_name, '--stat',
|
|
'--numworkers', str(concurrency), 'run'
|
|
]
|
|
cmd_str = ' '.join(cmd)
|
|
|
|
# Construct the commands to be piped
|
|
commands_to_pipe = '\n'.join([f"cp \"{dir_path}\" \"{s3_destination}\"" for dir_path in all_video_dirs_to_process])
|
|
|
|
logger.info(f"[DRY RUN] The following command would be executed:\n{cmd_str}")
|
|
logger.info(f"[DRY RUN] The following commands would be piped to stdin:\n{commands_to_pipe}")
|
|
|
|
if mode == 'mv':
|
|
logger.info(f"[DRY RUN] Mode is 'mv'. Would delete {len(processed_batch_dirs)} source batch directories after successful upload.")
|
|
|
|
# Pause briefly in dry-run mode if videos are found to avoid a fast, noisy loop.
|
|
dry_run_pause_s = 10
|
|
logger.info(f"[DRY RUN] Pausing for {dry_run_pause_s} seconds to prevent rapid re-listing of the same files (this is a short, fixed pause for dry-run only).")
|
|
time.sleep(dry_run_pause_s)
|
|
continue # Go to the start of the next cycle
|
|
else:
|
|
logger.info("[DRY RUN] No completed video batches found.")
|
|
|
|
# If in dry-run and no videos are found, sleep for the main interval.
|
|
logger.info(f"[DRY RUN] Sleeping for {sleep_interval_min} minute(s)...")
|
|
time.sleep(sleep_interval_sec)
|
|
continue
|
|
|
|
# --- Normal Operation Logic (Destructive) ---
|
|
work_done_in_cycle = False
|
|
try:
|
|
# --- 1. Find all videos to upload from all completed batches ---
|
|
if not os.path.exists(READY_PATH):
|
|
logger.info(f"Ready directory '{READY_PATH}' does not exist. Nothing to upload.")
|
|
else:
|
|
now = datetime.now()
|
|
wait_minutes = params['batch_completion_wait_min']
|
|
cutoff_time = now - timedelta(minutes=wait_minutes)
|
|
rounded_minute = (cutoff_time.minute // 10) * 10
|
|
cutoff_batch_ts = cutoff_time.strftime('%Y%m%dT%H') + f"{rounded_minute:02d}"
|
|
logger.info(f"Current time is {now.strftime('%H:%M:%S')}. With a {wait_minutes} min wait, processing batches up to and including '{cutoff_batch_ts}'.")
|
|
|
|
all_video_dirs_to_process = []
|
|
processed_batch_dirs = set()
|
|
all_batch_dirs = sorted([d for d in os.listdir(READY_PATH) if os.path.isdir(os.path.join(READY_PATH, d))])
|
|
|
|
for ts_dir in all_batch_dirs:
|
|
if ts_dir > cutoff_batch_ts:
|
|
continue # This batch is not old enough to be processed
|
|
|
|
batch_dir_path = os.path.join(READY_PATH, ts_dir)
|
|
video_dirs_in_batch = [os.path.join(batch_dir_path, d) for d in os.listdir(batch_dir_path) if os.path.isdir(os.path.join(batch_dir_path, d))]
|
|
|
|
if not video_dirs_in_batch:
|
|
logger.info(f"Batch directory '{batch_dir_path}' is empty. Removing it.")
|
|
try:
|
|
os.rmdir(batch_dir_path)
|
|
except OSError as e:
|
|
logger.error(f"Could not remove empty batch directory {batch_dir_path}: {e}")
|
|
continue # Move to the next batch
|
|
|
|
all_video_dirs_to_process.extend(video_dirs_in_batch)
|
|
processed_batch_dirs.add(batch_dir_path)
|
|
|
|
# --- 2. Upload All Found Videos in a Single Batch Command ---
|
|
if all_video_dirs_to_process:
|
|
work_done_in_cycle = True
|
|
logger.info(f"Found {len(all_video_dirs_to_process)} total video director(y/ies) in {len(processed_batch_dirs)} batch(es) to upload.")
|
|
|
|
cmd = [
|
|
's5cmd', '--endpoint-url', s3_endpoint, '--log', 'debug', '--no-verify-ssl',
|
|
'--use-list-objects-v1', '--profile', profile_name, '--stat',
|
|
'--numworkers', str(concurrency), 'run'
|
|
]
|
|
cmd_str = ' '.join(cmd)
|
|
|
|
# Construct the commands to be piped to stdin
|
|
commands_to_pipe = '\n'.join([f"cp \"{dir_path}\" \"{s3_destination}\"" for dir_path in all_video_dirs_to_process])
|
|
|
|
logger.info(f"Executing s5cmd batch command:\n{cmd_str}")
|
|
logger.info(f"Piping {len(all_video_dirs_to_process)} 'cp' commands to stdin.")
|
|
|
|
upload_start_time = time.time()
|
|
process = subprocess.run(cmd, check=True, capture_output=True, text=True, input=commands_to_pipe)
|
|
upload_duration = time.time() - upload_start_time
|
|
|
|
logger.info(f"s5cmd STDOUT: {process.stdout}")
|
|
if process.stderr:
|
|
logger.info(f"s5cmd STDERR: {process.stderr}")
|
|
logger.info(f"Upload command completed successfully in {upload_duration:.2f} seconds.")
|
|
logger.info(f"Successfully copied {len(all_video_dirs_to_process)} director(y/ies) to S3.")
|
|
|
|
# --- 3. Cleanup ---
|
|
if mode == 'mv':
|
|
logger.info(f"Mode is 'mv'. Cleaning up {len(processed_batch_dirs)} source batch director(y/ies).")
|
|
cleanup_start_time = time.time()
|
|
|
|
# Create a temporary empty directory to use as a source for rsync deletion
|
|
empty_dir_for_rsync = os.path.join(READY_PATH, f"__empty_{int(time.time())}")
|
|
os.makedirs(empty_dir_for_rsync, exist_ok=True)
|
|
|
|
try:
|
|
for batch_dir_path in processed_batch_dirs:
|
|
try:
|
|
# Use rsync with an empty source to efficiently delete the contents of the batch directory
|
|
# The trailing slash on both source and destination is important.
|
|
rsync_cmd = [
|
|
'rsync',
|
|
'-a', '--delete',
|
|
f'{empty_dir_for_rsync}/',
|
|
f'{batch_dir_path}/'
|
|
]
|
|
subprocess.run(rsync_cmd, check=True, capture_output=True, text=True)
|
|
|
|
# After the contents are deleted, remove the now-empty directory
|
|
os.rmdir(batch_dir_path)
|
|
logger.info(f"Successfully removed {batch_dir_path}")
|
|
except Exception as cleanup_e:
|
|
logger.error(f"Failed to remove directory {batch_dir_path}: {cleanup_e}", exc_info=True)
|
|
if isinstance(cleanup_e, subprocess.CalledProcessError):
|
|
logger.error(f"rsync STDERR: {cleanup_e.stderr}")
|
|
finally:
|
|
# Clean up the temporary empty directory
|
|
shutil.rmtree(empty_dir_for_rsync)
|
|
|
|
cleanup_duration = time.time() - cleanup_start_time
|
|
logger.info(f"Cleanup complete in {cleanup_duration:.2f} seconds.")
|
|
else: # mode == 'cp'
|
|
logger.info(f"Mode is 'cp'. Source directories will be left for inspection.")
|
|
|
|
if not work_done_in_cycle:
|
|
logger.info(f"No completed video batches found in '{READY_PATH}'.")
|
|
|
|
except Exception as e:
|
|
logger.error(f"An error occurred during the S3 upload cycle: {e}", exc_info=True)
|
|
if isinstance(e, subprocess.CalledProcessError):
|
|
logger.error(f"s5cmd STDERR: {e.stderr}")
|
|
# On error, we do NOT clean up, to allow for investigation and retries.
|
|
# The failed directories will be picked up in the next cycle.
|
|
# Treat errors as "no work done" to trigger sleep and prevent fast failure loops
|
|
work_done_in_cycle = False
|
|
|
|
# --- Loop Control ---
|
|
if not work_done_in_cycle:
|
|
logger.info(f"No work done in this cycle. Sleeping for {sleep_interval_min} minute(s)...")
|
|
time.sleep(sleep_interval_sec)
|
|
else:
|
|
logger.info("Work was completed in this cycle. Checking for more immediately.")
|
|
|
|
with DAG(
|
|
dag_id='ytdlp_s3_uploader',
|
|
default_args=DEFAULT_ARGS,
|
|
schedule=None,
|
|
start_date=days_ago(1),
|
|
catchup=False,
|
|
tags=['ytdlp', 's3', 'upload'],
|
|
doc_md="""### S3 Uploader DAG
|
|
|
|
1. This DAG creates dynamic uploader tasks with clear names depicting their worker machine (e.g., `upload_batch_on_dl001`).
|
|
2. Ansible updates an Airflow Variable named `s3_worker_hostnames` with a JSON list of all active uploader workers (typically dlXXX machines). Each worker listens to its own queue (e.g., `queue-dl-dl001`).
|
|
3. This DAG reads the variable on manual trigger or after a pause/resume cycle to create the dynamic tasks. This allows for easy inspection of per-worker logs and status from the Airflow UI.
|
|
4. Each dynamic task watches a shared folder (`/opt/airflow/downloadfiles/videos/ready`). Download workers place completed videos into timestamped sub-folders (e.g., `20241122T1050`). The uploader processes these 10-minute batches, copying them to S3 with `s5cmd` and then deleting the source directories. This design avoids race conditions and improves performance.
|
|
""",
|
|
params={
|
|
'mode': Param(
|
|
'mv', type="string", enum=['cp', 'mv'], title="Operation Mode",
|
|
description="`mv` (move): After a successful upload, the temporary batch directory is deleted. This is the standard behavior. `cp` (copy): The temporary batch directory is left intact for debugging; it will be cleaned up on the next run."
|
|
),
|
|
'dry_run': Param(
|
|
True, type="boolean", title="Dry Run",
|
|
description="If True, the DAG will perform all steps except the actual upload and cleanup. `s5cmd` will be run with `--dry-run`, and the final directory removal will be skipped. Log messages will indicate what would have happened."
|
|
),
|
|
'concurrency': Param(10, type="integer", title="s5cmd Concurrency"),
|
|
'sleep_if_no_videos_min': Param(10, type="integer", title="Sleep if Idle (minutes)", description="How many minutes the task should sleep if no videos are found to upload."),
|
|
'batch_completion_wait_min': Param(0, type="integer", title="Batch Completion Wait (minutes)", description="How many minutes to wait after a 10-minute batch window closes before considering it for upload. Default is 0, which processes the current batch immediately. A value of 10 restores the old behavior of waiting for the next 10-minute window."),
|
|
's3_conn_id': Param('s3_delivery_connection', type="string", title="S3 Connection ID", description="The Airflow connection ID for the S3-compatible storage. If this connection is invalid or missing, the task will fall back to environment variables."),
|
|
}
|
|
) as dag:
|
|
|
|
# Dynamically create one task per S3 worker hostname
|
|
# IMPORTANT: The tasks are created when this DAG file is parsed by the Airflow Scheduler.
|
|
# If you add/change the 's3_worker_hostnames' Airflow Variable, you may need to
|
|
# wait a few minutes for the scheduler to re-parse the file and update the tasks.
|
|
# Forcing a re-parse can be done by pausing and un-pausing the DAG in the UI.
|
|
s3_worker_hostnames = [] # Initialize to be safe
|
|
try:
|
|
# The variable should be a JSON list of strings, e.g., ["s3-001", "s3-002"]
|
|
s3_worker_hostnames = Variable.get("s3_worker_hostnames", deserialize_json=True, default_var=[])
|
|
logger.info(f"DAG 'ytdlp_s3_uploader' successfully loaded s3_worker_hostnames variable. Value: {s3_worker_hostnames}")
|
|
if not isinstance(s3_worker_hostnames, list):
|
|
logger.error(f"Airflow Variable 's3_worker_hostnames' is not a valid JSON list. Value: {s3_worker_hostnames}")
|
|
s3_worker_hostnames = [] # Reset to empty to prevent errors
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Could not read or parse Airflow Variable 's3_worker_hostnames'. "
|
|
f"Please create it in the Airflow UI as a JSON list of your S3 worker hostnames (e.g., [\"s3-001\"]). "
|
|
f"No S3 worker tasks will be created. Error: {e}",
|
|
exc_info=True
|
|
)
|
|
s3_worker_hostnames = []
|
|
|
|
@task(task_id='check_s3_worker_configuration')
|
|
def check_s3_worker_configuration_callable():
|
|
"""Logs the current value of the s3_worker_hostnames variable at runtime for debugging."""
|
|
logger.info("--- S3 Worker Configuration Check (at runtime) ---")
|
|
try:
|
|
hostnames = Variable.get("s3_worker_hostnames", deserialize_json=True, default_var=None)
|
|
if hostnames is None:
|
|
logger.error("Airflow Variable 's3_worker_hostnames' is not defined.")
|
|
logger.info("Please create it in the Airflow UI (Admin -> Variables) as a JSON list of strings, e.g., [\"s3-worker-01\"]")
|
|
elif not isinstance(hostnames, list):
|
|
logger.error(f"Airflow Variable 's3_worker_hostnames' is not a valid JSON list. Current value: {hostnames}")
|
|
elif not hostnames:
|
|
logger.warning("Airflow Variable 's3_worker_hostnames' is defined but is an empty list []. No worker tasks will be run.")
|
|
else:
|
|
logger.info(f"Successfully read 's3_worker_hostnames'. It contains {len(hostnames)} worker(s): {hostnames}")
|
|
logger.info("If you see this task but no worker tasks in the UI, it means the DAG did not find these workers when it was parsed by the scheduler.")
|
|
logger.info("This can happen due to caching. Please wait a few minutes for the scheduler to re-parse the DAG file, or pause/un-pause the DAG.")
|
|
|
|
except Exception as e:
|
|
logger.error(f"An error occurred while trying to read the 's3_worker_hostnames' variable at runtime: {e}", exc_info=True)
|
|
logger.info("--- End of Configuration Check ---")
|
|
|
|
check_s3_worker_configuration_task = check_s3_worker_configuration_callable()
|
|
check_s3_worker_configuration_task.doc_md = """
|
|
### S3 Worker Configuration Check
|
|
|
|
This task runs at the start of every DAG run to check the `s3_worker_hostnames` Airflow Variable.
|
|
|
|
The dynamic worker tasks are created based on this variable *at the time the DAG is parsed by the scheduler*.
|
|
|
|
**Check the logs for this task to see the current value of the variable as read at runtime.** This can help diagnose why worker tasks may not have been created.
|
|
|
|
If the logs show the variable is correct but you don't see the worker tasks in the UI, you may need to wait for the scheduler to re-parse the DAG file. You can force this by pausing and un-pausing the DAG.
|
|
"""
|
|
if s3_worker_hostnames:
|
|
worker_tasks = []
|
|
for hostname in s3_worker_hostnames:
|
|
# Sanitize hostname for task_id
|
|
task_id_hostname = hostname.replace('.', '_')
|
|
|
|
# Create a task for each worker, pinned to its specific queue
|
|
upload_task = task(
|
|
task_id=f'upload_batch_on_{task_id_hostname}',
|
|
queue=f'queue-s3-{hostname}'
|
|
)(run_s3_upload_batch)()
|
|
worker_tasks.append(upload_task)
|
|
|
|
check_s3_worker_configuration_task >> worker_tasks
|