Created
November 14, 2014 09:45
-
-
Save dzzh/aa39e43c14cb6cb12c58 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Workaround for CRUNCH-102 bug (https://issues.apache.org/jira/browse/CRUNCH-102) | |
* | |
* If before executing union() operation on two PCollections, there was groupByKey() executed on one collection, | |
* but not on another, union() will only include data from the former collection. | |
* | |
* This bug was fixed in Crunch-0.4.0, but for those using CDH3 it still occurs. | |
* | |
* To prevent this bug, call this method on ungrouped collection before using it in union() operation. | |
* | |
* This workaround slows the pipeline because of adding three more operations. | |
* Remove this code and its invocations after migrating to Crunch >= 0.4.0. | |
* | |
* @param collection collection built without groupByKey() operations applied | |
* @param clazz class of collection type parameter | |
* @param <T> collection type parameter | |
* @return collection with groupByKey() operation applied to id | |
*/ | |
public static <T> PCollection<T> avoidCrunch102Bug(PCollection<T> collection, Class<T> clazz) { | |
PGroupedTable<T, T> groupedTable = collection.parallelDo(new TraceDoFn<T, Pair<T, T>>() { | |
private static final long serialVersionUID = 2153680704401519277L; | |
@Override | |
public void process(T input, Emitter<Pair<T, T>> emitter, Tracer tracer) { | |
emitter.emit(new Pair<T, T>(input, input)); | |
} | |
}, Avros.tableOf(Avros.records(clazz), Avros.records(clazz))).groupByKey(); | |
return groupedTable.parallelDo(new TraceDoFn<Pair<T, Iterable<T>>, T>() { | |
private static final long serialVersionUID = -6661753843447650949L; | |
@Override | |
public void process(Pair<T, Iterable<T>> input, Emitter<T> emitter, Tracer tracer) { | |
emitter.emit(input.first()); | |
} | |
}, Avros.records(clazz)); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment