Skip to content

Instantly share code, notes, and snippets.

@alexkutsan
Last active February 10, 2022 07:56
Show Gist options
  • Save alexkutsan/35f944ed18d905aed583db7b763ad035 to your computer and use it in GitHub Desktop.
Save alexkutsan/35f944ed18d905aed583db7b763ad035 to your computer and use it in GitHub Desktop.
import logging
import logging.config
import yaml
import sys
# my_formatter = logging.Formatter("%(asctime)s[%(levelname)s] %(filename)s:%(lineno)d: %(message)s")
#
# output_file_handler = logging.FileHandler("demo.log")
# output_file_handler.setFormatter(my_formatter)
#
# stdout_handler = logging.StreamHandler(sys.stdout)
# stdout_handler.setFormatter(my_formatter)
# stdout_handler.setLevel(logging.ERROR)
#
# log = logging.getLogger(__name__)
# log.setLevel(logging.DEBUG)
# log.addHandler(output_file_handler)
# log.addHandler(stdout_handler)
#
# log.debug("This is DEBUG")
# log.info("This is INFO")
# log.warning("This is WARNING")
# log.error("This is ERROR")
# log.fatal("This is FATAL")
with open(file="./logging_config.yaml", mode='r') as file:
logging_yaml = yaml.load(stream=file, Loader=yaml.FullLoader)
logging.config.dictConfig(config=logging_yaml)
logging.getLogger("telemetry").info("start")
logging.debug("This is DEBUG")
logging.info("This is INFO")
logging.warning("This is WARNING")
logging.error("This is ERROR")
logging.fatal("This is FATAL")
logging.getLogger("telemetry").info("end")
@startuml
class FileService <interface> {
+ write(data) -> filename
+ read(filename) -> data
+ list_files() -> [] of files
+ change_dir()
}
class RawFileService {
__init__(working_dir)
+ write(data) -> filename
+ read(filename) -> data
+ list_files() -> [] of files
+ change_dir()
}
RawFileService -up-|> FileService
class Config <singleton> {
+ signature_algo
+ keys_dir
}
class SignedFileService {
+ write(data) -> filename
+ read(filename) -> data
+ list_files() -> [] of files
+ change_dir()
}
SignedFileService o-up- FileService
SignedFileService -up-|> FileService
class EncryptedFileService {
+ write(data) -> filename
+ read(filename) -> data
+ list_files() -> [] of files
+ change_dir()
}
EncryptedFileService o-up- FileService
EncryptedFileService -up-|> FileService
EncryptedFileService o-left- Encryption
class Encryption <interface> {
encrypt(data) -> encrypted_data, key
decrypt(encrypted_data, key) -> data
}
class SymetricEncryption {
encrypt(data) -> encrypted_data, key
decrypt(encrypted_data, key,) -> data
}
SymetricEncryption -up-|> Encryption
class HybrydEncryption {
encrypt(data) -> encrypted_data, encrypted_key
decrypt(encrypted_data, encrypted_key) -> data
}
HybrydEncryption -up-|> Encryption
@enduml
version: 1
disable_existing_loggers: true
formatters:
simple:
# https://docs.python.org/3/library/logging.html#logrecord-attributes
format: '[%(asctime)s] %(filename)s:%(lineno)d:%(message)s'
short:
format: '%(asctime)s - %(message)s'
handlers:
console:
class: logging.StreamHandler
level: ERROR
formatter: simple
stream: ext://sys.stdout
file:
class: logging.FileHandler
level: DEBUG
formatter: simple
filename: file_server_007.log
telemetry:
class: logging.FileHandler
level: INFO
formatter: short
filename: telemetry.log
loggers:
telemetry:
handlers: [telemetry]
root:
level: DEBUG
handlers: [console,file]
#! /usr/bin/env python3
import argparse
def read_file():
filename = input("Enter file name : ")
print(f"read file : {filename}")
# TODO read file and print result
def create_file():
content = input("Enter file content : ")
print(f"create_file : {content}")
# TODO write content to file
def delete_file():
filename = input("Enter file name : ")
print(f"delete file : {filename}")
# TODO delete file
def list_dir():
print(f"list dir")
pass
# Print content of current working directory
def change_dir():
directory = input("Enter dir name : ")
print(f"change dir : {directory}")
# Change current working directory
def main():
# Create argument parser that will retrieve working directory
commands = {
"get": read_file,
"create": create_file,
"delete": delete_file,
"ls": list_dir,
"cd": change_dir
}
while True:
command = input("Enter command: ")
if command == "exit":
return
if command not in commands:
print("Unknown command")
continue
command = commands[command]
try:
command()
except Exception as ex:
print(f"Error on {command} execution : {ex}")
if __name__ == "__main__":
main()
class Signer(type):
signers = {}
def __new__(cls, classname, parents, attributes):
if "__call__" not in attributes:
raise Exception(f"Signer class must implement {classname}.__call__ function")
signer_class = type(classname, parents, attributes)
if "label" not in attributes:
signer_class.label = classname.lower()
Signer.signers[signer_class.label] = signer_class
return signer_class
@staticmethod
def get_signer(label):
return Signer.signers[label]
class Md5Signer(metaclass=Signer):
label = "md5"
def __call__(self, data):
return hashlib.md5(data.encode()).hexdigest()
class Sha512Signer(metaclass=Signer):
def __call__(self, data):
return hashlib.sha512(data.encode()).hexdigest()
class Sha256Signer(metaclass=Signer):
def __call__(self, data):
return hashlib.sha512(data.encode()).hexdigest()
class Sha128Signer(metaclass=Signer):
def __call__(self, data):
return hashlib.sha128(data.encode()).hexdigest()
class X(metaclass=Signer):
def __call__(self, data):
pass
#
# print(f"Md5Signer.label = {Md5Signer.label}")
# print(f"Sha256Signer.label = {Sha256Signer.label}")
# print(f"X.label = {X.label}")
# print(f"signers = {Signer.get_signer('md5')}")
#
# file_service.create_signed_file("blabla", Sha512Signer())
def read_signed_file(filename):
data = file_service.read_file(filename)
for label in Signer.signers:
sig_filename = f"{filename}.{label}"
if os.path.exists(sig_filename):
signer = Signer.get_signer(label)
with open(sig_filename, "r") as sig_file:
actual_sig = signer(data)
expected_sig = sig_file.read()
if actual_sig == expected_sig:
return data
else:
raise Exception("File broken")
# Task
# Create module crypto
# Create file in module crypto : signature.py
# Create SignatureFactory meta class
# SignatureFactory should check if SignatureClass contains "__call__" function
# SignatureFactory should add label to class if labled is not already specified
# Create 2 signatures algos : md5, sha512
# Create read_signed_file function in file service
# Create create_signed_file function in file service
# Signature should be stored near the file with same name and extension as also label
from src import file_service
from mock import mock_open
import mock
def test_list_dir_succes_flow(mocker):
list_dir_mock = mocker.patch("os.listdir")
list_dir_mock.return_value = ["a"]
res = file_service.list_dir()
list_dir_mock.assert_called_once()
assert res == ["a"]
def test_create_file_success(mocker):
# AAA Arrange Act Assert
mocked_open = mock_open()
mocker.patch("builtins.open", mocked_open, create=True)
mocker.patch("src.utils.random_string").return_value = "test_random_string"
file_service.create_file("blabla")
mocked_open.assert_called_with("test_random_string", "w")
mocked_open().write.assert_called_with("blabla")
def test_create_file_dublicate(mocker):
mocked_open = mock_open()
mocker.patch("builtins.open", mocked_open, create=True)
path_exist_mock = mocker.patch("os.path.exists")
path_exist_call_count = 0
def on_path_exist_call(filename):
nonlocal path_exist_call_count
path_exist_call_count += 1
if path_exist_call_count == 2:
assert filename == "another_random_string"
return False
assert filename == "test_random_string"
return True
path_exist_mock.side_effect = on_path_exist_call
random_string_mock = mocker.patch("src.utils.random_string")
def random_string_side_effect(length):
if len(random_string_mock.mock_calls) == 1:
return "test_random_string"
return "another_random_string"
random_string_mock.side_effect = random_string_side_effect
file_service.create_file("blabla")
assert path_exist_mock.mock_calls == [mock.call("test_random_string"),
mock.call("another_random_string")]
mocked_open.assert_called_with("another_random_string", "w")
mocked_open().write.assert_called_with("blabla")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment