Skip to content

Instantly share code, notes, and snippets.

@mpouttuclarke
Created June 10, 2011 03:24
Show Gist options
  • Save mpouttuclarke/1018180 to your computer and use it in GitHub Desktop.
Save mpouttuclarke/1018180 to your computer and use it in GitHub Desktop.
Cascading.Avro_issue_6_Avro 1.4_2
Index: src/com/bixolabs/cascading/avro/AvroScheme.java
===================================================================
--- src/com/bixolabs/cascading/avro/AvroScheme.java (revision 1744)
+++ src/com/bixolabs/cascading/avro/AvroScheme.java (working copy)
@@ -41,14 +41,12 @@
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroKeyComparator;
import org.apache.avro.mapred.AvroOutputFormat;
-import org.apache.avro.mapred.AvroRecordReader;
import org.apache.avro.mapred.AvroSerialization;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.mapred.FsInput;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.util.Utf8;
+import org.apache.commons.collections.iterators.ArrayListIterator;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -192,6 +190,7 @@
private HashMap<Class<?>, Schema.Type> _typeMap = createTypeMap();
private transient Schema _schema;
+ private boolean convertingTypes;
public AvroScheme(Fields schemeFields, Class<?>[] schemeTypes) {
super(schemeFields, schemeFields);
@@ -202,10 +201,33 @@
_schemeTypes = schemeTypes;
}
- @Override
+ /**
+ * @return the convertingTypes
+ */
+ public boolean isConvertingTypes()
+ {
+ return convertingTypes;
+ }
+
+ /**
+ * @param convertingTypes the convertingTypes to set
+ */
+ public void setConvertingTypes(boolean convertingTypes)
+ {
+ this.convertingTypes = convertingTypes;
+ }
+
+ @Override
public void sourceInit(Tap tap, JobConf conf) throws IOException {
- // conf.set(AvroJob.INPUT_SCHEMA, getSchema().toString());
- conf.setInputFormat(CascadingAvroInputFormat.class);
+ if (convertingTypes)
+ { // Many schemas supported
+ conf.setInputFormat(CascadingAvroInputFormat.class);
+ }
+ else
+ { // We can only support 1 fixed schema with this option
+ conf.set(AvroJob.INPUT_SCHEMA, getSchema().toString());
+ conf.setInputFormat(AvroInputFormat.class);
+ }
// add AvroSerialization to io.serializations
Collection<String> serializations = conf
@@ -266,20 +288,39 @@
AvroWrapper<GenericData.Record> wrapper = (AvroWrapper<GenericData.Record>) key;
GenericData.Record datum = wrapper.datum();
for (int fieldIndex = 0, typeIndex = 0; fieldIndex < sourceFields.size(); fieldIndex++, typeIndex++) {
- Class<?> curType = _schemeTypes[typeIndex];
String fieldName = sourceFields.get(fieldIndex).toString();
- Object inObj = datum.get(fieldName);
- if (curType == ARRAY_CLASS) {
- typeIndex++;
- result.add(convertFromAvroArray(inObj, _schemeTypes[typeIndex]));
- } else if (curType == MAP_CLASS) {
- typeIndex++;
- result.add(convertFromAvroMap(inObj, _schemeTypes[typeIndex]));
- } else if (curType.isEnum()) {
- result.add(convertFromAvroPrimitive(inObj, Enum.class));
- } else {
- result.add(convertFromAvroPrimitive(inObj, curType));
+ try
+ {
+ Class<?> curType = _schemeTypes[typeIndex];
+ Object inObj = datum.get(fieldName);
+ if (curType == ARRAY_CLASS)
+ {
+ typeIndex++;
+ result.add(convertFromAvroArray(inObj, _schemeTypes[typeIndex]));
+ }
+ else if (curType == MAP_CLASS)
+ {
+ typeIndex++;
+ result.add(convertFromAvroMap(inObj, _schemeTypes[typeIndex]));
+ }
+ else if (curType.isEnum())
+ {
+ result.add(convertFromAvroPrimitive(inObj, Enum.class));
+ }
+ else
+ {
+ result.add(convertFromAvroPrimitive(inObj, curType));
+ }
}
+ catch (CascadingException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new CascadingException(datum.getSchema().getFullName() + " failed to load: "
+ + fieldName + "=" + _schemeTypes[typeIndex], e);
+ }
}
return result;
}
@@ -299,28 +340,37 @@
for (int fieldIndex = 0, typeIndex = 0; fieldIndex < sinkFields.size(); fieldIndex++, typeIndex++) {
String fieldName = sinkFields.get(fieldIndex).toString();
- Class<?> curType = _schemeTypes[typeIndex];
- if (curType == ARRAY_CLASS) {
- typeIndex++;
- datum.put(fieldName, convertToAvroArray(result.get(fieldIndex),
- _schemeTypes[typeIndex]));
- } else if (curType == MAP_CLASS) {
- typeIndex++;
- datum.put(fieldName, convertToAvroMap(result.get(fieldIndex),
- _schemeTypes[typeIndex]));
- } else {
- try {
- datum.put(fieldName, convertToAvroPrimitive(result
- .get(fieldIndex), _schemeTypes[typeIndex]));
- } catch (ClassCastException e) {
- throw new CascadingException("Type for field name: "
- + fieldName + "=" + _schemeTypes[typeIndex]
- + " does not match type of value "
- + result.get(fieldIndex) + "="
- + result.get(fieldIndex).getClass()
- + ", try using the unionOf factory method "
- + "to create the AvroScheme", e);
- }
+ try
+ {
+ Class<?> curType = _schemeTypes[typeIndex];
+ if (curType == ARRAY_CLASS)
+ {
+ typeIndex++;
+ datum.put(fieldName,
+ convertToAvroArray(result.get(fieldIndex), _schemeTypes[typeIndex]));
+ }
+ else if (curType == MAP_CLASS)
+ {
+ typeIndex++;
+ datum.put(fieldName,
+ convertToAvroMap(result.get(fieldIndex), _schemeTypes[typeIndex]));
+ }
+ else
+ {
+ datum.put(fieldName,
+ convertToAvroPrimitive(result.get(fieldIndex),
+ _schemeTypes[typeIndex]));
+ }
+ }
+ catch (CascadingException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new CascadingException(datum.getSchema().getFullName() + " failed to save: "
+ + fieldName + ", " + _schemeTypes[typeIndex] + "="
+ + result.get(fieldIndex), e);
}
}
@@ -473,15 +523,15 @@
//Data type conversion required due to type promotions
if(targetType == Long.class) {
if(sourceType == Integer.class) {
- return new Long(((Integer)inObj).longValue());
+ return ((Integer) inObj).longValue();
} else {
return Long.valueOf(inObj.toString());
}
} else if(targetType == Double.class) {
if(sourceType == Float.class) {
- return new Double(((Float)inObj).doubleValue());
+ return ((Float) inObj).doubleValue();
} else if(sourceType == Integer.class) {
- return new Double(((Integer)inObj).doubleValue());
+ return ((Integer) inObj).doubleValue();
} else {
return Double.valueOf(inObj.toString());
}
@@ -722,55 +772,87 @@
* JobConf.
*
* @param fsType
- * @param path
+ * @param paths
* @throws IOException
*/
- public static AvroScheme unionOf(String path)
+ public static AvroScheme unionOf(String... paths)
throws IOException
{
- return unionOf(path, new JobConf());
+ return unionOf(new JobConf(), null, paths);
}
/**
- * Assembles the scheme using a union of all fields within the input path using the specified
- * JobConf. Extracts all the Avro Schemas from all files to create the scheme.
+ * Assembles the scheme using a union of all schemas within the input path using the default
+ * JobConf and the specified projection.
*
+ * @param projection
+ * @param paths
+ * @return
+ * @throws IOException
+ */
+ public static AvroScheme unionOf(Fields projection, String... paths)
+ throws IOException
+ {
+ return unionOf(new JobConf(), projection, paths);
+ }
+
+ /**
+ * Assembles the scheme using a union of all fields within the input paths using the specified
+ * JobConf. Extracts all the Avro Schemas from all files to create the scheme. Paths can refer
+ * to any combination of direct paths to Avro files or paths to directories containing Avro
+ * files. This method does not recurse directories.
+ *
* It is possible to have multiple Avro Schemas present within the input directory(s). Usually
* this occurs when the data format changes over time. This method create a unified scheme which
* can read multiple Avro schemas and return fields and data types as a normalized union of the
- * schemas present in the input files one the path, using the following rules:
- * 1. If data types of the fields do not conflict, simply take the union of the fields and data
- * types of all input Schemas.
- * 2. If the data types conflict, but can be promoted, then promote them (int -> long,
- * float -> double, int -> double).
- * 3. Otherwise, convert conflicting data types to String.
+ * schemas present in the input files one the path, using the following rules: 1. If data types
+ * of the fields do not conflict, simply take the union of the fields and data types of all
+ * input Schemas. 2. If the data types conflict, but can be promoted, then promote them (int ->
+ * long, float -> double, int -> double). 3. Otherwise, convert conflicting data types to
+ * String.
*
* Fields are returned in field name order. Does not currently support nested array or map
- * types, these are removed from the union projection.
+ * types, these are removed from the union projection. If a projection is provided, then only
+ * fields in the projection will be returned.
*
- * @param path
* @param conf
+ * @param projection
+ * @param paths
* @return
* @throws IOException
*/
- public static AvroScheme unionOf(String path,
- JobConf conf)
+ public static AvroScheme unionOf(JobConf conf, Fields projection, String... paths)
throws IOException
{
- Path pathObj = new Path(path);
DatumReader<GenericRecord> reader =
- new GenericDatumReader<GenericRecord>();
- FileSystem fs = pathObj.getFileSystem(conf);
+ new GenericDatumReader<GenericRecord>();
Map<String, Class<?>> coerceTo = new TreeMap<String, Class<?>>();
- FileStatus[] stati = fs.listStatus(pathObj);
- for(int x = 0; x < stati.length; x++) {
- if(stati[x].isDir()) {
- continue;
+ for (String path : paths)
+ {
+ Path pathObj = new Path(path);
+ FileSystem fs = pathObj.getFileSystem(conf);
+ if(fs.isFile(pathObj))
+ {
+ Schema schema = getSchema(reader, fs, pathObj);
+ unifyToMap(coerceTo, schema);
}
- Schema schema = getSchema(reader, fs, stati[x]);
- unifyToMap(coerceTo, schema);
+ else
+ {
+ FileStatus[] stati = fs.listStatus(pathObj);
+ for (int x = 0; x < stati.length; x++)
+ {
+ if (stati[x].isDir())
+ {
+ continue;
+ }
+ Schema schema = getSchema(reader, fs, stati[x].getPath());
+ unifyToMap(coerceTo, projection, schema);
+ }
+ }
}
- return fromMap(coerceTo);
+ AvroScheme result = fromMap(coerceTo);
+ result.setConvertingTypes(true);
+ return result;
}
/**
@@ -798,30 +880,59 @@
*
* @param map
* @param schema
+ * @param map
+ * @param schema
*/
public static void unifyToMap(Map<String, Class<?>> map, Schema schema)
{
+ unifyToMap(map, null, schema);
+ }
+
+ /**
+ * Creates a unified Schema by adding the field names from the passed schema into the Map and
+ * transforming Avro types to Java types usable to Cascading.Avro. Does not currently support
+ * nested array or map types, these are removed from the union projection.
+ *
+ * Fields not in the projection set are ignored.
+ *
+ * @param map
+ * @param projection
+ * @param schema
+ */
+ public static void unifyToMap(Map<String, Class<?>> map, Fields projection, Schema schema)
+ {
List<Schema.Field> fields = schema.getFields();
- String[] names = new String[fields.size()];
- Class<?>[] types = new Class<?>[fields.size()];
- for(int y = 0; y < names.length; y++)
+ List<String> projectionList = new ArrayList<String>();
+ if (projection != null)
{
+ for (int x = 0; x < projection.size(); x++)
+ {
+ projectionList.add(projection.get(x).toString());
+ }
+ }
+ for (int y = 0; y < fields.size(); y++)
+ {
Schema.Field field = fields.get(y);
- names[y] = field.name();
+ String name = field.name();
+ if (projection != null && !projectionList.contains(name))
+ {
+ continue;
+ }
Schema fieldSchema = field.schema();
- types[y] = getJavaType(fieldSchema);
+ Class<?> type = getJavaType(fieldSchema);
// TODO: currently excluding maps and lists, need a different data
// structure to support them, Map with a single type won't work
- if (!Map.class.isAssignableFrom(types[y])
- && !List.class.isAssignableFrom(types[y]))
+ if (!MAP_CLASS.isAssignableFrom(type) && !ARRAY_CLASS.isAssignableFrom(type))
{
- if(map.containsKey(names[y])) {
- Class<?> otherClass = map.get(names[y]);
- if(!otherClass.equals(types[y])) {
- map.put(names[y], promotion(types[y], otherClass));
+ if (map.containsKey(name))
+ {
+ Class<?> otherClass = map.get(name);
+ if (!otherClass.equals(type))
+ {
+ map.put(name, promotion(type, otherClass));
}
} else {
- map.put(names[y], types[y]);
+ map.put(name, type);
}
}
}
@@ -857,10 +968,10 @@
protected static Schema getSchema(DatumReader<GenericRecord> reader,
FileSystem fs,
- FileStatus status)
+ Path path)
throws IOException
{
- InputStream stream = fs.open(status.getPath());
+ InputStream stream = fs.open(path);
DataFileStream<GenericRecord> in =
new DataFileStream<GenericRecord>(stream, reader);
Schema schema = in.getSchema();
Index: test/com/bixolabs/cascading/avro/AvroSchemeTest.java
===================================================================
--- test/com/bixolabs/cascading/avro/AvroSchemeTest.java (revision 1744)
+++ test/com/bixolabs/cascading/avro/AvroSchemeTest.java (working copy)
@@ -33,7 +33,6 @@
public class AvroSchemeTest {
- private static final String UNION_STR = "UNION";
private static final String OUTPUT_DIR = "build/test/AvroSchmeTest/";
private static enum TestEnum {
@@ -131,7 +130,8 @@
"bytesField",
"arrayOfLongsField",
"mapOfStringsField",
- "enumField");
+ "enumField",
+ "dummyField");
final Class<?>[] schemeTypes2 =
{ Double.class,
Integer.class,
@@ -144,7 +144,8 @@
Long.class,
Map.class,
String.class,
- TestEnum.class };
+ TestEnum.class,
+ String.class };
final String in = OUTPUT_DIR+ "testRoundTrip/in";
final String out = OUTPUT_DIR + "testRoundTrip/out";
@@ -163,12 +164,14 @@
// Now read from the results, and write to an Avro file.
Pipe writePipe = new Pipe("tuples to avro");
- Tap avroSink = new Lfs(new AvroScheme(testFields, schemeTypes), out);
+ AvroScheme avroScheme = new AvroScheme(testFields, schemeTypes);
+ assertFalse("Default converting types?", avroScheme.isConvertingTypes());
+ Tap avroSink = new Lfs(avroScheme, out);
Flow flow = new FlowConnector().connect(lfsSource, avroSink, writePipe);
flow.complete();
// Now read it back in, and verify that the data/types match up.
- Tap avroSource = new Lfs(new AvroScheme(testFields, schemeTypes), out);
+ Tap avroSource = new Lfs(avroScheme, out);
Tap verifySink = avroToTuples(testFields, verifyout, avroSource);
TupleEntryIterator sinkTuples = verifySink.openForRead(new JobConf());
verifyOutput(numRecords, sinkTuples, false);
@@ -185,10 +188,23 @@
// Now read union back in, and verify that the data/types match up.
AvroScheme unionOf = AvroScheme.unionOf(out);
+ assertTrue("Union converting types?", unionOf.isConvertingTypes());
assertEquals("Check union scheme",
+ "{'type':'record','name':'CascadingAvroSchema','namespace':'','fields':[{'name':'booleanField','type':['null','boolean'],'doc':''},{'name':'bytesField','type':['null','bytes'],'doc':''},{'name':'doubleField','type':['null','double'],'doc':''},{'name':'dummyField','type':['null','string'],'doc':''},{'name':'enumField','type':['null','string'],'doc':''},{'name':'floatField','type':['null','string'],'doc':''},{'name':'integerField','type':['null','string'],'doc':''},{'name':'longField','type':['null','long'],'doc':''},{'name':'stringField','type':['null','string'],'doc':''}]}",
+ unionOf.getJsonSchema().replace('"', '\''));
+ AvroScheme unionOfFiles =
+ AvroScheme.unionOf(new String[] {
+ out + File.separatorChar + "part-00000.avro",
+ out + File.separatorChar + "part-00001.avro"});
+ assertEquals("Check union of files scheme identical to union of path scheme",
+ unionOf.getJsonSchema(), unionOfFiles.getJsonSchema());
+ AvroScheme unionProjection = AvroScheme.unionOf(testFields, out);
+ assertEquals("Check union projection scheme",
"{'type':'record','name':'CascadingAvroSchema','namespace':'','fields':[{'name':'booleanField','type':['null','boolean'],'doc':''},{'name':'bytesField','type':['null','bytes'],'doc':''},{'name':'doubleField','type':['null','double'],'doc':''},{'name':'enumField','type':['null','string'],'doc':''},{'name':'floatField','type':['null','string'],'doc':''},{'name':'integerField','type':['null','string'],'doc':''},{'name':'longField','type':['null','long'],'doc':''},{'name':'stringField','type':['null','string'],'doc':''}]}",
- unionOf.getJsonSchema().replace('"', '\''));
+ unionProjection.getJsonSchema().replace('"', '\''));
+
avroSource = new Lfs(unionOf, out);
+
TupleEntryIterator sinkTuples2 = avroSource.openForRead(new JobConf());
verifyOutput(numRecords * 2, sinkTuples2, true);
sinkTuples2.close();
@@ -363,6 +379,12 @@
AvroScheme.addToTuple(t, mapOfStrings);
AvroScheme.addToTuple(t, TestEnum.ONE);
+
+ if (unionOf)
+ {
+ t.add(null);
+ }
+
write.add(t);
t = new Tuple();
@@ -404,6 +426,10 @@
t.add(new Tuple(0L, 1L));
t.add(new Tuple("key-0", "value-0", "key-1", "value-1"));
AvroScheme.addToTuple(t, TestEnum.TWO);
+ if (unionOf)
+ {
+ t.add(null);
+ }
write.add(t);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment