Skip to content

Instantly share code, notes, and snippets.

@abshingate
Created November 26, 2013 17:10
Show Gist options
  • Save abshingate/7662128 to your computer and use it in GitHub Desktop.
Save abshingate/7662128 to your computer and use it in GitHub Desktop.
package com.abhijit.mr.sequence.number;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
public class MRSeqNumCompositeValue implements Writable,
Comparable<MRSeqNumCompositeValue> {
private int taskId;
private long taskOutputRecordCount;
private int targetParitionNumber = -1;
private Text value;
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(taskId);
out.writeLong(taskOutputRecordCount);
value.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
this.taskId = in.readInt();
this.taskOutputRecordCount = in.readLong();
value = new Text();
value.readFields(in);
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public long getTaskOutputRecordCount() {
return taskOutputRecordCount;
}
public void setTaskOutputRecordCount(long taskOutputRecordCount) {
this.taskOutputRecordCount = taskOutputRecordCount;
}
public Text getValue() {
return value;
}
public void setValue(Text value) {
this.value = value;
}
public int getTargetParitionNumber() {
return targetParitionNumber;
}
public void setTargetParitionNumber(int targetParitionNumber) {
this.targetParitionNumber = targetParitionNumber;
}
@Override
public int compareTo(MRSeqNumCompositeValue other) {
return Integer.valueOf(this.taskId).compareTo(
Integer.valueOf(other.taskId));
}
}
package com.abhijit.mr.sequence.number;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MRSeqNumMapper extends
Mapper<LongWritable, Text, LongWritable, MRSeqNumCompositeValue> {
private static final Logger LOG = LoggerFactory
.getLogger(MRSeqNumMapper.class);
private long counter = 0;
private int reduceTaskCount = -1;
private MRSeqNumCompositeValue seqNumCompositeValue = new MRSeqNumCompositeValue();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
int thisTaskId = context.getTaskAttemptID().getTaskID().getId();
LOG.info("task id = {}", thisTaskId);
this.reduceTaskCount = context.getConfiguration().getInt(
"mapred.reduce.tasks", -1);
LOG.info("reduce task count = {}", reduceTaskCount);
seqNumCompositeValue.setTaskId(thisTaskId);
seqNumCompositeValue.setTargetParitionNumber(thisTaskId);
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
counter++;
seqNumCompositeValue.setValue(value);
context.write(key, seqNumCompositeValue);
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
seqNumCompositeValue.setValue(new Text(""));
LOG.info("map output records count = {}", counter);
seqNumCompositeValue.setTaskOutputRecordCount(counter);
for (int i = 0; i < reduceTaskCount; i++) {
seqNumCompositeValue.setTargetParitionNumber(i);
context.write(new LongWritable(-1), seqNumCompositeValue);
}
}
}
/**
*
*/
package com.abhijit.mr.sequence.number;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author ashing0
*
*/
public class MRSeqNumPartitioner extends
Partitioner<LongWritable, MRSeqNumCompositeValue> {
@Override
public int getPartition(LongWritable key, MRSeqNumCompositeValue value,
int numPartitions) {
return value.getTargetParitionNumber();
}
}
package com.abhijit.mr.sequence.number;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MRSeqNumReducer extends
Reducer<LongWritable, MRSeqNumCompositeValue, LongWritable, Text> {
private static final Logger LOG = LoggerFactory
.getLogger(MRSeqNumMapper.class);
private long startSeqNumber = -1;
private int thisTaskId = -1;
private LongWritable longWritable = new LongWritable();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
thisTaskId = context.getTaskAttemptID().getTaskID().getId();
LOG.info("task id = {}", thisTaskId);
}
@Override
protected void reduce(LongWritable key,
Iterable<MRSeqNumCompositeValue> values, Context context)
throws IOException, InterruptedException {
LOG.info("got key = {}", key.get());
if (key.get() == -1) {
Iterator<MRSeqNumCompositeValue> iterator = values.iterator();
int reduceTaskCount = context.getConfiguration().getInt(
"mapred.reduce.tasks", -1);
long[] mapCounts = new long[reduceTaskCount];
while (iterator.hasNext()) {
MRSeqNumCompositeValue value = (MRSeqNumCompositeValue) iterator.next();
LOG.info("current task id = {} , current map counts = {}",
value.getTaskId(), value.getTaskOutputRecordCount());
mapCounts[value.getTaskId()] = value.getTaskOutputRecordCount();
}
LOG.info("total map counts = {}", mapCounts.length);
startSeqNumber = 0;
for (int i = 0; i < mapCounts.length; i++) {
if (i >= thisTaskId) {
LOG.info("exiting with start seq number = {}", startSeqNumber);
break;
}
startSeqNumber = startSeqNumber + mapCounts[i];
}
return;
}
LOG.info("start sequence number = {}", startSeqNumber);
if (startSeqNumber == -1) {
throw new RuntimeException("seq number must have been found by now");
}
Iterator<MRSeqNumCompositeValue> iterator = values.iterator();
while (iterator.hasNext()) {
MRSeqNumCompositeValue value = (MRSeqNumCompositeValue) iterator.next();
longWritable.set(startSeqNumber++);
context.write(longWritable, value.getValue());
}
}
}
package com.abhijit.mr.sequence.number;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MRSequenceNumberAssigner extends Configured implements Tool {
private static final Logger LOG = LoggerFactory
.getLogger(MRSequenceNumberAssigner.class);
public int run(String[] args) throws Exception {
LOG.info("Starting job");
if (args.length < 2) {
LOG.error("Incorrect inputs");
printHelp();
return -1;
}
String inputPathString = args[0];
String outputPathString = args[1];
Job job = new Job(getConf());
job.setJarByClass(MRSeqNumMapper.class);
job.setMapperClass(MRSeqNumMapper.class);
job.setReducerClass(MRSeqNumReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(MRSeqNumCompositeValue.class);
job.setPartitionerClass(MRSeqNumPartitioner.class);
LOG.info("Adding {} in the list of input paths", inputPathString);
FileInputFormat.setInputPaths(job, new Path(inputPathString));
FileOutputFormat.setOutputPath(job, new Path(outputPathString));
int numberOfSplits = new TextInputFormat().getSplits(job).size();
LOG.info("Number of splits = {}", numberOfSplits);
job.setNumReduceTasks(numberOfSplits);
boolean result = job.waitForCompletion(true);
if (!result) {
return -2;
}
return 0;
}
private void printHelp() {
LOG.info("Usage: hadoop job mrseqnum.jar <input_path> <output_path>");
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MRSequenceNumberAssigner(), args);
System.exit(exitCode);
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.abhijit</groupId>
<artifactId>mr-sequence-number</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>mr-sequence-number</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>0.10.0-cdh4.2.1</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>0.8.1-cdh3u3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>cloudera-releases</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment