Skip to content

Instantly share code, notes, and snippets.

@ncwhale
Created October 18, 2020 10:02
Show Gist options
  • Save ncwhale/861018d37f8ea267e2f652b843c750bd to your computer and use it in GitHub Desktop.
Save ncwhale/861018d37f8ea267e2f652b843c750bd to your computer and use it in GitHub Desktop.
A simple aiogram storage impl on ZODBstorage.
import copy
import typing
import transaction
from aiogram.dispatcher.storage import BaseStorage
from BTrees.LOBTree import BTree
from ZODB import DB
class ZODBStorage(BaseStorage):
"""
Based on in-memory states storage, convert to ZODB.
This type of storage is safe for usage in bots, because you will keep all states after restarting.
"""
async def wait_closed(self):
pass
async def close(self):
self.connection.close()
def __init__(self, db: DB, root_key: str = "state"):
self.db = db
self.transaction_manager = transaction.TransactionManager()
self.connection = self.db.open(self.transaction_manager)
self.dbroot = self.connection.root()
if root_key not in self.dbroot:
with self.transaction_manager:
self.dbroot[root_key] = BTree()
self.data = self.dbroot[root_key]
def resolve_address(self, chat, user):
chat_id, user_id = map(str, self.check_address(chat=chat, user=user))
with self.transaction_manager as trans:
trans.note(u"resolve addresss")
if chat_id not in self.data:
self.data[chat_id] = BTree()
if user_id not in self.data[chat_id]:
self.data[chat_id][user_id] = {
'state': None, 'data': {}, 'bucket': {}}
return chat_id, user_id
async def get_state(self, *,
chat: typing.Union[str, int, None] = None,
user: typing.Union[str, int, None] = None,
default: typing.Optional[str] = None) -> typing.Optional[str]:
chat, user = self.resolve_address(chat=chat, user=user)
return self.data[chat][user]['state']
async def get_data(self, *,
chat: typing.Union[str, int, None] = None,
user: typing.Union[str, int, None] = None,
default: typing.Optional[str] = None) -> typing.Dict:
chat, user = self.resolve_address(chat=chat, user=user)
return copy.deepcopy(self.data[chat][user]['data'])
async def update_data(self, *,
chat: typing.Union[str, int, None] = None,
user: typing.Union[str, int, None] = None,
data: typing.Dict = None, **kwargs):
if data is None:
data = {}
chat, user = self.resolve_address(chat=chat, user=user)
with self.transaction_manager:
self.data[chat][user]['data'].update(data, **kwargs)
async def set_state(self, *,
chat: typing.Union[str, int, None] = None,
user: typing.Union[str, int, None] = None,
state: typing.AnyStr = None):
chat, user = self.resolve_address(chat=chat, user=user)
with self.transaction_manager:
self.data[chat][user]['state'] = state
async def set_data(self, *,
chat: typing.Union[str, int, None] = None,
user: typing.Union[str, int, None] = None,
data: typing.Dict = None):
chat, user = self.resolve_address(chat=chat, user=user)
with self.transaction_manager:
self.data[chat][user]['data'] = copy.deepcopy(data)
async def reset_state(self, *,
chat: typing.Union[str, int, None] = None,
user: typing.Union[str, int, None] = None,
with_data: typing.Optional[bool] = True):
await self.set_state(chat=chat, user=user, state=None)
if with_data:
await self.set_data(chat=chat, user=user, data={})
def has_bucket(self):
return True
async def get_bucket(self, *,
chat: typing.Union[str, int, None] = None,
user: typing.Union[str, int, None] = None,
default: typing.Optional[dict] = None) -> typing.Dict:
chat, user = self.resolve_address(chat=chat, user=user)
return copy.deepcopy(self.data[chat][user]['bucket'])
async def set_bucket(self, *,
chat: typing.Union[str, int, None] = None,
user: typing.Union[str, int, None] = None,
bucket: typing.Dict = None):
chat, user = self.resolve_address(chat=chat, user=user)
with self.transaction_manager:
self.data[chat][user]['bucket'] = copy.deepcopy(bucket)
async def update_bucket(self, *,
chat: typing.Union[str, int, None] = None,
user: typing.Union[str, int, None] = None,
bucket: typing.Dict = None, **kwargs):
if bucket is None:
bucket = {}
chat, user = self.resolve_address(chat=chat, user=user)
self.data[chat][user]['bucket'].update(bucket, **kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment