#!/usr/bin/env python3 """ Redis Queue Management CLI Tool for yt-ops-client. """ import argparse import json import logging import os import sys from typing import Optional import redis try: from dotenv import load_dotenv except ImportError: load_dotenv = None try: from tabulate import tabulate except ImportError: print("'tabulate' library not found. Please install it with: pip install tabulate", file=sys.stderr) tabulate = None # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class QueueManager: """Manages Redis lists (queues).""" def __init__(self, redis_host='localhost', redis_port=6379, redis_password=None): """Initialize Redis connection.""" logger.info(f"Attempting to connect to Redis at {redis_host}:{redis_port}...") try: self.redis = redis.Redis( host=redis_host, port=redis_port, password=redis_password, decode_responses=True, socket_connect_timeout=5, socket_timeout=5 ) self.redis.ping() logger.info(f"Successfully connected to Redis.") except redis.exceptions.ConnectionError as e: logger.error(f"Failed to connect to Redis at {redis_host}:{redis_port}: {e}") sys.exit(1) def list_queues(self, pattern: str): """Lists queues matching a pattern and their sizes.""" queues = [] for key in self.redis.scan_iter(match=pattern): key_type = self.redis.type(key) if key_type == 'list': size = self.redis.llen(key) queues.append({'name': key, 'size': size}) return queues def peek(self, queue_name: str, count: int): """Returns the top `count` items from a queue without removing them.""" return self.redis.lrange(queue_name, 0, count - 1) def count(self, queue_name: str) -> int: """Returns the number of items in a queue.""" return self.redis.llen(queue_name) def populate(self, queue_name: str, file_path: str) -> int: """Populates a queue from a file (text with one item per line, or JSON with an array of strings).""" count = 0 if file_path.lower().endswith('.json'): logger.info("Detected JSON file. Attempting to parse as an array of strings.") try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) if not isinstance(data, list): logger.error("JSON file must contain a list/array.") return 0 items_to_add = [str(item).strip() for item in data if str(item).strip()] pipe = self.redis.pipeline() for item in items_to_add: pipe.rpush(queue_name, item) count += 1 if count % 1000 == 0: pipe.execute() logger.info(f"Pushed {count} items...") pipe.execute() except (IOError, json.JSONDecodeError) as e: logger.error(f"Failed to read or parse JSON file '{file_path}': {e}") return 0 else: logger.info("Reading items from text file (one per line).") try: with open(file_path, 'r', encoding='utf-8') as f: pipe = self.redis.pipeline() for line in f: item = line.strip() if item: pipe.rpush(queue_name, item) count += 1 if count % 1000 == 0: pipe.execute() logger.info(f"Pushed {count} items...") pipe.execute() except IOError as e: logger.error(f"Failed to read file '{file_path}': {e}") return 0 logger.info(f"Finished. Pushed a total of {count} items to '{queue_name}'.") return count def clear(self, queue_name: str, dump_path: Optional[str] = None) -> int: """Clears a queue, optionally dumping its contents to a file.""" size = self.redis.llen(queue_name) if size == 0: logger.info(f"Queue '{queue_name}' is already empty.") return 0 if dump_path: logger.info(f"Dumping {size} items from '{queue_name}' to '{dump_path}'...") with open(dump_path, 'w') as f: # Use lpop to be memory efficient for very large queues while True: item = self.redis.lpop(queue_name) if item is None: break f.write(item + '\n') logger.info("Dump complete.") # After lpop, the queue is already empty. return size deleted_count = self.redis.delete(queue_name) if deleted_count > 0: logger.info(f"Cleared queue '{queue_name}' ({size} items).") return size def add_queue_manager_parser(subparsers): """Adds the parser for the 'queue' command.""" parser = subparsers.add_parser( 'queue', description='Manage Redis queues.', formatter_class=argparse.RawTextHelpFormatter, help='Manage Redis queues.' ) # Common arguments for all queue manager subcommands common_parser = argparse.ArgumentParser(add_help=False) common_parser.add_argument('--env-file', help='Path to a .env file to load environment variables from.') common_parser.add_argument('--env', default='dev', help="Environment name for queue prefixes (e.g., 'stg', 'prod'). Defaults to 'dev'.") common_parser.add_argument('--redis-host', default=None, help='Redis host. Defaults to REDIS_HOST or MASTER_HOST_IP env var, or localhost.') common_parser.add_argument('--redis-port', type=int, default=None, help='Redis port. Defaults to REDIS_PORT env var, or 6379.') common_parser.add_argument('--redis-password', default=None, help='Redis password. Defaults to REDIS_PASSWORD env var.') common_parser.add_argument('--verbose', action='store_true', help='Enable verbose logging') subparsers = parser.add_subparsers(dest='queue_command', help='Command to execute', required=True) # List command list_parser = subparsers.add_parser('list', help='List queues and their sizes.', parents=[common_parser]) list_parser.add_argument('--pattern', default='*queue*', help="Pattern to search for queue keys (default: '*queue*')") # Peek command peek_parser = subparsers.add_parser('peek', help='View items in a queue without removing them.', parents=[common_parser]) peek_parser.add_argument('queue_name', nargs='?', help="Name of the queue. Defaults to '_stress_inbox'.") peek_parser.add_argument('--count', type=int, default=10, help='Number of items to show (default: 10)') # Populate command populate_parser = subparsers.add_parser('populate', help='Populate a queue from a file (one item per line).', parents=[common_parser]) populate_parser.add_argument('file_path', help='Path to the file containing items to add.') populate_parser.add_argument('--queue-name', help="Name of the queue to populate. Defaults to '_stress_inbox'.") # Clear command clear_parser = subparsers.add_parser('clear', help='Clear a queue, optionally dumping its contents.', parents=[common_parser]) clear_parser.add_argument('queue_name', nargs='?', help="Name of the queue to clear. Defaults to '_stress_inbox'.") clear_parser.add_argument('--dump-to', help='File path to dump queue contents before clearing.') clear_parser.add_argument('--confirm', action='store_true', help='Confirm this destructive action (required).') return parser def main_queue_manager(args): """Main dispatcher for 'queue' command.""" if load_dotenv: was_loaded = load_dotenv(args.env_file) if was_loaded: print(f"Loaded environment variables from {args.env_file or '.env file'}", file=sys.stderr) elif args.env_file: print(f"ERROR: The specified --env-file was not found: {args.env_file}", file=sys.stderr) return 1 if args.redis_host is None: args.redis_host = os.getenv('REDIS_HOST', os.getenv('MASTER_HOST_IP', 'localhost')) if args.redis_port is None: args.redis_port = int(os.getenv('REDIS_PORT', 6379)) if args.redis_password is None: args.redis_password = os.getenv('REDIS_PASSWORD') if args.verbose: logging.getLogger().setLevel(logging.DEBUG) manager = QueueManager( redis_host=args.redis_host, redis_port=args.redis_port, redis_password=args.redis_password ) # For commands that operate on a single queue, set a default name based on the environment if not provided. is_single_queue_command = args.queue_command in ['peek', 'populate', 'clear'] if is_single_queue_command: # `populate` uses an option (--queue-name), while `peek` and `clear` use a positional argument. # We check for `queue_name` attribute and if it's falsy (None or empty string). if not getattr(args, 'queue_name', None): default_queue_name = f"{args.env}_stress_inbox" args.queue_name = default_queue_name print(f"INFO: No queue name specified, defaulting to '{default_queue_name}' based on --env='{args.env}'.", file=sys.stderr) if args.queue_command == 'list': queues = manager.list_queues(args.pattern) if not queues: print(f"No queues found matching pattern '{args.pattern}'.") return 0 if tabulate: print(tabulate(queues, headers='keys', tablefmt='grid')) else: for q in queues: print(f"{q['name']}: {q['size']}") return 0 elif args.queue_command == 'peek': size = manager.count(args.queue_name) items = manager.peek(args.queue_name, args.count) print(f"Queue '{args.queue_name}' has {size} items. Showing top {len(items)}:") for i, item in enumerate(items): print(f"{i+1: >3}: {item}") return 0 elif args.queue_command == 'populate': if not os.path.exists(args.file_path): print(f"Error: File not found at '{args.file_path}'", file=sys.stderr) return 1 manager.populate(args.queue_name, args.file_path) return 0 elif args.queue_command == 'clear': if not args.confirm: print("Error: --confirm flag is required for this destructive action.", file=sys.stderr) return 1 manager.clear(args.queue_name, args.dump_to) return 0 return 1 # Should not be reached