Skip to content

Instantly share code, notes, and snippets.

@mpouttuclarke
Created May 25, 2011 20:52
Show Gist options
  • Save mpouttuclarke/991941 to your computer and use it in GitHub Desktop.
Save mpouttuclarke/991941 to your computer and use it in GitHub Desktop.
Cacading.Avro issue 6 diff
Index: AvroScheme.java
===================================================================
--- AvroScheme.java (revision 1644)
+++ AvroScheme.java (working copy)
@@ -17,6 +17,7 @@
package com.bixolabs.cascading.avro;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@@ -25,10 +26,15 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.AvroKey;
@@ -38,12 +44,18 @@
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.util.Utf8;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.log4j.Logger;
+import com.bixolabs.cascading.avro.UnionAssembly.FSType;
+
+import cascading.CascadingException;
import cascading.scheme.Scheme;
import cascading.tap.Tap;
import cascading.tuple.Fields;
@@ -56,14 +68,42 @@
*/
@SuppressWarnings("serial")
public class AvroScheme extends Scheme {
+ private static final String NESTED_DIRECTORY_MESG = "Can only process Arvo files, not directory ";
+
+ public static final String DATA_CONVERSION_MESG =
+ "Cannot find data conversion for Avro: ";
+
+ public static final String CLASS_FOR_DATA_TYPE_MESG =
+ "Failed to load type class for Avro data type: ";
+
public static final Class<?> ARRAY_CLASS = List.class;
public static final Class<?> MAP_CLASS = Map.class;
+ public static enum FSType {
+ HFS, LFS
+ }
+
+ private static final Map<String, String> REVERSE_TYPE_MAP =
+ new HashMap<String, String>();
+
+ static {
+ REVERSE_TYPE_MAP.put(Schema.Type.STRING.toString(), "java.lang.String");
+ REVERSE_TYPE_MAP.put(Schema.Type.BOOLEAN.toString(), "java.lang.Boolean");
+ REVERSE_TYPE_MAP.put(Schema.Type.LONG.toString(), "java.lang.Long");
+ REVERSE_TYPE_MAP.put(Schema.Type.INT.toString(), "java.lang.Integer");
+ REVERSE_TYPE_MAP.put(Schema.Type.FLOAT.toString(), "java.lang.Float");
+ REVERSE_TYPE_MAP.put(Schema.Type.DOUBLE.toString(), "java.lang.Double");
+ REVERSE_TYPE_MAP.put(Schema.Type.BYTES.toString(), "org.apache.hadoop.io.BytesWritable");
+ REVERSE_TYPE_MAP.put(Schema.Type.ENUM.toString(), "lava.lang.Enum");
+ REVERSE_TYPE_MAP.put(Schema.Type.ARRAY.toString(), ARRAY_CLASS.getName());
+ REVERSE_TYPE_MAP.put(Schema.Type.MAP.toString(), MAP_CLASS.getName());
+ }
+
/**
-* Helper class used to save an Enum name in a type that Avro requires for
-* serialization.
-*
-*/
+ * Helper class used to save an Enum name in a type that Avro requires for
+ * serialization.
+ *
+ */
private static class CascadingEnumSymbol implements GenericEnumSymbol {
private String _name;
@@ -153,6 +193,7 @@
// Unpack this datum into source tuple fields
AvroWrapper<GenericData.Record> wrapper = (AvroWrapper<GenericData.Record>) key;
GenericData.Record datum = wrapper.datum();
+ Schema schema = datum.getSchema();
for (int fieldIndex = 0, typeIndex = 0; fieldIndex < sourceFields.size(); fieldIndex++, typeIndex++) {
Class<?> curType = _schemeTypes[typeIndex];
String fieldName = sourceFields.get(fieldIndex).toString();
@@ -163,8 +204,14 @@
} else if (curType == MAP_CLASS) {
typeIndex++;
result.add(convertFromAvroMap(inObj, _schemeTypes[typeIndex]));
+ } else if (curType.isEnum()) {
+ result.add(convertFromAvroPrimitive(inObj,
+ curType,
+ Enum.class));
} else {
- result.add(convertFromAvroPrimitive(inObj, curType));
+ result.add(convertFromAvroPrimitive(inObj,
+ curType,
+ getJavaType(schema.getField(fieldName).schema())));
}
}
return result;
@@ -304,20 +351,42 @@
for (Object name : names) {
enumNames.add(name.toString());
}
-
return Schema.createEnum(fieldTypes[0].getName(), null, null, enumNames);
} else {
return Schema.create(avroType);
}
}
- private Object convertFromAvroPrimitive(Object inObj, Class<?> inType) {
+ private Object convertFromAvroPrimitive(Object inObj,
+ Class<?> targetType,
+ Class<?> sourceType) {
if (inObj == null) {
return null;
- } else if (inType == String.class) {
- String convertedObj = ((Utf8) inObj).toString();
- return convertedObj;
- } else if (inType == BytesWritable.class) {
+ } else if (targetType == String.class
+ || targetType.isEnum()) {
+ return inObj.toString();
+ } else if (sourceType != null && targetType != sourceType) {
+ //Data type conversion required due to type promotions
+ if(targetType == Long.class) {
+ if(sourceType == Integer.class) {
+ return new Long(((Integer)inObj).longValue());
+ } else {
+ return Long.valueOf(inObj.toString());
+ }
+ } else if(targetType == Double.class) {
+ if(sourceType == Float.class) {
+ return new Double(((Float)inObj).doubleValue());
+ } else if(sourceType == Integer.class) {
+ return new Double(((Integer)inObj).doubleValue());
+ } else {
+ return Double.valueOf(inObj.toString());
+ }
+ } else {
+ throw new CascadingException("Cannot convert " +
+ sourceType.getName() + " to " +
+ targetType.getName());
+ }
+ } else if (targetType == BytesWritable.class) {
// TODO KKr - this is very inefficient, since we make a copy of
// the ByteBuffer array in the call to BytesWritable.set(buffer, pos, length).
// A potentially small win is to check if buffer.position() == 0, and if
@@ -327,8 +396,6 @@
BytesWritable result = new BytesWritable();
result.set(buffer.array(), buffer.position(), buffer.limit());
return result;
- } else if (inType.isEnum()) {
- return inObj.toString();
} else {
return inObj;
}
@@ -347,7 +414,7 @@
Tuple arrayTuple = new Tuple();
Iterator<?> iter = arr.iterator();
while (iter.hasNext()) {
- arrayTuple.add(convertFromAvroPrimitive(iter.next(), arrayType));
+ arrayTuple.add(convertFromAvroPrimitive(iter.next(), arrayType, null));
}
return arrayTuple;
}
@@ -363,7 +430,7 @@
Tuple convertedMapTuple = new Tuple();
for (Map.Entry<Utf8, Object> e : inMap.entrySet()) {
convertedMapTuple.add(e.getKey().toString());
- convertedMapTuple.add(convertFromAvroPrimitive(e.getValue(), mapValueClass));
+ convertedMapTuple.add(convertFromAvroPrimitive(e.getValue(), mapValueClass, null));
}
return convertedMapTuple;
}
@@ -535,4 +602,195 @@
t.add(mapTuple);
}
+
+ /**
+ * Assembles the scheme assuming Hfs input path and default configuration.
+ * @param path
+ * @throws IOException
+ */
+ public static AvroScheme unionOf(String path)
+ throws IOException
+ {
+ return unionOf(path, FSType.HFS);
+ }
+
+ /**
+ * Assembles the scheme using a union of all fields within the input path
+ * using the default JobConf.
+ * @param fsType
+ * @param path
+ * @throws IOException
+ */
+ @SuppressWarnings("deprecation")
+ public static AvroScheme unionOf(String path, FSType fsType)
+ throws IOException
+ {
+ return unionOf(path, FSType.HFS, new JobConf());
+ }
+
+ /**
+ * Assembles the scheme using a union of all fields within the input path
+ * using the specified JobConf.
+ *
+ * It is possible to have multiple Avro Schemas present within the input
+ * directory(s). Usually this occurs when the data format changes over time.
+ * This method create a unified scheme which can read multiple Avro schemas
+ * and return fields and data types as a normalized union of the schemas
+ * present in the input files one the path, using the following rules:
+ * 1. If data types of the fields do not conflict, simply take the union of
+ * the fields and data types of all input Schemas.
+ * 2. If the data types conflict, but can be promoted, then promote them
+ * (int -> long, float -> double, int -> double).
+ * 3. Otherwise, convert conflicting data types to String.
+ * Fields are returned in field name order.
+ * @param fsType
+ * @param conf
+ * @param path
+ * @throws IOException
+ */
+ @SuppressWarnings("deprecation")
+ public static AvroScheme unionOf(String path,
+ FSType fsType,
+ JobConf conf)
+ throws IOException
+ {
+ DatumReader<GenericRecord> reader =
+ new GenericDatumReader<GenericRecord>();
+ FileSystem fs = (fsType == FSType.HFS ?
+ FileSystem.get(conf) :
+ FileSystem.getLocal(conf));
+ Map<String, Class<?>> coerceTo = new HashMap<String, Class<?>>();
+ FileStatus[] stati = fs.listStatus(new Path(path));
+ for(int x = 0; x < stati.length; x++) {
+ if(stati[x].isDir()) {
+ throw new IOException(NESTED_DIRECTORY_MESG +
+ stati[x].getPath().toString());
+ }
+ Schema schema = getSchema(reader, fs, stati[x]);
+ unifyToMap(coerceTo, schema);
+ }
+ return fromMap(coerceTo);
+ }
+
+ /**
+ * Gets an AvroScheme from a Map<String, Class<?>>
+ * @param map
+ * @return
+ */
+ public static AvroScheme fromMap(Map<String, Class<?>> map)
+ {
+ String[] allNames = new String[map.size()];
+ Class<?>[] allTypes = new Class<?>[allNames.length];
+ Iterator<Map.Entry<String, Class<?>>> entries = map.entrySet().iterator();
+ for(int x = 0; x < allNames.length; x++) {
+ Map.Entry<String, Class<?>> entry = entries.next();
+ allNames[x] = entry.getKey();
+ allTypes[x] = entry.getValue();
+ }
+ return new AvroScheme(new Fields(allNames), allTypes);
+ }
+
+ /**
+ * Creates a unified Schema by adding the field names from the passed
+ * schema into the Map and transforming Avro types to Java types usable
+ * to Cascading.Avro
+ * @param map
+ * @param schema
+ */
+ public static void unifyToMap(Map<String, Class<?>> map, Schema schema)
+ {
+ List<Schema.Field> fields = schema.getFields();
+ String[] names = new String[fields.size()];
+ Class<?>[] types = new Class<?>[fields.size()];
+ for(int y = 0; y < names.length; y++)
+ {
+ Schema.Field field = fields.get(y);
+ names[y] = field.name();
+ Schema fieldSchema = field.schema();
+ types[y] = getJavaType(fieldSchema);
+ if(map.containsKey(names[y])) {
+ Class<?> otherClass = map.get(names[y]);
+ if(!otherClass.equals(types[y])) {
+ map.put(names[y], promotion(types[y], otherClass));
+ }
+ } else {
+ map.put(names[y], types[y]);
+ }
+ }
+ }
+
+ /**
+ * From a field's Schema, get the Java type associated to it. Note this is
+ * NOT a record schema, which contains a list of all fields.
+ * @param fieldSchema
+ */
+ public static Class<?> getJavaType(Schema fieldSchema)
+ {
+ if(Schema.Type.RECORD.equals(fieldSchema.getType())) {
+ return null;
+ }
+ Class<?> result = null;
+ if (fieldSchema != null) {
+ List<Schema> typesList = fieldSchema.getTypes();
+ if (typesList != null) {
+ //Skip 'null' type, which is assumed first in list
+ String dataTypeName = String.valueOf(typesList
+ .get(typesList.size() - 1));
+ if (dataTypeName != null) {
+ result = getClassForAvroType(dataTypeName);
+ }
+ }
+ }
+ return result;
+ }
+
+ protected static Schema getSchema(DatumReader<GenericRecord> reader,
+ FileSystem fs,
+ FileStatus status)
+ throws IOException
+ {
+ InputStream stream = fs.open(status.getPath());
+ DataFileStream<GenericRecord> in =
+ new DataFileStream<GenericRecord>(stream, reader);
+ Schema schema = in.getSchema();
+ in.close();
+ return schema;
+ }
+
+ protected static Class<?> promotion(Class<?> newType, Class<?> oldType)
+ {
+ if(newType == Long.class
+ && oldType == Integer.class
+ || newType == Double.class
+ && oldType == Float.class
+ || newType == Double.class
+ && oldType == Integer.class)
+ {
+ return newType;
+ } else {
+ return String.class;
+ }
+ }
+
+ protected static Class<?> getClassForAvroType(String dataType)
+ {
+ Class<?> type = null;
+ /*
+ * TODO: this next line is due to an Avro bug where the data types are
+ * surrounded by quote characters when 'null' is present in types list
+ * need to log Avro bug. This is a work around...
+ */
+ String dataTypeStripped = dataType.replaceAll("\\W+", "").toUpperCase();
+ if (REVERSE_TYPE_MAP.containsKey(dataTypeStripped)) {
+ try {
+ type = Class.forName(REVERSE_TYPE_MAP.get(dataTypeStripped));
+ } catch (ClassNotFoundException e) {
+ throw new CascadingException(CLASS_FOR_DATA_TYPE_MESG
+ + dataTypeStripped, e);
+ }
+ } else {
+ throw new CascadingException(DATA_CONVERSION_MESG + dataTypeStripped);
+ }
+ return type;
+ }
}
\ No newline at end of file
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment