Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save mpouttuclarke/1015429 to your computer and use it in GitHub Desktop.
Save mpouttuclarke/1015429 to your computer and use it in GitHub Desktop.
Cascading.Avro_issue_6_Avro 1.4
Index: src/com/bixolabs/cascading/avro/AvroScheme.java
===================================================================
--- src/com/bixolabs/cascading/avro/AvroScheme.java (revision 1735)
+++ src/com/bixolabs/cascading/avro/AvroScheme.java (working copy)
@@ -17,6 +17,7 @@
package com.bixolabs.cascading.avro;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@@ -25,25 +26,45 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroKeyComparator;
import org.apache.avro.mapred.AvroOutputFormat;
+import org.apache.avro.mapred.AvroRecordReader;
import org.apache.avro.mapred.AvroSerialization;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.util.Utf8;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import cascading.CascadingException;
import cascading.scheme.Scheme;
import cascading.tap.Tap;
import cascading.tuple.Fields;
@@ -54,30 +75,115 @@
* The AvroScheme class is a {@link Scheme} subclass. It supports reading and
* writing of data that has been serialized using Apache Avro.
*/
-@SuppressWarnings("serial")
+@SuppressWarnings({ "serial", "deprecation" })
public class AvroScheme extends Scheme {
- public static final Class<?> ARRAY_CLASS = List.class;
- public static final Class<?> MAP_CLASS = Map.class;
+
+ public static final String DATA_CONVERSION_MESG =
+ "Cannot find data conversion for Avro: ";
- /**
- * Helper class used to save an Enum name in a type that Avro requires for
- * serialization.
- *
- */
- private static class CascadingEnumSymbol implements GenericEnumSymbol {
+ public static final String CLASS_FOR_DATA_TYPE_MESG =
+ "Failed to load type class for Avro data type: ";
+
+ public static final Class<?> ARRAY_CLASS = List.class;
+ public static final Class<?> MAP_CLASS = Map.class;
+
+ private static final Map<String, String> REVERSE_TYPE_MAP =
+ new HashMap<String, String>();
- private String _name;
+ public static final class CascadingAvroInputFormat<T>
+ extends AvroInputFormat<T>
+ {
+ @Override
+ public RecordReader<AvroWrapper<T>, NullWritable> getRecordReader(InputSplit split,
+ JobConf job,
+ Reporter reporter)
+ throws IOException
+ {
+ reporter.setStatus(split.toString());
+ return new CascadingAvroRecordReader<T>(job, (FileSplit) split);
+ }
+ };
- public CascadingEnumSymbol(String name) {
- _name = name;
- }
+ public static final class CascadingAvroRecordReader<T>
+ implements RecordReader<AvroWrapper<T>, NullWritable>
+ {
- @Override
- public String toString() {
- return _name;
- }
- }
+ private FsInput in;
+ private DataFileReader<T> reader;
+ private long start;
+ private long end;
+ public CascadingAvroRecordReader(JobConf job, FileSplit split)
+ throws IOException
+ {
+ this.in = new FsInput(split.getPath(), job);
+ this.reader = new DataFileReader<T>(in, new GenericDatumReader<T>());
+ reader.sync(split.getStart()); // sync to start
+ this.start = in.tell();
+ this.end = split.getStart() + split.getLength();
+ }
+
+ public AvroWrapper<T> createKey() {
+ return new AvroWrapper<T>(null);
+ }
+ public NullWritable createValue() { return NullWritable.get(); }
+ public boolean next(AvroWrapper<T> wrapper, NullWritable ignore)
+ throws IOException {
+ if (!reader.hasNext() || reader.pastSync(end))
+ return false;
+ wrapper.datum(reader.next(wrapper.datum()));
+ return true;
+ }
+ public float getProgress() throws IOException {
+ if (end == start) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (in.tell() - start) / (float)(end - start));
+ }
+ }
+ public long getPos() throws IOException {
+ return in.tell();
+ }
+ public void close() throws IOException { reader.close(); }
+
+ };
+
+ static {
+ REVERSE_TYPE_MAP.put(Schema.Type.STRING.toString().toLowerCase(), "java.lang.String");
+ REVERSE_TYPE_MAP.put(Schema.Type.BOOLEAN.toString().toLowerCase(), "java.lang.Boolean");
+ REVERSE_TYPE_MAP.put(Schema.Type.LONG.toString().toLowerCase(), "java.lang.Long");
+ REVERSE_TYPE_MAP.put(Schema.Type.INT.toString().toLowerCase(), "java.lang.Integer");
+ REVERSE_TYPE_MAP.put(Schema.Type.FLOAT.toString().toLowerCase(), "java.lang.Float");
+ REVERSE_TYPE_MAP.put(Schema.Type.DOUBLE.toString().toLowerCase(), "java.lang.Double");
+ REVERSE_TYPE_MAP.put(Schema.Type.BYTES.toString().toLowerCase(),
+ "org.apache.hadoop.io.BytesWritable");
+ REVERSE_TYPE_MAP.put(Schema.Type.ENUM.toString().toLowerCase(), "lava.lang.Enum");
+ REVERSE_TYPE_MAP.put(Schema.Type.ARRAY.toString().toLowerCase(), ARRAY_CLASS.getName());
+ REVERSE_TYPE_MAP.put(Schema.Type.MAP.toString().toLowerCase(), MAP_CLASS.getName());
+ }
+
+ /**
+ * Helper class used to save an Enum name in a type that Avro requires for serialization.
+ *
+ */
+ private static class CascadingEnumSymbol
+ implements GenericEnumSymbol
+ {
+
+ private String _name;
+
+ public CascadingEnumSymbol(String name)
+ {
+ _name = name;
+ }
+
+ @Override
+ public String toString()
+ {
+ return _name;
+ }
+ }
+
private static final Logger LOGGER = Logger.getLogger(AvroScheme.class);
private static final String RECORD_NAME = "CascadingAvroSchema";
private String _recordName = RECORD_NAME;
@@ -96,11 +202,10 @@
_schemeTypes = schemeTypes;
}
- @SuppressWarnings( { "deprecation" })
@Override
public void sourceInit(Tap tap, JobConf conf) throws IOException {
- conf.set(AvroJob.INPUT_SCHEMA, getSchema().toString());
- conf.setInputFormat(AvroInputFormat.class);
+ // conf.set(AvroJob.INPUT_SCHEMA, getSchema().toString());
+ conf.setInputFormat(CascadingAvroInputFormat.class);
// add AvroSerialization to io.serializations
Collection<String> serializations = conf
@@ -116,10 +221,9 @@
_schemeFields));
}
- @SuppressWarnings( { "deprecation" })
@Override
public void sinkInit(Tap tap, JobConf conf) throws IOException {
- conf.set(AvroJob.OUTPUT_SCHEMA, getSchema().toString());
+ conf.set(AvroJob.OUTPUT_SCHEMA, getSchema().toString());
conf.setOutputFormat(AvroOutputFormat.class);
// Since we're outputting to Avro, we need to set up output values.
@@ -158,32 +262,31 @@
Fields sourceFields = getSourceFields();
- // Unpack this datum into source tuple fields
- AvroWrapper<GenericData.Record> wrapper = (AvroWrapper<GenericData.Record>) key;
- GenericData.Record datum = wrapper.datum();
- for (int fieldIndex = 0, typeIndex = 0; fieldIndex < sourceFields
- .size(); fieldIndex++, typeIndex++) {
- Class<?> curType = _schemeTypes[typeIndex];
- String fieldName = sourceFields.get(fieldIndex).toString();
- Object inObj = datum.get(fieldName);
- if (curType == ARRAY_CLASS) {
- typeIndex++;
- result
- .add(convertFromAvroArray(inObj,
- _schemeTypes[typeIndex]));
- } else if (curType == MAP_CLASS) {
- typeIndex++;
- result.add(convertFromAvroMap(inObj, _schemeTypes[typeIndex]));
- } else {
- result.add(convertFromAvroPrimitive(inObj, curType));
- }
- }
- return result;
- }
+ // Unpack this datum into source tuple fields
+ AvroWrapper<GenericData.Record> wrapper = (AvroWrapper<GenericData.Record>) key;
+ GenericData.Record datum = wrapper.datum();
+ for (int fieldIndex = 0, typeIndex = 0; fieldIndex < sourceFields.size(); fieldIndex++, typeIndex++) {
+ Class<?> curType = _schemeTypes[typeIndex];
+ String fieldName = sourceFields.get(fieldIndex).toString();
+ Object inObj = datum.get(fieldName);
+ if (curType == ARRAY_CLASS) {
+ typeIndex++;
+ result.add(convertFromAvroArray(inObj, _schemeTypes[typeIndex]));
+ } else if (curType == MAP_CLASS) {
+ typeIndex++;
+ result.add(convertFromAvroMap(inObj, _schemeTypes[typeIndex]));
+ } else if (curType.isEnum()) {
+ result.add(convertFromAvroPrimitive(inObj, Enum.class));
+ } else {
+ result.add(convertFromAvroPrimitive(inObj, curType));
+ }
+ }
+ return result;
+ }
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
- public void sink(TupleEntry tupleEntry, OutputCollector outputCollector)
+ public void sink(TupleEntry tupleEntry, OutputCollector outputCollector)
throws IOException {
// Create the appropriate AvroWrapper<T> from the result, and pass that
// as the key for the collect
@@ -210,17 +313,19 @@
datum.put(fieldName, convertToAvroPrimitive(result
.get(fieldIndex), _schemeTypes[typeIndex]));
} catch (ClassCastException e) {
- throw new IllegalArgumentException("Type for field name: "
+ throw new CascadingException("Type for field name: "
+ fieldName + "=" + _schemeTypes[typeIndex]
+ " does not match type of value "
+ result.get(fieldIndex) + "="
- + result.get(fieldIndex).getClass());
+ + result.get(fieldIndex).getClass()
+ + ", try using the unionOf factory method "
+ + "to create the AvroScheme", e);
}
}
}
- AvroWrapper<GenericData.Record> wrapper = new AvroWrapper<GenericData.Record>(
- datum);
+ AvroWrapper<GenericData.Record> wrapper =
+ new AvroWrapper<GenericData.Record>(datum);
outputCollector.collect(wrapper, NullWritable.get());
}
@@ -303,65 +408,92 @@
return schema;
}
- @SuppressWarnings( { "static-access", "unchecked" })
- private Schema createAvroSchema(Fields schemeFields, Class<?>[] fieldTypes,
- int depth) {
- Schema.Type avroType = toAvroSchemaType(fieldTypes[0]);
+ private Schema createAvroSchema(Fields schemeFields, Class<?>[] fieldTypes, int depth) {
+ Schema.Type avroType = toAvroSchemaType(fieldTypes[0]);
if (avroType == Schema.Type.ARRAY) {
Class<?> arrayTypes[] = { fieldTypes[1] };
- Schema schema = Schema.createArray(createAvroSchema(schemeFields
+ Schema schema =
+ Schema
+ .createArray(createAvroSchema(Fields
.offsetSelector(schemeFields.size() - 1, 1), arrayTypes,
depth + 1));
return schema;
} else if (avroType == Schema.Type.MAP) {
Class<?> mapTypes[] = { fieldTypes[1] };
- return Schema.createMap(createAvroSchema(schemeFields
+ return Schema
+ .createMap(createAvroSchema(Fields
.offsetSelector(schemeFields.size() - 1, 1), mapTypes,
depth + 1));
} else if (avroType == Schema.Type.RECORD) {
- return generateSchema(schemeFields.offsetSelector(schemeFields
+ return generateSchema(Fields.offsetSelector(schemeFields
.size() - 1, 1), fieldTypes, depth + 1);
} else if (avroType == Schema.Type.ENUM) {
- Class clazz = fieldTypes[0];
+ Class<?> clazz = fieldTypes[0];
Object[] names = clazz.getEnumConstants();
List<String> enumNames = new ArrayList<String>(names.length);
for (Object name : names) {
enumNames.add(name.toString());
}
-
- return Schema.createEnum(fieldTypes[0].getName(), null, null,
+ return Schema.createEnum(fieldTypes[0].getCanonicalName(), null, null,
enumNames);
} else {
return Schema.create(avroType);
}
}
- private Object convertFromAvroPrimitive(Object inObj, Class<?> inType) {
- if (inObj == null) {
- return null;
- } else if (inType == String.class) {
- String convertedObj = ((Utf8) inObj).toString();
- return convertedObj;
- } else if (inType == BytesWritable.class) {
- // TODO KKr - this is very inefficient, since we make a copy of
- // the ByteBuffer array in the call to BytesWritable.set(buffer,
- // pos, length).
- // A potentially small win is to check if buffer.position() == 0,
- // and if
- // so then do result = new BytesWritable(buffer.array()), followed
- // by
- // result.setSize(buffer.limit())
- ByteBuffer buffer = (ByteBuffer) inObj;
- BytesWritable result = new BytesWritable();
- result.set(buffer.array(), buffer.position(), buffer.limit());
- return result;
- } else if (inType.isEnum()) {
- return inObj.toString();
- } else {
- return inObj;
- }
- }
+ private Object convertFromAvroPrimitive(Object inObj, Class<?> targetType)
+ {
+ if (inObj == null) {
+ return null;
+ }
+ Class<?> sourceType = inObj.getClass();
+ if (targetType == String.class)
+ {
+ return inObj.toString();
+ }
+ else if (sourceType.isAssignableFrom(GenericData.EnumSymbol.class))
+ {
+ return inObj.toString();
+ }
+ else if (targetType == BytesWritable.class)
+ {
+ // TODO KKr - this is very inefficient, since we make a copy of
+ // the ByteBuffer array in the call to BytesWritable.set(buffer, pos, length).
+ // A potentially small win is to check if buffer.position() == 0, and if
+ // so then do result = new BytesWritable(buffer.array()), followed by
+ // result.setSize(buffer.limit())
+ ByteBuffer buffer = (ByteBuffer) inObj;
+ BytesWritable result = new BytesWritable();
+ result.set(buffer.array(), buffer.position(), buffer.limit());
+ return result;
+ }
+ else if (targetType != sourceType)
+ {
+ //Data type conversion required due to type promotions
+ if(targetType == Long.class) {
+ if(sourceType == Integer.class) {
+ return new Long(((Integer)inObj).longValue());
+ } else {
+ return Long.valueOf(inObj.toString());
+ }
+ } else if(targetType == Double.class) {
+ if(sourceType == Float.class) {
+ return new Double(((Float)inObj).doubleValue());
+ } else if(sourceType == Integer.class) {
+ return new Double(((Integer)inObj).doubleValue());
+ } else {
+ return Double.valueOf(inObj.toString());
+ }
+ } else {
+ throw new CascadingException("Cannot convert " +
+ sourceType.getName() + " to " +
+ targetType.getName());
+ }
+ } else {
+ return inObj;
+ }
+ }
private Object convertFromAvroArray(Object inObj, Class<?> arrayType) {
// Since Cascading doesn't have support for arrays - we are using a
@@ -372,14 +504,14 @@
return null;
}
- GenericData.Array<?> arr = (GenericData.Array<?>) inObj;
- Tuple arrayTuple = new Tuple();
- Iterator<?> iter = arr.iterator();
- while (iter.hasNext()) {
- arrayTuple.add(convertFromAvroPrimitive(iter.next(), arrayType));
- }
- return arrayTuple;
- }
+ GenericData.Array<?> arr = (GenericData.Array<?>) inObj;
+ Tuple arrayTuple = new Tuple();
+ Iterator<?> iter = arr.iterator();
+ while (iter.hasNext()) {
+ arrayTuple.add(convertFromAvroPrimitive(iter.next(), arrayType));
+ }
+ return arrayTuple;
+ }
@SuppressWarnings("unchecked")
private Object convertFromAvroMap(Object inObj, Class<?> mapValueClass) {
@@ -389,14 +521,13 @@
Map<Utf8, Object> inMap = (Map<Utf8, Object>) inObj;
- Tuple convertedMapTuple = new Tuple();
- for (Map.Entry<Utf8, Object> e : inMap.entrySet()) {
- convertedMapTuple.add(e.getKey().toString());
- convertedMapTuple.add(convertFromAvroPrimitive(e.getValue(),
- mapValueClass));
- }
- return convertedMapTuple;
- }
+ Tuple convertedMapTuple = new Tuple();
+ for (Map.Entry<Utf8, Object> e : inMap.entrySet()) {
+ convertedMapTuple.add(e.getKey().toString());
+ convertedMapTuple.add(convertFromAvroPrimitive(e.getValue(), mapValueClass));
+ }
+ return convertedMapTuple;
+ }
private Object convertToAvroPrimitive(Object inObj, Class<?> curType) {
if (inObj == null) {
@@ -410,14 +541,13 @@
.getLength());
return convertedObj;
} else if (curType.isEnum()) {
- Object result = new CascadingEnumSymbol((String) inObj);
+ Object result = new CascadingEnumSymbol((String) inObj);
return result;
} else {
return inObj;
}
}
- @SuppressWarnings("unchecked")
private Object convertToAvroArray(Object inObj, Class<?> arrayClass) {
if (inObj == null) {
return null;
@@ -425,13 +555,14 @@
Tuple tuple = (Tuple) inObj;
- GenericData.Array arr = new GenericData.Array(tuple.size(), Schema
+ GenericData.Array<Object> arr =
+ new GenericData.Array<Object>(tuple.size(), Schema
.createArray(Schema.create(toAvroSchemaType(arrayClass))));
for (int i = 0; i < tuple.size(); i++) {
if (tuple.getObject(i).getClass() == arrayClass) {
arr.add(convertToAvroPrimitive(tuple.getObject(i), arrayClass));
} else {
- throw new RuntimeException(
+ throw new CascadingException(
String
.format(
"Found array tuple with class %s instead of expected %",
@@ -453,19 +584,19 @@
int tupleSize = tuple.size();
boolean multipleOfTwo = (tupleSize >= 0) && ((tupleSize % 2) == 0);
if (!multipleOfTwo) {
- throw new RuntimeException(
+ throw new CascadingException(
"Invalid map definition - maps need to be Tuples made up of key,value pairs");
}
for (int i = 0; i < tupleSize; i += 2) {
// the tuple entries are key followed by value
if (tuple.getObject(i).getClass() != String.class) {
- throw new RuntimeException(
+ throw new CascadingException(
"Invalid map definition - the key should be a String - instead of "
+ tuple.getObject(i).getClass());
- }
+ }
if (tuple.getObject(i + 1).getClass() != valClass) {
- throw new RuntimeException(String.format(
+ throw new CascadingException(String.format(
"Found map value with %s instead of expected %s", tuple
.getObject(i + 1).getClass(), valClass));
}
@@ -515,7 +646,8 @@
"There must be at least one field");
}
- if (getSchemeTypesSize(schemeTypes) != schemeFields.size()) {
+ if (getSchemeTypesSize(schemeTypes) != schemeFields.size())
+ {
throw new IllegalArgumentException(
"You must have a schemeType for every field");
}
@@ -561,8 +693,8 @@
t.add(new BytesWritable(bytes));
}
- @SuppressWarnings("unchecked")
- public static void addToTuple(Tuple t, Enum e) {
+ public static void addToTuple(Tuple t, Enum<?> e)
+ {
t.add(e.toString());
}
@@ -584,4 +716,240 @@
t.add(mapTuple);
}
+
+ /**
+ * Assembles the scheme using a union of all schemas within the input path using the default
+ * JobConf.
+ *
+ * @param fsType
+ * @param path
+ * @throws IOException
+ */
+ public static AvroScheme unionOf(String path)
+ throws IOException
+ {
+ return unionOf(path, new JobConf());
+ }
+
+ /**
+ * Assembles the scheme using a union of all fields within the input path using the specified
+ * JobConf. Extracts all the Avro Schemas from all files to create the scheme.
+ *
+ * It is possible to have multiple Avro Schemas present within the input directory(s). Usually
+ * this occurs when the data format changes over time. This method create a unified scheme which
+ * can read multiple Avro schemas and return fields and data types as a normalized union of the
+ * schemas present in the input files one the path, using the following rules:
+ * 1. If data types of the fields do not conflict, simply take the union of the fields and data
+ * types of all input Schemas.
+ * 2. If the data types conflict, but can be promoted, then promote them (int -> long,
+ * float -> double, int -> double).
+ * 3. Otherwise, convert conflicting data types to String.
+ *
+ * Fields are returned in field name order. Does not currently support nested array or map
+ * types, these are removed from the union projection.
+ *
+ * @param path
+ * @param conf
+ * @return
+ * @throws IOException
+ */
+ public static AvroScheme unionOf(String path,
+ JobConf conf)
+ throws IOException
+ {
+ Path pathObj = new Path(path);
+ DatumReader<GenericRecord> reader =
+ new GenericDatumReader<GenericRecord>();
+ FileSystem fs = pathObj.getFileSystem(conf);
+ Map<String, Class<?>> coerceTo = new TreeMap<String, Class<?>>();
+ FileStatus[] stati = fs.listStatus(pathObj);
+ for(int x = 0; x < stati.length; x++) {
+ if(stati[x].isDir()) {
+ continue;
+ }
+ Schema schema = getSchema(reader, fs, stati[x]);
+ unifyToMap(coerceTo, schema);
+ }
+ return fromMap(coerceTo);
+ }
+
+ /**
+ * Gets an AvroScheme from a Map<String, Class<?>>
+ * @param map
+ * @return
+ */
+ public static AvroScheme fromMap(Map<String, Class<?>> map)
+ {
+ String[] allNames = new String[map.size()];
+ Class<?>[] allTypes = new Class<?>[allNames.length];
+ Iterator<Map.Entry<String, Class<?>>> entries = map.entrySet().iterator();
+ for(int x = 0; x < allNames.length; x++) {
+ Map.Entry<String, Class<?>> entry = entries.next();
+ allNames[x] = entry.getKey();
+ allTypes[x] = entry.getValue();
+ }
+ return new AvroScheme(new Fields(allNames), allTypes);
+ }
+
+ /**
+ * Creates a unified Schema by adding the field names from the passed schema into the Map and
+ * transforming Avro types to Java types usable to Cascading.Avro. Does not currently support
+ * nested array or map types, these are removed from the union projection.
+ *
+ * @param map
+ * @param schema
+ */
+ public static void unifyToMap(Map<String, Class<?>> map, Schema schema)
+ {
+ List<Schema.Field> fields = schema.getFields();
+ String[] names = new String[fields.size()];
+ Class<?>[] types = new Class<?>[fields.size()];
+ for(int y = 0; y < names.length; y++)
+ {
+ Schema.Field field = fields.get(y);
+ names[y] = field.name();
+ Schema fieldSchema = field.schema();
+ types[y] = getJavaType(fieldSchema);
+ // TODO: currently excluding maps and lists, need a different data
+ // structure to support them, Map with a single type won't work
+ if (!Map.class.isAssignableFrom(types[y])
+ && !List.class.isAssignableFrom(types[y]))
+ {
+ if(map.containsKey(names[y])) {
+ Class<?> otherClass = map.get(names[y]);
+ if(!otherClass.equals(types[y])) {
+ map.put(names[y], promotion(types[y], otherClass));
+ }
+ } else {
+ map.put(names[y], types[y]);
+ }
+ }
+ }
+ }
+
+ /**
+ * From a field's Schema, get the Java type associated to it. Note this is NOT a record schema,
+ * which contains a list of all fields. Will return null for Schema.Type.RECORD. If 'null' type
+ * is present, it is assumed to be first in the type list.
+ *
+ * @param fieldSchema
+ */
+ public static Class<?> getJavaType(Schema fieldSchema)
+ {
+ if(Schema.Type.RECORD.equals(fieldSchema.getType())) {
+ return null;
+ }
+ Class<?> result = null;
+ if (fieldSchema != null) {
+ List<Schema> typesList = fieldSchema.getTypes();
+ if (typesList != null && typesList.size() > 0)
+ {
+ //Skip 'null' type, which is assumed first in list
+ String dataTypeName = String.valueOf(typesList
+ .get(typesList.size() - 1));
+ if (dataTypeName != null) {
+ result = getClassForAvroType(dataTypeName);
+ }
+ }
+ }
+ return result;
+ }
+
+ protected static Schema getSchema(DatumReader<GenericRecord> reader,
+ FileSystem fs,
+ FileStatus status)
+ throws IOException
+ {
+ InputStream stream = fs.open(status.getPath());
+ DataFileStream<GenericRecord> in =
+ new DataFileStream<GenericRecord>(stream, reader);
+ Schema schema = in.getSchema();
+ in.close();
+ return schema;
+ }
+
+ protected static Class<?> promotion(Class<?> newType, Class<?> oldType)
+ {
+ if(newType == Long.class
+ && oldType == Integer.class
+ || newType == Double.class
+ && oldType == Float.class
+ || newType == Double.class
+ && oldType == Integer.class)
+ {
+ return newType;
+ } else {
+ return String.class;
+ }
+ }
+
+ protected static Class<?> getClassForAvroType(String dataType)
+ {
+ Class<?> type = null;
+ ObjectMapper m = new ObjectMapper();
+ JsonNode rootNode;
+ try
+ {
+ rootNode = m.readValue(dataType, JsonNode.class);
+ }
+ catch (Exception e)
+ {
+ throw new CascadingException("Unparsable Schema: " + dataType, e);
+ }
+ if (rootNode.isTextual())
+ {
+ type = avroPrimitiveToJavaClass(rootNode.getTextValue());
+ }
+ else if (rootNode.isArray())
+ {
+ Iterator<JsonNode> elements = rootNode.getElements();
+ while (elements.hasNext())
+ {
+ String typeString = elements.next().getTextValue();
+ type = avroPrimitiveToJavaClass(typeString);
+ if (type != null)
+ {
+ break;
+ }
+ }
+ }
+ else if (rootNode.isContainerNode() && rootNode.get("type") != null)
+ {
+ String containerTypeString = rootNode.get("type").getTextValue();
+ if (Schema.Type.ENUM.toString().equalsIgnoreCase(containerTypeString))
+ {
+ type = String.class;
+ }
+ else
+ {
+ type = avroPrimitiveToJavaClass(containerTypeString);
+ }
+ }
+ if (type == null)
+ {
+ throw new CascadingException("Unsupported type: " + dataType);
+ }
+ return type;
+ }
+
+ /**
+ * @param type
+ * @param typeString
+ * @return
+ */
+ protected static Class<?> avroPrimitiveToJavaClass(String typeString)
+ {
+ if (REVERSE_TYPE_MAP.containsKey(typeString))
+ {
+ try
+ {
+ return Class.forName(REVERSE_TYPE_MAP.get(typeString));
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new CascadingException(CLASS_FOR_DATA_TYPE_MESG + typeString, e);
+ }
+ }
+ return null;
+ }
}
\ No newline at end of file
Index: test/com/bixolabs/cascading/avro/AvroSchemeTest.java
===================================================================
--- test/com/bixolabs/cascading/avro/AvroSchemeTest.java (revision 1735)
+++ test/com/bixolabs/cascading/avro/AvroSchemeTest.java (working copy)
@@ -1,8 +1,6 @@
package com.bixolabs.cascading.avro;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
@@ -35,6 +33,7 @@
public class AvroSchemeTest {
+ private static final String UNION_STR = "UNION";
private static final String OUTPUT_DIR = "build/test/AvroSchmeTest/";
private static enum TestEnum {
@@ -91,17 +90,62 @@
}
-
- @SuppressWarnings("serial")
@Test
public void testRoundTrip() throws Exception {
// Create a scheme that tests each of the supported types
- final Fields testFields = new Fields("integerField", "longField", "booleanField", "doubleField", "floatField",
- "stringField", "bytesField", "arrayOfLongsField", "mapOfStringsField", "enumField");
- final Class<?>[] schemeTypes = {Integer.class, Long.class, Boolean.class, Double.class, Float.class,
- String.class, BytesWritable.class, List.class, Long.class, Map.class, String.class, TestEnum.class};
+ final Fields testFields =
+ new Fields("integerField",
+ "longField",
+ "booleanField",
+ "doubleField",
+ "floatField",
+ "stringField",
+ "bytesField",
+ "arrayOfLongsField",
+ "mapOfStringsField",
+ "enumField");
+ final Class<?>[] schemeTypes =
+ { Integer.class,
+ Long.class,
+ Boolean.class,
+ Double.class,
+ Float.class,
+ String.class,
+ BytesWritable.class,
+ List.class,
+ Long.class,
+ Map.class,
+ String.class,
+ TestEnum.class };
+
+ // Create scheme that tests type conversions for normalized unions
+ final Fields testFields2 =
+ new Fields("integerField",
+ "longField",
+ "booleanField",
+ "doubleField",
+ "floatField",
+ "stringField",
+ "bytesField",
+ "arrayOfLongsField",
+ "mapOfStringsField",
+ "enumField");
+ final Class<?>[] schemeTypes2 =
+ { Double.class,
+ Integer.class,
+ Boolean.class,
+ Float.class,
+ String.class,
+ String.class,
+ BytesWritable.class,
+ List.class,
+ Long.class,
+ Map.class,
+ String.class,
+ TestEnum.class };
+
final String in = OUTPUT_DIR+ "testRoundTrip/in";
final String out = OUTPUT_DIR + "testRoundTrip/out";
final String verifyout = OUTPUT_DIR + "testRoundTrip/verifyout";
@@ -111,41 +155,9 @@
// Create a sequence file with the appropriate tuples
Lfs lfsSource = new Lfs(new SequenceFile(testFields), in, SinkMode.REPLACE);
TupleEntryCollector write = lfsSource.openForWrite(new JobConf());
- Tuple t = new Tuple();
- t.add(0);
- t.add(0L);
- t.add(false);
- t.add(0.0d);
- t.add(0.0f);
- t.add("0");
- AvroScheme.addToTuple(t, new byte[] {0});
-
- List<Long> arrayOfLongs = new ArrayList<Long>() {{
- add(0L);
- }};
- AvroScheme.addToTuple(t, arrayOfLongs);
- Map<String, String> mapOfStrings = new HashMap<String, String>() {{
- put("key-0", "value-0");
- }};
- AvroScheme.addToTuple(t, mapOfStrings);
-
- AvroScheme.addToTuple(t, TestEnum.ONE);
- write.add(t);
+ writeTestData(write, false);
- t = new Tuple();
- t.add(1);
- t.add(1L);
- t.add(true);
- t.add(1.0d);
- t.add(1.0f);
- t.add("1");
- AvroScheme.addToTuple(t, new byte[] {0, 1});
- t.add(new Tuple(0L, 1L));
- t.add(new Tuple("key-0", "value-0", "key-1", "value-1"));
- AvroScheme.addToTuple(t, TestEnum.TWO);
- write.add(t);
-
write.close();
// Now read from the results, and write to an Avro file.
@@ -157,31 +169,98 @@
// Now read it back in, and verify that the data/types match up.
Tap avroSource = new Lfs(new AvroScheme(testFields, schemeTypes), out);
+ Tap verifySink = avroToTuples(testFields, verifyout, avroSource);
+ TupleEntryIterator sinkTuples = verifySink.openForRead(new JobConf());
+ verifyOutput(numRecords, sinkTuples, false);
+
+ // Write the test data for union scheme
+ FileUtils.moveFile(new File(out + File.separatorChar + "part-00000.avro"),
+ new File(verifyout + File.separatorChar + "part-00001.avro"));
+ Tap avroSink2 = new Lfs(new AvroScheme(testFields2, schemeTypes2), out, SinkMode.REPLACE);
+ TupleEntryCollector write2 = avroSink2.openForWrite(new JobConf());
+ writeTestData(write2, true);
+ write2.close();
+ FileUtils.moveFile(new File(verifyout + File.separatorChar + "part-00001.avro"),
+ new File(out + File.separatorChar + "part-00001.avro"));
+
+ // Now read union back in, and verify that the data/types match up.
+ AvroScheme unionOf = AvroScheme.unionOf(out);
+ assertEquals("Check union scheme",
+ "{'type':'record','name':'CascadingAvroSchema','namespace':'','fields':[{'name':'booleanField','type':['null','boolean'],'doc':''},{'name':'bytesField','type':['null','bytes'],'doc':''},{'name':'doubleField','type':['null','double'],'doc':''},{'name':'enumField','type':['null','string'],'doc':''},{'name':'floatField','type':['null','string'],'doc':''},{'name':'integerField','type':['null','string'],'doc':''},{'name':'longField','type':['null','long'],'doc':''},{'name':'stringField','type':['null','string'],'doc':''}]}",
+ unionOf.getJsonSchema().replace('"', '\''));
+ avroSource = new Lfs(unionOf, out);
+ TupleEntryIterator sinkTuples2 = avroSource.openForRead(new JobConf());
+ verifyOutput(numRecords * 2, sinkTuples2, true);
+ sinkTuples2.close();
+
+ // Ensure that the Avro file we write out is readable via the standard Avro API
+ File avroFile = new File(out + "/part-00000.avro");
+ DataFileReader<Object> reader =
+ new DataFileReader<Object>(avroFile, new GenericDatumReader<Object>());
+ int i = 0;
+ while (reader.hasNext())
+ {
+ reader.next();
+ i++;
+ }
+ assertEquals(numRecords, i);
+ }
+
+ /**
+ * @param testFields
+ * @param outPath
+ * @param avroSource
+ * @return
+ */
+ protected Tap avroToTuples(final Fields testFields, final String outPath, Tap avroSource)
+ {
Pipe readPipe = new Pipe("avro to tuples");
- Tap verifySink = new Hfs(new SequenceFile(testFields), verifyout, SinkMode.REPLACE);
+ Tap verifySink = new Hfs(new SequenceFile(testFields), outPath, SinkMode.REPLACE);
Flow readFlow = new FlowConnector().connect(avroSource, verifySink, readPipe);
readFlow.complete();
+ return verifySink;
+ }
- TupleEntryIterator sinkTuples = verifySink.openForRead(new JobConf());
+ /**
+ * @param numRecords
+ * @param sinkTuples
+ */
+ protected void verifyOutput(final int numRecords, TupleEntryIterator sinkTuples, boolean unionOf)
+ {
assertTrue(sinkTuples.hasNext());
-
- int i = 0;
+ int count = 0, i = 0;
while (sinkTuples.hasNext()) {
- TupleEntry te = sinkTuples.next();
-
- assertTrue(te.get("integerField") instanceof Integer);
+ TupleEntry te = sinkTuples.next();
+ if (!unionOf)
+ {
+ assertTrue(te.get("integerField") instanceof Integer);
+ }
+ else
+ {
+ assertTrue(te.get("integerField") instanceof String);
+ }
assertTrue(te.get("longField") instanceof Long);
assertTrue(te.get("booleanField") instanceof Boolean);
assertTrue(te.get("doubleField") instanceof Double);
- assertTrue(te.get("floatField") instanceof Float);
+ if (!unionOf)
+ {
+ assertTrue(te.get("floatField") instanceof Float);
+ }
+ else
+ {
+ assertTrue(te.get("floatField") instanceof String);
+ }
assertTrue(te.get("stringField") instanceof String);
assertTrue(te.get("bytesField") instanceof BytesWritable);
- assertTrue(te.get("arrayOfLongsField") instanceof Tuple);
- assertTrue(te.get("mapOfStringsField") instanceof Tuple);
+ if (!unionOf)
+ {
+ assertTrue(te.get("arrayOfLongsField") instanceof Tuple);
+ assertTrue(te.get("mapOfStringsField") instanceof Tuple);
+ }
assertTrue(te.get("enumField") instanceof String);
- assertEquals(i, te.getInteger("integerField"));
+ i = Float.valueOf(te.getString("integerField")).intValue();
assertEquals(i, te.getLong("longField"));
assertEquals(i > 0, te.getBoolean("booleanField"));
assertEquals((double)i, te.getDouble("doubleField"), 0.0001);
@@ -196,48 +275,136 @@
assertEquals(j, bytes[j]);
}
- Tuple longArray = (Tuple)te.get("arrayOfLongsField");
- assertEquals(i + 1, longArray.size());
- for (int j = 0; j < longArray.size(); j++) {
- assertTrue(longArray.get(j) instanceof Long);
- assertEquals(j, longArray.getLong(j));
+ if (!unionOf)
+ {
+ Tuple longArray = (Tuple) te.get("arrayOfLongsField");
+ assertEquals(i + 1, longArray.size());
+ for (int j = 0; j < longArray.size(); j++)
+ {
+ assertTrue(longArray.get(j) instanceof Long);
+ assertEquals(j, longArray.getLong(j));
+ }
+ Tuple stringMap = (Tuple) te.get("mapOfStringsField");
+ int numMapEntries = i + 1;
+ assertEquals(2 * numMapEntries, stringMap.size());
+ // Build a map from the data
+ Map<String, String> testMap = new HashMap<String, String>();
+ for (int j = 0; j < numMapEntries; j++)
+ {
+ assertTrue(stringMap.get(j * 2) instanceof String);
+ String key = stringMap.getString(j * 2);
+ assertTrue(stringMap.get((j * 2) + 1) instanceof String);
+ String value = stringMap.getString((j * 2) + 1);
+ testMap.put(key, value);
+ }
+ // Now make sure it has everything we're expecting.
+ for (int j = 0; j < numMapEntries; j++)
+ {
+ assertEquals("value-" + j, testMap.get("key-" + j));
+ }
}
-
- Tuple stringMap = (Tuple)te.get("mapOfStringsField");
- int numMapEntries = i + 1;
- assertEquals(2 * numMapEntries, stringMap.size());
-
- // Build a map from the data
- Map<String, String> testMap = new HashMap<String, String>();
- for (int j = 0; j < numMapEntries; j++) {
- assertTrue(stringMap.get(j * 2) instanceof String);
- String key = stringMap.getString(j * 2);
- assertTrue(stringMap.get((j * 2) + 1) instanceof String);
- String value = stringMap.getString((j * 2) + 1);
- testMap.put(key, value);
- }
-
- // Now make sure it has everything we're expecting.
- for (int j = 0; j < numMapEntries; j++) {
- assertEquals("value-" + j, testMap.get("key-" + j));
- }
- i++;
+ count++;
}
- assertEquals(numRecords, i);
+ assertEquals(numRecords, count);
+ }
+
+ /**
+ * @param write
+ */
+ protected void writeTestData(TupleEntryCollector write, boolean unionOf)
+ {
+ Tuple t = new Tuple();
+ if (unionOf)
+ {
+ t.add(0d);
+ }
+ else
+ {
+ t.add(0);
+ }
+ if (unionOf)
+ {
+ t.add(0);
+ }
+ else
+ {
+ t.add(0L);
+ }
+ t.add(false);
+ if (unionOf)
+ {
+ t.add(0.0f);
+ }
+ else
+ {
+ t.add(0.0d);
+ }
+ if (unionOf)
+ {
+ t.add("0.0");
+ }
+ else
+ {
+ t.add(0.0f);
+ }
+ t.add("0");
+ AvroScheme.addToTuple(t, new byte[] {0});
- // Ensure that the Avro file we write out is readable via the standard Avro API
- File avroFile = new File(out + "/part-00000.avro");
- DataFileReader<Object> reader =
- new DataFileReader<Object>(avroFile, new GenericDatumReader<Object>());
- i = 0;
- while (reader.hasNext()) {
- reader.next();
- i++;
+ List<Long> arrayOfLongs = new ArrayList<Long>() {{
+ add(0L);
+ }};
+ AvroScheme.addToTuple(t, arrayOfLongs);
+
+ Map<String, String> mapOfStrings = new HashMap<String, String>() {{
+ put("key-0", "value-0");
+ }};
+ AvroScheme.addToTuple(t, mapOfStrings);
+
+ AvroScheme.addToTuple(t, TestEnum.ONE);
+ write.add(t);
+
+ t = new Tuple();
+ if (unionOf)
+ {
+ t.add(1d);
}
- assertEquals(numRecords, i);
-
+ else
+ {
+ t.add(1);
+ }
+ if (unionOf)
+ {
+ t.add(1);
+ }
+ else
+ {
+ t.add(1L);
+ }
+ t.add(true);
+ if (unionOf)
+ {
+ t.add(1.0f);
+ }
+ else
+ {
+ t.add(1.0d);
+ }
+ if (unionOf)
+ {
+ t.add("1.0");
+ }
+ else
+ {
+ t.add(1.0f);
+ }
+ t.add("1");
+ AvroScheme.addToTuple(t, new byte[] {0, 1});
+ t.add(new Tuple(0L, 1L));
+ t.add(new Tuple("key-0", "value-0", "key-1", "value-1"));
+ AvroScheme.addToTuple(t, TestEnum.TWO);
+ write.add(t);
}
@Test
@@ -362,24 +529,22 @@
@Test
public void testSetRecordName() {
AvroScheme avroScheme = new AvroScheme(new Fields("a"), new Class[] { Long.class });
- String expected = "{\"type\":\"record\",\"name\":\"CascadingAvroSchema\",\"namespace\":\"\",\"fields\":[{\"name\":\"a\",\"type\":[\"null\",\"long\"],\"doc\":\"\"}]}";
+ String expected = "{'type':'record','name':'CascadingAvroSchema','namespace':'','fields':[{'name':'a','type':['null','long'],'doc':''}]}";
String jsonSchema = avroScheme.getJsonSchema();
- assertEquals(expected, jsonSchema);
+ assertEquals(expected, jsonSchema.replace('"', '\''));
avroScheme.setRecordName("FooBar");
String jsonSchemaWithRecordName = avroScheme.getJsonSchema();
- String expectedWithName = "{\"type\":\"record\",\"name\":\"FooBar\",\"namespace\":\"\",\"fields\":[{\"name\":\"a\",\"type\":[\"null\",\"long\"],\"doc\":\"\"}]}";
- assertEquals(expectedWithName, jsonSchemaWithRecordName);
+ String expectedWithName = "{'type':'record','name':'FooBar','namespace':'','fields':[{'name':'a','type':['null','long'],'doc':''}]}";
+ assertEquals(expectedWithName, jsonSchemaWithRecordName.replace('"', '\''));
}
@Test
public void testEnumInSchema() throws Exception {
AvroScheme avroScheme = new AvroScheme(new Fields("a"), new Class[] { TestEnum.class });
String jsonSchema = avroScheme.getJsonSchema();
- String enumField = String.format("{\"type\":\"enum\",\"name\":\"%s\",\"namespace\":\"%s\",\"symbols\":[\"ONE\",\"TWO\"]}",
- "AvroSchemeTest$TestEnum", TestEnum.class.getPackage().getName());
- String expected = String.format("{\"type\":\"record\",\"name\":\"CascadingAvroSchema\",\"namespace\":\"\",\"fields\":[{\"name\":\"a\",\"type\":[\"null\",%s],\"doc\":\"\"}]}",
- enumField);
- assertEquals(expected, jsonSchema);
+ String expected = String.format("{'type':'record','name':'CascadingAvroSchema','namespace':'','fields':[{'name':'a','type':['null',{'type':'enum','name':'%s','namespace':'%s','symbols':['ONE','TWO']}],'doc':''}]}",
+ "TestEnum", this.getClass().getCanonicalName());
+ assertEquals(expected, jsonSchema.replace('"', '\''));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment