"""StateEngine module"""
import logging
from typing import Any, Callable, Optional
from types import MethodType
import voluptuous as vol
from homecontrol.const import ItemStatus
from homecontrol.exceptions import (
ItemNotOnlineError
)
LOGGER = logging.getLogger(__name__)
[docs]class StateDef:
def __init__(
self,
poll_interval: Optional[float] = None,
default: Any = None,
default_factory: Callable = None) -> None:
self._poll_interval = poll_interval
self._default = default_factory() if default_factory else default
self._getter = None
self._setter = None
self._schema = None
[docs] def setter(self, schema: Optional[vol.Schema] = None) -> Callable:
"""Decorator to register a setter"""
def _setter_decorator(setter_method: Callable) -> Callable:
self._setter = setter_method
self._schema = schema
return setter_method
return _setter_decorator
[docs] def getter(self) -> Callable:
"""Decorator to register a getter"""
def _getter_decorator(getter_method: Callable) -> Callable:
self._getter = getter_method
return getter_method
return _getter_decorator
[docs] def register_state(
self,
state_engine: "StateEngine",
name: str,
item: "homecontrol.dependencies.entity_types.Item") -> "State":
"""Generates a State instance and registers it to a StateEngine"""
state = State(
state_engine,
self._default,
MethodType(self._getter, item) if self._getter else None,
MethodType(self._setter, item) if self._setter else None,
name=name,
poll_interval=self._poll_interval,
schema=self._schema
)
state_engine.register_state(state)
return state
[docs]class StateEngine:
"""Holds the states of an item"""
def __init__(
self,
item: "homecontrol.dependencies.entity_types.Item",
core: "homecontrol.core.Core",
state_defaults: dict = None):
state_defaults = state_defaults or {}
self.item = item
self.core = core
self.states = {}
for name in dir(item):
state_def: StateDef = getattr(item, name)
if name in state_defaults:
# pylint: disable=protected-access
state_def._default = state_defaults[name]
if isinstance(state_def, StateDef):
state_def.register_state(self, name, item)
[docs] def register_state(self, state: "State") -> None:
"""Registers a State instance to the StateEngine"""
self.states[state.name] = state
[docs] async def get(self, state: str):
"""Gets an item's state"""
if state in self.states:
return await self.states[state].get()
[docs] async def set(self, state: str, value) -> dict:
"""Sets an item's state"""
if state in self.states:
return await self.states[state].set(value)
[docs] def check_value(self, state: str, value) -> vol.error.Error:
"""Checks if a value is valid for a state"""
return self.states[state].check_value(value)
[docs] async def update(self, state: str, value):
"""Called from an item to update its state"""
if state in self.states:
return await self.states[state].update(value)
[docs] async def bulk_update(self, **kwargs):
"""Called from an item to update multiple states"""
for state, value in kwargs.items():
await self.states[state].update(value)
[docs] async def dump(self) -> dict:
"""Return a JSON serialisable object"""
return {
name: await self.states[name].get() for name in self.states
}
[docs]class State:
"""Holds one state of an item"""
getter: Callable
setter: Callable
value: Any
mutable: bool
# pylint: disable=too-many-arguments
def __init__(self,
state_engine: StateEngine,
default,
getter: Callable = None,
setter: Callable = None,
name: str = None,
state_type: type = None,
schema: dict = None,
poll_interval: float = None) -> None:
self.value = default if not state_type else state_type(*default)
self.name = name
self.getter = getter
self.setter = setter
self.state_engine = state_engine
self.schema = vol.Schema(schema) if schema else None
self.poll_interval = poll_interval
if self.poll_interval:
self.state_engine.core.tick_engine.tick(
self.poll_interval)(self.poll_value)
[docs] async def poll_value(self) -> None:
"""Polls the current state and updates it"""
if self.state_engine.item.status != ItemStatus.ONLINE:
return None
await self.update(await self.getter())
[docs] async def get(self):
"""Gets a state"""
if self.state_engine.item.status != ItemStatus.ONLINE:
return None
if self.getter and not self.poll_interval:
return await self.getter()
return self.value
[docs] async def set(self, value) -> dict:
"""Sets a state"""
if self.state_engine.item.status != ItemStatus.ONLINE:
raise ItemNotOnlineError(self.state_engine.item.identifier)
if self.schema: # Apply schema to new value
value = self.schema(value)
if self.setter:
result: dict = await self.setter(value)
for state, change in result.items():
self.state_engine.states[state].value = change
self.state_engine.core.event_engine.broadcast(
"state_change", item=self.state_engine.item, changes=result)
LOGGER.debug("State change: %s %s",
self.state_engine.item.identifier, result)
return result
return {}
[docs] def check_value(self, value) -> vol.error.Error:
"""Checks if a value is valid for a state"""
if self.schema:
try:
self.schema(value)
return True
except vol.error.Error as error:
return error
return True
[docs] async def update(self, value):
"""Updates a state"""
if not self.value == value:
self.value = value
self.state_engine.core.event_engine.broadcast(
"state_change", item=self.state_engine.item, changes={
self.name: self.value
})
LOGGER.debug("State change: %s %s",
self.state_engine.item.identifier, {self.name: value})
return True
return False