#!/usr/bin/env python3 """ Redis Queue Management CLI Tool for yt-ops-client. """ import argparse import hashlib import json import logging import os import sys from typing import Optional import redis try: from dotenv import load_dotenv except ImportError: load_dotenv = None try: from tabulate import tabulate except ImportError: print("'tabulate' library not found. Please install it with: pip install tabulate", file=sys.stderr) tabulate = None try: import yaml except ImportError: print("PyYAML is not installed. Please install it with: pip install PyYAML", file=sys.stderr) yaml = None from pathlib import Path import fnmatch # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def _find_configured_queues(policies_dir="policies", env=None): """Scans YAML files in a directory to find configured queue names.""" if not yaml: return set() expected_queues = set() policies_path = Path(policies_dir) if not policies_path.is_dir(): logger.debug(f"Policies directory '{policies_dir}' not found, cannot find expected queues.") return set() for policy_file in policies_path.glob("*.yaml"): try: with open(policy_file, 'r', encoding='utf-8') as f: policy_data = yaml.safe_load(f) if not isinstance(policy_data, dict): continue queue_policy = policy_data.get('queue_policy') if not isinstance(queue_policy, dict): continue use_prefix = queue_policy.get('use_env_prefix', True) prefix = "" if use_prefix and env: prefix = f"{env}_" for key, value in queue_policy.items(): if key.endswith('_queue') and isinstance(value, str): expected_queues.add(f"{prefix}{value}") except (IOError, yaml.YAMLError) as e: logger.debug(f"Could not parse policy {policy_file} to find queues: {e}") continue return expected_queues class QueueManager: """Manages Redis lists (queues).""" def _push_state_key(self, queue_name: str, file_path: str) -> str: """Get Redis key for storing the last pushed index for a given queue and file.""" # Use a hash of the absolute file path to create a consistent, safe key. abs_path = os.path.abspath(file_path) path_hash = hashlib.sha256(abs_path.encode()).hexdigest() return f"ytops_client:queue_push_state:{queue_name}:{path_hash}" def __init__(self, redis_host='localhost', redis_port=6379, redis_password=None): """Initialize Redis connection.""" logger.info(f"Attempting to connect to Redis at {redis_host}:{redis_port}...") try: self.redis = redis.Redis( host=redis_host, port=redis_port, password=redis_password, decode_responses=True, socket_connect_timeout=5, socket_timeout=5 ) self.redis.ping() logger.info(f"Successfully connected to Redis.") except redis.exceptions.ConnectionError as e: logger.error(f"Failed to connect to Redis at {redis_host}:{redis_port}: {e}") sys.exit(1) def list_queues(self, pattern: str): """Lists queues matching a pattern and their sizes.""" queues = [] for key in self.redis.scan_iter(match=pattern): key_type = self.redis.type(key) if key_type == 'list': size = self.redis.llen(key) queues.append({'name': key, 'size': size}) return queues def peek(self, queue_name: str, count: int): """Returns the top `count` items from a queue without removing them.""" return self.redis.lrange(queue_name, 0, count - 1) def count(self, queue_name: str) -> int: """Returns the number of items in a queue.""" return self.redis.llen(queue_name) def push_from_file(self, queue_name: str, file_path: str, wrap_key: Optional[str] = None, limit: Optional[int] = None, start_index: Optional[int] = None, auto_shift: bool = False) -> int: """Populates a queue from a file (text with one item per line, or JSON with an array of items).""" count = 0 # --- State management for file position --- state_key = None current_start_index = 0 # 0-based index if auto_shift: state_key = self._push_state_key(queue_name, file_path) last_index_str = self.redis.get(state_key) if last_index_str: current_start_index = int(last_index_str) logger.info(f"Auto-shift enabled. Resuming from line {current_start_index + 1}.") elif start_index is not None: # CLI provides 1-based index, convert to 0-based. current_start_index = max(0, start_index - 1) logger.info(f"Starting from line {current_start_index + 1} as requested.") # --- items_to_add = [] total_items_in_file = 0 if file_path.lower().endswith('.json'): if wrap_key: logger.warning("--wrap-file-line-in-json is ignored for JSON files, as they are expected to contain complete items.") logger.info("Detected JSON file. Attempting to parse as an array of items.") try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) if not isinstance(data, list): logger.error("JSON file must contain a list/array.") return 0 total_items_in_file = len(data) if current_start_index >= total_items_in_file: logger.info(f"Start index ({current_start_index + 1}) is past the end of the file ({total_items_in_file} items). Nothing to push.") return 0 items_to_process = data[current_start_index:] if limit is not None and limit >= 0: items_to_process = items_to_process[:limit] # Items can be strings or objects. If objects, they should be converted to JSON strings. for item in items_to_process: if isinstance(item, str): items_to_add.append(item.strip()) else: items_to_add.append(json.dumps(item)) items_to_add = [item for item in items_to_add if item] except (IOError, json.JSONDecodeError) as e: logger.error(f"Failed to read or parse JSON file '{file_path}': {e}") return 0 else: logger.info("Reading items from text file (one per line).") try: with open(file_path, 'r', encoding='utf-8') as f: all_lines = f.readlines() total_items_in_file = len(all_lines) if current_start_index >= total_items_in_file: logger.info(f"Start index ({current_start_index + 1}) is past the end of the file ({total_items_in_file} lines). Nothing to push.") return 0 lines_to_process = all_lines[current_start_index:] if limit is not None and limit >= 0: lines_to_process = lines_to_process[:limit] for line in lines_to_process: item = line.strip() if item: if wrap_key: payload = json.dumps({wrap_key: item}) else: payload = item items_to_add.append(payload) except IOError as e: logger.error(f"Failed to read file '{file_path}': {e}") return 0 if items_to_add: pipe = self.redis.pipeline() for item in items_to_add: pipe.rpush(queue_name, item) count += 1 if count > 0 and count % 1000 == 0: pipe.execute() logger.info(f"Pushed {count} of {len(items_to_add)} items...") pipe.execute() if auto_shift and state_key: new_index = current_start_index + count # Don't save a new index if we've reached the end of the file. # This allows re-running the command to start from the beginning again. if new_index >= total_items_in_file: self.redis.delete(state_key) logger.info(f"Auto-shift: Reached end of file. Cleared saved position for '{os.path.basename(file_path)}'. Next run will start from the beginning.") else: self.redis.set(state_key, new_index) logger.info(f"Auto-shift: Saved next start position for '{os.path.basename(file_path)}' as line {new_index + 1}.") logger.info(f"Finished. Pushed a total of {count} items to '{queue_name}'.") return count def push_generated(self, queue_name: str, prefix: str, count: int) -> int: """Pushes generated payloads to a queue.""" from datetime import datetime timestamp = datetime.now().strftime('%Y%m%dt%H%M') pipe = self.redis.pipeline() pushed_count = 0 for i in range(count): generated_value = f"{prefix}_{timestamp}_{i:04d}" payload = json.dumps({"url": generated_value}) pipe.rpush(queue_name, payload) pushed_count += 1 if pushed_count > 0 and pushed_count % 1000 == 0: pipe.execute() logger.info(f"Pushed {pushed_count} of {count} items...") pipe.execute() logger.info(f"Finished. Pushed a total of {pushed_count} items to '{queue_name}'.") return pushed_count def push_static(self, queue_name: str, payload: str, count: int) -> int: """Pushes a static payload multiple times to a queue.""" try: json.loads(payload) except json.JSONDecodeError: logger.error(f"Invalid JSON in --payload-json: {payload}") return 0 pipe = self.redis.pipeline() pushed_count = 0 for _ in range(count): pipe.rpush(queue_name, payload) pushed_count += 1 if pushed_count > 0 and pushed_count % 1000 == 0: pipe.execute() logger.info(f"Pushed {pushed_count} of {count} items...") pipe.execute() logger.info(f"Finished. Pushed a total of {pushed_count} items to '{queue_name}'.") return pushed_count def clear(self, queue_name: str, dump_path: Optional[str] = None) -> int: """Clears a queue, optionally dumping its contents to a file.""" size = self.redis.llen(queue_name) if size == 0: logger.info(f"Queue '{queue_name}' is already empty.") return 0 if dump_path: logger.info(f"Dumping {size} items from '{queue_name}' to '{dump_path}'...") with open(dump_path, 'w') as f: # Use lpop to be memory efficient for very large queues while True: item = self.redis.lpop(queue_name) if item is None: break f.write(item + '\n') logger.info("Dump complete.") # After lpop, the queue is already empty. return size deleted_count = self.redis.delete(queue_name) if deleted_count > 0: logger.info(f"Cleared queue '{queue_name}' ({size} items).") return size def add_queue_manager_parser(subparsers): """Adds the parser for the 'queue' command.""" parser = subparsers.add_parser( 'queue', description='Manage Redis queues.', formatter_class=argparse.RawTextHelpFormatter, help='Manage Redis queues.' ) # Common arguments for all queue manager subcommands common_parser = argparse.ArgumentParser(add_help=False) common_parser.add_argument('--env-file', help='Path to a .env file to load environment variables from.') common_parser.add_argument('--env', default='dev', help="Environment name for queue prefixes (e.g., 'stg', 'prod'). Defaults to 'dev'.") common_parser.add_argument('--redis-host', default=None, help='Redis host. Defaults to REDIS_HOST or MASTER_HOST_IP env var, or localhost.') common_parser.add_argument('--redis-port', type=int, default=None, help='Redis port. Defaults to REDIS_PORT env var, or 6379.') common_parser.add_argument('--redis-password', default=None, help='Redis password. Defaults to REDIS_PASSWORD env var.') common_parser.add_argument('--verbose', action='store_true', help='Enable verbose logging') subparsers = parser.add_subparsers(dest='queue_command', help='Command to execute', required=True) # List command list_parser = subparsers.add_parser('list', help='List queues and their sizes.', parents=[common_parser]) list_parser.add_argument('--pattern', default='*queue*', help="Pattern to search for queue keys (default: '*queue*')") # Peek command peek_parser = subparsers.add_parser('peek', help='View items in a queue without removing them.', parents=[common_parser]) peek_parser.add_argument('queue_name', nargs='?', help="Name of the queue. Defaults to '_stress_inbox'.") peek_parser.add_argument('--count', type=int, default=10, help='Number of items to show (default: 10)') # Push command push_parser = subparsers.add_parser('push', help='Push items to a queue from a file, a generator, or a static payload.', parents=[common_parser]) push_parser.add_argument('queue_name', nargs='?', help="Name of the queue. Defaults to '_stress_inbox'.") push_parser.add_argument('--count', type=int, default=None, help='Number of items to push. For --from-file, limits the number of lines pushed. For other sources, specifies how many items to generate/push (defaults to 1).') shift_group = push_parser.add_mutually_exclusive_group() shift_group.add_argument('--start', type=int, help='For --from-file, start pushing from this line number (1-based).') shift_group.add_argument('--auto-shift', action='store_true', help="For --from-file, automatically resume from where the last push left off. State is stored in Redis.") source_group = push_parser.add_mutually_exclusive_group(required=True) source_group.add_argument('--from-file', dest='file_path', help='Path to a file containing items to add (one per line, or a JSON array).') source_group.add_argument('--payload-json', help='A static JSON payload to push. Use with --count to push multiple times.') source_group.add_argument('--generate-payload-prefix', help='Generate JSON payloads with a timestamp and counter. Example: {"url": "PREFIX_yyyymmddthhmm_0001"}. Use with --count.') push_parser.add_argument('--wrap-file-line-in-json', metavar='KEY', help="For text files (--from-file), wrap each line in a JSON object with the specified key (e.g., 'url' -> {\"url\": \"line_content\"}).") # Clear command clear_parser = subparsers.add_parser('clear', help='Clear a queue, optionally dumping its contents.', parents=[common_parser]) clear_parser.add_argument('queue_name', nargs='?', help="Name of the queue to clear. Defaults to '_stress_inbox'.") clear_parser.add_argument('--dump-to', help='File path to dump queue contents before clearing.') clear_parser.add_argument('--confirm', action='store_true', help='Confirm this destructive action (required).') return parser def main_queue_manager(args): """Main dispatcher for 'queue' command.""" if load_dotenv: was_loaded = load_dotenv(args.env_file) if was_loaded: print(f"Loaded environment variables from {args.env_file or '.env file'}", file=sys.stderr) elif args.env_file: print(f"ERROR: The specified --env-file was not found: {args.env_file}", file=sys.stderr) return 1 if args.redis_host is None: args.redis_host = os.getenv('REDIS_HOST', os.getenv('MASTER_HOST_IP', 'localhost')) if args.redis_port is None: args.redis_port = int(os.getenv('REDIS_PORT', 6379)) if args.redis_password is None: args.redis_password = os.getenv('REDIS_PASSWORD') if args.verbose: logging.getLogger().setLevel(logging.DEBUG) manager = QueueManager( redis_host=args.redis_host, redis_port=args.redis_port, redis_password=args.redis_password ) # For commands that operate on a single queue, set a default name based on the environment if not provided. is_single_queue_command = args.queue_command in ['peek', 'push', 'clear'] if is_single_queue_command: # `push`, `peek` and `clear` use a positional argument for queue_name. # We check for `queue_name` attribute and if it's falsy (None or empty string). if not getattr(args, 'queue_name', None): default_queue_name = f"{args.env}_stress_inbox" args.queue_name = default_queue_name print(f"INFO: No queue name specified, defaulting to '{default_queue_name}' based on --env='{args.env}'.", file=sys.stderr) if args.queue_command == 'list': queues_from_redis = manager.list_queues(args.pattern) # Discover queues from policy files expected_queues_from_policies = _find_configured_queues(env=args.env) # Merge Redis results with policy-defined queues all_queues_map = {q['name']: q for q in queues_from_redis} for q_name in expected_queues_from_policies: if q_name not in all_queues_map: # Only add if it matches the pattern filter if fnmatch.fnmatch(q_name, args.pattern): all_queues_map[q_name] = {'name': q_name, 'size': 0} queues = sorted(list(all_queues_map.values()), key=lambda x: x['name']) if not queues: print(f"No queues found matching pattern '{args.pattern}'.") return 0 if tabulate: print(tabulate(queues, headers='keys', tablefmt='grid')) else: for q in queues: print(f"{q['name']}: {q['size']}") return 0 elif args.queue_command == 'peek': size = manager.count(args.queue_name) items = manager.peek(args.queue_name, args.count) print(f"Queue '{args.queue_name}' has {size} items. Showing top {len(items)}:") for i, item in enumerate(items): print(f"{i+1: >3}: {item}") return 0 elif args.queue_command == 'push': if args.file_path: if not os.path.exists(args.file_path): print(f"Error: File not found at '{args.file_path}'", file=sys.stderr) return 1 manager.push_from_file( args.queue_name, args.file_path, args.wrap_file_line_in_json, limit=args.count, start_index=args.start, auto_shift=args.auto_shift ) elif args.payload_json: count = args.count if args.count is not None else 1 manager.push_static(args.queue_name, args.payload_json, count) elif args.generate_payload_prefix: count = args.count if args.count is not None else 1 if count <= 0: print("Error: --count must be 1 or greater for --generate-payload-prefix.", file=sys.stderr) return 1 manager.push_generated(args.queue_name, args.generate_payload_prefix, count) return 0 elif args.queue_command == 'clear': if not args.confirm: print("Error: --confirm flag is required for this destructive action.", file=sys.stderr) return 1 manager.clear(args.queue_name, args.dump_to) return 0 return 1 # Should not be reached