352 lines
14 KiB
Python

from __future__ import annotations
from enum import Enum
from typing import TYPE_CHECKING, Optional
import time
import logging
from statemachine import StateMachine, State, event
if TYPE_CHECKING:
from .profile_manager_tool import ProfileManager
logger = logging.getLogger(__name__)
class ProfileState(Enum):
"""Enumeration for profile states."""
ACTIVE = "ACTIVE"
LOCKED = "LOCKED"
RESTING = "RESTING"
BANNED = "BANNED"
COOLDOWN = "COOLDOWN"
PAUSED = "PAUSED"
@classmethod
def values(cls):
return [item.value for item in cls]
class ProfileStateMachine(StateMachine):
"""A state machine for managing the lifecycle of a profile."""
# States
# We use lowercase attribute names to match the on_enter_* callback convention (e.g., on_enter_active).
# The value passed to State() is the string stored in Redis (uppercase).
active = State("ACTIVE", initial=True)
locked = State("LOCKED")
resting = State("RESTING")
banned = State("BANNED")
cooldown = State("COOLDOWN")
paused = State("PAUSED")
# Transitions
lock = active.to(locked)
unlock = locked.to(active)
start_cooldown = locked.to(cooldown)
rest = active.to(resting) | locked.to(resting) | cooldown.to(resting)
ban = active.to(banned) | locked.to(banned) | resting.to(banned) | cooldown.to(banned) | paused.to(banned)
activate = resting.to(active) | banned.to(active) | cooldown.to(active) | paused.to(active)
pause = active.to(paused) | locked.to(paused) | resting.to(paused) | cooldown.to(paused)
def __init__(self, manager: ProfileManager, profile_name: str, initial_value=None):
self.manager = manager
self.profile_name = profile_name
# Call parent constructor with model=self
super().__init__(model=self)
# If initial_value is provided, set the current state without triggering transitions
if initial_value is not None:
# Convert to uppercase to match state values
initial_value = initial_value.upper()
# Check if we're not already in this state
# Compare case-insensitively to handle any discrepancies
if self.current_state.value.upper() != initial_value:
# Find the corresponding state object
target_state = None
for state in self.states:
# Compare both .value and .id case-insensitively
if state.value.upper() == initial_value or (hasattr(state, 'id') and state.id.upper() == initial_value):
target_state = state
break
if target_state:
# Set current state without triggering transitions
self.current_state = target_state
else:
# If state not found, log a warning but don't crash
logger.warning(f"Could not find state '{initial_value}' (case-insensitive) for profile '{profile_name}'. Keeping current state '{self.current_state.value}'.")
# --- Action Methods ---
def on_enter_locked(self, event_data: event.EventData, owner: Optional[str] = None):
"""Action executed when entering the LOCKED state."""
# When re-hydrating a state machine, `owner` will be None. In this case,
# the profile is already locked, so we should not perform any actions.
if owner is None:
return
now = time.time()
profile_key = self.manager._profile_key(self.profile_name)
pipe = self.manager.redis.pipeline()
# Explicitly use Enum value to ensure Uppercase in Redis
state_val = ProfileState.LOCKED.value
pipe.hset(profile_key, mapping={
'state': state_val,
'lock_owner': owner,
'lock_timestamp': str(now),
'last_used': str(now)
})
# Update state indexes
if event_data.source:
pipe.zrem(self.manager._state_key(event_data.source.value), self.profile_name)
pipe.zadd(self.manager._state_key(state_val), {self.profile_name: now})
pipe.execute()
logger.info(f"Updated profile '{self.profile_name}' from {event_data.source.value if event_data.source else 'None'} to {state_val}")
logger.info(f"Locked profile '{self.profile_name}' for owner '{owner}'")
def on_enter_cooldown(self, event_data: event.EventData, duration: Optional[int] = None):
"""Action executed when entering the COOLDOWN state."""
# When re-hydrating a state machine, `duration` will be None. In this case,
# the profile is already in cooldown, so we should not perform any actions.
if duration is None:
return
now = time.time()
rest_until = now + duration
profile_key = self.manager._profile_key(self.profile_name)
pipe = self.manager.redis.pipeline()
# Explicitly use Enum value to ensure Uppercase in Redis
state_val = ProfileState.COOLDOWN.value
pipe.hset(profile_key, mapping={
'state': state_val,
'rest_until': str(rest_until),
'rest_reason': 'Post-task cooldown',
'lock_owner': '',
'lock_timestamp': '0',
'last_used': str(now)
})
# Update state indexes
if event_data.source:
pipe.zrem(self.manager._state_key(event_data.source.value), self.profile_name)
pipe.zadd(self.manager._state_key(state_val), {self.profile_name: now})
# Remove from locks
pipe.hdel(self.manager._locks_key(), self.profile_name)
pipe.execute()
logger.info(f"Updated profile '{self.profile_name}' from {event_data.source.value if event_data.source else 'None'} to {state_val}")
logger.info(f"Unlocked profile '{self.profile_name}' into COOLDOWN for {duration}s.")
def on_enter_active(self, event_data: event.EventData, profile: Optional[dict] = None):
"""Action executed when entering the ACTIVE state."""
now = time.time()
source_state = event_data.source
profile_key = self.manager._profile_key(self.profile_name)
pipe = self.manager.redis.pipeline()
# Explicitly use Enum value to ensure Uppercase in Redis
state_val = ProfileState.ACTIVE.value
updates = {
'state': state_val,
'rest_until': '0',
'rest_reason': '',
'reason': '',
'ban_reason': '',
'lock_owner': '',
'lock_timestamp': '0',
'last_used': str(now),
'wait_started_at': '0'
}
if source_state and source_state.value in [ProfileState.RESTING.value, ProfileState.COOLDOWN.value]:
updates['last_rest_timestamp'] = str(now)
# Reset counters if activating from a long-term off state, but not from a short cooldown.
should_reset_counters = False
if source_state and source_state.value in [ProfileState.BANNED.value, ProfileState.PAUSED.value]:
should_reset_counters = True
elif source_state and source_state.value == ProfileState.RESTING.value:
# For RESTING, only reset if it wasn't just waiting for a slot after a cooldown.
profile_data = profile or self.manager.redis.hgetall(profile_key)
is_waiting_after_cooldown = profile_data.get('rest_reason') == "Waiting for group capacity"
if not is_waiting_after_cooldown:
should_reset_counters = True
if should_reset_counters:
logger.info(f"Resetting session counters for profile '{self.profile_name}' on activation.")
updates.update({
'success_count': '0',
'failure_count': '0',
'tolerated_error_count': '0',
'download_count': '0',
'download_error_count': '0',
})
pipe.hset(profile_key, mapping=updates)
# Update state indexes - this is critical for list_profiles to work correctly
if source_state:
pipe.zrem(self.manager._state_key(source_state.value), self.profile_name)
pipe.zadd(self.manager._state_key(state_val), {self.profile_name: now})
# Remove from locks if coming from LOCKED state
if source_state and source_state.value == ProfileState.LOCKED.value:
pipe.hdel(self.manager._locks_key(), self.profile_name)
# When activating a profile, ensure its proxy is also active.
proxy_url = self.manager.redis.hget(profile_key, 'proxy')
if proxy_url:
logger.debug(f"Ensuring associated proxy '{proxy_url}' is active for profile '{self.profile_name}'.")
pipe.hset(self.manager._proxy_state_key(proxy_url), mapping={
'state': ProfileState.ACTIVE.value,
'rest_until': '0',
'work_start_timestamp': str(now)
})
pipe.execute()
logger.info(f"Updated profile '{self.profile_name}' from {source_state.value if source_state else 'None'} to {state_val}")
def on_enter_banned(self, event_data: event.EventData, reason: str = ''):
"""Action executed when entering the BANNED state."""
now = time.time()
profile_key = self.manager._profile_key(self.profile_name)
pipe = self.manager.redis.pipeline()
# Explicitly use Enum value to ensure Uppercase in Redis
state_val = ProfileState.BANNED.value
updates = {
'state': state_val,
'last_used': str(now),
'lock_owner': '',
'lock_timestamp': '0',
'rest_until': '0',
'rest_reason': '',
'wait_started_at': '0'
}
if reason:
updates['ban_reason'] = reason
updates['reason'] = reason
pipe.hset(profile_key, mapping=updates)
# Update state indexes
if event_data.source:
pipe.zrem(self.manager._state_key(event_data.source.value), self.profile_name)
pipe.zadd(self.manager._state_key(state_val), {self.profile_name: now})
# Remove from locks
if event_data.source and event_data.source.value == ProfileState.LOCKED.value:
pipe.hdel(self.manager._locks_key(), self.profile_name)
pipe.execute()
logger.info(f"Updated profile '{self.profile_name}' from {event_data.source.value if event_data.source else 'None'} to {state_val}")
if reason:
logger.info(f"Reason for ban: {reason}")
def on_enter_resting(self, event_data: event.EventData, reason: str = '', duration_minutes: Optional[int] = None, is_waiting_profile: bool = False, is_rotation: bool = False):
"""Action executed when entering the RESTING state."""
now = time.time()
source_state = event_data.source
profile_key = self.manager._profile_key(self.profile_name)
pipe = self.manager.redis.pipeline()
# Explicitly use Enum value to ensure Uppercase in Redis
state_val = ProfileState.RESTING.value
updates = {
'state': state_val,
'last_used': str(now),
'lock_owner': '',
'lock_timestamp': '0',
}
if is_waiting_profile:
updates['wait_started_at'] = str(now)
updates['rest_until'] = '0'
elif duration_minutes == 0:
updates['rest_until'] = '0'
else:
# Default to 1 hour if no duration is provided
rest_duration_seconds = (duration_minutes * 60) if duration_minutes is not None else 3600
rest_until = now + rest_duration_seconds
updates['rest_until'] = str(rest_until)
if reason:
updates['rest_reason'] = reason
updates['reason'] = reason
if is_rotation:
logger.info(f"Resetting session counters for profile '{self.profile_name}' on rotation.")
updates.update({
'success_count': '0',
'failure_count': '0',
'tolerated_error_count': '0',
'download_count': '0',
'download_error_count': '0',
})
pipe.hset(profile_key, mapping=updates)
# Update state indexes - this is critical for the enforcer to see the correct state
if source_state:
pipe.zrem(self.manager._state_key(source_state.value), self.profile_name)
pipe.zadd(self.manager._state_key(state_val), {self.profile_name: now})
# Remove from locks if coming from LOCKED state
if source_state and source_state.value == ProfileState.LOCKED.value:
pipe.hdel(self.manager._locks_key(), self.profile_name)
pipe.execute()
logger.info(f"Updated profile '{self.profile_name}' from {source_state.value if source_state else 'None'} to {state_val}")
if reason:
logger.info(f"Reason for rest: {reason}")
def on_enter_paused(self, event_data: event.EventData, reason: str = ''):
"""Action executed when entering the PAUSED state."""
now = time.time()
profile_key = self.manager._profile_key(self.profile_name)
pipe = self.manager.redis.pipeline()
# Explicitly use Enum value to ensure Uppercase in Redis
state_val = ProfileState.PAUSED.value
updates = {
'state': state_val,
'last_used': str(now),
'lock_owner': '',
'lock_timestamp': '0',
'rest_until': '0',
'rest_reason': '',
'wait_started_at': '0'
}
if reason:
updates['reason'] = reason
pipe.hset(profile_key, mapping=updates)
# Update state indexes
if event_data.source:
pipe.zrem(self.manager._state_key(event_data.source.value), self.profile_name)
pipe.zadd(self.manager._state_key(state_val), {self.profile_name: now})
# Remove from locks
if event_data.source and event_data.source.value == ProfileState.LOCKED.value:
pipe.hdel(self.manager._locks_key(), self.profile_name)
pipe.execute()
logger.info(f"Updated profile '{self.profile_name}' from {event_data.source.value if event_data.source else 'None'} to {state_val}")
if reason:
logger.info(f"Reason for pause: {reason}")