From 93d485db08a3cf8d3cd6683ab68f7633c7700c87 Mon Sep 17 00:00:00 2001 From: aperez Date: Tue, 13 Jan 2026 11:47:21 +0300 Subject: [PATCH] Updates on centralized of configuration to cluster.xml --- ansible/playbook-README.md | 12 + ansible/playbook-proxies.yml | 21 +- ansible/playbook-stress-lifecycle.yml | 361 +++++++++++++++--- tools/generate-inventory.py | 23 +- tools/generate-profile-setup-policy.py | 140 +++++++ .../10_direct_docker_auth_simulation.yaml | 18 +- .../11_direct_docker_download_simulation.yaml | 16 +- .../policies/12_queue_auth_simulation.yaml | 15 +- .../13_queue_download_simulation.yaml | 15 +- .../policies/6_profile_setup_policy.yaml | 41 +- .../8_unified_simulation_enforcer.yaml | 94 ++--- .../ytops_client/policy_enforcer_tool.py | 66 ++++ .../ytops_client/profile_setup_tool.py | 26 ++ .../ytops_client/stress_policy_tool.py | 301 ++++++++++++++- 14 files changed, 964 insertions(+), 185 deletions(-) create mode 100755 tools/generate-profile-setup-policy.py diff --git a/ansible/playbook-README.md b/ansible/playbook-README.md index 9b67956..4567cf4 100644 --- a/ansible/playbook-README.md +++ b/ansible/playbook-README.md @@ -87,9 +87,21 @@ ansible-playbook ansible/playbook-stress-manage-processes.yml -i ansible/invento # Start auth generators and download simulators on all workers ansible-playbook ansible/playbook-stress-lifecycle.yml -i ansible/inventory.green.ini -e "action=start" +# Start ONLY auth generators on workers +ansible-playbook ansible/playbook-stress-lifecycle.yml -i ansible/inventory.green.ini -e "action=start-auth" + +# Start ONLY download simulators on workers +ansible-playbook ansible/playbook-stress-lifecycle.yml -i ansible/inventory.green.ini -e "action=start-download" + # Stop all processes on workers ansible-playbook ansible/playbook-stress-lifecycle.yml -i ansible/inventory.green.ini -e "action=stop" +# Stop ONLY auth generators on workers +ansible-playbook ansible/playbook-stress-lifecycle.yml -i ansible/inventory.green.ini -e "action=stop-auth" + +# Stop ONLY download simulators on workers +ansible-playbook ansible/playbook-stress-lifecycle.yml -i ansible/inventory.green.ini -e "action=stop-download" + # Check status of all worker processes ansible-playbook ansible/playbook-stress-lifecycle.yml -i ansible/inventory.green.ini -e "action=status" ``` diff --git a/ansible/playbook-proxies.yml b/ansible/playbook-proxies.yml index 749889a..8c9f697 100644 --- a/ansible/playbook-proxies.yml +++ b/ansible/playbook-proxies.yml @@ -23,6 +23,17 @@ group: "{{ deploy_group }}" mode: '0755' + - name: Filter proxy configurations for this worker + ansible.builtin.set_fact: + worker_specific_proxies: >- + {% set result = {} -%} + {% for proxy_name in worker_proxies -%} + {% if proxy_name in shadowsocks_proxies -%} + {%- set _ = result.update({proxy_name: shadowsocks_proxies[proxy_name]}) -%} + {% endif -%} + {% endfor -%} + {{ result }} + - name: Create individual proxy config directories ansible.builtin.file: path: "/srv/shadowsocks-rust/config_ssp_{{ item.value.local_port }}" @@ -30,7 +41,7 @@ owner: "{{ ansible_user }}" group: "{{ deploy_group }}" mode: '0755' - loop: "{{ shadowsocks_proxies | dict2items }}" + loop: "{{ worker_specific_proxies | dict2items }}" - name: Create Shadowsocks-Rust proxy configuration files ansible.builtin.copy: @@ -50,7 +61,7 @@ owner: "{{ ansible_user }}" group: "{{ deploy_group }}" mode: '0644' - loop: "{{ shadowsocks_proxies | dict2items }}" + loop: "{{ worker_specific_proxies | dict2items }}" - name: Create docker-compose.yml for Shadowsocks-Rust proxies ansible.builtin.template: @@ -59,6 +70,8 @@ owner: "{{ ansible_user }}" group: "{{ deploy_group }}" mode: '0644' + vars: + shadowsocks_proxies: "{{ worker_specific_proxies }}" - name: Ensure old docker-compose.yaml file is removed to avoid conflicts ansible.builtin.file: @@ -74,7 +87,7 @@ docker stop "${container_id}" >/dev/null 2>&1 || true docker rm -f "${container_id}" >/dev/null 2>&1 || true fi - loop: "{{ shadowsocks_proxies | dict2items }}" + loop: "{{ worker_specific_proxies | dict2items }}" register: stop_conflicting_containers changed_when: "'Stopping and removing it' in stop_conflicting_containers.stdout" loop_control: @@ -92,5 +105,5 @@ remove_orphans: true recreate: always pull: "{{ 'never' if fast_deploy | default(false) else 'missing' }}" - when: shadowsocks_proxies is defined and shadowsocks_proxies | length > 0 + when: worker_proxies is defined and worker_proxies | length > 0 become: yes diff --git a/ansible/playbook-stress-lifecycle.yml b/ansible/playbook-stress-lifecycle.yml index 8fdd7ee..e798e5b 100644 --- a/ansible/playbook-stress-lifecycle.yml +++ b/ansible/playbook-stress-lifecycle.yml @@ -4,7 +4,7 @@ gather_facts: no vars: # Default action - action: "status" # Available actions: start, stop, status + action: "status" # Available actions: start, stop, status, start-auth, stop-auth, start-download, stop-download, stop-generator tasks: - name: "Start all configured generators and simulators" @@ -15,37 +15,242 @@ combined_prefixes: "{{ profile_prefixes | default([]) | join(',') }}" when: profile_prefixes is defined and profile_prefixes | length > 0 - - name: "Start single auth generator for all profiles: {{ combined_prefixes | default('none') }}" + - name: "Start auth generator(s)" + when: profile_prefixes is defined and profile_prefixes | length > 0 + block: + - name: "Start single auth generator for all profiles: {{ combined_prefixes | default('none') }}" + ansible.builtin.command: >- + ansible-playbook {{ playbook_dir }}/playbook-stress-auth-generator.yml + -i {{ inventory_file }} + --limit {{ inventory_hostname }} + -e "start_generator=true" + -e "profile_prefix={{ combined_prefixes }}" + {% if dummy_batch is defined %}-e "dummy_batch={{ dummy_batch }}"{% endif %} + {% if auth_min_seconds is defined %}-e "auth_min_seconds={{ auth_min_seconds }}"{% endif %} + {% if auth_max_seconds is defined %}-e "auth_max_seconds={{ auth_max_seconds }}"{% endif %} + {% if batch_size is defined %}-e "batch_size={{ batch_size }}"{% endif %} + {% if create_download_tasks is defined %}-e "create_download_tasks={{ create_download_tasks }}"{% endif %} + {% if formats_to_download is defined %}-e "formats_to_download={{ formats_to_download }}"{% endif %} + delegate_to: localhost + changed_when: true + when: (auth_workers_per_profile | default(0) | int == 0) and (auth_workers_total | default(0) | int > 0) + + - name: "Start parallel auth generators for each profile" + ansible.builtin.command: >- + ansible-playbook {{ playbook_dir }}/playbook-stress-auth-generator.yml + -i {{ inventory_file }} + --limit {{ inventory_hostname }} + -e "start_generator=true" + -e "profile_prefix={{ item }}" + {% if dummy_batch is defined %}-e "dummy_batch={{ dummy_batch }}"{% endif %} + {% if auth_min_seconds is defined %}-e "auth_min_seconds={{ auth_min_seconds }}"{% endif %} + {% if auth_max_seconds is defined %}-e "auth_max_seconds={{ auth_max_seconds }}"{% endif %} + {% if batch_size is defined %}-e "batch_size={{ batch_size }}"{% endif %} + {% if create_download_tasks is defined %}-e "create_download_tasks={{ create_download_tasks }}"{% endif %} + {% if formats_to_download is defined %}-e "formats_to_download={{ formats_to_download }}"{% endif %} + delegate_to: localhost + changed_when: true + loop: "{{ profile_prefixes }}" + loop_control: + loop_var: item + label: "profile: {{ item }}" + when: auth_workers_per_profile | default(0) | int > 0 + + - name: "Start download simulator(s)" + when: profile_prefixes is defined and profile_prefixes | length > 0 + block: + - name: "Start single download simulator for all profiles: {{ combined_prefixes | default('none') }}" + ansible.builtin.command: >- + ansible-playbook {{ playbook_dir }}/playbook-stress-download-simulation.yml + -i {{ inventory_file }} + --limit {{ inventory_hostname }} + -e "start_download=true" + -e "profile_prefix={{ combined_prefixes }}" + {% if dummy_batch is defined %}-e "dummy_batch={{ dummy_batch }}"{% endif %} + {% if download_min_seconds is defined %}-e "download_min_seconds={{ download_min_seconds }}"{% endif %} + {% if download_max_seconds is defined %}-e "download_max_seconds={{ download_max_seconds }}"{% endif %} + {% if extra_set_args is defined %}-e 'extra_set_args={{ extra_set_args | to_json }}'{% endif %} + delegate_to: localhost + changed_when: true + when: (download_workers_per_profile | default(0) | int == 0) and (download_workers_total | default(0) | int > 0) + + - name: "Start parallel download simulators for each profile" + ansible.builtin.command: >- + ansible-playbook {{ playbook_dir }}/playbook-stress-download-simulation.yml + -i {{ inventory_file }} + --limit {{ inventory_hostname }} + -e "start_download=true" + -e "profile_prefix={{ item }}" + {% if dummy_batch is defined %}-e "dummy_batch={{ dummy_batch }}"{% endif %} + {% if download_min_seconds is defined %}-e "download_min_seconds={{ download_min_seconds }}"{% endif %} + {% if download_max_seconds is defined %}-e "download_max_seconds={{ download_max_seconds }}"{% endif %} + {% if extra_set_args is defined %}-e 'extra_set_args={{ extra_set_args | to_json }}'{% endif %} + delegate_to: localhost + changed_when: true + loop: "{{ profile_prefixes }}" + loop_control: + loop_var: item + label: "profile: {{ item }}" + when: download_workers_per_profile | default(0) | int > 0 + + - name: "Start only auth generators on workers" + when: action == "start-auth" + block: + - name: "Set combined profile prefixes string" + ansible.builtin.set_fact: + combined_prefixes: "{{ profile_prefixes | default([]) | join(',') }}" + when: profile_prefixes is defined and profile_prefixes | length > 0 + + - name: "Start auth generator(s)" + when: profile_prefixes is defined and profile_prefixes | length > 0 + block: + - name: "Start single auth generator for all profiles: {{ combined_prefixes | default('none') }}" + ansible.builtin.command: >- + ansible-playbook {{ playbook_dir }}/playbook-stress-auth-generator.yml + -i {{ inventory_file }} + --limit {{ inventory_hostname }} + -e "start_generator=true" + -e "profile_prefix={{ combined_prefixes }}" + {% if dummy_batch is defined %}-e "dummy_batch={{ dummy_batch }}"{% endif %} + {% if auth_min_seconds is defined %}-e "auth_min_seconds={{ auth_min_seconds }}"{% endif %} + {% if auth_max_seconds is defined %}-e "auth_max_seconds={{ auth_max_seconds }}"{% endif %} + {% if batch_size is defined %}-e "batch_size={{ batch_size }}"{% endif %} + {% if create_download_tasks is defined %}-e "create_download_tasks={{ create_download_tasks }}"{% endif %} + {% if formats_to_download is defined %}-e "formats_to_download={{ formats_to_download }}"{% endif %} + delegate_to: localhost + changed_when: true + when: (auth_workers_per_profile | default(0) | int == 0) and (auth_workers_total | default(0) | int > 0) + + - name: "Start parallel auth generators for each profile" + ansible.builtin.command: >- + ansible-playbook {{ playbook_dir }}/playbook-stress-auth-generator.yml + -i {{ inventory_file }} + --limit {{ inventory_hostname }} + -e "start_generator=true" + -e "profile_prefix={{ item }}" + {% if dummy_batch is defined %}-e "dummy_batch={{ dummy_batch }}"{% endif %} + {% if auth_min_seconds is defined %}-e "auth_min_seconds={{ auth_min_seconds }}"{% endif %} + {% if auth_max_seconds is defined %}-e "auth_max_seconds={{ auth_max_seconds }}"{% endif %} + {% if batch_size is defined %}-e "batch_size={{ batch_size }}"{% endif %} + {% if create_download_tasks is defined %}-e "create_download_tasks={{ create_download_tasks }}"{% endif %} + {% if formats_to_download is defined %}-e "formats_to_download={{ formats_to_download }}"{% endif %} + delegate_to: localhost + changed_when: true + loop: "{{ profile_prefixes }}" + loop_control: + loop_var: item + label: "profile: {{ item }}" + when: auth_workers_per_profile | default(0) | int > 0 + + - name: "Start only download simulators on workers" + when: action == "start-download" + block: + - name: "Set combined profile prefixes string" + ansible.builtin.set_fact: + combined_prefixes: "{{ profile_prefixes | default([]) | join(',') }}" + when: profile_prefixes is defined and profile_prefixes | length > 0 + + - name: "Start download simulator(s)" + when: profile_prefixes is defined and profile_prefixes | length > 0 + block: + - name: "Start single download simulator for all profiles: {{ combined_prefixes | default('none') }}" + ansible.builtin.command: >- + ansible-playbook {{ playbook_dir }}/playbook-stress-download-simulation.yml + -i {{ inventory_file }} + --limit {{ inventory_hostname }} + -e "start_download=true" + -e "profile_prefix={{ combined_prefixes }}" + {% if dummy_batch is defined %}-e "dummy_batch={{ dummy_batch }}"{% endif %} + {% if download_min_seconds is defined %}-e "download_min_seconds={{ download_min_seconds }}"{% endif %} + {% if download_max_seconds is defined %}-e "download_max_seconds={{ download_max_seconds }}"{% endif %} + {% if extra_set_args is defined %}-e 'extra_set_args={{ extra_set_args | to_json }}'{% endif %} + delegate_to: localhost + changed_when: true + when: (download_workers_per_profile | default(0) | int == 0) and (download_workers_total | default(0) | int > 0) + + - name: "Start parallel download simulators for each profile" + ansible.builtin.command: >- + ansible-playbook {{ playbook_dir }}/playbook-stress-download-simulation.yml + -i {{ inventory_file }} + --limit {{ inventory_hostname }} + -e "start_download=true" + -e "profile_prefix={{ item }}" + {% if dummy_batch is defined %}-e "dummy_batch={{ dummy_batch }}"{% endif %} + {% if download_min_seconds is defined %}-e "download_min_seconds={{ download_min_seconds }}"{% endif %} + {% if download_max_seconds is defined %}-e "download_max_seconds={{ download_max_seconds }}"{% endif %} + {% if extra_set_args is defined %}-e 'extra_set_args={{ extra_set_args | to_json }}'{% endif %} + delegate_to: localhost + changed_when: true + loop: "{{ profile_prefixes }}" + loop_control: + loop_var: item + label: "profile: {{ item }}" + when: download_workers_per_profile | default(0) | int > 0 + + - name: "Stop only auth generators on workers (via playbook call)" + when: action == "stop-generator" + block: + - name: "Set combined profile prefixes string" + ansible.builtin.set_fact: + combined_prefixes: "{{ profile_prefixes | default([]) | join(',') }}" + when: profile_prefixes is defined and profile_prefixes | length > 0 + + - name: "Stop single auth generator for profiles: {{ combined_prefixes | default('none') }}" ansible.builtin.command: >- ansible-playbook {{ playbook_dir }}/playbook-stress-auth-generator.yml -i {{ inventory_file }} --limit {{ inventory_hostname }} - -e "start_generator=true" + -e "stop_generator=true" -e "profile_prefix={{ combined_prefixes }}" - {% if dummy_batch is defined %}-e "dummy_batch={{ dummy_batch }}"{% endif %} - {% if auth_min_seconds is defined %}-e "auth_min_seconds={{ auth_min_seconds }}"{% endif %} - {% if auth_max_seconds is defined %}-e "auth_max_seconds={{ auth_max_seconds }}"{% endif %} - {% if batch_size is defined %}-e "batch_size={{ batch_size }}"{% endif %} - {% if create_download_tasks is defined %}-e "create_download_tasks={{ create_download_tasks }}"{% endif %} - {% if formats_to_download is defined %}-e "formats_to_download={{ formats_to_download }}"{% endif %} delegate_to: localhost changed_when: true when: profile_prefixes is defined and profile_prefixes | length > 0 - - name: "Start single download simulator for all profiles: {{ combined_prefixes | default('none') }}" - ansible.builtin.command: >- - ansible-playbook {{ playbook_dir }}/playbook-stress-download-simulation.yml - -i {{ inventory_file }} - --limit {{ inventory_hostname }} - -e "start_download=true" - -e "profile_prefix={{ combined_prefixes }}" - {% if dummy_batch is defined %}-e "dummy_batch={{ dummy_batch }}"{% endif %} - {% if download_min_seconds is defined %}-e "download_min_seconds={{ download_min_seconds }}"{% endif %} - {% if download_max_seconds is defined %}-e "download_max_seconds={{ download_max_seconds }}"{% endif %} - {% if extra_set_args is defined %}-e 'extra_set_args={{ extra_set_args | to_json }}'{% endif %} - delegate_to: localhost - changed_when: true - when: profile_prefixes is defined and profile_prefixes | length > 0 + - name: "Stop only auth generators on workers" + when: action == "stop-auth" + block: + - name: Kill all auth generator tmux sessions on this worker + ansible.builtin.shell: + cmd: | + for session in $(tmux list-sessions -F "#{session_name}" 2>/dev/null | grep -E "^stress-auth-"); do + tmux kill-session -t "$session" + done || true + ignore_errors: yes + changed_when: false + + - name: Kill all ytops-client auth generator processes on this worker + ansible.builtin.shell: + cmd: | + # Gracefully terminate + ps aux | grep "[y]tops-client.*stress-policy.*12_queue_auth_simulation" | awk '{print $2}' | xargs kill >/dev/null 2>&1 || true + sleep 0.5 + # Force kill + ps aux | grep "[y]tops-client.*stress-policy.*12_queue_auth_simulation" | awk '{print $2}' | xargs kill -9 >/dev/null 2>&1 || true + ignore_errors: yes + changed_when: false + + - name: "Stop only download simulators on workers" + when: action == "stop-download" + block: + - name: Kill all download simulator tmux sessions on this worker + ansible.builtin.shell: + cmd: | + for session in $(tmux list-sessions -F "#{session_name}" 2>/dev/null | grep -E "^stress-download-"); do + tmux kill-session -t "$session" + done || true + ignore_errors: yes + changed_when: false + + - name: Kill all ytops-client download simulator processes on this worker + ansible.builtin.shell: + cmd: | + # Gracefully terminate + ps aux | grep "[y]tops-client.*stress-policy.*11_direct_docker_download_simulation" | awk '{print $2}' | xargs kill >/dev/null 2>&1 || true + sleep 0.5 + # Force kill + ps aux | grep "[y]tops-client.*stress-policy.*11_direct_docker_download_simulation" | awk '{print $2}' | xargs kill -9 >/dev/null 2>&1 || true + ignore_errors: yes + changed_when: false - name: "Stop all worker generators and simulators" when: action == "stop" @@ -78,36 +283,90 @@ combined_prefixes: "{{ profile_prefixes | default([]) | join(',') }}" when: profile_prefixes is defined and profile_prefixes | length > 0 - - name: "Check single auth generator for all profiles: {{ combined_prefixes | default('none') }}" - ansible.builtin.command: >- - ansible-playbook {{ playbook_dir }}/playbook-stress-auth-generator.yml - -i {{ inventory_file }} - --limit {{ inventory_hostname }} - -e "check_status=true" - -e "profile_prefix={{ combined_prefixes }}" - delegate_to: localhost - changed_when: false + - name: "Check auth generator status" when: profile_prefixes is defined and profile_prefixes | length > 0 - register: auth_status_check + block: + - name: "Check single auth generator for all profiles: {{ combined_prefixes | default('none') }}" + ansible.builtin.command: >- + ansible-playbook {{ playbook_dir }}/playbook-stress-auth-generator.yml + -i {{ inventory_file }} + --limit {{ inventory_hostname }} + -e "check_status=true" + -e "profile_prefix={{ combined_prefixes }}" + delegate_to: localhost + changed_when: false + register: auth_status_check_combined + when: (auth_workers_per_profile | default(0) | int == 0) and (auth_workers_total | default(0) | int > 0) - - name: "Display auth generator status for {{ inventory_hostname }}" - ansible.builtin.debug: - var: auth_status_check.stdout_lines - when: auth_status_check is defined + - name: "Display combined auth generator status for {{ inventory_hostname }}" + ansible.builtin.debug: + var: auth_status_check_combined.stdout_lines + when: auth_status_check_combined is defined and auth_status_check_combined.stdout_lines is defined - - name: "Check single download simulator for all profiles: {{ combined_prefixes | default('none') }}" - ansible.builtin.command: >- - ansible-playbook {{ playbook_dir }}/playbook-stress-download-simulation.yml - -i {{ inventory_file }} - --limit {{ inventory_hostname }} - -e "check_status=true" - -e "profile_prefix={{ combined_prefixes }}" - delegate_to: localhost - changed_when: false + - name: "Check parallel auth generators for each profile" + ansible.builtin.command: >- + ansible-playbook {{ playbook_dir }}/playbook-stress-auth-generator.yml + -i {{ inventory_file }} + --limit {{ inventory_hostname }} + -e "check_status=true" + -e "profile_prefix={{ item }}" + delegate_to: localhost + changed_when: false + loop: "{{ profile_prefixes }}" + loop_control: + loop_var: item + label: "profile: {{ item }}" + register: auth_status_check_parallel + when: auth_workers_per_profile | default(0) | int > 0 + + - name: "Display parallel auth generator status for {{ inventory_hostname }}" + ansible.builtin.debug: + msg: "{{ item.stdout_lines }}" + loop: "{{ auth_status_check_parallel.results | default([]) }}" + loop_control: + label: "status for profile: {{ item.item }}" + when: auth_status_check_parallel is defined and auth_status_check_parallel.results is defined + + - name: "Check download simulator status" when: profile_prefixes is defined and profile_prefixes | length > 0 - register: download_status_check + block: + - name: "Check single download simulator for all profiles: {{ combined_prefixes | default('none') }}" + ansible.builtin.command: >- + ansible-playbook {{ playbook_dir }}/playbook-stress-download-simulation.yml + -i {{ inventory_file }} + --limit {{ inventory_hostname }} + -e "check_status=true" + -e "profile_prefix={{ combined_prefixes }}" + delegate_to: localhost + changed_when: false + register: download_status_check_combined + when: (download_workers_per_profile | default(0) | int == 0) and (download_workers_total | default(0) | int > 0) - - name: "Display download simulator status for {{ inventory_hostname }}" - ansible.builtin.debug: - var: download_status_check.stdout_lines - when: download_status_check is defined + - name: "Display combined download simulator status for {{ inventory_hostname }}" + ansible.builtin.debug: + var: download_status_check_combined.stdout_lines + when: download_status_check_combined is defined and download_status_check_combined.stdout_lines is defined + + - name: "Check parallel download simulators for each profile" + ansible.builtin.command: >- + ansible-playbook {{ playbook_dir }}/playbook-stress-download-simulation.yml + -i {{ inventory_file }} + --limit {{ inventory_hostname }} + -e "check_status=true" + -e "profile_prefix={{ item }}" + delegate_to: localhost + changed_when: false + loop: "{{ profile_prefixes }}" + loop_control: + loop_var: item + label: "profile: {{ item }}" + register: download_status_check_parallel + when: download_workers_per_profile | default(0) | int > 0 + + - name: "Display parallel download simulator status for {{ inventory_hostname }}" + ansible.builtin.debug: + msg: "{{ item.stdout_lines }}" + loop: "{{ download_status_check_parallel.results | default([]) }}" + loop_control: + label: "status for profile: {{ item.item }}" + when: download_status_check_parallel is defined and download_status_check_parallel.results is defined diff --git a/tools/generate-inventory.py b/tools/generate-inventory.py index 9ad8351..c132b7f 100755 --- a/tools/generate-inventory.py +++ b/tools/generate-inventory.py @@ -63,14 +63,35 @@ def generate_host_vars(cluster_config, host_vars_dir): # Per-node list of proxies to USE worker_proxies = config.get('proxies', []) - profile_prefixes = config.get('profile_prefixes', []) + profile_pools = config.get('profile_pools', []) + profile_prefixes = [] + for pool in profile_pools: + profile_prefixes.extend(pool.get('prefixes', [])) cleanup_settings = config.get('cleanup_settings') + # Worker process settings + auth_workers_total = config.get('auth_workers_total', 0) + auth_workers_per_profile = config.get('auth_workers_per_profile', 0) + download_workers_total = config.get('download_workers_total', 0) + download_workers_per_profile = config.get('download_workers_per_profile', 0) + + # If no auth worker config is provided, default to a single generator for all profiles. + if auth_workers_total == 0 and auth_workers_per_profile == 0: + auth_workers_total = 1 + + # If no download worker config is provided, default to a single simulator for all profiles. + if download_workers_total == 0 and download_workers_per_profile == 0: + download_workers_total = 1 + with open(host_vars_file, 'w') as f: f.write("---\n") f.write(f"# Variables for {hostname}\n") f.write(f"master_host_ip: {master_ip}\n") f.write("redis_port: 52909\n") + f.write(f"auth_workers_total: {auth_workers_total}\n") + f.write(f"auth_workers_per_profile: {auth_workers_per_profile}\n") + f.write(f"download_workers_total: {download_workers_total}\n") + f.write(f"download_workers_per_profile: {download_workers_per_profile}\n") # Add node-specific directory aliases for template compatibility # The master path is needed by all nodes for the .env template. diff --git a/tools/generate-profile-setup-policy.py b/tools/generate-profile-setup-policy.py new file mode 100755 index 0000000..5757345 --- /dev/null +++ b/tools/generate-profile-setup-policy.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python3 +""" +Generates the profile setup policy YAML from the main cluster configuration file. + +This script reads the worker configurations from a cluster.yml file, aggregates +all profile definitions, and generates a policy file that can be used by the +`ytops-client setup-profiles` command. This centralizes profile management +in the cluster configuration file. +""" + +import yaml +import sys +import os +from collections import OrderedDict + +# To ensure YAML dumps dicts in the order they are created +def represent_ordereddict(dumper, data): + value = [] + for item_key, item_value in data.items(): + node_key = dumper.represent_data(item_key) + node_value = dumper.represent_data(item_value) + value.append((node_key, node_value)) + return yaml.nodes.MappingNode(u'tag:yaml.org,2002:map', value) + +yaml.add_representer(OrderedDict, represent_ordereddict) + + +# Custom list type and representer to achieve flow style for inner lists +class FlowList(list): + pass + +def flow_style_list_representer(dumper, data): + return dumper.represent_sequence(u'tag:yaml.org,2002:seq', data, flow_style=True) + +yaml.add_representer(FlowList, flow_style_list_representer) + + +# Custom string type and representer for double-quoted strings +class QuotedString(str): + pass + +def quoted_string_representer(dumper, data): + return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, style='"') + +yaml.add_representer(QuotedString, quoted_string_representer) + + +def load_cluster_config(config_path): + """Load cluster configuration from YAML file""" + with open(config_path, 'r') as f: + return yaml.safe_load(f) + +def generate_policy(cluster_config, output_path): + """Generate the profile setup policy file using common pools.""" + + shadowsocks_proxies = cluster_config.get('shadowsocks_proxies', {}) + all_workers = cluster_config.get('workers', {}) + + common_pools = [] + + # Aggregate profile pools from all workers + for worker_name, worker_config in all_workers.items(): + for pool in worker_config.get('profile_pools', []): + proxy_service = pool['proxy_service'] + if proxy_service not in shadowsocks_proxies: + print(f"Warning: Proxy service '{proxy_service}' for profile pool '{pool['prefixes']}' on worker '{worker_name}' not found in global shadowsocks_proxies. Skipping.", file=sys.stderr) + continue + + proxy_port = shadowsocks_proxies[proxy_service]['local_port'] + proxy_string = f"{proxy_service}:{proxy_port}" + + pool_entry = OrderedDict([ + ('prefixes', sorted(pool['prefixes'])), + ('proxy', proxy_string), + ('count', pool['count']) + ]) + common_pools.append(pool_entry) + + # Sort the pools by the first prefix in each pool for consistent file output + sorted_common_pools = sorted(common_pools, key=lambda x: x['prefixes'][0]) + + # Write the policy file manually to ensure exact formatting and comments + with open(output_path, 'w') as f: + f.write("# Configuration for setting up profiles for a simulation or test run.\n") + f.write("# This file is used by the `bin/ytops-client setup-profiles` command.\n") + f.write("# It uses a common pool definition to avoid repetition.\n\n") + f.write("# !!! THIS FILE IS AUTO-GENERATED by tools/generate-profile-setup-policy.py !!!\n") + f.write("# !!! DO NOT EDIT. Your changes will be overwritten. !!!\n") + f.write("# !!! Edit cluster.green.yml and re-run the generator instead. !!!\n\n") + + f.write("simulation_parameters:\n") + f.write(" # --- Common Redis settings for all tools ---\n") + f.write(" # The environment name ('env') is now specified in each setup block below.\n") + f.write(' env_file: ".env" # Optional: path to a .env file.\n') + + f.write("\n# --- Common Pool Definitions ---\n") + f.write("# Define the profile pools once. They will be created in both\n") + f.write("# the auth and download simulation environments.\n") + f.write("# The `setup-profiles` tool must be updated to support this format.\n") + f.write("common_pools:\n") + for pool in sorted_common_pools: + prefixes_str = ", ".join([f'"{p}"' for p in pool['prefixes']]) + f.write(f' - prefixes: [{prefixes_str}]\n') + f.write(f' proxy: "{pool["proxy"]}"\n') + f.write(f' count: {pool["count"]}\n') + + f.write("\n# --- Profile setup for the AUTHENTICATION simulation ---\n") + f.write("auth_profile_setup:\n") + f.write(' env: "sim_auth"\n') + f.write(" cleanup_before_run: true\n") + f.write(" # The setup tool will use the 'common_pools' defined above.\n") + f.write(" use_common_pools: true\n") + + f.write("\n# --- Profile setup for the DOWNLOAD simulation ---\n") + f.write("download_profile_setup:\n") + f.write(' env: "sim_download"\n') + f.write(" cleanup_before_run: true\n") + f.write(" # The setup tool will also use the 'common_pools' defined above.\n") + f.write(" use_common_pools: true\n") + + print(f"Successfully generated profile setup policy at: {output_path}") + + +def main(): + if len(sys.argv) != 3: + print("Usage: ./tools/generate-profile-setup-policy.py ") + sys.exit(1) + + config_path = sys.argv[1] + output_path = sys.argv[2] + + if not os.path.exists(config_path): + print(f"Error: Cluster configuration file not found at '{config_path}'", file=sys.stderr) + sys.exit(1) + + cluster_config = load_cluster_config(config_path) + generate_policy(cluster_config, output_path) + +if __name__ == "__main__": + main() diff --git a/ytops_client-source/policies/10_direct_docker_auth_simulation.yaml b/ytops_client-source/policies/10_direct_docker_auth_simulation.yaml index 642466c..4ed5142 100644 --- a/ytops_client-source/policies/10_direct_docker_auth_simulation.yaml +++ b/ytops_client-source/policies/10_direct_docker_auth_simulation.yaml @@ -35,15 +35,15 @@ settings: download_skipped_failure_rate: 0.0 execution_control: - # Define worker pools. For a single auth worker that serves multiple groups - # (e.g., user1, user2), a single pool with a broad prefix like "user" is - # correct. This allows the worker to lock whichever profile the enforcer - # makes available from any group. - worker_pools: - - profile_prefix: "user" - workers: 1 - # - profile_prefix: "user2" - # workers: 1 + # Automatically discover profile groups from Redis and create workers for them. + # This avoids having to list each profile group (e.g., user31, user32) manually. + worker_pool_discovery: + # A glob-style pattern to find profile prefixes in Redis. + # 'user*' will match all profiles like 'user31_001', 'user61_002', etc., + # and the tool will create worker pools grouped by 'user31', 'user61', etc. + profile_prefix_pattern: "user*" + # Number of workers to assign to each discovered profile prefix group. + workers_per_profile_group: 1 # How long a worker should pause if it cannot find an available profile to lock. worker_polling_interval_seconds: 1 # No sleep between tasks; throughput is controlled by yt-dlp performance and profile availability. diff --git a/ytops_client-source/policies/11_direct_docker_download_simulation.yaml b/ytops_client-source/policies/11_direct_docker_download_simulation.yaml index bfc6b9b..5eebcdd 100644 --- a/ytops_client-source/policies/11_direct_docker_download_simulation.yaml +++ b/ytops_client-source/policies/11_direct_docker_download_simulation.yaml @@ -24,15 +24,13 @@ settings: profile_extraction_regex: '^.+?-(user[^-]+)-' execution_control: - # Define worker pools, each tied to a specific profile prefix. - # The stress tool will launch the specified number of workers for each pool. - worker_pools: - - profile_prefix: "user1" - workers: 1 - - profile_prefix: "user2" - workers: 1 - - profile_prefix: "user3" - workers: 1 + # Automatically discover profile groups from Redis and create workers for them. + # This avoids having to list each profile group (e.g., user1, user2) manually. + worker_pool_discovery: + # A glob-style pattern to find profile prefixes in Redis. + profile_prefix_pattern: "user*" + # Number of workers to assign to each discovered profile prefix group. + workers_per_profile_group: 1 # How long a worker should pause if it cannot find an available profile or task. worker_polling_interval_seconds: 1 diff --git a/ytops_client-source/policies/12_queue_auth_simulation.yaml b/ytops_client-source/policies/12_queue_auth_simulation.yaml index e9b4c1b..96b5db4 100644 --- a/ytops_client-source/policies/12_queue_auth_simulation.yaml +++ b/ytops_client-source/policies/12_queue_auth_simulation.yaml @@ -36,14 +36,13 @@ settings: save_info_json_dir: "run/docker_mount/info_json_tasks/direct_docker_simulation" execution_control: - # Define worker pools for multiple user groups - worker_pools: - - profile_prefix: "user1" - workers: 1 - - profile_prefix: "user2" - workers: 1 - - profile_prefix: "user3" - workers: 1 + # Automatically discover profile groups from Redis and create workers for them. + # This avoids having to list each profile group (e.g., user31, user32) manually. + worker_pool_discovery: + # A glob-style pattern to find profile prefixes in Redis. + profile_prefix_pattern: "user*" + # Number of workers to assign to each discovered profile prefix group. + workers_per_profile_group: 1 # How long a worker should pause if it cannot find an available profile to lock. worker_polling_interval_seconds: 1 # No sleep between tasks; throughput is controlled by yt-dlp performance and profile availability. diff --git a/ytops_client-source/policies/13_queue_download_simulation.yaml b/ytops_client-source/policies/13_queue_download_simulation.yaml index 8287ed2..5a46a9f 100644 --- a/ytops_client-source/policies/13_queue_download_simulation.yaml +++ b/ytops_client-source/policies/13_queue_download_simulation.yaml @@ -25,14 +25,13 @@ settings: # can be specified in the download task. execution_control: - # Define worker pools for multiple user groups - worker_pools: - - profile_prefix: "user1" - workers: 1 - - profile_prefix: "user2" - workers: 1 - - profile_prefix: "user3" - workers: 1 + # Automatically discover profile groups from Redis and create workers for them. + # This avoids having to list each profile group (e.g., user31, user32) manually. + worker_pool_discovery: + # A glob-style pattern to find profile prefixes in Redis. + profile_prefix_pattern: "user*" + # Number of workers to assign to each discovered profile prefix group. + workers_per_profile_group: 1 # How long a worker should pause if it cannot find an available profile or task. worker_polling_interval_seconds: 1 diff --git a/ytops_client-source/policies/6_profile_setup_policy.yaml b/ytops_client-source/policies/6_profile_setup_policy.yaml index cdcbd6b..93366d9 100644 --- a/ytops_client-source/policies/6_profile_setup_policy.yaml +++ b/ytops_client-source/policies/6_profile_setup_policy.yaml @@ -1,38 +1,37 @@ # Configuration for setting up profiles for a simulation or test run. # This file is used by the `bin/ytops-client setup-profiles` command. -# It contains separate blocks for authentication and download simulations. +# It uses a common pool definition to avoid repetition. simulation_parameters: # --- Common Redis settings for all tools --- # The environment name ('env') is now specified in each setup block below. env_file: ".env" # Optional: path to a .env file. +# --- Common Pool Definitions --- +# Define the profile pools once. They will be created in both +# the auth and download simulation environments. +# The `setup-profiles` tool must be updated to support this format. +common_pools: + - prefixes: ["user31"] + proxy: "sslocal-rust-1088:1088" + count: 3 + - prefixes: ["user32"] + proxy: "sslocal-rust-1085:1085" + count: 3 + - prefixes: ["user61", "user62"] + proxy: "sslocal-rust-1084:1084" + count: 3 + # --- Profile setup for the AUTHENTICATION simulation --- auth_profile_setup: env: "sim_auth" cleanup_before_run: true - pools: - - prefix: "user1" - proxy: "sslocal-rust-1088:1088" - count: 3 - - prefix: "user2" - proxy: "sslocal-rust-1085:1085" - count: 3 - - prefix: "user3" - proxy: "sslocal-rust-1084:1084" - count: 3 + # The setup tool will use the 'common_pools' defined above. + use_common_pools: true # --- Profile setup for the DOWNLOAD simulation --- download_profile_setup: env: "sim_download" cleanup_before_run: true - pools: - - prefix: "user1" - proxy: "sslocal-rust-1088:1088" - count: 3 - - prefix: "user2" - proxy: "sslocal-rust-1085:1085" - count: 3 - - prefix: "user3" - proxy: "sslocal-rust-1084:1084" - count: 3 + # The setup tool will also use the 'common_pools' defined above. + use_common_pools: true diff --git a/ytops_client-source/policies/8_unified_simulation_enforcer.yaml b/ytops_client-source/policies/8_unified_simulation_enforcer.yaml index 3c20308..348ec32 100644 --- a/ytops_client-source/policies/8_unified_simulation_enforcer.yaml +++ b/ytops_client-source/policies/8_unified_simulation_enforcer.yaml @@ -13,6 +13,23 @@ simulation_parameters: # How often the enforcer should wake up and apply all policies. interval_seconds: 2 +# --- Dynamic Profile Group Templates --- +# The policy enforcer will find all profile prefixes matching a pattern in Redis +# and apply the settings from the matching template. This avoids having to list +# every profile group manually. +# NOTE: The policy enforcer tool must be updated to support this format. +profile_group_templates: + - pattern: "user*" + auth: + max_active_profiles: 1 + rotate_after_requests: 5 + rest_duration_minutes_on_rotation: 0.20 + wait_download_finish_per_group: true + max_wait_for_downloads_minutes: 240 + download: + rotate_after_requests: 0 + rest_duration_minutes_on_rotation: 0.2 + # --- Policies for the Authentication Simulation --- auth_policy_enforcer_config: @@ -49,31 +66,8 @@ auth_policy_enforcer_config: # Set to 1 to ensure only one group's profile is active at any time. global_max_active_profiles: 1 - # Define separate profile groups for each user type. - # This allows one profile from each group to be active simultaneously, - # ensuring the single auth worker is never blocked waiting for downloads. - profile_groups: - - name: "auth_user1" - prefix: "user1" - max_active_profiles: 1 - rotate_after_requests: 5 - rest_duration_minutes_on_rotation: 0.20 - wait_download_finish_per_group: true - max_wait_for_downloads_minutes: 240 - - name: "auth_user2" - prefix: "user2" - max_active_profiles: 1 - rotate_after_requests: 5 - rest_duration_minutes_on_rotation: 0.20 - wait_download_finish_per_group: true - max_wait_for_downloads_minutes: 240 - - name: "auth_user3" - prefix: "user3" - max_active_profiles: 1 - rotate_after_requests: 5 - rest_duration_minutes_on_rotation: 0.20 - wait_download_finish_per_group: true - max_wait_for_downloads_minutes: 240 + # The 'profile_groups' section is now inherited from 'profile_group_definitions' above. + # The enforcer logic should be updated to read from there. proxy_work_minutes: 0 proxy_rest_duration_minutes: 0 @@ -97,37 +91,17 @@ auth_policy_enforcer_config: unlock_cooldown_seconds: 0 # --- Cross-simulation synchronization --- +# This section is simplified because the link between auth and download profiles +# is now defined in the `profile_group_definitions`. cross_simulation_sync: - # Link auth profiles to download profiles (by prefix) - profile_links: - - auth: "user1" - download: "user1" - - auth: "user2" - download: "user2" - - auth: "user3" - download: "user3" - # Which states to synchronize from auth to download. - # 'RESTING' is no longer needed here; the new group-aware deactivation logic - # in `sync_active_profile` handles rotation more cleanly. sync_states: - "BANNED" - - # If true, when an auth profile is rotated, the corresponding - # download profile group will also be rotated. This is now handled by the - # group-aware deactivation logic triggered by `sync_active_profile`. - sync_rotation: true - - # If true, a BANNED state on an auth profile will force the download profile - # to also be BANNED. + # If true, a BANNED state on an auth profile will force the download profile to also be BANNED. enforce_auth_lead: true - # CRITICAL: Ensures the correct download profile GROUP is active. - # This will activate the target download profile and rest any profiles in other groups. sync_active_profile: true - - # When an auth profile is in the 'waiting_downloads' state, ensure the - # matching download profile is active so it can process those downloads. + # When an auth profile is in the 'waiting_downloads' state, ensure the matching download profile is active. sync_waiting_downloads: true # --- Policies for the Download Simulation --- @@ -151,25 +125,9 @@ download_policy_enforcer_config: # For accounts, it is ~2000 videos/hour (~4000 webpage/player requests per hour). # The settings below should be configured to respect these limits. - # Define separate profile groups for download workers. - # Increase max_active_profiles to allow all profiles in a group to be used. - profile_groups: - - name: "download_user1" - prefix: "user1" - rotate_after_requests: 0 - rest_duration_minutes_on_rotation: 0.2 - # max_active_profiles: 0 # Allow all profiles in this group to be active (0, -1, or omitted) - - name: "download_user2" - prefix: "user2" - rotate_after_requests: 0 - rest_duration_minutes_on_rotation: 0.2 - # max_active_profiles: 0 # Allow all profiles in this group to be active (0, -1, or omitted) - - name: "download_user3" - prefix: "user3" - rotate_after_requests: 0 - rest_duration_minutes_on_rotation: 0.2 - # max_active_profiles: 0 # Allow all profiles in this group to be active (0, -1, or omitted) - + # The 'profile_groups' section is now inherited from 'profile_group_definitions' above. + # The enforcer logic should be updated to read from there. + # Time-based proxy rules are disabled. proxy_work_minutes: 0 proxy_rest_duration_minutes: 10 diff --git a/ytops_client-source/ytops_client/policy_enforcer_tool.py b/ytops_client-source/ytops_client/policy_enforcer_tool.py index 238a3a8..4ec1d38 100644 --- a/ytops_client-source/ytops_client/policy_enforcer_tool.py +++ b/ytops_client-source/ytops_client/policy_enforcer_tool.py @@ -10,6 +10,8 @@ import os import signal import sys import time +import fnmatch +from copy import deepcopy try: import yaml @@ -396,6 +398,15 @@ class PolicyEnforcer: profile_name = profile['name'] group_name = profile_to_group_map.get(profile_name) + # --- New: Early activation for profiles waiting on a now-active proxy --- + proxy_url = profile.get('proxy') + if proxy_url and profile.get('rest_reason') in ("Waiting for proxy", self.PROXY_REST_REASON): + proxy_state_data = proxy_states.get(proxy_url, {}) + if proxy_state_data.get('state') == ProfileState.ACTIVE.value: + logger.debug(f"Profile '{profile_name}' was waiting for proxy '{proxy_url}', which is now active. Bypassing its rest period.") + profile['rest_until'] = 0 + # --- End new logic --- + # --- New check to prevent activating profiles from a waiting group --- if group_name in waiting_group_names: logger.debug(f"Profile '{profile_name}' activation deferred because its group '{group_name}' is waiting for downloads to complete.") @@ -1574,6 +1585,61 @@ def main_policy_enforcer(args): return None logger.info(f"Setting up enforcer for {sim_type} simulation...") + + # --- Dynamic Profile Group Discovery --- + profile_group_templates = policy.get('profile_group_templates') + # Check if templates exist and if the config block doesn't already have groups (CLI overrides take precedence) + if profile_group_templates and 'profile_groups' not in policy_config: + logger.info(f"Found 'profile_group_templates'. Discovering profile groups dynamically for {sim_type}...") + + # Determine key_prefix to connect to the right Redis env (logic duplicated from below) + policy_env = sim_params.get(env_policy_key) + default_policy_env = sim_params.get('env') + effective_env = env_cli_arg or args.env or policy_env or default_policy_env or 'dev' + if args.key_prefix: + temp_key_prefix = args.key_prefix + elif args.legacy: + temp_key_prefix = 'profile_mgmt_' + else: + temp_key_prefix = f"{effective_env}_profile_mgmt_" + + try: + # Use a temporary manager to scan for profiles in the correct environment + temp_manager = ProfileManager(redis_host, redis_port, redis_password, temp_key_prefix, redis_db) + all_profiles = temp_manager.list_profiles() + + # Extract unique prefixes (e.g., 'user31' from 'user31_0') + found_prefixes = set(p['name'].rsplit('_', 1)[0] for p in all_profiles) + + if not found_prefixes: + logger.warning(f"Dynamic discovery found no profile prefixes for env '{effective_env}'. No group policies will be applied.") + else: + logger.info(f"Discovered {len(found_prefixes)} unique profile prefixes: {sorted(list(found_prefixes))}") + + generated_groups = [] + for prefix in sorted(list(found_prefixes)): + for template in profile_group_templates: + pattern = template.get('pattern') + if pattern and fnmatch.fnmatch(prefix, pattern): + # Get the settings for the current simulation type (Auth/Download) + sim_settings = template.get(sim_type.lower()) + if not sim_settings: + logger.debug(f"Template with pattern '{pattern}' has no settings for '{sim_type}'. Skipping for prefix '{prefix}'.") + continue + + # Create a new group from the relevant part of the template + new_group = deepcopy(sim_settings) + new_group['prefix'] = prefix + new_group['name'] = prefix # Use prefix as group name + + generated_groups.append(new_group) + logger.debug(f"Applied template with pattern '{pattern}' to prefix '{prefix}' for {sim_type} simulation.") + break # Move to next prefix once a match is found + + policy_config['profile_groups'] = generated_groups + except Exception as e: + logger.error(f"Failed during dynamic profile group discovery: {e}", exc_info=args.verbose) + config = Config(args, policy_config, code_defaults) # Determine the effective environment name with correct precedence: diff --git a/ytops_client-source/ytops_client/profile_setup_tool.py b/ytops_client-source/ytops_client/profile_setup_tool.py index be3cf3f..9a346c0 100644 --- a/ytops_client-source/ytops_client/profile_setup_tool.py +++ b/ytops_client-source/ytops_client/profile_setup_tool.py @@ -205,6 +205,32 @@ def main_setup_profiles(args): logger.error(f"Failed to load or parse policy file {args.policy_file}: {e}") return 1 + # --- Expand common_pools if defined --- + common_pools_def = policy.get('common_pools') + if common_pools_def: + logger.info("Found 'common_pools' definition. Expanding into setup blocks.") + expanded_pools = [] + for pool_def in common_pools_def: + # Handle both 'prefix' (string) and 'prefixes' (list) for flexibility + prefixes = pool_def.get('prefixes', []) + if 'prefix' in pool_def and pool_def['prefix'] not in prefixes: + prefixes.append(pool_def['prefix']) + + if not prefixes: + logger.warning(f"A pool in 'common_pools' is missing a 'prefix' or 'prefixes' key. Skipping: {pool_def}") + continue + + for p in prefixes: + new_pool = pool_def.copy() + new_pool['prefix'] = p + if 'prefixes' in new_pool: del new_pool['prefixes'] + expanded_pools.append(new_pool) + + for setup_key in ['auth_profile_setup', 'download_profile_setup']: + if setup_key in policy and policy[setup_key].get('use_common_pools'): + logger.debug(f"Applying {len(expanded_pools)} common pool definitions to '{setup_key}'.") + policy[setup_key]['pools'] = expanded_pools + sim_params = policy.get('simulation_parameters', {}) setups_to_run = [] diff --git a/ytops_client-source/ytops_client/stress_policy_tool.py b/ytops_client-source/ytops_client/stress_policy_tool.py index cfc9463..4db8ed4 100644 --- a/ytops_client-source/ytops_client/stress_policy_tool.py +++ b/ytops_client-source/ytops_client/stress_policy_tool.py @@ -73,6 +73,7 @@ state between multiple workers and enforcing policies (e.g., rate limits, cooldo import argparse import collections import concurrent.futures +import fnmatch import json import logging import os @@ -581,13 +582,62 @@ def main_stress_policy(args): sp_utils.display_effective_policy(policy, policy_name, args, sources=urls_list) if args.dry_run: return 0 + # --- Worker Pool Setup (including dynamic discovery) --- worker_pools = exec_control.get('worker_pools', []) + discovery_config = exec_control.get('worker_pool_discovery') + + if discovery_config: + if worker_pools: + logger.warning("Both 'worker_pools' and 'worker_pool_discovery' are defined. 'worker_pool_discovery' will take precedence.") + + discovery_pattern = discovery_config.get('profile_prefix_pattern') + workers_per_group = discovery_config.get('workers_per_profile_group', 1) + + direct_policy = policy.get('direct_batch_cli_policy', {}) + use_env = direct_policy.get('use_profile_env', 'auth') + manager_for_discovery = profile_managers.get(use_env) + + if not manager_for_discovery: + logger.error(f"Could not determine profile manager for worker pool discovery in mode '{orchestration_mode}/{mode}'.") + return 1 + + if not discovery_pattern: + logger.error("'worker_pool_discovery' is missing required key 'profile_prefix_pattern'.") + return 1 + + logger.info(f"Discovering worker pools from profile prefixes matching '{discovery_pattern}'...") + try: + all_profiles = manager_for_discovery.list_profiles() + found_prefixes = set() + for profile in all_profiles: + profile_name = profile['name'] + if fnmatch.fnmatch(profile_name, discovery_pattern): + # Assuming standard name format like 'user31_001', extract 'user31' + prefix = profile_name.rsplit('_', 1)[0] + found_prefixes.add(prefix) + + if not found_prefixes: + logger.warning(f"Worker pool discovery found no profiles matching pattern '{discovery_pattern}'. No workers will be started.") + worker_pools = [] + else: + worker_pools = [] + for prefix in sorted(list(found_prefixes)): + worker_pools.append({ + 'profile_prefix': prefix, + 'workers': workers_per_group + }) + logger.info(f"Discovered {len(found_prefixes)} profile groups, creating {workers_per_group} worker(s) for each: {', '.join(sorted(list(found_prefixes)))}") + except Exception as e: + logger.error(f"Failed to discover profile groups from Redis: {e}", exc_info=True) + return 1 + if not worker_pools and exec_control.get('workers'): + # Fallback for legacy 'workers: N' config prefix = policy.get('info_json_generation_policy', {}).get('profile_prefix') worker_pools.append({'workers': exec_control.get('workers'), 'profile_prefix': prefix or 'user'}) if not worker_pools: - logger.error("No workers configured in policy (execution_control.workers or execution_control.worker_pools).") + logger.error("No workers configured. Use 'execution_control.worker_pools' or 'worker_pool_discovery'.") return 1 if args.profile_prefix: @@ -695,13 +745,62 @@ def main_stress_policy(args): sp_utils.display_effective_policy(policy, policy_name, args, sources=urls_list) if args.dry_run: return 0 + # --- Worker Pool Setup (including dynamic discovery) --- worker_pools = exec_control.get('worker_pools', []) + discovery_config = exec_control.get('worker_pool_discovery') + + if discovery_config: + if worker_pools: + logger.warning("Both 'worker_pools' and 'worker_pool_discovery' are defined. 'worker_pool_discovery' will take precedence.") + + discovery_pattern = discovery_config.get('profile_prefix_pattern') + workers_per_group = discovery_config.get('workers_per_profile_group', 1) + + direct_policy = policy.get('direct_docker_cli_policy', {}) + use_env = direct_policy.get('use_profile_env', 'auth' if mode == 'fetch_only' else 'download') + manager_for_discovery = profile_managers.get(use_env) + + if not manager_for_discovery: + logger.error(f"Could not determine profile manager for worker pool discovery in mode '{orchestration_mode}/{mode}'.") + return 1 + + if not discovery_pattern: + logger.error("'worker_pool_discovery' is missing required key 'profile_prefix_pattern'.") + return 1 + + logger.info(f"Discovering worker pools from profile prefixes matching '{discovery_pattern}'...") + try: + all_profiles = manager_for_discovery.list_profiles() + found_prefixes = set() + for profile in all_profiles: + profile_name = profile['name'] + if fnmatch.fnmatch(profile_name, discovery_pattern): + # Assuming standard name format like 'user31_001', extract 'user31' + prefix = profile_name.rsplit('_', 1)[0] + found_prefixes.add(prefix) + + if not found_prefixes: + logger.warning(f"Worker pool discovery found no profiles matching pattern '{discovery_pattern}'. No workers will be started.") + worker_pools = [] + else: + worker_pools = [] + for prefix in sorted(list(found_prefixes)): + worker_pools.append({ + 'profile_prefix': prefix, + 'workers': workers_per_group + }) + logger.info(f"Discovered {len(found_prefixes)} profile groups, creating {workers_per_group} worker(s) for each: {', '.join(sorted(list(found_prefixes)))}") + except Exception as e: + logger.error(f"Failed to discover profile groups from Redis: {e}", exc_info=True) + return 1 + if not worker_pools and exec_control.get('workers'): + # Fallback for legacy 'workers: N' config prefix = policy.get('info_json_generation_policy', {}).get('profile_prefix') worker_pools.append({'workers': exec_control.get('workers'), 'profile_prefix': prefix or 'user'}) if not worker_pools: - logger.error("No workers configured in policy (execution_control.workers or execution_control.worker_pools).") + logger.error("No workers configured. Use 'execution_control.worker_pools' or 'worker_pool_discovery'.") return 1 if args.profile_prefix: @@ -751,13 +850,62 @@ def main_stress_policy(args): sp_utils.display_effective_policy(policy, policy_name, args, sources=[]) if args.dry_run: return 0 + # --- Worker Pool Setup (including dynamic discovery) --- worker_pools = exec_control.get('worker_pools', []) + discovery_config = exec_control.get('worker_pool_discovery') + + if discovery_config: + if worker_pools: + logger.warning("Both 'worker_pools' and 'worker_pool_discovery' are defined. 'worker_pool_discovery' will take precedence.") + + discovery_pattern = discovery_config.get('profile_prefix_pattern') + workers_per_group = discovery_config.get('workers_per_profile_group', 1) + + direct_policy = policy.get('direct_docker_cli_policy', {}) + use_env = direct_policy.get('use_profile_env', 'auth' if mode == 'fetch_only' else 'download') + manager_for_discovery = profile_managers.get(use_env) + + if not manager_for_discovery: + logger.error(f"Could not determine profile manager for worker pool discovery in mode '{orchestration_mode}/{mode}'.") + return 1 + + if not discovery_pattern: + logger.error("'worker_pool_discovery' is missing required key 'profile_prefix_pattern'.") + return 1 + + logger.info(f"Discovering worker pools from profile prefixes matching '{discovery_pattern}'...") + try: + all_profiles = manager_for_discovery.list_profiles() + found_prefixes = set() + for profile in all_profiles: + profile_name = profile['name'] + if fnmatch.fnmatch(profile_name, discovery_pattern): + # Assuming standard name format like 'user31_001', extract 'user31' + prefix = profile_name.rsplit('_', 1)[0] + found_prefixes.add(prefix) + + if not found_prefixes: + logger.warning(f"Worker pool discovery found no profiles matching pattern '{discovery_pattern}'. No workers will be started.") + worker_pools = [] + else: + worker_pools = [] + for prefix in sorted(list(found_prefixes)): + worker_pools.append({ + 'profile_prefix': prefix, + 'workers': workers_per_group + }) + logger.info(f"Discovered {len(found_prefixes)} profile groups, creating {workers_per_group} worker(s) for each: {', '.join(sorted(list(found_prefixes)))}") + except Exception as e: + logger.error(f"Failed to discover profile groups from Redis: {e}", exc_info=True) + return 1 + if not worker_pools and exec_control.get('workers'): + # Fallback for legacy 'workers: N' config prefix = policy.get('download_policy', {}).get('profile_prefix') worker_pools.append({'workers': exec_control.get('workers'), 'profile_prefix': prefix or 'user'}) if not worker_pools: - logger.error("No workers configured in policy (execution_control.workers or execution_control.worker_pools).") + logger.error("No workers configured. Use 'execution_control.worker_pools' or 'worker_pool_discovery'.") return 1 if args.profile_prefix: @@ -823,13 +971,60 @@ def main_stress_policy(args): sp_utils.display_effective_policy(policy, policy_name, args, sources=[]) if args.dry_run: return 0 + # --- Worker Pool Setup (including dynamic discovery) --- worker_pools = exec_control.get('worker_pools', []) + discovery_config = exec_control.get('worker_pool_discovery') + + if discovery_config: + if worker_pools: + logger.warning("Both 'worker_pools' and 'worker_pool_discovery' are defined. 'worker_pool_discovery' will take precedence.") + + discovery_pattern = discovery_config.get('profile_prefix_pattern') + workers_per_group = discovery_config.get('workers_per_profile_group', 1) + + manager_for_discovery = profile_managers.get('download') + + if not manager_for_discovery: + logger.error(f"Could not determine profile manager for worker pool discovery in mode '{orchestration_mode}/{mode}'.") + return 1 + + if not discovery_pattern: + logger.error("'worker_pool_discovery' is missing required key 'profile_prefix_pattern'.") + return 1 + + logger.info(f"Discovering worker pools from profile prefixes matching '{discovery_pattern}'...") + try: + all_profiles = manager_for_discovery.list_profiles() + found_prefixes = set() + for profile in all_profiles: + profile_name = profile['name'] + if fnmatch.fnmatch(profile_name, discovery_pattern): + # Assuming standard name format like 'user31_001', extract 'user31' + prefix = profile_name.rsplit('_', 1)[0] + found_prefixes.add(prefix) + + if not found_prefixes: + logger.warning(f"Worker pool discovery found no profiles matching pattern '{discovery_pattern}'. No workers will be started.") + worker_pools = [] + else: + worker_pools = [] + for prefix in sorted(list(found_prefixes)): + worker_pools.append({ + 'profile_prefix': prefix, + 'workers': workers_per_group + }) + logger.info(f"Discovered {len(found_prefixes)} profile groups, creating {workers_per_group} worker(s) for each: {', '.join(sorted(list(found_prefixes)))}") + except Exception as e: + logger.error(f"Failed to discover profile groups from Redis: {e}", exc_info=True) + return 1 + if not worker_pools and exec_control.get('workers'): + # Fallback for legacy 'workers: N' config prefix = policy.get('download_policy', {}).get('profile_prefix') worker_pools.append({'workers': exec_control.get('workers'), 'profile_prefix': prefix or 'user'}) if not worker_pools: - logger.error("No workers configured in policy (execution_control.workers or execution_control.worker_pools).") + logger.error("No workers configured. Use 'execution_control.worker_pools' or 'worker_pool_discovery'.") return 1 if args.profile_prefix: @@ -930,13 +1125,60 @@ def main_stress_policy(args): sp_utils.display_effective_policy(policy, policy_name, args, sources=[]) if args.dry_run: return 0 + # --- Worker Pool Setup (including dynamic discovery) --- worker_pools = exec_control.get('worker_pools', []) + discovery_config = exec_control.get('worker_pool_discovery') + + if discovery_config: + if worker_pools: + logger.warning("Both 'worker_pools' and 'worker_pool_discovery' are defined. 'worker_pool_discovery' will take precedence.") + + discovery_pattern = discovery_config.get('profile_prefix_pattern') + workers_per_group = discovery_config.get('workers_per_profile_group', 1) + + manager_for_discovery = profile_managers.get('auth') + + if not manager_for_discovery: + logger.error(f"Could not determine profile manager for worker pool discovery in mode '{orchestration_mode}/{mode}'.") + return 1 + + if not discovery_pattern: + logger.error("'worker_pool_discovery' is missing required key 'profile_prefix_pattern'.") + return 1 + + logger.info(f"Discovering worker pools from profile prefixes matching '{discovery_pattern}'...") + try: + all_profiles = manager_for_discovery.list_profiles() + found_prefixes = set() + for profile in all_profiles: + profile_name = profile['name'] + if fnmatch.fnmatch(profile_name, discovery_pattern): + # Assuming standard name format like 'user31_001', extract 'user31' + prefix = profile_name.rsplit('_', 1)[0] + found_prefixes.add(prefix) + + if not found_prefixes: + logger.warning(f"Worker pool discovery found no profiles matching pattern '{discovery_pattern}'. No workers will be started.") + worker_pools = [] + else: + worker_pools = [] + for prefix in sorted(list(found_prefixes)): + worker_pools.append({ + 'profile_prefix': prefix, + 'workers': workers_per_group + }) + logger.info(f"Discovered {len(found_prefixes)} profile groups, creating {workers_per_group} worker(s) for each: {', '.join(sorted(list(found_prefixes)))}") + except Exception as e: + logger.error(f"Failed to discover profile groups from Redis: {e}", exc_info=True) + return 1 + if not worker_pools and exec_control.get('workers'): + # Fallback for legacy 'workers: N' config prefix = policy.get('info_json_generation_policy', {}).get('profile_prefix') worker_pools.append({'workers': exec_control.get('workers'), 'profile_prefix': prefix or 'user'}) if not worker_pools: - logger.error("No workers configured in policy (execution_control.workers or execution_control.worker_pools).") + logger.error("No workers configured. Use 'execution_control.worker_pools' or 'worker_pool_discovery'.") return 1 if args.profile_prefix: @@ -1039,13 +1281,60 @@ def main_stress_policy(args): sp_utils.display_effective_policy(policy, policy_name, args, sources=[]) if args.dry_run: return 0 + # --- Worker Pool Setup (including dynamic discovery) --- worker_pools = exec_control.get('worker_pools', []) + discovery_config = exec_control.get('worker_pool_discovery') + + if discovery_config: + if worker_pools: + logger.warning("Both 'worker_pools' and 'worker_pool_discovery' are defined. 'worker_pool_discovery' will take precedence.") + + discovery_pattern = discovery_config.get('profile_prefix_pattern') + workers_per_group = discovery_config.get('workers_per_profile_group', 1) + + manager_for_discovery = profile_managers.get('download') + + if not manager_for_discovery: + logger.error(f"Could not determine profile manager for worker pool discovery in mode '{orchestration_mode}/{mode}'.") + return 1 + + if not discovery_pattern: + logger.error("'worker_pool_discovery' is missing required key 'profile_prefix_pattern'.") + return 1 + + logger.info(f"Discovering worker pools from profile prefixes matching '{discovery_pattern}'...") + try: + all_profiles = manager_for_discovery.list_profiles() + found_prefixes = set() + for profile in all_profiles: + profile_name = profile['name'] + if fnmatch.fnmatch(profile_name, discovery_pattern): + # Assuming standard name format like 'user31_001', extract 'user31' + prefix = profile_name.rsplit('_', 1)[0] + found_prefixes.add(prefix) + + if not found_prefixes: + logger.warning(f"Worker pool discovery found no profiles matching pattern '{discovery_pattern}'. No workers will be started.") + worker_pools = [] + else: + worker_pools = [] + for prefix in sorted(list(found_prefixes)): + worker_pools.append({ + 'profile_prefix': prefix, + 'workers': workers_per_group + }) + logger.info(f"Discovered {len(found_prefixes)} profile groups, creating {workers_per_group} worker(s) for each: {', '.join(sorted(list(found_prefixes)))}") + except Exception as e: + logger.error(f"Failed to discover profile groups from Redis: {e}", exc_info=True) + return 1 + if not worker_pools and exec_control.get('workers'): + # Fallback for legacy 'workers: N' config prefix = policy.get('download_policy', {}).get('profile_prefix') worker_pools.append({'workers': exec_control.get('workers'), 'profile_prefix': prefix or 'user'}) if not worker_pools: - logger.error("No workers configured in policy (execution_control.workers or execution_control.worker_pools).") + logger.error("No workers configured. Use 'execution_control.worker_pools' or 'worker_pool_discovery'.") return 1 if args.profile_prefix: