Sunday, July 6, 2014

Custom Writable

I have never tackled a custom Writable before. I am a huge Avro (http://avro.apache.org/) fan, so I usually try to get my data converted to avro early. A discussion got me interested in tackling it and I had my Bouncy Castle ASN.1 Hadoop example open, so I extended that to a basic custom Writable example.

This thread by Oded Rosen was invaluable:
http://mail-archives.apache.org/mod_mbox/hadoop-general/201005.mbox/%3CAANLkTinzP8-nnGg8Q5aaJ8gXCCg6Som7e8Xarc_2PGDD@mail.gmail.com%3E
(also at http://osdir.com/ml/general-hadoop-apache/2010-05/msg00073.html if above is down)

I put the code in package com.awcoleman.BouncyCastleGenericCDRHadoopWithWritable in github.

The basics from the thread above and a bit of other reading are:
If your class will only be used as a value and not a key, implement the Writable interface.
If your class will be used as a key (and possibly a value), implement the WritableComparable interface (which extends Writable).

A Writable must have 3 things:
An empty contructor. There can be other contructors with arguments, but there must be a no argument one as well.
An overridden write method to write variables out.
An overridden readFields method to populate an object from a previous write method output.

Hadoop reuses Writable objects, so cleaning all variables before populating them in readFields will stop surprises.

WritableComparable adds to Writable:
An overridden hashcode method to partition keys.
An overridden compareTo method.

The advice given in the 'How to write a complex Writable' thread adds:
Override the equals method
Implement RawComparator for your type. This post (http://vangjee.wordpress.com/2012/03/30/implementing-rawcomparator-will-speed-up-your-hadoop-mapreduce-mr-jobs-2/) has an example that extends WritableComparator, which implements RawComparator.

In my example in github, I only tested Writable since I pull individual fields and wrap them as Text or LongWritable for the keys.


Wednesday, July 2, 2014

Processing ASN.1 Call Detail Records with Hadoop (using Bouncy Castle) Part 3

Finally we get to the Hadoop Map/Reduce job...

We created the data and created a simple decoder to test, so now we can take the decoding logic and put it in a RecordReader.

The InputFormat we create is very simple - set isSplitable false and use our RecordReader named RawFileRecordReader.

package com.awcoleman.BouncyCastleGenericCDRHadoop;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
*
* Input Format for "Simple Generic CDR"
* Reads in entire ASN.1 file as key.
*
*
* @author awcoleman
* @version 20140522
* license: Apache License 2.0; http://www.apache.org/licenses/LICENSE-2.0
*
*/
public class RawFileAsBinaryInputFormat extends FileInputFormat<Text, LongWritable> {
@Override
protected boolean isSplitable(JobContext context, Path filename){
return false;
}
@Override
public RecordReader<Text, LongWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
return new RawFileRecordReader();
}
}
view raw gistfile1.txt hosted with ❤ by GitHub

The RecordReader does the bulk of the work.

package com.awcoleman.BouncyCastleGenericCDRHadoop;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.bouncycastle.asn1.ASN1InputStream;
import org.bouncycastle.asn1.ASN1Primitive;
import org.bouncycastle.asn1.ASN1Sequence;
/**
*
* Input Format for "Simple Generic CDR"
* Reads in entire ASN.1 file as key.
*
* @author awcoleman
* @version 20140522
* license: Apache License 2.0; http://www.apache.org/licenses/LICENSE-2.0
*
*/
public class RawFileRecordReader extends RecordReader<Text, LongWritable> {
private Path path;
private InputStream is;
private FSDataInputStream fsin;
private ASN1InputStream asnin;
private ASN1Primitive obj;
private Text currentKey;
private LongWritable currentValue;
private boolean isProcessed = false;
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (isProcessed) return false;
currentKey = new Text( path.getName() );
int recordCounter = 0;
while ((obj = asnin.readObject()) != null) {
CallDetailRecord thisCdr = new CallDetailRecord((ASN1Sequence) obj);
recordCounter++;
System.out.println("CallDetailRecord "+thisCdr.getRecordNumber()+" Calling "+thisCdr.getCallingNumber()
+" Called "+thisCdr.getCalledNumber()+ " Start Date-Time "+thisCdr.getStartDate()+"-"
+thisCdr.getStartTime()+" duration "+thisCdr.getDuration()
);
}
isProcessed = true;
//Return number of records
currentValue = new LongWritable(recordCounter);
return true;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return currentKey;
}
@Override
public LongWritable getCurrentValue() throws IOException, InterruptedException {
return currentValue;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return isProcessed ? 1 : 0;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
path = ((FileSplit) split).getPath();
FileSystem fs = path.getFileSystem(conf);
FSDataInputStream fsin = fs.open(path);
is=decompressStream(fsin);
asnin = new ASN1InputStream(is);
}
@Override
public void close() throws IOException {
asnin.close();
is.close();
if (fsin!=null) fsin.close();
}
public static InputStream decompressStream(InputStream input) {
InputStream returnStream=null;
org.apache.commons.compress.compressors.CompressorInputStream cis = null;
BufferedInputStream bis=null;
try {
bis = new BufferedInputStream(input);
bis.mark(1024); //Mark stream to reset if uncompressed data
cis = new org.apache.commons.compress.compressors.CompressorStreamFactory().createCompressorInputStream(bis);
returnStream = cis;
} catch (org.apache.commons.compress.compressors.CompressorException ce) { //CompressorStreamFactory throws CompressorException for uncompressed files
try {
bis.reset();
} catch (IOException ioe) {
String errmessageIOE="IO Exception ( "+ioe.getClass().getName()+" ) : "+ioe.getMessage();
System.out.println(errmessageIOE);
}
returnStream = bis;
} catch (Exception e) {
String errmessage="Exception ( "+e.getClass().getName()+" ) : "+e.getMessage();
System.out.println(errmessage);
}
return returnStream;
}
}
view raw gistfile1.java hosted with ❤ by GitHub

RawFileRecordReader simply returns the filename and the count of the ASN.1 records in the file. We can change that to something more useful in a later post.

The Driver is also simple.

package com.awcoleman.BouncyCastleGenericCDRHadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* Basic Hadoop Driver (with Mapper and Reducer included) for "Simple Generic CDR"
*
*
* @author awcoleman
* @version 20140522
* license: Apache License 2.0; http://www.apache.org/licenses/LICENSE-2.0
*
*/
public class BasicDriverMapReduce extends Configured implements Tool {
public static class BasicMapper extends Mapper<Text, LongWritable, Text, LongWritable> {
public void map(Text key, LongWritable value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
public static class BasicReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
private long total = 0;
public void reduce(Text key, Iterable<LongWritable> values, Context context ) throws IOException, InterruptedException {
for (LongWritable val : values) {
total += val.get();
}
context.write(key, new LongWritable(total));
}
}
public int run(String[] args) throws Exception {
if (args.length < 2 ) {
System.out.println("Missing input and output filenames. Exiting.");
System.exit(1);
}
Job job = new Job(super.getConf());
job.setJarByClass(BasicDriverMapReduce.class);
job.setJobName("BasicDriver1");
job.setMapperClass(BasicMapper.class);
job.setReducerClass(BasicReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setInputFormatClass(RawFileAsBinaryInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
int ret = ToolRunner.run(conf, new BasicDriverMapReduce(), args);
System.exit(ret);
}
}
view raw gistfile1.java hosted with ❤ by GitHub

Get the full code on github. The code here is the simplest way to handle binary data files. There are lots of things to add for better performance. If the data files are large enough, adding in splitting logic may be worthwhile. If the data files are small, it may be worth using a map job to group them into sequence files, or convert them into avro files.

Update: Links to Part 1Part 2Part 3.