diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index a52db32..a9ff0a8 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -6,6 +6,7 @@ import dev.caraml.serving.store.Feature; import dev.caraml.store.protobuf.serving.ServingServiceProto; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.*; import java.util.stream.Collectors; import org.apache.avro.AvroRuntimeException; @@ -57,13 +58,31 @@ public List> convertRowToFeature( return featureReferences.stream() .map(ServingServiceProto.FeatureReference::getFeatureTable) .distinct() - .map(cf -> row.getColumnCells(cf.getBytes(), null)) + .map(cf -> { + List rowCells = row.getColumnCells(cf.getBytes(), null); + System.out.println("Column Family: " + cf); + System.out.println("Row Cells: " + rowCells); + return rowCells; + }) +// .map(cf -> row.getColumnCells(cf.getBytes(), null)) .filter(ls -> !ls.isEmpty()) .flatMap( rowCells -> { Cell rowCell = rowCells.get(0); // Latest cell - String family = Bytes.toString(rowCell.getFamilyArray()); - ByteString value = ByteString.copyFrom(rowCell.getValueArray()); +// String family = Bytes.toString(rowCell.getFamilyArray()); +// System.out.println("rowCell: " + rowCell.toString()); +// ByteString value = ByteString.copyFrom(rowCell.getValueArray()); +// System.out.println("value: " + value); + ByteBuffer valueBuffer = ByteBuffer.wrap(rowCell.getValueArray()) + .position(rowCell.getValueOffset()) + .limit(rowCell.getValueOffset() + rowCell.getValueLength()) + .slice(); + ByteBuffer familyBuffer = ByteBuffer.wrap(rowCell.getFamilyArray()) + .position(rowCell.getFamilyOffset()) + .limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength()) + .slice(); + String family = ByteString.copyFrom(familyBuffer).toStringUtf8(); + ByteString value = ByteString.copyFrom(valueBuffer); List features; List localFeatureReferences = @@ -118,6 +137,7 @@ public Map getFeaturesFromSSTable( .filter(row -> !row.isEmpty()) .forEach(row -> result.put(ByteString.copyFrom(row.getRow()), row)); + return result; } catch (IOException e) { throw new RuntimeException(e); diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java index 7802f19..f9ed029 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java @@ -5,6 +5,7 @@ import com.google.common.cache.LoadingCache; import com.google.protobuf.ByteString; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; @@ -94,7 +95,14 @@ private GenericDatumReader loadReader(SchemaReference reference) Result result = table.get(query); Cell last = result.getColumnLatestCell(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes()); - Schema schema = new Schema.Parser().parse(Bytes.toString(last.getValueArray())); + if (last == null) { + throw new RuntimeException("Schema not found"); + } + ByteBuffer schemaBuffer = ByteBuffer.wrap(last.getValueArray()) + .position(last.getValueOffset()) + .limit(last.getValueOffset() + last.getValueLength()) + .slice(); + Schema schema = new Schema.Parser().parse(ByteString.copyFrom(schemaBuffer).toStringUtf8()); return new GenericDatumReader<>(schema); } catch (IOException e) { throw new RuntimeException(e);