Skip to content

Instantly share code, notes, and snippets.

@kohlerm
Created August 31, 2018 11:08
Show Gist options
  • Save kohlerm/649f92195e71697a1931f81def176ec9 to your computer and use it in GitHub Desktop.
Save kohlerm/649f92195e71697a1931f81def176ec9 to your computer and use it in GitHub Desktop.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples;
import java.io.Serializable;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.spark.api.java.JavaSparkContext;
import spark.jobserver.api.JobEnvironment;
import spark.jobserver.japi.JSparkJob;
import org.apache.beam.runners.spark.SparkContextOptions;
import com.typesafe.config.Config;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
//
//
//
/**
* An example that counts words in Shakespeare and includes Beam best practices.
*
* <p>This class, {@link WordCount}, is the second in a series of four successively more detailed
* 'word count' examples. You may first want to take a look at {@link MinimalWordCount}.
* After you've looked at this example, then see the {@link DebuggingWordCount}
* pipeline, for introduction of additional concepts.
*
* <p>For a detailed walkthrough of this example, see
* <a href="https://beam.apache.org/get-started/wordcount-example/">
* https://beam.apache.org/get-started/wordcount-example/
* </a>
*
* <p>Basic concepts, also in the MinimalWordCount example:
* Reading text files; counting a PCollection; writing to text files
*
* <p>New Concepts:
* <pre>
* 1. Executing a Pipeline both locally and using the selected runner
* 2. Using ParDo with static DoFns defined out-of-line
* 3. Building a composite transform
* 4. Defining your own pipeline options
* </pre>
*
* <p>Concept #1: you can execute this pipeline either locally or using by selecting another runner.
* These are now command-line options and not hard-coded as they were in the MinimalWordCount
* example.
*
* <p>To change the runner, specify:
* <pre>{@code
* --runner=YOUR_SELECTED_RUNNER
* }
* </pre>
*
* <p>To execute this pipeline, specify a local output file (if using the
* {@code DirectRunner}) or output prefix on a supported distributed file system.
* <pre>{@code
* --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
* }</pre>
*
* <p>The input file defaults to a public data set containing the text of of King Lear,
* by William Shakespeare. You can override it and choose your own input with {@code --inputFile}.
*/
public class WordCountJS implements JSparkJob<String>, Serializable {
/**
* Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns
* statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it
* to a ParDo in the pipeline.
*/
private static final Logger LOG = LoggerFactory.getLogger(WordCountJS.class);
static List<String> lines = new ArrayList<String>(Arrays.asList(
"It does not matter",
"what temperature the room is",
"is always room temperature"));
public String run(JavaSparkContext context, JobEnvironment runtime, Config config) {
// WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
// .as(WordCountOptions.class);
WordCountOptions options = PipelineOptionsFactory.create().as(WordCountOptions.class);
String text = null;
try {
// get jobserver config data
text = config.getString("input.string");
LOG.info("config input.text: " + text);
}
catch (com.typesafe.config.ConfigException.Missing ex) {
LOG.info("could not read config");
}
if(text != null) {
try {
lines.add(text);
}
catch(java.lang.Exception ex) {
LOG.info("could not add text from config: " + ex.getMessage());
}
}
// add the sample text to options
options.setText(lines);
// pass in jobserver context
options.setProvidedSparkContext(context);
// tell sparkrunner to use provided context from jobserver
options.setUsesProvidedSparkContext(true);
runWordCount(options);
return null;
}
public Config verify(JavaSparkContext context, JobEnvironment runtime, Config config) {
// config is optional - no need to validate
return config;
}
static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
private final Distribution lineLenDist = Metrics.distribution(
ExtractWordsFn.class, "lineLenDistro");
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> receiver) {
lineLenDist.update(element.length());
if (element.trim().isEmpty()) {
emptyLines.inc();
}
// Split the line into words.
String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);
// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
receiver.output(word);
}
}
}
}
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
/** A SimpleFunction that simply logs the string value without any other transformation */
public static class LogCounts extends SimpleFunction<String, String> {
@Override
public String apply(String input) {
// simply log count result
LOG.info("Count result: " + input);
return input;
}
}
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
/**
* Options supported by {@link WordCount}.
*
* <p>Concept #4: Defining your own configuration options. Here, you can add your own arguments
* to be processed by the command-line parser, and specify default values for them. You can then
* access the options values in your pipeline code.
*
* <p>Inherits standard configuration options.
*/
public interface WordCountOptions extends SparkContextOptions {
/**
* One property to pass in static text for the word count
*/
@Description("Sample text")
@Required
List<String> getText();
void setText(List<String> value);
}
static void runWordCount(WordCountOptions options) {
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
// Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
// static FormatAsTextFn() to the ParDo transform.
p.apply("ReadLines", Create.of(lines)).setCoder(StringUtf8Coder.of())
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply(MapElements.via(new LogCounts()));
p.run().waitUntilFinish();
}
}
@kohlerm
Copy link
Author

kohlerm commented Aug 31, 2018

Running (Java) Beam Jobs with spark-jobserver
run it in context created like this:
curl -d "" 'localhost:8090/contexts/jcontext?context-factory=spark.jobserver.context.JavaSparContextFactory'

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment