Source code for homecontrol.dependencies.item_manager

"""ItemManager for HomeControl"""
import asyncio
import logging
from inspect import isclass
from typing import Optional

from attr import asdict, attrib, attrs

import voluptuous as vol
from homecontrol.const import (EVENT_ITEM_CREATED, EVENT_ITEM_NOT_WORKING,
                               EVENT_ITEM_REMOVED, ItemStatus)
from homecontrol.dependencies.entity_types import Item, Module
from homecontrol.dependencies.storage import Storage

LOGGER = logging.getLogger(__name__)

CONFIG_SCHEMA = vol.Schema([
    vol.Schema({
        vol.Required("id"): str,
        vol.Required("type"): str,
        vol.Optional("name"): str,
        vol.Optional("cfg"): dict,
        vol.Required("states", default={}): dict,
        vol.Required("enabled", default=True): bool
    }, extra=vol.ALLOW_EXTRA)
])


# pylint: disable=too-few-public-methods
[docs]@attrs(slots=True) class StorageEntry: """The storage representation of an item""" unique_identifier: str = attrib() type: str = attrib() provider: str = attrib() enabled: bool = attrib(default=True) identifier: str = attrib(default=None) state_defaults: dict = attrib(factory=lambda: {}) cfg: dict = attrib(factory=lambda: {}) name: str = attrib(default=None) hidden: bool = attrib(default=False) def __attrs_post_init__(self): self.identifier = self.identifier or self.unique_identifier self.name = self.name or self.identifier
[docs]def yaml_entry_to_storage_entry(yaml_entry: dict) -> StorageEntry: """Converts a yaml entry to a JSON entry""" if "cfg" in yaml_entry: cfg = yaml_entry["cfg"] else: cfg = yaml_entry.copy() for key in ("id", "type", "name", "states", "enabled"): cfg.pop(key, None) return StorageEntry( cfg=cfg, enabled=yaml_entry["enabled"], identifier=yaml_entry["id"], unique_identifier="yaml_" + yaml_entry["id"], type=yaml_entry["type"], state_defaults=yaml_entry["states"], provider="yaml", name=yaml_entry.get("name", yaml_entry["id"]), hidden=False )
[docs]class ItemManager: """ ItemManager manages all your stateful items """ def __init__(self, core): self.core = core self.items = {} self.item_constructors = {} self.storage = Storage( self.core, "items", 1, storage_init=lambda: {}, loader=self._load_items, dumper=self._dump_items ) self.item_config = self.storage.load_data()
[docs] async def init(self) -> None: """Initialise the items from configuration""" self.yaml_cfg = await self.core.cfg.register_domain( "items", schema=CONFIG_SCHEMA, default=[]) self.load_yaml_config() for storage_entry in self.item_config.values(): if not storage_entry.enabled: continue await self.create_from_storage_entry(storage_entry)
[docs] def load_yaml_config(self) -> None: """Loads the YAML configuration""" for key in tuple(self.item_config.keys()): if self.item_config[key].provider == "yaml": del self.item_config[key] for yaml_entry in self.yaml_cfg: storage_entry = yaml_entry_to_storage_entry(yaml_entry) self.item_config[ storage_entry.unique_identifier] = storage_entry self.storage.schedule_save(self.item_config)
[docs] def get_storage_entry(self, unique_identifier: str) -> StorageEntry: """Returns the StorageEntry for a unique_identifier""" return self.item_config.get(unique_identifier, None)
[docs] def update_storage_entry(self, entry: StorageEntry) -> None: """Updates a config storage entry""" self.item_config[entry.unique_identifier] = entry self.storage.schedule_save(self.item_config)
def _load_items(self, data: dict) -> dict: # pylint: disable=no-self-use entries = {} for entry in data: entries[entry["unique_identifier"]] = StorageEntry(**entry) return entries def _dump_items(self, data: dict) -> dict: # pylint: disable=no-self-use return [asdict(entry) for entry in data.values()]
[docs] async def add_from_module(self, mod_obj: Module) -> None: """ Adds the item specifications of a module to the dict of available ones mod_obj: homecontrol.entity_types.Module """ for attribute in dir(mod_obj.mod): item_class = getattr(mod_obj.mod, attribute) if (isclass(item_class) and issubclass(item_class, Item) and item_class is not Item): item_class.module = mod_obj item_class.type = f"{mod_obj.name}.{item_class.__name__}" self.item_constructors[ item_class.type] = item_class.constructor
[docs] def iter_items_by_id(self, iterable) -> [Item]: """Translates item identifiers into item instances""" for identifier in iterable: if identifier in self.items: yield self.items[identifier]
[docs] def get_by_unique_identifier(self, unique_identifier: str) -> Item: """Returns an item by its unique identifier""" for item in self.items.values(): if item.unique_identifier == unique_identifier: return item return None
[docs] def get_item(self, identifier: str) -> Item: """Returns an item by identifier or unique_identifier""" return (self.items.get(identifier, None) or self.get_by_unique_identifier(identifier))
[docs] async def stop_item(self, item: Item, status: ItemStatus = ItemStatus.STOPPED) -> None: """Stops an item""" if item.status is not ItemStatus.ONLINE: return await item.stop() LOGGER.info("Item %s has been stopped with status %s", item.identifier, status) item.status = status
[docs] async def stop(self) -> None: """Stops every item""" await asyncio.gather(*( self.stop_item(item) for item in self.items.values()))
[docs] async def remove_item(self, identifier: str) -> None: """ Removes a HomeControl item identifier: str The item's identifier """ if identifier not in self.items: LOGGER.info( "Item %s does not exist so it could not be removed", identifier) return item = self.items[identifier] if item.status == ItemStatus.ONLINE: await self.stop_item(item) del self.items[identifier] self.core.event_engine.broadcast(EVENT_ITEM_REMOVED, item=item) LOGGER.info("Item %s has been removed", identifier)
[docs] async def create_from_storage_entry( self, storage_entry: StorageEntry) -> Item: """Creates an Item from a storage entry""" return await self.create_item( identifier=storage_entry.identifier, unique_identifier=storage_entry.unique_identifier, name=storage_entry.name, item_type=storage_entry.type, cfg=storage_entry.cfg, state_defaults=storage_entry.state_defaults )
[docs] async def init_item(self, item: Item) -> None: """Initialises an item""" LOGGER.debug("Initialising item %s", item.identifier) try: init_result = await item.init() except Exception: # pylint: disable=broad-except LOGGER.warning("An exception was raised when initialising item %s", item.identifier, exc_info=True) init_result = False # pylint: disable=singleton-comparison if init_result == False: # noqa: E712 item.status = ItemStatus.OFFLINE return item.status = ItemStatus.ONLINE
[docs] async def register_entry( self, storage_entry: StorageEntry, override: bool = False) -> Optional[Item]: """Registers a storage entry""" existing_item = self.get_by_unique_identifier( storage_entry.unique_identifier) existing_entry = self.item_config.get(storage_entry.unique_identifier) if not override and existing_entry: return existing_item if existing_item: await self.remove_item(existing_item.identifier) storage_entry.enabled = existing_item self.item_config[storage_entry.unique_identifier] = storage_entry self.storage.schedule_save(self.item_config) if storage_entry.enabled: return await self.create_from_storage_entry(storage_entry)
# pylint: disable=too-many-arguments,too-many-locals
[docs] async def create_item( self, identifier: str, item_type: str, cfg: dict = None, state_defaults: dict = None, name: str = None, unique_identifier: str = None ) -> Item: """Creates a HomeControl item""" if item_type not in self.item_constructors: LOGGER.error("Item type not found: %s", item_type) return item_constructor = self.item_constructors[item_type] item = await item_constructor( identifier, name, cfg, state_defaults=state_defaults, core=self.core, unique_identifier=unique_identifier or identifier ) self.items[identifier] = item await self.init_item(item) self.core.event_engine.broadcast(EVENT_ITEM_CREATED, item=item) LOGGER.debug("Item created: %s", item.identifier) if item.status != ItemStatus.ONLINE: LOGGER.warning( "Item could not be initialised: %s [%s]", identifier, item_type) self.core.event_engine.broadcast(EVENT_ITEM_NOT_WORKING, item=item) return item