Skip to content

Commit

Permalink
SAMOA-47: Adding Code Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jayadeepj authored and gdfm committed Nov 30, 2015
1 parent 533f12a commit 76a3736
Show file tree
Hide file tree
Showing 7 changed files with 608 additions and 602 deletions.
251 changes: 124 additions & 127 deletions samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,136 +35,133 @@
import com.github.javacliparser.IntOption;
import com.github.javacliparser.StringOption;


/**
* InstanceStream implementation to handle Apache Avro Files.
* Handles both JSON & Binary encoded streams
*
* InstanceStream implementation to handle Apache Avro Files. Handles both JSON & Binary encoded streams
*
*
*/
public class AvroFileStream extends FileStream {

private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(AvroFileStream.class);

public FileOption avroFileOption = new FileOption("avroFile", 'f',"Avro File(s) to load.", null, null, false);
public IntOption classIndexOption = new IntOption("classIndex", 'c',"Class index of data. 0 for none or -1 for last attribute in file.",-1, -1, Integer.MAX_VALUE);
public StringOption encodingFormatOption = new StringOption("encodingFormatOption", 'e', "Encoding format for Avro Files. Can be JSON/AVRO", "BINARY");

/** Represents the last read Instance **/
protected InstanceExample lastInstanceRead;

/** Represents the binary input stream of avro data **/
protected transient InputStream inputStream = null;

/** The extension to be considered for the files **/
private static final String AVRO_FILE_EXTENSION = "avro";

/* (non-Javadoc)
* @see org.apache.samoa.streams.FileStream#reset()
* Reset the BINARY encoded Avro Stream & Close the file source
*/
@Override
protected void reset() {

try {
if (this.inputStream != null)
this.inputStream.close();

fileSource.reset();
} catch (IOException ioException) {
logger.error(AVRO_STREAM_FAILED_RESTART_ERROR+" : {}",ioException);
throw new RuntimeException(AVRO_STREAM_FAILED_RESTART_ERROR, ioException);
}

if (!getNextFileStream()) {
hitEndOfStream = true;
throw new RuntimeException(AVRO_STREAM_EMPTY_STREAM_ERROR);
}
}


/**
* Get next File Stream & set the class index read from the command line option
* @return
*/
protected boolean getNextFileStream() {
if (this.inputStream != null)
try {
this.inputStream.close();
} catch (IOException ioException) {
logger.error(AVRO_STREAM_FAILED_READ_ERROR+" : {}",ioException);
throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException);
}

this.inputStream = this.fileSource.getNextInputStream();

if (this.inputStream == null)
return false;

this.instances = new Instances(this.inputStream, classIndexOption.getValue(),encodingFormatOption.getValue());

if (this.classIndexOption.getValue() < 0) {
this.instances.setClassIndex(this.instances.numAttributes() - 1);
} else if (this.classIndexOption.getValue() > 0) {
this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
}
return true;
}


/* (non-Javadoc)
* @see org.apache.samoa.streams.FileStream#readNextInstanceFromFile()
* Read next Instance from File. Return false if unable to read next Instance
*/
@Override
protected boolean readNextInstanceFromFile() {
try {
if (this.instances.readInstance()) {
this.lastInstanceRead = new InstanceExample(this.instances.instance(0));
this.instances.delete();
return true;
}
if (this.inputStream != null) {
this.inputStream.close();
this.inputStream = null;
}
return false;
} catch (IOException ioException) {
logger.error(AVRO_STREAM_FAILED_READ_ERROR+" : {}",ioException);
throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException);
}

}

@Override
public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) {
super.prepareForUseImpl(monitor, repository);
String filePath = this.avroFileOption.getFile().getAbsolutePath();
this.fileSource.init(filePath, AvroFileStream.AVRO_FILE_EXTENSION);
this.lastInstanceRead = null;
}


/* (non-Javadoc)
* @see org.apache.samoa.streams.FileStream#getLastInstanceRead()
* Return the last read Instance
*/
@Override
protected InstanceExample getLastInstanceRead() {
return this.lastInstanceRead;
}


@Override
public void getDescription(StringBuilder sb, int indent) {
throw new UnsupportedOperationException(AVRO_STREAM_UNSUPPORTED_METHOD);
}

/** Error Messages to for all types of Avro File Streams */
protected static final String AVRO_STREAM_FAILED_RESTART_ERROR = "Avro FileStream restart failed.";
protected static final String AVRO_STREAM_EMPTY_STREAM_ERROR = "Avro FileStream is empty.";
protected static final String AVRO_STREAM_FAILED_READ_ERROR = "Failed to read next instance from Avro File Stream.";
protected static final String AVRO_STREAM_UNSUPPORTED_METHOD = "This method is not supported for AvroFileStream yet.";

private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(AvroFileStream.class);

public FileOption avroFileOption = new FileOption("avroFile", 'f', "Avro File(s) to load.", null, null, false);
public IntOption classIndexOption = new IntOption("classIndex", 'c',
"Class index of data. 0 for none or -1 for last attribute in file.", -1, -1, Integer.MAX_VALUE);
public StringOption encodingFormatOption = new StringOption("encodingFormatOption", 'e',
"Encoding format for Avro Files. Can be JSON/AVRO", "BINARY");

/** Represents the last read Instance **/
protected InstanceExample lastInstanceRead;

/** Represents the binary input stream of avro data **/
protected transient InputStream inputStream = null;

/** The extension to be considered for the files **/
private static final String AVRO_FILE_EXTENSION = "avro";

/* (non-Javadoc)
* @see org.apache.samoa.streams.FileStream#reset()
* Reset the BINARY encoded Avro Stream & Close the file source
*/
@Override
protected void reset() {

try {
if (this.inputStream != null)
this.inputStream.close();

fileSource.reset();
} catch (IOException ioException) {
logger.error(AVRO_STREAM_FAILED_RESTART_ERROR + " : {}", ioException);
throw new RuntimeException(AVRO_STREAM_FAILED_RESTART_ERROR, ioException);
}

if (!getNextFileStream()) {
hitEndOfStream = true;
throw new RuntimeException(AVRO_STREAM_EMPTY_STREAM_ERROR);
}
}

/**
* Get next File Stream & set the class index read from the command line option
*
* @return
*/
protected boolean getNextFileStream() {
if (this.inputStream != null)
try {
this.inputStream.close();
} catch (IOException ioException) {
logger.error(AVRO_STREAM_FAILED_READ_ERROR + " : {}", ioException);
throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException);
}

this.inputStream = this.fileSource.getNextInputStream();

if (this.inputStream == null)
return false;

this.instances = new Instances(this.inputStream, classIndexOption.getValue(), encodingFormatOption.getValue());

if (this.classIndexOption.getValue() < 0) {
this.instances.setClassIndex(this.instances.numAttributes() - 1);
} else if (this.classIndexOption.getValue() > 0) {
this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
}
return true;
}

/* (non-Javadoc)
* @see org.apache.samoa.streams.FileStream#readNextInstanceFromFile()
* Read next Instance from File. Return false if unable to read next Instance
*/
@Override
protected boolean readNextInstanceFromFile() {
try {
if (this.instances.readInstance()) {
this.lastInstanceRead = new InstanceExample(this.instances.instance(0));
this.instances.delete();
return true;
}
if (this.inputStream != null) {
this.inputStream.close();
this.inputStream = null;
}
return false;
} catch (IOException ioException) {
logger.error(AVRO_STREAM_FAILED_READ_ERROR + " : {}", ioException);
throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException);
}

}

@Override
public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) {
super.prepareForUseImpl(monitor, repository);
String filePath = this.avroFileOption.getFile().getAbsolutePath();
this.fileSource.init(filePath, AvroFileStream.AVRO_FILE_EXTENSION);
this.lastInstanceRead = null;
}

/* (non-Javadoc)
* @see org.apache.samoa.streams.FileStream#getLastInstanceRead()
* Return the last read Instance
*/
@Override
protected InstanceExample getLastInstanceRead() {
return this.lastInstanceRead;
}

@Override
public void getDescription(StringBuilder sb, int indent) {
throw new UnsupportedOperationException(AVRO_STREAM_UNSUPPORTED_METHOD);
}

/** Error Messages to for all types of Avro File Streams */
protected static final String AVRO_STREAM_FAILED_RESTART_ERROR = "Avro FileStream restart failed.";
protected static final String AVRO_STREAM_EMPTY_STREAM_ERROR = "Avro FileStream is empty.";
protected static final String AVRO_STREAM_FAILED_READ_ERROR = "Failed to read next instance from Avro File Stream.";
protected static final String AVRO_STREAM_UNSUPPORTED_METHOD = "This method is not supported for AvroFileStream yet.";

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public Instance readInstanceDense() {
while (numAttribute == 0 && streamTokenizer.ttype != StreamTokenizer.TT_EOF) {
// For each line
while (streamTokenizer.ttype != StreamTokenizer.TT_EOL
&& streamTokenizer.ttype != StreamTokenizer.TT_EOF) {
&& streamTokenizer.ttype != StreamTokenizer.TT_EOF) {
// For each item
if (streamTokenizer.ttype == StreamTokenizer.TT_NUMBER) {
// System.out.println(streamTokenizer.nval + "Num ");
Expand All @@ -95,7 +95,7 @@ public Instance readInstanceDense() {

} else if (streamTokenizer.sval != null && (
streamTokenizer.ttype == StreamTokenizer.TT_WORD
|| streamTokenizer.ttype == 34 || streamTokenizer.ttype == 39)) {
|| streamTokenizer.ttype == 34 || streamTokenizer.ttype == 39)) {
// System.out.println(streamTokenizer.sval + "Str");
boolean isNumeric = attributes.get(numAttribute).isNumeric();
double value;
Expand Down Expand Up @@ -158,7 +158,7 @@ private Instance readInstanceSparse() {
streamTokenizer.nextToken(); // Remove the '{' char
// For each line
while (streamTokenizer.ttype != StreamTokenizer.TT_EOL
&& streamTokenizer.ttype != StreamTokenizer.TT_EOF) {
&& streamTokenizer.ttype != StreamTokenizer.TT_EOF) {
while (streamTokenizer.ttype != '}') {
// For each item
// streamTokenizer.nextToken();
Expand All @@ -176,7 +176,7 @@ private Instance readInstanceSparse() {
if (streamTokenizer.ttype == StreamTokenizer.TT_NUMBER) {
// System.out.print(streamTokenizer.nval + " ");
this.setSparseValue(instance, indexValues, attributeValues, numAttribute,
streamTokenizer.nval, true);
streamTokenizer.nval, true);
// numAttribute++;

} else if (streamTokenizer.sval != null && (
Expand All @@ -185,12 +185,12 @@ private Instance readInstanceSparse() {
// System.out.print(streamTokenizer.sval + "-");
if (attributes.get(numAttribute).isNumeric()) {
this.setSparseValue(instance, indexValues, attributeValues, numAttribute,
Double.valueOf(streamTokenizer.sval).doubleValue(), true);
Double.valueOf(streamTokenizer.sval).doubleValue(), true);
} else {
this.setSparseValue(instance, indexValues, attributeValues, numAttribute,
this.instanceInformation
.attribute(numAttribute).indexOfValue(streamTokenizer.sval),
false);
this.instanceInformation
.attribute(numAttribute).indexOfValue(streamTokenizer.sval),
false);
}
}
streamTokenizer.nextToken();
Expand All @@ -211,14 +211,14 @@ private Instance readInstanceSparse() {
arrayAttributeValues[i] = attributeValues.get(i).doubleValue();
}
instance.addSparseValues(arrayIndexValues, arrayAttributeValues,
this.instanceInformation.numAttributes());
this.instanceInformation.numAttributes());
return instance;

}

private void setSparseValue(Instance instance, List<Integer> indexValues,
List<Double> attributeValues,
int numAttribute, double value, boolean isNumber) {
List<Double> attributeValues,
int numAttribute, double value, boolean isNumber) {
double valueAttribute;
if (isNumber && this.instanceInformation.attribute(numAttribute).isNominal) {
valueAttribute =
Expand Down Expand Up @@ -246,7 +246,7 @@ private Instance readDenseInstanceSparse() {
streamTokenizer.nextToken(); // Remove the '{' char
// For each line
while (streamTokenizer.ttype != StreamTokenizer.TT_EOL
&& streamTokenizer.ttype != StreamTokenizer.TT_EOF) {
&& streamTokenizer.ttype != StreamTokenizer.TT_EOF) {
while (streamTokenizer.ttype != '}') {
// For each item
// streamTokenizer.nextToken();
Expand All @@ -267,11 +267,11 @@ private Instance readDenseInstanceSparse() {
// "/"+this.instanceInformation.attribute(numAttribute).indexOfValue(streamTokenizer.sval)+" ");
if (attributes.get(numAttribute).isNumeric()) {
this.setValue(instance, numAttribute,
Double.valueOf(streamTokenizer.sval).doubleValue(), true);
Double.valueOf(streamTokenizer.sval).doubleValue(), true);
} else {
this.setValue(instance, numAttribute,
this.instanceInformation.attribute(numAttribute)
.indexOfValue(streamTokenizer.sval), false);
this.instanceInformation.attribute(numAttribute)
.indexOfValue(streamTokenizer.sval), false);
// numAttribute++;
}
}
Expand Down Expand Up @@ -395,6 +395,6 @@ private void initStreamTokenizer(Reader reader) {

@Override
public Instance readInstance() {
return readInstance(this.reader);
return readInstance(this.reader);
}
}
Loading

0 comments on commit 76a3736

Please sign in to comment.