"""ModuleManager module"""
import asyncio
import importlib
import importlib.util
import logging
import os
import sys
from typing import TYPE_CHECKING, Dict
import pkg_resources
import homecontrol
import voluptuous as vol
from homecontrol.const import EVENT_MODULE_LOADED
from homecontrol.dependencies.ensure_pip_requirements import ensure_packages
from homecontrol.dependencies.entity_types import Module
from homecontrol.dependencies.yaml_loader import YAMLLoader
from homecontrol.exceptions import PipInstallError
if TYPE_CHECKING:
from homecontrol.core import Core
LOGGER = logging.getLogger(__name__)
CONFIG_SCHEMA = vol.Schema({
vol.Required("folders", default=[]): [str],
vol.Required("blacklist", default=[]): [str],
vol.Required("whitelist", default=[]): [str],
vol.Required("install-pip-requirements", default=True): bool,
vol.Required("load-internal-modules", default=True): bool
})
# pylint: disable=too-few-public-methods
[docs]class ModuleFolder:
"""
module folder representation to create
a dummy package in sys.modules
"""
def __init__(self, name: str) -> None:
self.__name__ = name
[docs]class ModuleAccessor:
"""Wrapper for ModuleManager.loaded_modules"""
def __init__(self, module_manager: "ModuleManager") -> None:
self._module_manager = module_manager
def __getattr__(self, name: str):
return self._module_manager.loaded_modules.get(name)
[docs]class ModuleManager:
"""Manages your modules"""
cfg: dict
loaded_modules: Dict[str, Module]
module_accessor: ModuleAccessor
def __init__(self, core: "Core"):
self.core = core
self.loaded_modules = {}
self.module_accessor = ModuleAccessor(self)
[docs] async def init(self) -> None:
"""Initialise the modules"""
self.cfg = await self.core.cfg.register_domain(
"module-manager",
schema=CONFIG_SCHEMA,
allow_reload=False)
if self.cfg["load-internal-modules"]:
internal_module_folder = pkg_resources.resource_filename(
homecontrol.__name__, "modules")
await self.load_folder(internal_module_folder)
for folder in self.cfg["folders"]:
await self.load_folder(folder)
[docs] async def load_folder(self, path: str) -> [object]:
"""Load every module in a folder"""
load_tasks = []
blacklist = self.cfg["blacklist"]
whitelist = self.cfg["whitelist"]
package_name = f"homecontrol_{os.path.basename(path)}"
sys.modules[package_name] = ModuleFolder(package_name)
for node in os.listdir(path):
if node.startswith("__"):
continue
mod_path = os.path.join(path, node)
mod_name = node if os.path.isdir(
node) else ".".join(os.path.splitext(node)[:-1])
if ((mod_name not in blacklist)
and (not whitelist or mod_name in whitelist)):
if os.path.isdir(mod_path):
load_tasks.append(self.core.loop.create_task(
self.load_folder_module(mod_path, mod_name)))
elif os.path.isfile(mod_path) and node.endswith(".py"):
load_tasks.append(self.core.loop.create_task(
self.load_file_module(mod_path, mod_name)))
return await asyncio.gather(*load_tasks)
[docs] async def load_file_module(self,
mod_path: str,
name: str) -> (Module, Exception):
"""
Loads a module from a file and initialises it
Returns a Module object
"""
try:
assert os.path.isfile(mod_path)
except AssertionError as error:
LOGGER.warning(
"Module could not be loaded: %s at %s", name, mod_path)
self.core.event_engine.broadcast(
"module_not_loaded", exception=error)
return error
mod_spec = importlib.util.spec_from_file_location(name, mod_path)
mod = importlib.util.module_from_spec(mod_spec)
mod.resource_folder = None
mod.event = self.core.event_engine.register
mod_spec.loader.exec_module(mod)
if not hasattr(mod, "Module"):
mod.Module = type("Module_" + name, (Module,), {})
else:
mod.Module = type("Module_" + name, (mod.Module, Module), {})
spec = getattr(mod, "SPEC", {})
return await self._load_module_object(spec, name, mod_path, mod)
[docs] async def load_folder_module(self,
path: str,
name: str) -> (Module, Exception):
"""
Loads a module from a folder and initialises it
It also takes care of pip requirements
Returns a Module object
"""
mod_path = os.path.join(path, "module.py")
spec_path = os.path.join(path, "module.yaml")
parent_path = os.path.dirname(path)
try:
assert os.path.isdir(path)
assert os.path.isfile(mod_path)
except AssertionError as error:
LOGGER.warning("Module could not be loaded: %s at %s", name, path)
self.core.event_engine.broadcast(
"module_not_loaded", exception=error, name=name)
return error
spec = (YAMLLoader.load(open(spec_path))
if os.path.isfile(spec_path) else {})
try:
ensure_packages(spec.get("pip-requirements", []))
ensure_packages(
spec.get("pip-test-requirements", []), test_index=True)
except PipInstallError as e:
LOGGER.warning(
"Module could not be loaded: %s at %s", name, path)
self.core.event_engine.broadcast(
"module_not_loaded",
exception=e,
name=name)
return
mod_name = f"homecontrol_{os.path.basename(parent_path)}.{name}"
mod_spec = importlib.util.spec_from_file_location(
name, mod_path, submodule_search_locations=[path])
mod = importlib.util.module_from_spec(mod_spec)
mod.__package__ = mod_name
sys.modules[mod_name] = mod
mod.SPEC = spec
mod.resource_folder = path
mod_spec.loader.exec_module(mod)
return await self._load_module_object(mod.SPEC, name, path, mod)
async def _load_module_object(self,
spec: dict,
name: str,
path: str,
mod) -> Module:
"""
Initialises a module object
This method should only be invoked by ModuleManager
"""
if hasattr(mod, "_setup_module"):
mod._setup_module(self.core) # pylint: disable=protected-access
if not hasattr(mod, "Module"):
mod.Module = type("Module_" + name, (Module,), {})
else:
mod.Module = type("Module_" + name, (mod.Module, Module), {})
mod_obj = mod.Module.__new__(mod.Module)
mod_obj.core = self.core
mod_obj.resource_folder = mod.resource_folder
mod_obj.name = name
mod_obj.path = path
mod_obj.item_specs = {}
mod_obj.mod = mod
mod_obj.spec = spec
mod_obj.__init__()
self.loaded_modules[name] = mod_obj
await self.core.item_manager.add_from_module(mod_obj)
if hasattr(mod_obj, "init"):
await mod_obj.init()
self.core.event_engine.broadcast(EVENT_MODULE_LOADED, module=mod_obj)
return mod_obj
[docs] async def stop(self) -> None:
"""Unloads all modules to prepare for a shutdown"""
return await asyncio.gather(*(
module.stop() for module in self.loaded_modules.values()))
# pylint: disable=no-self-use
[docs] def resource_path(self, module: Module, path: str = "") -> str:
"""
Returns the path for a module's resource folder
Note that only folder modules can have a resource path
"""
path = os.path.join(module.resource_folder, path)
if os.path.exists(path):
return path
raise FileNotFoundError(f"Resource path {path} does not exist")