Skip to content

Commit

Permalink
Fix issue due to difference in bigtable and hbase response
Browse files Browse the repository at this point in the history
* Use offset and length to get rowCell values because hbase server
  returns slightly different response structure than bigtable
* This is also applied when looking up the avro schema
  • Loading branch information
shydefoo committed Sep 9, 2024
1 parent fd4470c commit e1840ef
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import dev.caraml.serving.store.Feature;
import dev.caraml.store.protobuf.serving.ServingServiceProto;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.avro.AvroRuntimeException;
Expand Down Expand Up @@ -57,13 +58,31 @@ public List<List<Feature>> convertRowToFeature(
return featureReferences.stream()
.map(ServingServiceProto.FeatureReference::getFeatureTable)
.distinct()
.map(cf -> row.getColumnCells(cf.getBytes(), null))
.map(cf -> {
List<Cell> rowCells = row.getColumnCells(cf.getBytes(), null);
System.out.println("Column Family: " + cf);
System.out.println("Row Cells: " + rowCells);
return rowCells;
})
// .map(cf -> row.getColumnCells(cf.getBytes(), null))
.filter(ls -> !ls.isEmpty())
.flatMap(
rowCells -> {
Cell rowCell = rowCells.get(0); // Latest cell
String family = Bytes.toString(rowCell.getFamilyArray());
ByteString value = ByteString.copyFrom(rowCell.getValueArray());
// String family = Bytes.toString(rowCell.getFamilyArray());
// System.out.println("rowCell: " + rowCell.toString());
// ByteString value = ByteString.copyFrom(rowCell.getValueArray());
// System.out.println("value: " + value);
ByteBuffer valueBuffer = ByteBuffer.wrap(rowCell.getValueArray())
.position(rowCell.getValueOffset())
.limit(rowCell.getValueOffset() + rowCell.getValueLength())
.slice();
ByteBuffer familyBuffer = ByteBuffer.wrap(rowCell.getFamilyArray())
.position(rowCell.getFamilyOffset())
.limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength())
.slice();
String family = ByteString.copyFrom(familyBuffer).toStringUtf8();
ByteString value = ByteString.copyFrom(valueBuffer);

List<Feature> features;
List<ServingServiceProto.FeatureReference> localFeatureReferences =
Expand Down Expand Up @@ -118,6 +137,7 @@ public Map<ByteString, Result> getFeaturesFromSSTable(
.filter(row -> !row.isEmpty())
.forEach(row -> result.put(ByteString.copyFrom(row.getRow()), row));


return result;
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.common.cache.LoadingCache;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
Expand Down Expand Up @@ -94,7 +95,14 @@ private GenericDatumReader<GenericRecord> loadReader(SchemaReference reference)
Result result = table.get(query);

Cell last = result.getColumnLatestCell(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes());
Schema schema = new Schema.Parser().parse(Bytes.toString(last.getValueArray()));
if (last == null) {
throw new RuntimeException("Schema not found");
}
ByteBuffer schemaBuffer = ByteBuffer.wrap(last.getValueArray())
.position(last.getValueOffset())
.limit(last.getValueOffset() + last.getValueLength())
.slice();
Schema schema = new Schema.Parser().parse(ByteString.copyFrom(schemaBuffer).toStringUtf8());
return new GenericDatumReader<>(schema);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down

0 comments on commit e1840ef

Please sign in to comment.