Skip to content

Instantly share code, notes, and snippets.

@egalpin
Created September 8, 2021 17:10
Show Gist options
  • Save egalpin/2d6ad2210cf9f66108ff48a9c7566ebc to your computer and use it in GitHub Desktop.
Save egalpin/2d6ad2210cf9f66108ff48a9c7566ebc to your computer and use it in GitHub Desktop.
import argparse
from typing import Iterable
from typing import List
from typing import Tuple
from typing import Union
import apache_beam as beam
from apache_beam.transforms.combiners import ToDictCombineFn
from apache_beam.transforms.combiners import ToSetCombineFn
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.pvalue import TaggedOutput
class Foobar(beam.DoFn):
KV = 'foo'
KKV = 'bar'
def process(self, element: str) -> Iterable[Union[
Tuple[str, str],
Tuple[str, Tuple[str, List[str]]]
]]:
if len(element) % 2:
# Output to main i.e. KV
yield (element, 'foo')
else:
yield TaggedOutput(Foobar.KKV,
(element, (element + 'bar', ['baz'])))
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
_, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session =\
save_main_session
with beam.Pipeline(options=pipeline_options) as p:
partitioned_elements = (
p
| 'Create some test values' >> beam.Create([
'test',
'tester',
'testing',
'tests',
])
| 'Partition the test values by length' >> beam.ParDo(Foobar())
)
'''
This seems to run fine on PortableRunner + DirectRunner, but on
Dataflow v2 runner, this fails with:
line: "shuffle_dax_writer.cc:59"
message: "Check failed: kv_coder : expecting a KV coder, but had
Strings
'''
(
partitioned_elements[Foobar.KV]
| beam.CombinePerKey(ToSetCombineFn())
)
(
partitioned_elements[Foobar.KKV]
| beam.CombinePerKey(ToDictCombineFn())
)
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment