Source code for homecontrol.dependencies.event_engine

"""EventEngine for HomeControl"""

import asyncio
import logging
from collections import defaultdict
from datetime import datetime
from typing import Any, Callable, List, Union

LOGGER = logging.getLogger(__name__)


# pylint: disable=too-few-public-methods
[docs]class Event: """Representation for an Event""" __slots__ = ("event_type", "data", "timestamp", "kwargs") def __init__(self, event_type: str, data: dict = None, timestamp: datetime = None, kwargs: dict = None) -> None: self.event_type = event_type self.data = data or {} self.timestamp = timestamp or datetime.now() self.kwargs = kwargs or {} def __repr__(self) -> str: return f"<Event {self.event_type} kwargs={self.kwargs} {self.data}>"
[docs]class EventEngine: """Dispatcher for events""" def __init__(self, core) -> None: self.core = core self.handlers = defaultdict(set)
[docs] @staticmethod def create_event(event_type: str, data: dict = None, **kwargs) -> Event: """ Creates an Event to be broadcasted """ data = data or {} data.update(kwargs) return Event(event_type, data=data, timestamp=datetime.now())
[docs] def get_event_handlers(self, event: Event) -> List: """ Returns a list of handlers for an Event """ return ( list(self.handlers.get("*", list())) + list(self.handlers.get(event.event_type, list())) )
[docs] def broadcast(self, # lgtm [py/similar-function] event_type: str, data: dict = None, **kwargs) -> List[asyncio.Future]: """ Broadcast an event and return the futures Every listener is a coroutine that will simply receive event and `kwargs` Example: >>> async def on_event(event: Event, ...): >>> return """ event = self.create_event(event_type, data, **kwargs) LOGGER.debug("Event: %s", event) return [asyncio.ensure_future( handler(event, **kwargs), loop=self.core.loop) for handler in self.get_event_handlers(event)]
[docs] def broadcast_threaded(self, # lgtm [py/similar-function] event_type: str, data: dict = None, **kwargs) -> List[asyncio.Task]: """ Same as broadcast BUT - It returns Futures and not Tasks - It uses threads """ event = self.create_event(event_type, data, **kwargs) LOGGER.debug("Event: %s", event) return [asyncio.run_coroutine_threadsafe( handler(event, **kwargs), loop=self.core.loop) for handler in self.get_event_handlers(event)]
[docs] async def gather(self, event_type: str, data: dict = None, timeout: Union[float, int, None] = None, **kwargs) -> List[Any]: """ Broadcast an event and return the results """ tasks = self.broadcast(event_type, data, **kwargs) if not tasks: return [] return await asyncio.wait( tasks, loop=self.core.loop, timeout=timeout)
[docs] def register(self, event: str) -> Callable: """ Decorator to register event handlers """ def _register(coro): self.handlers[event].add(coro) return coro return _register
[docs] def remove_handler(self, event: str, handler: Callable) -> None: """Removes an event handler""" self.handlers[event].discard(handler)