Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save gabihodoroaga/9538a6fb13aa0bcee3d1e1cb1e66b7b9 to your computer and use it in GitHub Desktop.
Save gabihodoroaga/9538a6fb13aa0bcee3d1e1cb1e66b7b9 to your computer and use it in GitHub Desktop.
WordCount GCS with custom output file name
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.util.Arrays;
public class WriteFileWithName {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
// Concept #2: Apply a FlatMapElements transform the PCollection of text lines.
// This transform splits the lines in PCollection<String>, where each element is an
// individual word in Shakespeare's collected texts.
.apply(
FlatMapElements.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
// We use a Filter transform to avoid empty word
.apply(Filter.by((String word) -> !word.isEmpty()))
// Concept #3: Apply the Count transform to our PCollection of individual words. The Count
// transform returns a new PCollection of key/value pairs, where each key represents a
// unique word in the text. The associated value is the occurrence count for that word.
.apply(Count.perElement())
// Apply a MapElements transform that formats our PCollection of word counts into a
// printable string, suitable for writing to an output file.
.apply(
MapElements.into(TypeDescriptors.strings())
.via(
(KV<String, Long> wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()))
// Concept #4: Apply a write transform, FileIO.writeDynamic, at the end of the pipeline.
// FileIO.writeDynamic writes the contents of a PCollection (in this case, our PCollection of
// formatted strings) to a series of text files in GCS.
//
// By default, it will write to a set of files with names like <prefix>>-00001-of-00005<suffix>
// mentioned in withNaming method, defaultNaming accepts two arguments `prefix` and `suffix`
.apply(FileIO.<String, String>writeDynamic()
// specifies how the elements should be distributed, since all elements need to be considered
// equally, the elements are distributed on name of input class, which is same for all input
.by((SerializableFunction<String, String>) input -> input.getClass().toString())
.to("<GCS path URI>")
.withNumShards(1)
.via(TextIO.sink())
.withDestinationCoder(StringUtf8Coder.of())
// @arguments(String, String), prefix and suffix, prefix can be dynamic based on some rule
.withNaming(key -> FileIO.Write.defaultNaming("temp", ".txt"))
);
p.run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment