Created
September 16, 2014 07:23
-
-
Save rishav-rohit/e3556aa203a3961cdf34 to your computer and use it in GitHub Desktop.
HBase multi table input union example mapper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.rishav.hbase.union; | |
import java.util.Arrays; | |
import org.apache.hadoop.hbase.client.Result; | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
import org.apache.hadoop.hbase.mapreduce.TableMapper; | |
import org.apache.hadoop.hbase.mapreduce.TableSplit; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.io.Text; | |
public class UnionMapper extends TableMapper<Text, IntWritable> { | |
private static byte[] storeSalesTable = Bytes.toBytes("storeSales"); | |
private static byte[] onlineSalesTable = Bytes.toBytes("onlineSales"); | |
byte[] sales; | |
String storeSales; | |
Integer sSales; | |
String onlineSales; | |
Integer oSales; | |
Text mapperKey; | |
IntWritable mapperValue; | |
@Override | |
public void map(ImmutableBytesWritable rowKey, Result columns, Context context) { | |
// get table name | |
TableSplit currentSplit = (TableSplit)context.getInputSplit(); | |
byte[] tableName = currentSplit.getTableName(); | |
try { | |
if (Arrays.equals(tableName, storeSalesTable)) { | |
String date = new String(rowKey.get()).split("#")[0]; | |
sales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sSales")); | |
storeSales = new String(sales); | |
sSales = new Integer(storeSales); | |
mapperKey = new Text("s#" + date); | |
mapperValue = new IntWritable(sSales); | |
context.write(mapperKey, mapperValue); | |
} else if (Arrays.equals(tableName, onlineSalesTable)) { | |
String date = new String(rowKey.get()); | |
sales = columns.getValue(Bytes.toBytes("cf2"), Bytes.toBytes("oSales")); | |
onlineSales = new String(sales); | |
Integer oSales = new Integer(onlineSales); | |
mapperKey = new Text("o#"+date); | |
mapperValue = new IntWritable(oSales); | |
context.write(mapperKey, mapperValue); | |
} | |
} catch (Exception e) { | |
// TODO : exception handling logic | |
e.printStackTrace(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment