Created
July 25, 2024 08:18
-
-
Save lktslionel/f153d32f274c3038f42e6ffc05f57bcc to your computer and use it in GitHub Desktop.
Railway Programming with Python Returns library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
from functools import partial | |
from typing import Any, Dict, List, Optional | |
from returns.pipeline import flow | |
from returns.pointfree import alt, bind, lash, map | |
from returns.result import Failure, Result, Success | |
from dataclasses import dataclass | |
Metadata = Dict[str, Any] | |
@dataclass(kw_only=True) | |
class NoEntriesFoundError(Exception): | |
count: int | |
def __init__(self, count: int) -> None: | |
self.count = count | |
super().__init__("No entries found") | |
@dataclass(kw_only=True) | |
class NoAttributeIdFoundError(Exception): | |
attrs: Optional[Dict] = None | |
def __init__(self, *, attrs=None) -> None: | |
self.attrs = attrs | |
super().__init__("No attribute 'id' found") | |
@dataclass(kw_only=True) | |
class MetadataEntryNotFound(Exception): | |
attrs: Optional[Dict] = None | |
id: Optional[str] = None | |
def __init__( | |
self, | |
*, | |
id: Optional[str] = None, | |
attrs=None, | |
) -> None: | |
self.id = id | |
self.attrs = attrs | |
super().__init__(f"No metadata entry found with id[{id}]") | |
Error = NoEntriesFoundError | NoAttributeIdFoundError | MetadataEntryNotFound | |
def find_metadata_entries( | |
count: int) -> Result[List[Metadata], NoEntriesFoundError]: | |
#print(f"count: {count}") | |
if count == 0: | |
return Failure(NoEntriesFoundError(count)) | |
metas = [{"id": str(x)} for x in range(count)] | |
#print(f"metas: {metas}") | |
return Success(metas) | |
def filter_metadata_by_id( | |
metas: List[Metadata], | |
attrs: Optional[Dict] = None | |
) -> Result[Metadata, NoAttributeIdFoundError | MetadataEntryNotFound]: | |
#print(f"filter_metadata_by_id.attrs: {attrs}") | |
#print(f"filter_metadata_by_id.metas: {metas}") | |
id = attrs.get("id") if attrs else None | |
if id is None: | |
return Failure(NoAttributeIdFoundError(attrs=attrs)) | |
meta = next(filter(lambda x: x.get("id") == id, metas), None) | |
if not meta: | |
return Failure(MetadataEntryNotFound(id=id, attrs=attrs)) | |
return Success(meta) | |
def fallback_with_a_single_entry( | |
err: NoEntriesFoundError) -> Result[Any, Error]: | |
match err: | |
case NoEntriesFoundError(): | |
print("[FALLBACK]: ValueError: {}".format(err)) | |
print("[FALLBACK]: Trying with one entry") | |
return find_metadata_entries(1) | |
case _: | |
print("[DEBUG]: Exception: {}".format(err)) | |
return Failure(err) | |
def debug_on_failure(err: Failure) -> Result[Any, Exception]: | |
match err: | |
case ValueError(): | |
print("[DEBUG]: err: {}".format(err)) | |
case Exception(): | |
print("[DEBUG]: Exception: {}".format(err)) | |
return err | |
def find_by_attrs( | |
count: int, | |
*, | |
attrs: Optional[Dict[str, Any]] = None) -> Result[Metadata, Exception]: | |
if attrs is None: | |
attrs = {} | |
return flow( | |
count, | |
find_metadata_entries, | |
lash(fallback_with_a_single_entry), | |
alt(debug_on_failure), | |
bind(partial(filter_metadata_by_id, attrs=attrs)), | |
) | |
if __name__ == '__main__': | |
# import pdb | |
# pdb.set_trace() | |
# breakpoint() | |
calls = [ | |
# find_by_attrs(5, attrs={"id": "2"}), | |
find_by_attrs(0), | |
# find_by_attrs( | |
# 1, | |
# attrs={"no": False}, | |
# ), | |
# find_by_attrs( | |
# 2, | |
# attrs={"id": "3"}, | |
# ), | |
] | |
match random.choice(calls): | |
case Failure(err): | |
print(f"Failure: Error: {err}") | |
case Success(meta): | |
print(f"Success: Meta: {meta}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
asttokens==2.4.1 ; python_version >= "3.10" and python_version < "3.12" \ | |
--hash=sha256:051ed49c3dcae8913ea7cd08e46a606dba30b79993209636c4875bc1d637bc24 \ | |
--hash=sha256:b03869718ba9a6eb027e134bfdf69f38a236d681c83c160d510768af11254ba0 | |
colorama==0.4.6 ; python_version >= "3.10" and python_version < "3.12" and sys_platform == "win32" \ | |
--hash=sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44 \ | |
--hash=sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6 | |
decorator==5.1.1 ; python_version >= "3.10" and python_version < "3.12" \ | |
--hash=sha256:637996211036b6385ef91435e4fae22989472f9d571faba8927ba8253acbc330 \ | |
--hash=sha256:b8c3f85900b9dc423225913c5aace94729fe1fa9763b38939a95226f02d37186 | |
exceptiongroup==1.2.2 ; python_version >= "3.10" and python_version < "3.11" \ | |
--hash=sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b \ | |
--hash=sha256:47c2edf7c6738fafb49fd34290706d1a1a2f4d1c6df275526b62cbb4aa5393cc | |
executing==2.0.1 ; python_version >= "3.10" and python_version < "3.12" \ | |
--hash=sha256:35afe2ce3affba8ee97f2d69927fa823b08b472b7b994e36a52a964b93d16147 \ | |
--hash=sha256:eac49ca94516ccc753f9fb5ce82603156e590b27525a8bc32cce8ae302eb61bc | |
ipython==8.26.0 ; python_version >= "3.10" and python_version < "3.12" \ | |
--hash=sha256:1cec0fbba8404af13facebe83d04436a7434c7400e59f47acf467c64abd0956c \ | |
--hash=sha256:e6b347c27bdf9c32ee9d31ae85defc525755a1869f14057e900675b9e8d6e6ff | |
jedi==0.19.1 ; python_version >= "3.10" and python_version < "3.12" \ | |
--hash=sha256:cf0496f3651bc65d7174ac1b7d043eff454892c708a87d1b683e57b569927ffd \ | |
--hash=sha256:e983c654fe5c02867aef4cdfce5a2fbb4a50adc0af145f70504238f18ef5e7e0 | |
matplotlib-inline==0.1.7 ; python_version >= "3.10" and python_version < "3.12" \ | |
--hash=sha256:8423b23ec666be3d16e16b60bdd8ac4e86e840ebd1dd11a30b9f117f2fa0ab90 \ | |
--hash=sha256:df192d39a4ff8f21b1895d72e6a13f5fcc5099f00fa84384e0ea28c2cc0653ca | |
parso==0.8.4 ; python_version >= "3.10" and python_version < "3.12" \ | |
--hash=sha256:a418670a20291dacd2dddc80c377c5c3791378ee1e8d12bffc35420643d43f18 \ | |
--hash=sha256:eb3a7b58240fb99099a345571deecc0f9540ea5f4dd2fe14c2a99d6b281ab92d | |
pexpect==4.9.0 ; python_version >= "3.10" and python_version < "3.12" and (sys_platform != "win32" and sys_platform != "emscripten") \ | |
--hash=sha256:7236d1e080e4936be2dc3e326cec0af72acf9212a7e1d060210e70a47e253523 \ | |
--hash=sha256:ee7d41123f3c9911050ea2c2dac107568dc43b2d3b0c7557a33212c398ead30f | |
prompt-toolkit==3.0.47 ; python_version >= "3.10" and python_version < "3.12" \ | |
--hash=sha256:0d7bfa67001d5e39d02c224b663abc33687405033a8c422d0d675a5a13361d10 \ | |
--hash=sha256:1e1b29cb58080b1e69f207c893a1a7bf16d127a5c30c9d17a25a5d77792e5360 | |
ptyprocess==0.7.0 ; python_version >= "3.10" and python_version < "3.12" and (sys_platform != "win32" and sys_platform != "emscripten") \ | |
--hash=sha256:4b41f3967fce3af57cc7e94b888626c18bf37a083e3651ca8feeb66d492fef35 \ | |
--hash=sha256:5c5d0a3b48ceee0b48485e0c26037c0acd7d29765ca3fbb5cb3831d347423220 | |
pure-eval==0.2.3 ; python_version >= "3.10" and python_version < "3.12" \ | |
--hash=sha256:1db8e35b67b3d218d818ae653e27f06c3aa420901fa7b081ca98cbedc874e0d0 \ | |
--hash=sha256:5f4e983f40564c576c7c8635ae88db5956bb2229d7e9237d03b3c0b0190eaf42 | |
pygments==2.18.0 ; python_version >= "3.10" and python_version < "3.12" \ | |
--hash=sha256:786ff802f32e91311bff3889f6e9a86e81505fe99f2735bb6d60ae0c5004f199 \ | |
--hash=sha256:b8e6aca0523f3ab76fee51799c488e38782ac06eafcf95e7ba832985c8e7b13a | |
returns==0.23.0 ; python_full_version >= "3.10.0" and python_version < "3.12" \ | |
--hash=sha256:27594c28e5fc338e052d27ddf77fe1da82db4472f6d59901e7e9165be35a5256 \ | |
--hash=sha256:278aa6168072b24574ad14be32f7123d1b835928473dd40bc506f47c8b25859a | |
six==1.16.0 ; python_version >= "3.10" and python_version < "3.12" \ | |
--hash=sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926 \ | |
--hash=sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254 | |
stack-data==0.6.3 ; python_version >= "3.10" and python_version < "3.12" \ | |
--hash=sha256:836a778de4fec4dcd1dcd89ed8abff8a221f58308462e1c4aa2a3cf30148f0b9 \ | |
--hash=sha256:d5558e0c25a4cb0853cddad3d77da9891a08cb85dd9f9f91b9f8cd66e511e695 | |
traitlets==5.14.3 ; python_version >= "3.10" and python_version < "3.12" \ | |
--hash=sha256:9ed0579d3502c94b4b3732ac120375cda96f923114522847de4b3bb98b96b6b7 \ | |
--hash=sha256:b74e89e397b1ed28cc831db7aea759ba6640cb3de13090ca145426688ff1ac4f | |
typing-extensions==4.12.2 ; python_version >= "3.10" and python_version < "3.12" \ | |
--hash=sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d \ | |
--hash=sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8 | |
wcwidth==0.2.13 ; python_version >= "3.10" and python_version < "3.12" \ | |
--hash=sha256:3da69048e4540d84af32131829ff948f1e022c1c6bdb8d6102117aac784f6859 \ | |
--hash=sha256:72ea0c06399eb286d978fdedb6923a9eb47e1c486ce63e9b4e64fc18303972b5 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment