Skip to content

Instantly share code, notes, and snippets.

View alksndrglk's full-sized avatar

Aleksandr Goliak alksndrglk

  • Russia, Saint-Petersburg
View GitHub Profile
@alksndrglk
alksndrglk / thread_uow.py
Created January 15, 2024 06:31
session in thread
transmission = sessionmaker(db_transmission1c)
class Handler(Thread):
def __init__(self, q):
Thread.__init__(self)
self.q: Queue = q
def run(self):
def process(session):
# --- queries.py ----
async def get_by_email(self, email: str) -> typing.Union[Admin, None]:
async with self.app.database.session() as session:
admin = await session.execute(
select(AdminModel).where(AdminModel.email == email)
)
if admin:
for admin in admin.scalars():
return Admin(
id=admin.id, email=admin.email, password=admin.password
#----client
import grpc
import sbercorus_mq_pb2
import sbercorus_mq_pb2_grpc
import asyncio
async def get_client_stream_requests():
while True:
from datetime import date
from dateutil.relativedelta import relativedelta
from itertools import islice
def payment_day(start, period=1):
last_payment_day = start
while True:
next_payment_day = last_payment_day + relativedelta(months=period)
yield next_payment_day.strftime("%d-%m-%Y")
last_payment_day = next_payment_day
apiVersion: batch/v1
kind: Job
metadata:
name: minio-event
spec:
template:
spec:
containers:
- name: minio-event-configuration
image: minio/mc
def factorial(k):
def gen(n):
if n == 1:
yield 1
else:
for u in gen(n - 1):
continue
yield u * n
return next(gen(k))