Monday, June 16, 2014

Processing ASN.1 Call Detail Records with Hadoop (using Bouncy Castle) Part 2

The Stand-alone Decoder

Now that we have created sample data, we can create a simple decoder with the Bouncy Castle library.


import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import org.bouncycastle.asn1.ASN1InputStream;
import org.bouncycastle.asn1.ASN1Primitive;
import org.bouncycastle.asn1.ASN1Sequence;
/**
*
* A standalone decoder for the example ASN.1 specification "Simple Generic CDR".
*
* ASN.1 specification "Simple Generic CDR":
* <pre>
* {@code
* GenericCDR-Schema DEFINITIONS IMPLICIT TAGS ::=
* BEGIN
* GenericCallDataRecord ::= SEQUENCE {
* recordNumber [APPLICATION 2] IMPLICIT INTEGER,
* callingNumber [APPLICATION 8] IMPLICIT UTF8String (SIZE(1..20)),
* calledNumber [APPLICATION 9] IMPLICIT UTF8String (SIZE(1..20)),
* startDate [APPLICATION 16] IMPLICIT UTF8String (SIZE(8)),
* startTime [APPLICATION 18] IMPLICIT UTF8String (SIZE(6)),
* duration [APPLICATION 19] IMPLICIT INTEGER
* }
* END
* }
* </pre>
*
* @author awcoleman
* @version 20140525
* license: Apache License 2.0; http://www.apache.org/licenses/LICENSE-2.0
*
*/
public class StandaloneDecoder {
public StandaloneDecoder(String filename) throws IOException {
File fileIn = new File(filename);
FileInputStream fin = new FileInputStream(fileIn);
InputStream is=decompressStream(fin);
ASN1InputStream asnin = new ASN1InputStream(is);
ASN1Primitive obj = null;
while ((obj = asnin.readObject()) != null) {
CallDetailRecord thisCdr = new CallDetailRecord((ASN1Sequence) obj);
System.out.println("CallDetailRecord "+thisCdr.getRecordNumber()+" Calling "+thisCdr.getCallingNumber()
+" Called "+thisCdr.getCalledNumber()+ " Start Date-Time "+thisCdr.getStartDate()+"-"
+thisCdr.getStartTime()+" duration "+thisCdr.getDuration()
);
}
asnin.close();
is.close();
fin.close();
}
public static InputStream decompressStream(InputStream input) {
InputStream returnStream=null;
org.apache.commons.compress.compressors.CompressorInputStream cis = null;
BufferedInputStream bis=null;
try {
bis = new BufferedInputStream(input);
bis.mark(1024); //Mark stream to reset if uncompressed data
cis = new org.apache.commons.compress.compressors.CompressorStreamFactory().createCompressorInputStream(bis);
returnStream = cis;
} catch (org.apache.commons.compress.compressors.CompressorException ce) { //CompressorStreamFactory throws CompressorException for uncompressed files
try {
bis.reset();
} catch (IOException ioe) {
String errmessageIOE="IO Exception ( "+ioe.getClass().getName()+" ) : "+ioe.getMessage();
System.out.println(errmessageIOE);
}
returnStream = bis;
} catch (Exception e) {
String errmessage="Exception ( "+e.getClass().getName()+" ) : "+e.getMessage();
System.out.println(errmessage);
}
return returnStream;
}
public static void main(String[] args) {
if (args.length < 1 ) {
System.out.println("Missing a filename. Exiting.");
System.exit(1);
}
String filename = args[0];
try {
@SuppressWarnings("unused")
StandaloneDecoder mainObj = new StandaloneDecoder(filename);
} catch (IOException ioe) {
String errmessage="ERROR. EXITING. Exception ( "+ioe.getClass().getName()+" ) : "+ioe.getMessage();
System.out.println(errmessage);
ioe.printStackTrace();
System.exit(1);
}
}
}
The decompressStream method is a little overkill, but will let the sample data be compressed and handle it fine. This causes a dependency on commons-compress but can also be removed easily (just change to return input).

To iterate through the ASN.1 file, we keep grabbing objects from ASN1InputStream with readObject. Once we have an object, we use it to create a CallDetailRecord instance.


import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import org.bouncycastle.asn1.ASN1Encodable;
import org.bouncycastle.asn1.ASN1Object;
import org.bouncycastle.asn1.ASN1Primitive;
import org.bouncycastle.asn1.ASN1Sequence;
import org.bouncycastle.asn1.DERApplicationSpecific;
/**
*
* A container class for a "Simple Generic CDR".
*
* @author awcoleman
* @version 20140525
* license: Apache License 2.0; http://www.apache.org/licenses/LICENSE-2.0
*
*/
public class CallDetailRecord extends ASN1Object {
ASN1Sequence callDetailRecord;
ASN1Sequence cdr;
int recordNumber; //APPLICATION 2
String callingNumber; //APPLICATION 8
String calledNumber; //APPLICATION 9
String startDate; //APPLICATION 16
String startTime; //APPLICATION 18
int duration; //APPLICATION 19
public CallDetailRecord(ASN1Sequence inSeq) throws UnsupportedEncodingException {
cdr = inSeq;
for (Enumeration<ASN1Encodable> en = cdr.getObjects(); en.hasMoreElements();) {
ASN1Encodable em = en.nextElement();
ASN1Primitive emp = em.toASN1Primitive();
DERApplicationSpecific emt = (DERApplicationSpecific)emp;
//System.out.println("emt.getApplicationTag(): "+emt.getApplicationTag());
switch (emt.getApplicationTag()) {
case 2: recordNumber = emt.getContents()[0];
break;
case 8: callingNumber = new String(emt.getContents(), "UTF-8");
break;
case 9: calledNumber = new String(emt.getContents(), "UTF-8");
break;
case 16: startDate = new String(emt.getContents(), "UTF-8");
break;
case 18: startTime = new String(emt.getContents(), "UTF-8");
break;
case 19: duration = emt.getContents()[0];
break;
default:
//Unknown application number. In production would either log or error.
break;
}
}
}
@Override
public ASN1Primitive toASN1Primitive() {
return callDetailRecord;
}
public int getRecordNumber() {
return recordNumber;
}
public String getCallingNumber() {
return callingNumber;
}
public String getCalledNumber() {
return calledNumber;
}
public String getStartDate() {
return startDate;
}
public String getStartTime() {
return startTime;
}
public int getDuration() {
return duration;
}
}
Using Bouncy Castle requires some digging into the data format to get the expected set of classes. Now that the decoder is complete, we can move on to the Map/Reduce job. We didn't have to create a decoder and could have jumped straight into the Map/Reduce job, but creating a simple decoder for the first time I tackle a binary format has always saved me time.

Update: Links to Part 1Part 2Part 3.