Created
October 18, 2020 10:02
-
-
Save ncwhale/861018d37f8ea267e2f652b843c750bd to your computer and use it in GitHub Desktop.
A simple aiogram storage impl on ZODBstorage.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import copy | |
import typing | |
import transaction | |
from aiogram.dispatcher.storage import BaseStorage | |
from BTrees.LOBTree import BTree | |
from ZODB import DB | |
class ZODBStorage(BaseStorage): | |
""" | |
Based on in-memory states storage, convert to ZODB. | |
This type of storage is safe for usage in bots, because you will keep all states after restarting. | |
""" | |
async def wait_closed(self): | |
pass | |
async def close(self): | |
self.connection.close() | |
def __init__(self, db: DB, root_key: str = "state"): | |
self.db = db | |
self.transaction_manager = transaction.TransactionManager() | |
self.connection = self.db.open(self.transaction_manager) | |
self.dbroot = self.connection.root() | |
if root_key not in self.dbroot: | |
with self.transaction_manager: | |
self.dbroot[root_key] = BTree() | |
self.data = self.dbroot[root_key] | |
def resolve_address(self, chat, user): | |
chat_id, user_id = map(str, self.check_address(chat=chat, user=user)) | |
with self.transaction_manager as trans: | |
trans.note(u"resolve addresss") | |
if chat_id not in self.data: | |
self.data[chat_id] = BTree() | |
if user_id not in self.data[chat_id]: | |
self.data[chat_id][user_id] = { | |
'state': None, 'data': {}, 'bucket': {}} | |
return chat_id, user_id | |
async def get_state(self, *, | |
chat: typing.Union[str, int, None] = None, | |
user: typing.Union[str, int, None] = None, | |
default: typing.Optional[str] = None) -> typing.Optional[str]: | |
chat, user = self.resolve_address(chat=chat, user=user) | |
return self.data[chat][user]['state'] | |
async def get_data(self, *, | |
chat: typing.Union[str, int, None] = None, | |
user: typing.Union[str, int, None] = None, | |
default: typing.Optional[str] = None) -> typing.Dict: | |
chat, user = self.resolve_address(chat=chat, user=user) | |
return copy.deepcopy(self.data[chat][user]['data']) | |
async def update_data(self, *, | |
chat: typing.Union[str, int, None] = None, | |
user: typing.Union[str, int, None] = None, | |
data: typing.Dict = None, **kwargs): | |
if data is None: | |
data = {} | |
chat, user = self.resolve_address(chat=chat, user=user) | |
with self.transaction_manager: | |
self.data[chat][user]['data'].update(data, **kwargs) | |
async def set_state(self, *, | |
chat: typing.Union[str, int, None] = None, | |
user: typing.Union[str, int, None] = None, | |
state: typing.AnyStr = None): | |
chat, user = self.resolve_address(chat=chat, user=user) | |
with self.transaction_manager: | |
self.data[chat][user]['state'] = state | |
async def set_data(self, *, | |
chat: typing.Union[str, int, None] = None, | |
user: typing.Union[str, int, None] = None, | |
data: typing.Dict = None): | |
chat, user = self.resolve_address(chat=chat, user=user) | |
with self.transaction_manager: | |
self.data[chat][user]['data'] = copy.deepcopy(data) | |
async def reset_state(self, *, | |
chat: typing.Union[str, int, None] = None, | |
user: typing.Union[str, int, None] = None, | |
with_data: typing.Optional[bool] = True): | |
await self.set_state(chat=chat, user=user, state=None) | |
if with_data: | |
await self.set_data(chat=chat, user=user, data={}) | |
def has_bucket(self): | |
return True | |
async def get_bucket(self, *, | |
chat: typing.Union[str, int, None] = None, | |
user: typing.Union[str, int, None] = None, | |
default: typing.Optional[dict] = None) -> typing.Dict: | |
chat, user = self.resolve_address(chat=chat, user=user) | |
return copy.deepcopy(self.data[chat][user]['bucket']) | |
async def set_bucket(self, *, | |
chat: typing.Union[str, int, None] = None, | |
user: typing.Union[str, int, None] = None, | |
bucket: typing.Dict = None): | |
chat, user = self.resolve_address(chat=chat, user=user) | |
with self.transaction_manager: | |
self.data[chat][user]['bucket'] = copy.deepcopy(bucket) | |
async def update_bucket(self, *, | |
chat: typing.Union[str, int, None] = None, | |
user: typing.Union[str, int, None] = None, | |
bucket: typing.Dict = None, **kwargs): | |
if bucket is None: | |
bucket = {} | |
chat, user = self.resolve_address(chat=chat, user=user) | |
self.data[chat][user]['bucket'].update(bucket, **kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment