-
-
Save srinivasanHadoop/7245106 to your computer and use it in GitHub Desktop.
package com.srini.tikacustom; | |
import java.io.IOException; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
public class TikaFileInputFormat extends FileInputFormat<Text, Text> { | |
@Override | |
public RecordReader<Text, Text> createRecordReader(InputSplit split, | |
TaskAttemptContext context) throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
return new TikaRecordReader(); | |
} | |
} |
package com.srini.tikacustom; | |
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class TikaMapreduce extends Configured implements Tool { | |
public static class TikaMapper extends Mapper<Text, Text, Text, Text> { | |
public void map(Text key, Text value, Context context) | |
throws IOException, InterruptedException { | |
context.write(key, value); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
int exit = ToolRunner.run(new Configuration(), new TikaMapreduce(), | |
args); | |
System.exit(exit); | |
} | |
@Override | |
public int run(String[] args) throws Exception { | |
// TODO Auto-generated method stub | |
if (args.length != 2) { | |
System.out.println("set the input path and output path"); | |
return 2; | |
} | |
Configuration conf = new Configuration(); | |
Job job = new Job(conf, "TikaMapreduce"); | |
job.setJarByClass(getClass()); | |
job.setJobName("TikRead"); | |
job.setInputFormatClass(TikaFileInputFormat.class); | |
FileInputFormat.addInputPath(job, new Path(args[0])); | |
job.setMapperClass(TikaMapper.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(Text.class); | |
job.setOutputFormatClass(TikaOutPutFormt.class); | |
FileOutputFormat.setOutputPath(job, new Path(args[1] | |
+ System.currentTimeMillis())); | |
return job.waitForCompletion(true) ? 0 : 1; | |
} | |
} |
package com.srini.tikacustom; | |
import java.io.IOException; | |
import org.apache.hadoop.fs.FSDataOutputStream; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.RecordWriter; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
public class TikaOutPutFormt extends FileOutputFormat<Text, Text> { | |
@Override | |
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) | |
throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
Path path=FileOutputFormat.getOutputPath(context); | |
Path fullapth=new Path(path,"Srini.txt"); | |
FileSystem fs=path.getFileSystem(context.getConfiguration()); | |
FSDataOutputStream output=fs.create(fullapth,context); | |
return new TikaRecordWrite(output); | |
} | |
} |
package com.srini.tikacustom; | |
import java.io.IOException; | |
import java.net.URL; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FSDataInputStream; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.input.FileSplit; | |
import org.apache.tika.Tika; | |
import org.apache.tika.exception.TikaException; | |
public class TikaRecordReader extends RecordReader<Text, Text> { | |
private Text key = new Text(); | |
private Text value = new Text(); | |
private FileSplit fileSplit; | |
private Configuration conf; | |
private boolean processed = false; | |
@Override | |
public void close() throws IOException { | |
// TODO Auto-generated method stub | |
} | |
@Override | |
public Text getCurrentKey() throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
return key; | |
} | |
@Override | |
public Text getCurrentValue() throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
return value; | |
} | |
@Override | |
public float getProgress() throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
return processed ? 1.0f : 0.0f; | |
} | |
@Override | |
public void initialize(InputSplit split, TaskAttemptContext context) | |
throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
this.fileSplit = (FileSplit) split; | |
this.conf = context.getConfiguration(); | |
} | |
@Override | |
public boolean nextKeyValue() throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
if (!processed) { | |
Path path = fileSplit.getPath(); | |
key.set(path.toString()); | |
@SuppressWarnings("unused") | |
FileSystem fs = path.getFileSystem(conf); | |
@SuppressWarnings("unused") | |
FSDataInputStream fin = null; | |
try { | |
String con = new Tika().parseToString(new URL(path.toString())); | |
String string = con.replaceAll("[$%&+,:;=?#|']", " "); | |
String string2 = string.replaceAll("\\s+", " "); | |
String lo = string2.toLowerCase(); | |
value.set(lo); | |
} catch (TikaException e) { | |
// TODO Auto-generated catch block | |
e.printStackTrace(); | |
} | |
processed = true; | |
return true; | |
} else { | |
return false; | |
} | |
} | |
} |
package com.srini.tikacustom; | |
import java.io.DataOutputStream; | |
import java.io.IOException; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.RecordWriter; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
public class TikaRecordWrite extends RecordWriter<Text, Text> { | |
private DataOutputStream out; | |
public TikaRecordWrite(DataOutputStream output) { | |
// TODO Auto-generated constructor stub | |
out=output; | |
try { | |
out.writeBytes("result:\r\n"); | |
} catch (IOException e) { | |
// TODO Auto-generated catch block | |
e.printStackTrace(); | |
} | |
} | |
@Override | |
public void close(TaskAttemptContext context) throws IOException, | |
InterruptedException { | |
// TODO Auto-generated method stub | |
out.close(); | |
} | |
@Override | |
public void write(Text key, Text value) throws IOException, | |
InterruptedException { | |
// TODO Auto-generated method stub | |
out.writeBytes(key.toString()); | |
out.writeBytes(","); | |
out.writeBytes(value.toString()); | |
out.writeBytes("\r\n"); | |
} | |
} |
This is very awesome!!
I don't understand, where is the reducer? I see only the mapper.
Hi sri,
I used you your code to parse a pdf file.
But it is giving me an error :
Error: java.net.MalformedURLException: unknown protocol: hdfs
at java.net.URL.(URL.java:593)
at java.net.URL.(URL.java:483)
at java.net.URL.(URL.java:432)
at com.serendio.icvs.hbase.test.TikaRecordReader.nextKeyValue(TikaRecordReader.java:68)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:532)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
Hey!! I am also getting the same error as mentioned above by sahil28. Did anyone get it to work
Please reply!!
na batao !!!! maine chala liya hai!!! tumhe bhi nahi bataaunga par ab!!!
i am also facing same error Error: java.net.MalformedURLException: unknown protocol: hdfs . what the i got this error please anyone help me
there is two ways to solve the error.
add 'URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());' at TikaRecordReader.java
...
...
...
FSDataInputStream fin = null;
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
try {
String con = new Tika().parseToString(new URL(path.toString()));
...
...
...
or, change the code below which at TikaRecordReader.java
...
...
...
FSDataInputStream fin = null;
try {
String con = new Tika().parseToString(new URL(path.toString()));
...
...
...
to
...
...
...
FSDataInputStream fin = fs.open(path);
try {
String con = new Tika().parseToString(fin);
...
...
...
hope it helps
hi i am getting error , what jar files i need to add for tika ????can you share if any ?
Hi I am trying to use your mapreduce program to parse TIF images. Do i need to change any input output format ??
Hi Guys,
please send me the feed back