Last active
March 15, 2024 14:27
-
-
Save tspycher/cab8525b09ef780ed84735cb2bc2d1f3 to your computer and use it in GitHub Desktop.
Very simple Pipeline for a warmup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import apache_beam as beam | |
from apache_beam.options.pipeline_options import PipelineOptions | |
from typing import Iterable | |
import dataclasses | |
@dataclasses.dataclass | |
class Person: | |
GENDER_MALE = "m" | |
GENDER_FEMALE = "f" | |
GENDERS = [GENDER_MALE, GENDER_FEMALE] | |
name: str | |
email: str | |
gender: str | |
@classmethod | |
def from_dict(cls, data: dict): | |
return cls(data.get("name"), data.get("email"), data.get("gender").lower()) | |
def __str__(self): | |
return f"{self.name} <{self.email}>" | |
class ToModel(beam.DoFn): | |
def process(self, elem: dict) -> Iterable[Person]: | |
person = Person.from_dict(elem) | |
yield person | |
class Cleanup(beam.DoFn): | |
def process(self, elem: Person) -> Iterable[Person]: | |
elem.email = elem.email.lower() | |
elem.name = elem.name.title() | |
yield elem | |
class SplitGenders(beam.DoFn): | |
def process(self, elem: Person): | |
yield beam.pvalue.TaggedOutput(elem.gender, elem) | |
class ShowMale(beam.DoFn): | |
def process(self, elem: Person) -> Iterable[Person]: | |
if elem.gender != Person.GENDER_MALE: | |
raise Exception("Person is not male") | |
print(f"Male Person: {elem}") | |
yield elem | |
class ShowFemale(beam.DoFn): | |
def process(self, elem: Person) -> Iterable[Person]: | |
if elem.gender != Person.GENDER_FEMALE: | |
raise Exception("Person is not female") | |
print(f"Female Person: {elem}") | |
yield elem | |
if __name__ == "__main__": | |
options = {} | |
pipeline_options = PipelineOptions(**options) | |
raw_ugly_data = [ | |
{"name": "Tom Spycher", "email": "me@SpYcher.tld", "gender": "m"}, | |
{"name": "Lara Croft", "email": "tomb@raider.TLD", "gender": "f"}, | |
{"name": "Harrison Ford", "email": "ford@indianajones.com", "gender": "m"}, | |
{"name": "Undefined Person", "email": "undefined@person.com", "gender": "u"}, | |
{"name": "sandra Bullock", "email": "sandra@bullock.TLD", "gender": "f"}, | |
{"name": "pamela anderson", "email": "boobies@anderson.TLD", "gender": "f"} | |
] | |
p = beam.Pipeline(options=pipeline_options) | |
genders = ( | |
p | |
| "Create" >> beam.Create(raw_ugly_data) | |
| "To Model" >> beam.ParDo(ToModel()) | |
| "Cleanup" >> beam.ParDo(Cleanup()) | |
| "Gender Sorting" >> beam.ParDo(SplitGenders()).with_outputs(*Person.GENDERS) | |
) | |
processed_male = (genders[Person.GENDER_MALE] | |
| "Show Male Person" >> beam.ParDo(ShowMale()) | |
) | |
processed_female = (genders[Person.GENDER_FEMALE] | |
| "Show Female Person" >> beam.ParDo(ShowFemale()) | |
) | |
( (processed_female, processed_male) | |
| "Merge" >> beam.Flatten() | |
| "output" >> beam.Map(print) | |
) | |
pipeline_result = p.run() | |
pipeline_result.wait_until_finish() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment