import logging import os import shlex import signal import subprocess import sys import threading import time try: import docker except ImportError: docker = None logger = logging.getLogger(__name__) # Worker ID tracking worker_id_map = {} worker_id_counter = 0 worker_id_lock = threading.Lock() def get_worker_id(): """Assigns a stable, sequential ID to each worker thread.""" global worker_id_counter thread_id = threading.get_ident() with worker_id_lock: if thread_id not in worker_id_map: worker_id_map[thread_id] = worker_id_counter worker_id_counter += 1 return worker_id_map[thread_id] def run_command(cmd, running_processes, process_lock, input_data=None, binary_stdout=False, stream_output=False, stream_prefix="", env=None): """ Runs a command, captures its output, and returns status. If binary_stdout is True, stdout is returned as bytes. Otherwise, both are decoded strings. If stream_output is True, the command's stdout/stderr are printed to the console in real-time. """ logger.debug(f"Running command: {' '.join(shlex.quote(s) for s in cmd)}") if env: logger.debug(f"With custom environment: {env}") process = None try: # Combine with os.environ to ensure PATH etc. are inherited. process_env = os.environ.copy() if env: # Ensure all values in the custom env are strings process_env.update({k: str(v) for k, v in env.items()}) # Always open in binary mode to handle both cases. We will decode later. process = subprocess.Popen( cmd, stdin=subprocess.PIPE if input_data else None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid, # Start in a new process group to isolate from terminal signals env=process_env ) with process_lock: running_processes.add(process) stdout_capture = [] stderr_capture = [] def read_pipe(pipe, capture_list, display_pipe=None, prefix=""): """Reads a pipe line by line (as bytes), appending to a list and optionally displaying.""" for line in iter(pipe.readline, b''): capture_list.append(line) if display_pipe: # Decode for display display_line = line.decode('utf-8', errors='replace') # Use print to ensure atomicity and proper handling of newlines print(f"{prefix}{display_line.strip()}", file=display_pipe) stdout_display_pipe = sys.stdout if stream_output else None stderr_display_pipe = sys.stderr if stream_output else None # We must read stdout and stderr in parallel to prevent deadlocks. stdout_thread = threading.Thread(target=read_pipe, args=(process.stdout, stdout_capture, stdout_display_pipe, stream_prefix)) stderr_thread = threading.Thread(target=read_pipe, args=(process.stderr, stderr_capture, stderr_display_pipe, stream_prefix)) stdout_thread.start() stderr_thread.start() # Handle stdin after starting to read outputs to avoid deadlocks. if input_data: try: process.stdin.write(input_data.encode('utf-8')) process.stdin.close() except (IOError, BrokenPipeError): # This can happen if the process exits quickly or doesn't read stdin. logger.debug(f"Could not write to stdin for command: {' '.join(cmd)}. Process may have already exited.") # Wait for the process to finish and for all output to be read. # Add a timeout to prevent indefinite hangs. 15 minutes should be enough for any single download. timeout_seconds = 15 * 60 try: retcode = process.wait(timeout=timeout_seconds) except subprocess.TimeoutExpired: logger.error(f"Command timed out after {timeout_seconds} seconds: {' '.join(cmd)}") # Kill the entire process group to ensure child processes (like yt-dlp or ffmpeg) are also terminated. try: os.killpg(os.getpgid(process.pid), signal.SIGKILL) except (ProcessLookupError, PermissionError): pass # Process already finished or we lack permissions retcode = -1 # Indicate failure # Wait a moment for pipes to close after killing. try: process.wait(timeout=5) except subprocess.TimeoutExpired: logger.warning("Process did not terminate gracefully after SIGKILL.") stdout_thread.join(timeout=5) stderr_thread.join(timeout=5) stdout_bytes = b"".join(stdout_capture) stderr_bytes = b"".join(stderr_capture) # If we timed out, create a synthetic stderr message to ensure the failure is reported upstream. if retcode == -1 and not stderr_bytes.strip(): stderr_bytes = f"Command timed out after {timeout_seconds} seconds".encode('utf-8') stdout = stdout_bytes if binary_stdout else stdout_bytes.decode('utf-8', errors='replace') stderr = stderr_bytes.decode('utf-8', errors='replace') return retcode, stdout, stderr except FileNotFoundError: logger.error(f"Command not found: {cmd[0]}. Make sure it's in your PATH.") return -1, "", f"Command not found: {cmd[0]}" except Exception as e: logger.error(f"An error occurred while running command: {' '.join(cmd)}. Error: {e}") return -1, "", str(e) finally: if process: with process_lock: running_processes.discard(process) def run_docker_container(image_name, command, volumes, stream_prefix="", network_name=None, log_callback=None, profile_manager=None, profile_name=None, environment=None, log_command_override=None): """ Runs a command in a new, ephemeral Docker container using docker-py. Streams logs in real-time, allows for live log processing, and ensures cleanup. Can monitor a profile and stop the container if the profile is BANNED or RESTING. Returns a tuple of (exit_code, stdout_str, stderr_str, stop_reason). """ if not docker: # This should be caught earlier, but as a safeguard: return -1, "", "Docker SDK for Python is not installed. Please run: pip install docker", None logger.debug(f"Running docker container. Image: {image_name}, Command: {command}, Volumes: {volumes}, Network: {network_name}") # --- Construct and log the equivalent CLI command for debugging --- try: user_id = f"{os.getuid()}:{os.getgid()}" if os.name != 'nt' else None cli_cmd = ['docker', 'run', '--rm'] if user_id: cli_cmd.extend(['-u', user_id]) if network_name: cli_cmd.extend(['--network', network_name]) if environment: for k, v in sorted(environment.items()): cli_cmd.extend(['-e', f"{k}={v}"]) if volumes: for host_path, container_config in sorted(volumes.items()): bind = container_config.get('bind') mode = container_config.get('mode', 'rw') cli_cmd.extend(['-v', f"{os.path.abspath(host_path)}:{bind}:{mode}"]) cli_cmd.append(image_name) cli_cmd.extend(command) logger.info(f"Full docker command: {' '.join(shlex.quote(s) for s in cli_cmd)}") if log_command_override: # Build a more comprehensive, runnable command for logging env_prefix_parts = [] if environment: for k, v in sorted(environment.items()): env_prefix_parts.append(f"{k}={shlex.quote(str(v))}") env_prefix = ' '.join(env_prefix_parts) equivalent_ytdlp_cmd = ' '.join(shlex.quote(s) for s in log_command_override) full_equivalent_cmd = f"{env_prefix} {equivalent_ytdlp_cmd}".strip() logger.info(f"Equivalent host command: {full_equivalent_cmd}") except Exception as e: logger.warning(f"Could not construct equivalent docker command for logging: {e}") # --- End of logging --- container = None monitor_thread = None stop_monitor_event = threading.Event() # Use a mutable object (dict) to share the stop reason between threads stop_reason_obj = {'reason': None} try: client = docker.from_env() # Run container as current host user to avoid permission issues with volume mounts user_id = f"{os.getuid()}:{os.getgid()}" if os.name != 'nt' else None container = client.containers.run( image_name, command=command, volumes=volumes, detach=True, network=network_name, user=user_id, environment=environment, # We use `remove` in `finally` instead of `auto_remove` to ensure we can get logs # even if the container fails to start. ) # Thread to monitor profile status and stop container if BANNED or RESTING def monitor_profile(): while not stop_monitor_event.is_set(): try: profile_info = profile_manager.get_profile(profile_name) if profile_info: state = profile_info.get('state') if state in ['BANNED', 'RESTING']: logger.warning(f"Profile '{profile_name}' is {state}. Stopping container {container.short_id}.") stop_reason_obj['reason'] = f"Profile became {state}" try: container.stop(timeout=5) except docker.errors.APIError as e: logger.warning(f"Could not stop container {container.short_id}: {e}") break # Stop monitoring except Exception as e: logger.error(f"Error in profile monitor thread: {e}") # Wait for 2 seconds or until stop event is set stop_monitor_event.wait(2) if profile_manager and profile_name: monitor_thread = threading.Thread(target=monitor_profile, daemon=True) monitor_thread.start() # Stream logs in a separate thread to avoid blocking. log_stream = container.logs(stream=True, follow=True, stdout=True, stderr=True) for line_bytes in log_stream: line_str = line_bytes.decode('utf-8', errors='replace').strip() # Use logger.info to ensure output is captured by all handlers logger.info(f"{stream_prefix}{line_str}") if log_callback: # The callback can return True to signal an immediate stop. if log_callback(line_str): logger.warning(f"Log callback requested to stop container {container.short_id}.") stop_reason_obj['reason'] = "Stopped by log callback (fatal error)" try: container.stop(timeout=5) except docker.errors.APIError as e: logger.warning(f"Could not stop container {container.short_id}: {e}") break # Stop reading logs result = container.wait(timeout=15 * 60) exit_code = result.get('StatusCode', -1) # Get final logs to separate stdout and stderr. final_stdout = container.logs(stdout=True, stderr=False) final_stderr = container.logs(stdout=False, stderr=True) stdout_str = final_stdout.decode('utf-8', errors='replace') stderr_str = final_stderr.decode('utf-8', errors='replace') return exit_code, stdout_str, stderr_str, stop_reason_obj['reason'] except docker.errors.ImageNotFound: logger.error(f"Docker image not found: '{image_name}'. Please pull it first.") return -1, "", f"Docker image not found: {image_name}", None except docker.errors.APIError as e: logger.error(f"Docker API error: {e}") return -1, "", str(e), None except Exception as e: logger.error(f"An unexpected error occurred while running docker container: {e}", exc_info=True) return -1, "", str(e), None finally: if monitor_thread: stop_monitor_event.set() monitor_thread.join(timeout=1) if container: try: container.remove(force=True) logger.debug(f"Removed container {container.short_id}") except docker.errors.APIError as e: logger.warning(f"Could not remove container {container.short_id}: {e}")