Skip to content

Instantly share code, notes, and snippets.

@nalvared
Last active February 25, 2024 10:36
Show Gist options
  • Save nalvared/df3f92e9268a0799eeb6333cc5dc3003 to your computer and use it in GitHub Desktop.
Save nalvared/df3f92e9268a0799eeb6333cc5dc3003 to your computer and use it in GitHub Desktop.
Hadoop MapReduce CVS example

How it works

Usage

Usage:
--input | -i: indicates the absolute path of the INPUT file
--output | -o: indicates the absolute path of the OUTPUT file
--delimiter | -d (optional): indicates the CSV delimiter, default is semicolon (;)
--col-key | -ck: indicates which CSV column is the key
--col-value | -cv: indicates which CSV column is the value
Example:
$ hadoop jar CsvMapReduce.jar CsvMapReduce \
--input=/path/to/input-folder --output=/path/to/output-folder -ck=2 -cv=0 --delimiter=','

This class uses hadoop map reduce for calculating the average of float values grouped by integer key, for example avg(height) by age with the following file as dataset:

"height";"weight";"age";"male"
151.765;47.8256065;63;1
139.7;36.4858065;63;0
136.525;31.864838;65;0
156.845;53.0419145;41;1

[...]

We have probed with the following datasets:

Execution with hadoop

The execution, getting avg(height) by age

$ hadoop jar CsvMapReduce.jar CsvMapReduce \
-i=/age-in -o=/age-out --col-key=2 --col-value=0

The results

$ hdfs dfs -cat /age-out/*

0	63.32681818181818
1	72.54817272727273
2	90.59333333333332
3	88.65164444444446
4	95.67333333333335
5	99.86644999999999
6	104.22466666666669
7	112.875
8	114.38792307692307
9	116.74928571428572
10	120.015

[...]

The execution, getting avg(deaths) by hurracaine cagetory

$ hadoop jar CsvMapReduce.jar CsvMapReduce \
-i=/hurr -o=/hurr-o --col-key=3 --col-value=2

The results

$ hdfs dfs -cat /hurr-o/*

0	0.0
1	12.722222222222221
2	23.80952380952381
3	17.035714285714285
4	29.4
5	159.0
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CsvMapReduce {
public static class AppMapper
extends Mapper<Object, Text, IntWritable, DoubleWritable>{
private final static DoubleWritable height = new DoubleWritable();
private IntWritable age = new IntWritable();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
String CSV_DELIMITER = conf.get("CSV_DELIMITER");
int COL_KEY = Integer.valueOf(conf.get("COL_KEY"));
int COL_VALUE = Integer.valueOf(conf.get("COL_VALUE"));
String[] tokens = value.toString().split(CSV_DELIMITER);
if (tokens.length >= COL_KEY && tokens.length >= COL_VALUE) {
try {
age.set(Integer.valueOf(tokens[COL_KEY]));
height.set(Double.valueOf(tokens[COL_VALUE]));
} catch (NumberFormatException e) {
System.err.println(e);
}
}
else {
age.set(-1);
height.set(0.0);
}
context.write(age, height);
}
}
public static class AppReducer
extends Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> {
private DoubleWritable result = new DoubleWritable();
public void reduce(IntWritable key, Iterable<DoubleWritable> values,Context context) throws IOException, InterruptedException {
double sum = 0.0;
double total = 0.0;
for (DoubleWritable value : values) {
sum += value.get();
total += 1.0;
}
result.set(sum/total);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
String INPUT = "";
String OUTPUT = "";
String CSV_DELIMITER = ";";
int COL_KEY = 0;
int COL_VALUE = 1;
for (String arg: args){
String[] pair = arg.split("=");
switch (pair[0]) {
case "--input":
case "-i":
INPUT = pair[1];
break;
case "--output":
case "-o":
OUTPUT = pair[1];
break;
case "--delimiter":
case "-d":
CSV_DELIMITER = pair[1];
break;
case "--col-key":
case "-ck":
COL_KEY = Integer.valueOf(pair[1]);
break;
case "--col-value":
case "-cv":
COL_VALUE = Integer.valueOf(pair[1]);
break;
default:
System.out.println("[ " + pair[0] + " ] is not a valid option");
System.out.println("Usage:");
System.out.println("--input | -i: indicates the absolute path of the INPUT file");
System.out.println("--output | -o: indicates the absolute path of the OUTPUT file");
System.out.println("--delimiter | -d (optional): indicates the CSV delimiter, default is semicolon (;)");
System.out.println("--col-key | -ck: indicates which CSV column is the key");
System.out.println("--col-value | -cv: indicates which CSV column is the value");
System.out.println("Example:");
System.out.println("--input=/path/to/file --output=/path/to/another/file -ck=2 -cv=0");
return;
}
}
System.out.println("[ INPUT ] " + INPUT);
System.out.println("[ OUTPUT ] " + OUTPUT);
System.out.println("[ CSV_DELIMITER ] " + CSV_DELIMITER);
System.out.println("[ COL KEY ] " + COL_KEY);
System.out.println("[ COL VALUE ] " + COL_VALUE);
Configuration conf = new Configuration();
conf.set("CSV_DELIMITER", CSV_DELIMITER);
conf.set("COL_KEY", String.valueOf(COL_KEY));
conf.set("COL_VALUE", String.valueOf(COL_VALUE));
Job job = Job.getInstance(conf, "AgeHeight Application");
job.setJarByClass(CsvMapReduce.class);
job.setMapperClass(AppMapper.class);
job.setCombinerClass(AppReducer.class);
job.setReducerClass(AppReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path(INPUT));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment