Last active
April 27, 2024 02:23
-
-
Save YuriFontella/59735d03c71a1d9e2e4a37e7e8cfda37 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
from pathlib import Path | |
from contextlib import asynccontextmanager | |
from enum import Enum | |
from litestar import Litestar, get, post, Controller, Request, Response, Router, WebSocket, websocket, MediaType | |
from litestar.types import ASGIApp, Scope, Receive, Send | |
from litestar.datastructures import State | |
from litestar.di import Provide | |
from litestar.events import listener | |
from litestar.exceptions import HTTPException | |
from litestar.status_codes import HTTP_500_INTERNAL_SERVER_ERROR | |
from litestar.config.cors import CORSConfig | |
from litestar.config.csrf import CSRFConfig | |
from litestar.handlers.websocket_handlers import websocket_listener | |
from litestar.static_files import create_static_files_router | |
from litestar.stores.memory import MemoryStore | |
from litestar.middleware.rate_limit import RateLimitConfig | |
from litestar.connection import ASGIConnection | |
from litestar.handlers.base import BaseRouteHandler | |
from litestar.exceptions import NotAuthorizedException | |
from litestar.channels import ChannelsPlugin | |
from litestar.channels.backends.memory import MemoryChannelsBackend | |
from litestar.logging import LoggingConfig | |
from litestar.config.allowed_hosts import AllowedHostsConfig | |
from dataclasses import dataclass | |
from pydantic import BaseModel | |
cors_config = CORSConfig(allow_origins=['*']) | |
csrf_config = CSRFConfig(secret='secret') | |
rate_limit_config = RateLimitConfig(rate_limit=('second', 100)) | |
logging_config = LoggingConfig( | |
root={'level': logging.getLevelName(logging.INFO), 'handlers': ['console']}, | |
formatters={ | |
'standard': {'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'} | |
} | |
) | |
store = MemoryStore() | |
channels_plugin = ChannelsPlugin(backend=MemoryChannelsBackend(), channels=['messages'], create_ws_route_handlers=True) | |
FILES_DIR = Path('files') | |
class UserRole(str, Enum): | |
CONSUMER = 'consumer' | |
ADMIN = 'admin' | |
class User(BaseModel): | |
name: str | |
role: UserRole | |
@property | |
def is_admin(self) -> bool: | |
return self.role == UserRole.ADMIN | |
def app_exception_handler(request: Request, exc: HTTPException) -> Response: | |
print(exc.detail) | |
return Response( | |
content={ | |
'error': 'server error', | |
'path': request.url.path, | |
'detail': exc.detail, | |
'status_code': exc.status_code, | |
}, | |
status_code=exc.status_code | |
) | |
def internal_server_error_handler(request: Request, exc: Exception) -> Response: | |
return Response( | |
media_type=MediaType.TEXT, | |
content=f"server error: {exc}", | |
status_code=500, | |
) | |
def middleware_factory(app: ASGIApp) -> ASGIApp: | |
async def my_middleware(scope: Scope, receive: Receive, send: Send) -> None: | |
print(scope.get('headers')) | |
print('hello middleware') | |
scope['user'] = User(name='yuri cazzeri fontella', role=UserRole.ADMIN) | |
await app(scope, receive, send) | |
return my_middleware | |
def admin_user_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None: | |
print('connection', connection.user.is_admin) | |
if not connection.user.is_admin: | |
raise NotAuthorizedException() | |
def before_request(request: Request): | |
print(request.query_params) | |
print('before_request') | |
def after_request(response: Response): | |
print('cookies', response.cookies) | |
print('after_request') | |
return Response({'message': response.content}) | |
def on_startup(app: Litestar) -> None: | |
app.state.me = {'name': 'Yuri Fontella'} | |
print('on_startup') | |
FILES_DIR.mkdir(exist_ok=True) | |
FILES_DIR.joinpath('hello.txt').write_text('hello') | |
def on_shutdown(): | |
print('on_shutdown') | |
async def on_deps(): | |
print('hello dependecies injection') | |
return True | |
@listener('send_email') | |
async def on_listener(email: str) -> None: | |
await asyncio.sleep(10) | |
print('hello listener', email) | |
@get('/csrf') | |
async def on_csrf(request: Request) -> bool: | |
print(request.cookies) | |
print(request.headers) | |
return True | |
@get('/exception') | |
async def on_exception() -> None: | |
raise HTTPException('oh no!') | |
@get(path='/', before_request=before_request, after_request=after_request) | |
async def on_get(params: str) -> bool: | |
print(params) | |
return True | |
@post('/', dependencies={'deps': Provide(on_deps)}, exclude_from_csrf=True) | |
async def on_post(state: State, data: dict, deps: bool, request: Request, scope: Scope) -> bool: | |
print(data) | |
print(state.dict()) | |
print(scope.get('user')) | |
app = request.app | |
app.emit('send_email', email='yuri@gmail.com') | |
return deps | |
class IndexController(Controller): | |
@get('/controller') | |
async def on_users(self) -> bool: | |
return False | |
@dataclass | |
class UserRoute: | |
id: int | |
name: str | |
@get('/user', cache=2) | |
async def on_user(request: Request) -> UserRoute: | |
request.logger.error('oh no') | |
request.logger.info('yeah') | |
user = UserRoute(id=1, name='yuri') | |
print(user.name) | |
return user | |
@asynccontextmanager | |
async def lifespan(_: Litestar): | |
print('hello') | |
yield | |
print('bye') | |
@websocket_listener('/websocket_listener') | |
async def on_websocket_listener(data: dict) -> dict: | |
print(data) | |
return data | |
@get('/store/set') | |
async def on_store_set(name: str) -> bool: | |
await store.set('name', name) | |
return True | |
@get('/store/get') | |
async def on_store_get() -> dict: | |
name = await store.get('name') | |
print(name) | |
return {'name': name.decode('utf-8')} | |
@post(path='/guard', guards=[admin_user_guard], exclude_from_csrf=True) | |
async def on_guard() -> bool: | |
return True | |
@get('/subscribe') | |
async def on_subscribe(channels: ChannelsPlugin) -> bool: | |
await channels.subscribe(['messages']) | |
return True | |
@get('/publish') | |
async def on_publish(channels: ChannelsPlugin, text: str) -> bool: | |
channels.publish(data={'text': text}, channels=['messages']) | |
return True | |
@websocket('/websocket') | |
async def on_websocket(socket: WebSocket) -> None: | |
await socket.accept() | |
while True: | |
response = await socket.receive_json() | |
print(response) | |
await socket.send_json(response) | |
index_controller = Router('/index', route_handlers=[IndexController]) | |
app = Litestar( | |
route_handlers=[ | |
index_controller, | |
on_get, | |
on_post, | |
on_user, | |
on_exception, | |
on_csrf, | |
on_store_set, | |
on_store_get, | |
on_websocket_listener, | |
on_guard, | |
on_publish, | |
on_subscribe, | |
on_websocket, | |
create_static_files_router(path='/static', directories=['files'], send_as_attachment=True) | |
], | |
on_startup=[on_startup], | |
on_shutdown=[on_shutdown], | |
listeners=[on_listener], | |
exception_handlers={ | |
HTTPException: app_exception_handler, | |
HTTP_500_INTERNAL_SERVER_ERROR: internal_server_error_handler | |
}, | |
middleware=[middleware_factory, rate_limit_config.middleware], | |
cors_config=cors_config, | |
csrf_config=csrf_config, | |
logging_config=logging_config, | |
lifespan=[lifespan], | |
plugins=[channels_plugin], | |
openapi_config=None, | |
debug=False, | |
allowed_hosts=AllowedHostsConfig( | |
allowed_hosts=["*"] | |
) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment