Created
November 18, 2019 11:34
-
-
Save prostomarkeloff/659221a805bb29e463b0307d3b312de0 to your computer and use it in GitHub Desktop.
uvkpy best code ever......
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Class-based-handlers with states, inheritance and other features. | |
""" | |
import asyncio | |
import typing | |
import inspect | |
import functools | |
from enum import Enum, auto | |
from vk.bot_framework.dispatcher.handler import BaseHandler as BaseHandlerForDispatcher | |
from vk.bot_framework.dispatcher.rule import BaseRule | |
from vk.bot_framework.dispatcher.handler import SkipHandler | |
from vk.bot_framework.dispatcher import data_ | |
from vk.types import BotEvent | |
class EventType(Enum): | |
ON_INIT = auto() | |
BEFORE_HANDLER = auto() | |
AFTER_HANDLER = auto() | |
def event_handler(event_type: EventType): | |
assert isinstance(event_type, EventType), "Bad event type" | |
def decorator(func): | |
func._event_type = event_type | |
return func | |
return decorator | |
def _handle_skip_handler(): | |
""" | |
Handle skip handler by hooks or rules :thinking: | |
:return: | |
""" | |
def wrapped(func): | |
async def wrapper(*args, **kwargs): | |
try: | |
result = await func(*args, **kwargs) | |
except SkipHandler: | |
result = False | |
return result | |
return wrapper | |
return wrapped | |
class BaseHandlerMeta(type): | |
def __new__(mcs, name, bases, attrs): | |
on_init_handlers = [] | |
before_handlers = [] | |
after_handlers = [] | |
for k, attr in attrs.items(): | |
if inspect.isfunction(attr): | |
event_type = getattr(attr, "_event_type", None) | |
if not event_type: | |
continue | |
if event_type is EventType.BEFORE_HANDLER: | |
before_handlers.append(attr) | |
elif event_type is EventType.AFTER_HANDLER: | |
after_handlers.append(attr) | |
elif event_type is EventType.ON_INIT: | |
on_init_handlers.append(attr) | |
names = ["_before_handlers", "_after_handlers", "_on_init_handlers"] | |
for name in names: | |
attrs[name] = locals().get(name[1::]) | |
for base in bases: | |
if not hasattr(base, "_after_handlers"): | |
continue | |
for name in names: | |
# at this moment it doesn't work............. | |
getattr(base, name).extend(locals().get(name)) | |
attrs[name] = getattr(base, name) | |
return super().__new__(mcs, name, bases, attrs) | |
class BaseHandler(metaclass=BaseHandlerMeta): | |
pass | |
class UHandler(BaseHandlerForDispatcher): | |
# metainfo for third party addons (like web admin panel) | |
meta = { | |
"name": "UHandler", | |
"description": "Class-Based-Handler implementation", | |
"deprecated": False, | |
"type": "handler", | |
} | |
def __init__( | |
self, event: BotEvent, handler: typing.Callable, rules: typing.List[BaseRule] | |
): | |
self._event = event | |
self._handler = handler | |
self._rules = rules | |
if inspect.isclass(self._handler): | |
self._handler = self._handler() | |
self._handler.__name__ = self._handler.__class__.__name__ | |
@property | |
def event_type(self) -> BotEvent: | |
return self._event | |
@property | |
def handler(self) -> typing.Callable: | |
return self._handler | |
@property | |
def rules(self) -> typing.List[BaseRule]: | |
return self._rules | |
async def _trigger_before_handler_event_handlers(self, event, data: dict) -> dict: | |
for handler in self._handler._before_handlers: | |
handler = getattr(self._handler, handler.__name__) | |
result = await handler(event, data) | |
if isinstance(result, dict): | |
data.update(result) | |
data_.set(data) | |
return data | |
async def _trigger_after_handler_event_handlers( | |
self, event, data: dict, result: typing.Any | |
) -> typing.Any: | |
for handler in self._handler._after_handlers: | |
handler = getattr(self._handler, handler.__name__) | |
result = await handler(event, data, result) | |
return result | |
async def _execute_rules(self, event, data: dict) -> typing.Union[dict, bool]: | |
for rule in self._rules: | |
if not asyncio.iscoroutinefunction(rule) and not isinstance(rule, BaseRule): | |
result = rule(event, data) | |
else: | |
result = await rule(event, data) | |
if not result: | |
return False | |
if isinstance(result, dict): # user want to update the `data_` | |
data.update(result) | |
data_.set(data) | |
return data | |
@_handle_skip_handler() | |
async def execute_handler( | |
self, event, data: dict | |
) -> typing.Union[typing.Any, bool]: | |
# event is `event.object` | |
data = await self._execute_rules(event, data) | |
if data is False: | |
return False | |
have_event_handlers = isinstance(self._handler, BaseHandler) | |
if have_event_handlers: | |
data = await self._trigger_before_handler_event_handlers(event, data) | |
result = await self.handler(event, data) | |
if have_event_handlers: | |
result = await self._trigger_after_handler_event_handlers( | |
event, data, result | |
) | |
return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
don't see to that please.....