I have never tackled a custom Writable before. I am a huge Avro (http://avro.apache.org/) fan, so I usually try to get my data converted to avro early. A discussion got me interested in tackling it and I had my Bouncy Castle ASN.1 Hadoop example open, so I extended that to a basic custom Writable example.
This thread by Oded Rosen was invaluable:
http://mail-archives.apache.org/mod_mbox/hadoop-general/201005.mbox/%3CAANLkTinzP8-nnGg8Q5aaJ8gXCCg6Som7e8Xarc_2PGDD@mail.gmail.com%3E
(also at http://osdir.com/ml/general-hadoop-apache/2010-05/msg00073.html if above is down)
I put the code in package com.awcoleman.BouncyCastleGenericCDRHadoopWithWritable in github.
The basics from the thread above and a bit of other reading are:
If your class will only be used as a value and not a key, implement the Writable interface.
If your class will be used as a key (and possibly a value), implement the WritableComparable interface (which extends Writable).
A Writable must have 3 things:
An empty contructor. There can be other contructors with arguments, but there must be a no argument one as well.
An overridden write method to write variables out.
An overridden readFields method to populate an object from a previous write method output.
Hadoop reuses Writable objects, so cleaning all variables before populating them in readFields will stop surprises.
WritableComparable adds to Writable:
An overridden hashcode method to partition keys.
An overridden compareTo method.
The advice given in the 'How to write a complex Writable' thread adds:
Override the equals method
Implement RawComparator for your type. This post (http://vangjee.wordpress.com/2012/03/30/implementing-rawcomparator-will-speed-up-your-hadoop-mapreduce-mr-jobs-2/) has an example that extends WritableComparator, which implements RawComparator.
In my example in github, I only tested Writable since I pull individual fields and wrap them as Text or LongWritable for the keys.
Sunday, July 6, 2014
Wednesday, July 2, 2014
Processing ASN.1 Call Detail Records with Hadoop (using Bouncy Castle) Part 3
Finally we get to the Hadoop Map/Reduce job...
We created the data and created a simple decoder to test, so now we can take the decoding logic and put it in a RecordReader.
The InputFormat we create is very simple - set isSplitable false and use our RecordReader named RawFileRecordReader.
The RecordReader does the bulk of the work.
RawFileRecordReader simply returns the filename and the count of the ASN.1 records in the file. We can change that to something more useful in a later post.
The Driver is also simple.
Get the full code on github. The code here is the simplest way to handle binary data files. There are lots of things to add for better performance. If the data files are large enough, adding in splitting logic may be worthwhile. If the data files are small, it may be worth using a map job to group them into sequence files, or convert them into avro files.
Update: Links to Part 1, Part 2, Part 3.
We created the data and created a simple decoder to test, so now we can take the decoding logic and put it in a RecordReader.
The InputFormat we create is very simple - set isSplitable false and use our RecordReader named RawFileRecordReader.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.awcoleman.BouncyCastleGenericCDRHadoop; | |
import java.io.IOException; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.JobContext; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
/** | |
* | |
* Input Format for "Simple Generic CDR" | |
* Reads in entire ASN.1 file as key. | |
* | |
* | |
* @author awcoleman | |
* @version 20140522 | |
* license: Apache License 2.0; http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
*/ | |
public class RawFileAsBinaryInputFormat extends FileInputFormat<Text, LongWritable> { | |
@Override | |
protected boolean isSplitable(JobContext context, Path filename){ | |
return false; | |
} | |
@Override | |
public RecordReader<Text, LongWritable> createRecordReader( | |
InputSplit split, TaskAttemptContext context) throws IOException, | |
InterruptedException { | |
return new RawFileRecordReader(); | |
} | |
} |
The RecordReader does the bulk of the work.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.awcoleman.BouncyCastleGenericCDRHadoop; | |
import java.io.BufferedInputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FSDataInputStream; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.input.FileSplit; | |
import org.bouncycastle.asn1.ASN1InputStream; | |
import org.bouncycastle.asn1.ASN1Primitive; | |
import org.bouncycastle.asn1.ASN1Sequence; | |
/** | |
* | |
* Input Format for "Simple Generic CDR" | |
* Reads in entire ASN.1 file as key. | |
* | |
* @author awcoleman | |
* @version 20140522 | |
* license: Apache License 2.0; http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
*/ | |
public class RawFileRecordReader extends RecordReader<Text, LongWritable> { | |
private Path path; | |
private InputStream is; | |
private FSDataInputStream fsin; | |
private ASN1InputStream asnin; | |
private ASN1Primitive obj; | |
private Text currentKey; | |
private LongWritable currentValue; | |
private boolean isProcessed = false; | |
@Override | |
public boolean nextKeyValue() throws IOException, InterruptedException { | |
if (isProcessed) return false; | |
currentKey = new Text( path.getName() ); | |
int recordCounter = 0; | |
while ((obj = asnin.readObject()) != null) { | |
CallDetailRecord thisCdr = new CallDetailRecord((ASN1Sequence) obj); | |
recordCounter++; | |
System.out.println("CallDetailRecord "+thisCdr.getRecordNumber()+" Calling "+thisCdr.getCallingNumber() | |
+" Called "+thisCdr.getCalledNumber()+ " Start Date-Time "+thisCdr.getStartDate()+"-" | |
+thisCdr.getStartTime()+" duration "+thisCdr.getDuration() | |
); | |
} | |
isProcessed = true; | |
//Return number of records | |
currentValue = new LongWritable(recordCounter); | |
return true; | |
} | |
@Override | |
public Text getCurrentKey() throws IOException, InterruptedException { | |
return currentKey; | |
} | |
@Override | |
public LongWritable getCurrentValue() throws IOException, InterruptedException { | |
return currentValue; | |
} | |
@Override | |
public float getProgress() throws IOException, InterruptedException { | |
return isProcessed ? 1 : 0; | |
} | |
@Override | |
public void initialize(InputSplit split, TaskAttemptContext context) | |
throws IOException, InterruptedException { | |
Configuration conf = context.getConfiguration(); | |
path = ((FileSplit) split).getPath(); | |
FileSystem fs = path.getFileSystem(conf); | |
FSDataInputStream fsin = fs.open(path); | |
is=decompressStream(fsin); | |
asnin = new ASN1InputStream(is); | |
} | |
@Override | |
public void close() throws IOException { | |
asnin.close(); | |
is.close(); | |
if (fsin!=null) fsin.close(); | |
} | |
public static InputStream decompressStream(InputStream input) { | |
InputStream returnStream=null; | |
org.apache.commons.compress.compressors.CompressorInputStream cis = null; | |
BufferedInputStream bis=null; | |
try { | |
bis = new BufferedInputStream(input); | |
bis.mark(1024); //Mark stream to reset if uncompressed data | |
cis = new org.apache.commons.compress.compressors.CompressorStreamFactory().createCompressorInputStream(bis); | |
returnStream = cis; | |
} catch (org.apache.commons.compress.compressors.CompressorException ce) { //CompressorStreamFactory throws CompressorException for uncompressed files | |
try { | |
bis.reset(); | |
} catch (IOException ioe) { | |
String errmessageIOE="IO Exception ( "+ioe.getClass().getName()+" ) : "+ioe.getMessage(); | |
System.out.println(errmessageIOE); | |
} | |
returnStream = bis; | |
} catch (Exception e) { | |
String errmessage="Exception ( "+e.getClass().getName()+" ) : "+e.getMessage(); | |
System.out.println(errmessage); | |
} | |
return returnStream; | |
} | |
} |
RawFileRecordReader simply returns the filename and the count of the ASN.1 records in the file. We can change that to something more useful in a later post.
The Driver is also simple.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.awcoleman.BouncyCastleGenericCDRHadoop; | |
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
/** | |
* | |
* Basic Hadoop Driver (with Mapper and Reducer included) for "Simple Generic CDR" | |
* | |
* | |
* @author awcoleman | |
* @version 20140522 | |
* license: Apache License 2.0; http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
*/ | |
public class BasicDriverMapReduce extends Configured implements Tool { | |
public static class BasicMapper extends Mapper<Text, LongWritable, Text, LongWritable> { | |
public void map(Text key, LongWritable value, Context context) throws IOException, InterruptedException { | |
context.write(key, value); | |
} | |
} | |
public static class BasicReducer extends Reducer<Text,LongWritable,Text,LongWritable> { | |
private long total = 0; | |
public void reduce(Text key, Iterable<LongWritable> values, Context context ) throws IOException, InterruptedException { | |
for (LongWritable val : values) { | |
total += val.get(); | |
} | |
context.write(key, new LongWritable(total)); | |
} | |
} | |
public int run(String[] args) throws Exception { | |
if (args.length < 2 ) { | |
System.out.println("Missing input and output filenames. Exiting."); | |
System.exit(1); | |
} | |
Job job = new Job(super.getConf()); | |
job.setJarByClass(BasicDriverMapReduce.class); | |
job.setJobName("BasicDriver1"); | |
job.setMapperClass(BasicMapper.class); | |
job.setReducerClass(BasicReducer.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(LongWritable.class); | |
job.setInputFormatClass(RawFileAsBinaryInputFormat.class); | |
FileInputFormat.addInputPath(job, new Path(args[0])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
return job.waitForCompletion(true) ? 0 : 1; | |
} | |
public static void main(String[] args) throws Exception { | |
Configuration conf = new Configuration(); | |
int ret = ToolRunner.run(conf, new BasicDriverMapReduce(), args); | |
System.exit(ret); | |
} | |
} |
Get the full code on github. The code here is the simplest way to handle binary data files. There are lots of things to add for better performance. If the data files are large enough, adding in splitting logic may be worthwhile. If the data files are small, it may be worth using a map job to group them into sequence files, or convert them into avro files.
Update: Links to Part 1, Part 2, Part 3.
Subscribe to:
Posts (Atom)