Last active
June 6, 2017 17:24
-
-
Save cobookman/e4d2f2b89b4c3cadae9cd83892162758 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
group 'com.google.cloud.dataflow.teleport' | |
version '1.0-Alpha' | |
apply plugin: 'application' | |
apply plugin: 'java' | |
sourceCompatibility = 1.8 | |
mainClassName = "com.google.cloud.dataflow.teleport.Main" | |
repositories { | |
mavenCentral() | |
} | |
dependencies { | |
compile group: 'com.google.cloud.dataflow', name: 'google-cloud-dataflow-java-sdk-all', version: '2.0.0' | |
compile group: 'org.apache.beam', name: 'beam-sdks-java-extensions-protobuf', version: '2.0.0' | |
// compile group: 'com.google.protobuf', name: 'protobuf-java', version: '3.1.0' | |
compile group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.1.0' | |
// compile group: 'com.google.code.gson', name: 'gson', version: '2.8.0' | |
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.21' | |
compile group: 'org.slf4j', name: 'slf4j-jdk14', version: '1.7.21' | |
testCompile group: 'junit', name: 'junit', version: '4.12' | |
} | |
task resources { | |
def resourcesDir = new File('build/resources/main') | |
resourcesDir.mkdirs() | |
} | |
task uberjar(type: Jar) { | |
from files(sourceSets.main.output.classesDir) | |
from {configurations.compile.collect {zipTree(it)}} { | |
exclude "META-INF/*.SF" | |
exclude "META-INF/*.DSA" | |
exclude "META-INF/*.RSA" | |
} | |
manifest { | |
attributes 'Main-Class': mainClassName | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.google.cloud.dataflow.teleport; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.PipelineResult; | |
import org.apache.beam.sdk.io.TextIO; | |
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO; | |
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1; | |
import org.apache.beam.sdk.options.Description; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.options.Validation; | |
import org.apache.beam.sdk.options.ValueProvider; | |
import org.apache.beam.sdk.options.Default; | |
import org.apache.beam.sdk.options.PipelineOptions; | |
import org.apache.beam.sdk.transforms.DoFn; | |
import org.apache.beam.sdk.transforms.ParDo; | |
import com.google.protobuf.util.JsonFormat; | |
import com.google.protobuf.util.JsonFormat.TypeRegistry; | |
import com.google.datastore.v1.Entity; | |
import java.text.DateFormat; | |
import java.text.SimpleDateFormat; | |
import java.util.Date; | |
import java.io.IOException; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.sdk.values.TypeDescriptor; | |
/** | |
* Exports Datastore Entities to GCS as newline deliminted Protobuf v3 Json. | |
*/ | |
public class DatastoreToGcs { | |
/** | |
* Runs the DatastoreToGcs dataflow pipeline | |
* @param args | |
*/ | |
public static void main(String[] args) throws IOException { | |
Options options = PipelineOptionsFactory.fromArgs(args) | |
.withValidation() | |
.as(Options.class); | |
// System.out.println("Pipeline runner of: " + options.getRunner().toString()); | |
// Build DatastoreToGCS pipeline | |
Pipeline pipeline = Pipeline.create(options); | |
PCollection<Entity> entities = pipeline.apply("IngestEntities", | |
DatastoreIO.v1().read() | |
.withProjectId(options.getProjectId()) | |
.withLiteralGqlQuery(options.getGqlQuery())); | |
PCollection<String> jsons = entities.apply("EntityToJson", ParDo.of(new EntityToJson())); | |
jsons.apply("JsonToGcs", TextIO.write().to(options.getGcsSavePath()) | |
.withSuffix(".json")); | |
// Start the job | |
PipelineResult pipelineResult = pipeline.run(); | |
if (options.getKeepJobsRunning()) { | |
System.out.println("Blocking until done"); | |
try { | |
System.out.println(pipelineResult.waitUntilFinish()); | |
} catch (Exception exc) { | |
System.err.println(exc); | |
pipelineResult.cancel(); | |
} | |
} | |
} | |
interface Options extends PipelineOptions { | |
@Validation.Required | |
@Description("GCS Path E.g: gs://mybucket/somepath/") | |
ValueProvider<String> getGcsSavePath(); | |
void setGcsSavePath(ValueProvider<String> gcsSavePath); | |
@Validation.Required | |
@Description("GQL Query to get the datastore Entities") | |
ValueProvider<String> getGqlQuery(); | |
void setGqlQuery(ValueProvider<String> gqlQuery); | |
@Validation.Required | |
@Description("ProjectId") | |
ValueProvider<String> getProjectId(); | |
void setProjectId(ValueProvider<String> projectId); | |
@Description("Block until job finishes") | |
@Default.Boolean(true) | |
Boolean getKeepJobsRunning(); | |
void setKeepJobsRunning(Boolean keepJobsRunning); | |
} | |
static class EntityToJson extends DoFn<Entity, String> { | |
protected static JsonFormat.Printer mJsonPrinter = null; | |
public static JsonFormat.Printer getPrinter() { | |
if (mJsonPrinter == null) { | |
TypeRegistry typeRegistry = TypeRegistry.newBuilder() | |
.add(Entity.getDescriptor()) | |
.build(); | |
mJsonPrinter = JsonFormat.printer() | |
.usingTypeRegistry(typeRegistry) | |
.includingDefaultValueFields() | |
.omittingInsignificantWhitespace(); | |
} | |
return mJsonPrinter; | |
} | |
@ProcessElement | |
public void processElement(ProcessContext c) throws Exception { | |
Entity entity = c.element(); | |
c.output(getPrinter().print(entity)); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Saving template to: /Datastore..to.GCS | |
Pipeline Args | |
--runner=DataflowRunner | |
--project=strong-moose | |
--stagingLocation=gs://strong-moose.appspot.com/staging/ | |
--tempLocation=gs://strong-moose.appspot.com/temp/ | |
--templateLocation=gs://strong-moose.appspot.com/templates3/ | |
Jun 06, 2017 10:16:53 AM org.apache.beam.runners.dataflow.DataflowRunner fromOptions | |
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 1 files. Enable logging at DEBUG level to see which files will be staged. | |
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for IngestEntities/ParDo(GqlQueryTranslate)/ParMultiDo(GqlQueryTranslate).out0 [PCollection]. Correct one of the following root causes: | |
No Coder has been manually specified; you may do so using .setCoder(). | |
Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for com.google.datastore.v1.Query. | |
Building a Coder using a registered CoderProvider failed. | |
See suppressed exceptions for detailed failures. | |
Using the default output Coder from the producing PTransform failed: Unable to provide a Coder for com.google.datastore.v1.Query. | |
Building a Coder using a registered CoderProvider failed. | |
See suppressed exceptions for detailed failures. | |
at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:444) | |
at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:250) | |
at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:104) | |
at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:147) | |
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:481) | |
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422) | |
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:277) | |
at org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$Read.expand(DatastoreV1.java:581) | |
at org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$Read.expand(DatastoreV1.java:226) | |
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482) | |
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:441) | |
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56) | |
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:179) | |
at com.google.cloud.dataflow.teleport.DatastoreToGcs.main(DatastoreToGcs.java:47) | |
at com.google.cloud.dataflow.teleport.Main.main(Main.java:50) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment