352 lines
14 KiB
Python
352 lines
14 KiB
Python
from __future__ import annotations
|
|
from enum import Enum
|
|
from typing import TYPE_CHECKING, Optional
|
|
import time
|
|
import logging
|
|
|
|
from statemachine import StateMachine, State, event
|
|
|
|
if TYPE_CHECKING:
|
|
from .profile_manager_tool import ProfileManager
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ProfileState(Enum):
|
|
"""Enumeration for profile states."""
|
|
ACTIVE = "ACTIVE"
|
|
LOCKED = "LOCKED"
|
|
RESTING = "RESTING"
|
|
BANNED = "BANNED"
|
|
COOLDOWN = "COOLDOWN"
|
|
PAUSED = "PAUSED"
|
|
|
|
@classmethod
|
|
def values(cls):
|
|
return [item.value for item in cls]
|
|
|
|
|
|
class ProfileStateMachine(StateMachine):
|
|
"""A state machine for managing the lifecycle of a profile."""
|
|
|
|
# States
|
|
# We use lowercase attribute names to match the on_enter_* callback convention (e.g., on_enter_active).
|
|
# The value passed to State() is the string stored in Redis (uppercase).
|
|
active = State("ACTIVE", initial=True)
|
|
locked = State("LOCKED")
|
|
resting = State("RESTING")
|
|
banned = State("BANNED")
|
|
cooldown = State("COOLDOWN")
|
|
paused = State("PAUSED")
|
|
|
|
# Transitions
|
|
lock = active.to(locked)
|
|
unlock = locked.to(active)
|
|
start_cooldown = locked.to(cooldown)
|
|
|
|
rest = active.to(resting) | locked.to(resting) | cooldown.to(resting)
|
|
ban = active.to(banned) | locked.to(banned) | resting.to(banned) | cooldown.to(banned) | paused.to(banned)
|
|
activate = resting.to(active) | banned.to(active) | cooldown.to(active) | paused.to(active)
|
|
|
|
pause = active.to(paused) | locked.to(paused) | resting.to(paused) | cooldown.to(paused)
|
|
|
|
def __init__(self, manager: ProfileManager, profile_name: str, initial_value=None):
|
|
self.manager = manager
|
|
self.profile_name = profile_name
|
|
# Call parent constructor with model=self
|
|
super().__init__(model=self)
|
|
# If initial_value is provided, set the current state without triggering transitions
|
|
if initial_value is not None:
|
|
# Convert to uppercase to match state values
|
|
initial_value = initial_value.upper()
|
|
# Check if we're not already in this state
|
|
# Compare case-insensitively to handle any discrepancies
|
|
if self.current_state.value.upper() != initial_value:
|
|
# Find the corresponding state object
|
|
target_state = None
|
|
for state in self.states:
|
|
# Compare both .value and .id case-insensitively
|
|
if state.value.upper() == initial_value or (hasattr(state, 'id') and state.id.upper() == initial_value):
|
|
target_state = state
|
|
break
|
|
if target_state:
|
|
# Set current state without triggering transitions
|
|
self.current_state = target_state
|
|
else:
|
|
# If state not found, log a warning but don't crash
|
|
logger.warning(f"Could not find state '{initial_value}' (case-insensitive) for profile '{profile_name}'. Keeping current state '{self.current_state.value}'.")
|
|
|
|
# --- Action Methods ---
|
|
|
|
def on_enter_locked(self, event_data: event.EventData, owner: Optional[str] = None):
|
|
"""Action executed when entering the LOCKED state."""
|
|
# When re-hydrating a state machine, `owner` will be None. In this case,
|
|
# the profile is already locked, so we should not perform any actions.
|
|
if owner is None:
|
|
return
|
|
|
|
now = time.time()
|
|
profile_key = self.manager._profile_key(self.profile_name)
|
|
pipe = self.manager.redis.pipeline()
|
|
|
|
# Explicitly use Enum value to ensure Uppercase in Redis
|
|
state_val = ProfileState.LOCKED.value
|
|
|
|
pipe.hset(profile_key, mapping={
|
|
'state': state_val,
|
|
'lock_owner': owner,
|
|
'lock_timestamp': str(now),
|
|
'last_used': str(now)
|
|
})
|
|
|
|
# Update state indexes
|
|
if event_data.source:
|
|
pipe.zrem(self.manager._state_key(event_data.source.value), self.profile_name)
|
|
pipe.zadd(self.manager._state_key(state_val), {self.profile_name: now})
|
|
|
|
pipe.execute()
|
|
|
|
logger.info(f"Updated profile '{self.profile_name}' from {event_data.source.value if event_data.source else 'None'} to {state_val}")
|
|
logger.info(f"Locked profile '{self.profile_name}' for owner '{owner}'")
|
|
|
|
def on_enter_cooldown(self, event_data: event.EventData, duration: Optional[int] = None):
|
|
"""Action executed when entering the COOLDOWN state."""
|
|
# When re-hydrating a state machine, `duration` will be None. In this case,
|
|
# the profile is already in cooldown, so we should not perform any actions.
|
|
if duration is None:
|
|
return
|
|
|
|
now = time.time()
|
|
rest_until = now + duration
|
|
profile_key = self.manager._profile_key(self.profile_name)
|
|
pipe = self.manager.redis.pipeline()
|
|
|
|
# Explicitly use Enum value to ensure Uppercase in Redis
|
|
state_val = ProfileState.COOLDOWN.value
|
|
|
|
pipe.hset(profile_key, mapping={
|
|
'state': state_val,
|
|
'rest_until': str(rest_until),
|
|
'rest_reason': 'Post-task cooldown',
|
|
'lock_owner': '',
|
|
'lock_timestamp': '0',
|
|
'last_used': str(now)
|
|
})
|
|
|
|
# Update state indexes
|
|
if event_data.source:
|
|
pipe.zrem(self.manager._state_key(event_data.source.value), self.profile_name)
|
|
pipe.zadd(self.manager._state_key(state_val), {self.profile_name: now})
|
|
|
|
# Remove from locks
|
|
pipe.hdel(self.manager._locks_key(), self.profile_name)
|
|
|
|
pipe.execute()
|
|
|
|
logger.info(f"Updated profile '{self.profile_name}' from {event_data.source.value if event_data.source else 'None'} to {state_val}")
|
|
logger.info(f"Unlocked profile '{self.profile_name}' into COOLDOWN for {duration}s.")
|
|
|
|
def on_enter_active(self, event_data: event.EventData, profile: Optional[dict] = None):
|
|
"""Action executed when entering the ACTIVE state."""
|
|
now = time.time()
|
|
source_state = event_data.source
|
|
profile_key = self.manager._profile_key(self.profile_name)
|
|
pipe = self.manager.redis.pipeline()
|
|
|
|
# Explicitly use Enum value to ensure Uppercase in Redis
|
|
state_val = ProfileState.ACTIVE.value
|
|
|
|
updates = {
|
|
'state': state_val,
|
|
'rest_until': '0',
|
|
'rest_reason': '',
|
|
'reason': '',
|
|
'ban_reason': '',
|
|
'lock_owner': '',
|
|
'lock_timestamp': '0',
|
|
'last_used': str(now),
|
|
'wait_started_at': '0'
|
|
}
|
|
|
|
if source_state and source_state.value in [ProfileState.RESTING.value, ProfileState.COOLDOWN.value]:
|
|
updates['last_rest_timestamp'] = str(now)
|
|
|
|
# Reset counters if activating from a long-term off state, but not from a short cooldown.
|
|
should_reset_counters = False
|
|
if source_state and source_state.value in [ProfileState.BANNED.value, ProfileState.PAUSED.value]:
|
|
should_reset_counters = True
|
|
elif source_state and source_state.value == ProfileState.RESTING.value:
|
|
# For RESTING, only reset if it wasn't just waiting for a slot after a cooldown.
|
|
profile_data = profile or self.manager.redis.hgetall(profile_key)
|
|
is_waiting_after_cooldown = profile_data.get('rest_reason') == "Waiting for group capacity"
|
|
if not is_waiting_after_cooldown:
|
|
should_reset_counters = True
|
|
|
|
if should_reset_counters:
|
|
logger.info(f"Resetting session counters for profile '{self.profile_name}' on activation.")
|
|
updates.update({
|
|
'success_count': '0',
|
|
'failure_count': '0',
|
|
'tolerated_error_count': '0',
|
|
'download_count': '0',
|
|
'download_error_count': '0',
|
|
})
|
|
|
|
pipe.hset(profile_key, mapping=updates)
|
|
|
|
# Update state indexes - this is critical for list_profiles to work correctly
|
|
if source_state:
|
|
pipe.zrem(self.manager._state_key(source_state.value), self.profile_name)
|
|
pipe.zadd(self.manager._state_key(state_val), {self.profile_name: now})
|
|
|
|
# Remove from locks if coming from LOCKED state
|
|
if source_state and source_state.value == ProfileState.LOCKED.value:
|
|
pipe.hdel(self.manager._locks_key(), self.profile_name)
|
|
|
|
# When activating a profile, ensure its proxy is also active.
|
|
proxy_url = self.manager.redis.hget(profile_key, 'proxy')
|
|
if proxy_url:
|
|
logger.debug(f"Ensuring associated proxy '{proxy_url}' is active for profile '{self.profile_name}'.")
|
|
pipe.hset(self.manager._proxy_state_key(proxy_url), mapping={
|
|
'state': ProfileState.ACTIVE.value,
|
|
'rest_until': '0',
|
|
'work_start_timestamp': str(now)
|
|
})
|
|
|
|
pipe.execute()
|
|
logger.info(f"Updated profile '{self.profile_name}' from {source_state.value if source_state else 'None'} to {state_val}")
|
|
|
|
def on_enter_banned(self, event_data: event.EventData, reason: str = ''):
|
|
"""Action executed when entering the BANNED state."""
|
|
now = time.time()
|
|
profile_key = self.manager._profile_key(self.profile_name)
|
|
pipe = self.manager.redis.pipeline()
|
|
|
|
# Explicitly use Enum value to ensure Uppercase in Redis
|
|
state_val = ProfileState.BANNED.value
|
|
|
|
updates = {
|
|
'state': state_val,
|
|
'last_used': str(now),
|
|
'lock_owner': '',
|
|
'lock_timestamp': '0',
|
|
'rest_until': '0',
|
|
'rest_reason': '',
|
|
'wait_started_at': '0'
|
|
}
|
|
if reason:
|
|
updates['ban_reason'] = reason
|
|
updates['reason'] = reason
|
|
|
|
pipe.hset(profile_key, mapping=updates)
|
|
|
|
# Update state indexes
|
|
if event_data.source:
|
|
pipe.zrem(self.manager._state_key(event_data.source.value), self.profile_name)
|
|
pipe.zadd(self.manager._state_key(state_val), {self.profile_name: now})
|
|
|
|
# Remove from locks
|
|
if event_data.source and event_data.source.value == ProfileState.LOCKED.value:
|
|
pipe.hdel(self.manager._locks_key(), self.profile_name)
|
|
|
|
pipe.execute()
|
|
logger.info(f"Updated profile '{self.profile_name}' from {event_data.source.value if event_data.source else 'None'} to {state_val}")
|
|
if reason:
|
|
logger.info(f"Reason for ban: {reason}")
|
|
|
|
def on_enter_resting(self, event_data: event.EventData, reason: str = '', duration_minutes: Optional[int] = None, is_waiting_profile: bool = False, is_rotation: bool = False):
|
|
"""Action executed when entering the RESTING state."""
|
|
now = time.time()
|
|
source_state = event_data.source
|
|
profile_key = self.manager._profile_key(self.profile_name)
|
|
pipe = self.manager.redis.pipeline()
|
|
|
|
# Explicitly use Enum value to ensure Uppercase in Redis
|
|
state_val = ProfileState.RESTING.value
|
|
|
|
updates = {
|
|
'state': state_val,
|
|
'last_used': str(now),
|
|
'lock_owner': '',
|
|
'lock_timestamp': '0',
|
|
}
|
|
|
|
if is_waiting_profile:
|
|
updates['wait_started_at'] = str(now)
|
|
updates['rest_until'] = '0'
|
|
elif duration_minutes == 0:
|
|
updates['rest_until'] = '0'
|
|
else:
|
|
# Default to 1 hour if no duration is provided
|
|
rest_duration_seconds = (duration_minutes * 60) if duration_minutes is not None else 3600
|
|
rest_until = now + rest_duration_seconds
|
|
updates['rest_until'] = str(rest_until)
|
|
|
|
if reason:
|
|
updates['rest_reason'] = reason
|
|
updates['reason'] = reason
|
|
|
|
if is_rotation:
|
|
logger.info(f"Resetting session counters for profile '{self.profile_name}' on rotation.")
|
|
updates.update({
|
|
'success_count': '0',
|
|
'failure_count': '0',
|
|
'tolerated_error_count': '0',
|
|
'download_count': '0',
|
|
'download_error_count': '0',
|
|
})
|
|
|
|
pipe.hset(profile_key, mapping=updates)
|
|
|
|
# Update state indexes - this is critical for the enforcer to see the correct state
|
|
if source_state:
|
|
pipe.zrem(self.manager._state_key(source_state.value), self.profile_name)
|
|
pipe.zadd(self.manager._state_key(state_val), {self.profile_name: now})
|
|
|
|
# Remove from locks if coming from LOCKED state
|
|
if source_state and source_state.value == ProfileState.LOCKED.value:
|
|
pipe.hdel(self.manager._locks_key(), self.profile_name)
|
|
|
|
pipe.execute()
|
|
logger.info(f"Updated profile '{self.profile_name}' from {source_state.value if source_state else 'None'} to {state_val}")
|
|
if reason:
|
|
logger.info(f"Reason for rest: {reason}")
|
|
|
|
def on_enter_paused(self, event_data: event.EventData, reason: str = ''):
|
|
"""Action executed when entering the PAUSED state."""
|
|
now = time.time()
|
|
profile_key = self.manager._profile_key(self.profile_name)
|
|
pipe = self.manager.redis.pipeline()
|
|
|
|
# Explicitly use Enum value to ensure Uppercase in Redis
|
|
state_val = ProfileState.PAUSED.value
|
|
|
|
updates = {
|
|
'state': state_val,
|
|
'last_used': str(now),
|
|
'lock_owner': '',
|
|
'lock_timestamp': '0',
|
|
'rest_until': '0',
|
|
'rest_reason': '',
|
|
'wait_started_at': '0'
|
|
}
|
|
if reason:
|
|
updates['reason'] = reason
|
|
|
|
pipe.hset(profile_key, mapping=updates)
|
|
|
|
# Update state indexes
|
|
if event_data.source:
|
|
pipe.zrem(self.manager._state_key(event_data.source.value), self.profile_name)
|
|
pipe.zadd(self.manager._state_key(state_val), {self.profile_name: now})
|
|
|
|
# Remove from locks
|
|
if event_data.source and event_data.source.value == ProfileState.LOCKED.value:
|
|
pipe.hdel(self.manager._locks_key(), self.profile_name)
|
|
|
|
pipe.execute()
|
|
logger.info(f"Updated profile '{self.profile_name}' from {event_data.source.value if event_data.source else 'None'} to {state_val}")
|
|
if reason:
|
|
logger.info(f"Reason for pause: {reason}")
|