Created
June 7, 2017 16:09
-
-
Save cobookman/e30268b4cfa8d0cebbd1e4ae8ef848f0 to your computer and use it in GitHub Desktop.
Working ShadowJar Build
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
group 'com.google.cloud.dataflow.teleport' | |
version '1.0-Alpha' | |
buildscript { | |
repositories { | |
jcenter() | |
} | |
dependencies { | |
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.0' | |
} | |
} | |
apply plugin: 'java' | |
apply plugin: 'application' | |
apply plugin: 'com.github.johnrengelman.shadow' | |
sourceCompatibility = 1.8 | |
mainClassName = "com.google.cloud.dataflow.teleport.Main" | |
repositories { | |
mavenCentral() | |
} | |
dependencies { | |
compile group: 'com.google.cloud.dataflow', name: 'google-cloud-dataflow-java-sdk-all', version: '2.0.0' | |
compile group: 'org.apache.beam', name: 'beam-sdks-java-extensions-protobuf', version: '2.0.0' | |
compile group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.1.0' | |
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.21' | |
compile group: 'org.slf4j', name: 'slf4j-jdk14', version: '1.7.21' | |
testCompile group: 'junit', name: 'junit', version: '4.12' | |
} | |
shadowJar { | |
baseName = 'shadow' | |
classifier = null | |
version = version | |
mergeServiceFiles() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.google.cloud.dataflow.teleport; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.PipelineResult; | |
import org.apache.beam.sdk.io.TextIO; | |
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO; | |
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1; | |
import org.apache.beam.sdk.options.Default; | |
import org.apache.beam.sdk.options.Description; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.options.PipelineOptions; | |
import org.apache.beam.sdk.options.Validation; | |
import org.apache.beam.sdk.options.ValueProvider; | |
import org.apache.beam.sdk.transforms.DoFn; | |
import org.apache.beam.sdk.transforms.ParDo; | |
import com.google.protobuf.util.JsonFormat; | |
import com.google.protobuf.util.JsonFormat.TypeRegistry; | |
import com.google.datastore.v1.Entity; | |
import java.text.DateFormat; | |
import java.text.SimpleDateFormat; | |
import java.util.Date; | |
import java.io.IOException; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.sdk.values.TypeDescriptor; | |
/** | |
* Exports Datastore Entities to GCS as newline deliminted Protobuf v3 Json. | |
*/ | |
public class DatastoreToGcs { | |
/** | |
* Runs the DatastoreToGcs dataflow pipeline | |
* @param args | |
*/ | |
public static void main(String[] args) throws IOException { | |
Options options = PipelineOptionsFactory.fromArgs(args) | |
.withValidation() | |
.as(Options.class); | |
// System.out.println("Pipeline runner of: " + options.getRunner().toString()); | |
// Build DatastoreToGCS pipeline | |
Pipeline pipeline = Pipeline.create(options); | |
pipeline | |
.apply("IngestEntities", | |
DatastoreIO.v1().read() | |
.withProjectId(options.getProjectId()) | |
.withLiteralGqlQuery(options.getGqlQuery()) | |
.withNamespace(options.getNamespace())) | |
.apply("EntityToJson", ParDo.of(new EntityToJson())) | |
.apply("JsonToGcs", TextIO.write().to(options.getGcsSavePath()) | |
.withSuffix(".json")); | |
// Start the job | |
PipelineResult pipelineResult = pipeline.run(); | |
if (options.getKeepJobsRunning()) { | |
System.out.println("Blocking until done"); | |
try { | |
System.out.println(pipelineResult.waitUntilFinish()); | |
} catch (Exception exc) { | |
System.err.println(exc); | |
pipelineResult.cancel(); | |
} | |
} | |
} | |
interface Options extends PipelineOptions { | |
@Validation.Required | |
@Description("GCS Path E.g: gs://mybucket/somepath/") | |
ValueProvider<String> getGcsSavePath(); | |
void setGcsSavePath(ValueProvider<String> gcsSavePath); | |
@Validation.Required | |
@Description("GQL Query to get the datastore Entities") | |
ValueProvider<String> getGqlQuery(); | |
void setGqlQuery(ValueProvider<String> gqlQuery); | |
@Validation.Required | |
@Description("Namespace of Entities, use `\"\"` for default") | |
ValueProvider<String> getNamespace(); | |
void setNamespace(ValueProvider<String> namespace); | |
@Validation.Required | |
@Description("ProjectId") | |
ValueProvider<String> getProjectId(); | |
void setProjectId(ValueProvider<String> projectId); | |
@Description("Block until job finishes") | |
@Default.Boolean(true) | |
Boolean getKeepJobsRunning(); | |
void setKeepJobsRunning(Boolean keepJobsRunning); | |
} | |
static class EntityToJson extends DoFn<Entity, String> { | |
protected static JsonFormat.Printer mJsonPrinter = null; | |
public static JsonFormat.Printer getPrinter() { | |
if (mJsonPrinter == null) { | |
TypeRegistry typeRegistry = TypeRegistry.newBuilder() | |
.add(Entity.getDescriptor()) | |
.build(); | |
mJsonPrinter = JsonFormat.printer() | |
.usingTypeRegistry(typeRegistry) | |
.includingDefaultValueFields() | |
.omittingInsignificantWhitespace(); | |
} | |
return mJsonPrinter; | |
} | |
@ProcessElement | |
public void processElement(ProcessContext c) throws Exception { | |
Entity entity = c.element(); | |
c.output(getPrinter().print(entity)); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment