From e1840ef9f07f1e967199a7bb69acf60c0b376ea3 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Fri, 6 Sep 2024 22:43:19 +0800 Subject: [PATCH 1/3] Fix issue due to difference in bigtable and hbase response * Use offset and length to get rowCell values because hbase server returns slightly different response structure than bigtable * This is also applied when looking up the avro schema --- .../store/bigtable/HBaseOnlineRetriever.java | 26 ++++++++++++++++--- .../store/bigtable/HBaseSchemaRegistry.java | 10 ++++++- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index a52db32..a9ff0a8 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -6,6 +6,7 @@ import dev.caraml.serving.store.Feature; import dev.caraml.store.protobuf.serving.ServingServiceProto; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.*; import java.util.stream.Collectors; import org.apache.avro.AvroRuntimeException; @@ -57,13 +58,31 @@ public List> convertRowToFeature( return featureReferences.stream() .map(ServingServiceProto.FeatureReference::getFeatureTable) .distinct() - .map(cf -> row.getColumnCells(cf.getBytes(), null)) + .map(cf -> { + List rowCells = row.getColumnCells(cf.getBytes(), null); + System.out.println("Column Family: " + cf); + System.out.println("Row Cells: " + rowCells); + return rowCells; + }) +// .map(cf -> row.getColumnCells(cf.getBytes(), null)) .filter(ls -> !ls.isEmpty()) .flatMap( rowCells -> { Cell rowCell = rowCells.get(0); // Latest cell - String family = Bytes.toString(rowCell.getFamilyArray()); - ByteString value = ByteString.copyFrom(rowCell.getValueArray()); +// String family = Bytes.toString(rowCell.getFamilyArray()); +// System.out.println("rowCell: " + rowCell.toString()); +// ByteString value = ByteString.copyFrom(rowCell.getValueArray()); +// System.out.println("value: " + value); + ByteBuffer valueBuffer = ByteBuffer.wrap(rowCell.getValueArray()) + .position(rowCell.getValueOffset()) + .limit(rowCell.getValueOffset() + rowCell.getValueLength()) + .slice(); + ByteBuffer familyBuffer = ByteBuffer.wrap(rowCell.getFamilyArray()) + .position(rowCell.getFamilyOffset()) + .limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength()) + .slice(); + String family = ByteString.copyFrom(familyBuffer).toStringUtf8(); + ByteString value = ByteString.copyFrom(valueBuffer); List features; List localFeatureReferences = @@ -118,6 +137,7 @@ public Map getFeaturesFromSSTable( .filter(row -> !row.isEmpty()) .forEach(row -> result.put(ByteString.copyFrom(row.getRow()), row)); + return result; } catch (IOException e) { throw new RuntimeException(e); diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java index 7802f19..f9ed029 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java @@ -5,6 +5,7 @@ import com.google.common.cache.LoadingCache; import com.google.protobuf.ByteString; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; @@ -94,7 +95,14 @@ private GenericDatumReader loadReader(SchemaReference reference) Result result = table.get(query); Cell last = result.getColumnLatestCell(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes()); - Schema schema = new Schema.Parser().parse(Bytes.toString(last.getValueArray())); + if (last == null) { + throw new RuntimeException("Schema not found"); + } + ByteBuffer schemaBuffer = ByteBuffer.wrap(last.getValueArray()) + .position(last.getValueOffset()) + .limit(last.getValueOffset() + last.getValueLength()) + .slice(); + Schema schema = new Schema.Parser().parse(ByteString.copyFrom(schemaBuffer).toStringUtf8()); return new GenericDatumReader<>(schema); } catch (IOException e) { throw new RuntimeException(e); From 0eb1b135f594ab9258b8cde6cc0c5c0a28f7f2f4 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Fri, 6 Sep 2024 22:49:23 +0800 Subject: [PATCH 2/3] Fix linting --- .../store/bigtable/HBaseOnlineRetriever.java | 48 ++++++++++--------- .../store/bigtable/HBaseSchemaRegistry.java | 12 ++--- .../store/bigtable/HBaseStoreConfig.java | 39 +++++++++++++++ 3 files changed, 71 insertions(+), 28 deletions(-) create mode 100644 caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index a9ff0a8..847c42d 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -17,7 +17,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; public class HBaseOnlineRetriever implements SSTableOnlineRetriever { private final Connection client; @@ -58,31 +57,37 @@ public List> convertRowToFeature( return featureReferences.stream() .map(ServingServiceProto.FeatureReference::getFeatureTable) .distinct() - .map(cf -> { - List rowCells = row.getColumnCells(cf.getBytes(), null); - System.out.println("Column Family: " + cf); - System.out.println("Row Cells: " + rowCells); - return rowCells; + .map( + cf -> { + List rowCells = row.getColumnCells(cf.getBytes(), null); + System.out.println("Column Family: " + cf); + System.out.println("Row Cells: " + rowCells); + return rowCells; }) -// .map(cf -> row.getColumnCells(cf.getBytes(), null)) + // .map(cf -> row.getColumnCells(cf.getBytes(), null)) .filter(ls -> !ls.isEmpty()) .flatMap( rowCells -> { Cell rowCell = rowCells.get(0); // Latest cell -// String family = Bytes.toString(rowCell.getFamilyArray()); -// System.out.println("rowCell: " + rowCell.toString()); -// ByteString value = ByteString.copyFrom(rowCell.getValueArray()); -// System.out.println("value: " + value); - ByteBuffer valueBuffer = ByteBuffer.wrap(rowCell.getValueArray()) - .position(rowCell.getValueOffset()) - .limit(rowCell.getValueOffset() + rowCell.getValueLength()) - .slice(); - ByteBuffer familyBuffer = ByteBuffer.wrap(rowCell.getFamilyArray()) - .position(rowCell.getFamilyOffset()) - .limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength()) - .slice(); - String family = ByteString.copyFrom(familyBuffer).toStringUtf8(); - ByteString value = ByteString.copyFrom(valueBuffer); + // String family = + // Bytes.toString(rowCell.getFamilyArray()); + // System.out.println("rowCell: " + + // rowCell.toString()); + // ByteString value = + // ByteString.copyFrom(rowCell.getValueArray()); + // System.out.println("value: " + value); + ByteBuffer valueBuffer = + ByteBuffer.wrap(rowCell.getValueArray()) + .position(rowCell.getValueOffset()) + .limit(rowCell.getValueOffset() + rowCell.getValueLength()) + .slice(); + ByteBuffer familyBuffer = + ByteBuffer.wrap(rowCell.getFamilyArray()) + .position(rowCell.getFamilyOffset()) + .limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength()) + .slice(); + String family = ByteString.copyFrom(familyBuffer).toStringUtf8(); + ByteString value = ByteString.copyFrom(valueBuffer); List features; List localFeatureReferences = @@ -137,7 +142,6 @@ public Map getFeaturesFromSSTable( .filter(row -> !row.isEmpty()) .forEach(row -> result.put(ByteString.copyFrom(row.getRow()), row)); - return result; } catch (IOException e) { throw new RuntimeException(e); diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java index f9ed029..3af2511 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java @@ -16,7 +16,6 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; public class HBaseSchemaRegistry { private final Connection hbaseClient; @@ -96,12 +95,13 @@ private GenericDatumReader loadReader(SchemaReference reference) Cell last = result.getColumnLatestCell(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes()); if (last == null) { - throw new RuntimeException("Schema not found"); + throw new RuntimeException("Schema not found"); } - ByteBuffer schemaBuffer = ByteBuffer.wrap(last.getValueArray()) - .position(last.getValueOffset()) - .limit(last.getValueOffset() + last.getValueLength()) - .slice(); + ByteBuffer schemaBuffer = + ByteBuffer.wrap(last.getValueArray()) + .position(last.getValueOffset()) + .limit(last.getValueOffset() + last.getValueLength()) + .slice(); Schema schema = new Schema.Parser().parse(ByteString.copyFrom(schemaBuffer).toStringUtf8()); return new GenericDatumReader<>(schema); } catch (IOException e) { diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java new file mode 100644 index 0000000..d36203c --- /dev/null +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java @@ -0,0 +1,39 @@ +package dev.caraml.serving.store.bigtable; + +import dev.caraml.serving.store.OnlineRetriever; +import java.io.IOException; +import lombok.Getter; +import lombok.Setter; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConfigurationProperties(prefix = "caraml.store.hbase") +@ConditionalOnProperty(prefix = "caraml.store", name = "active", havingValue = "hbase") +@Getter +@Setter +public class HBaseStoreConfig { + private String zookeeperQuorum; + private String zookeeperClientPort; + + @Bean + public OnlineRetriever getRetriever() { + org.apache.hadoop.conf.Configuration conf; + conf = HBaseConfiguration.create(); + conf.set("hbase.zookeeper.quorum", zookeeperQuorum); + conf.set("hbase.zookeeper.property.clientPort", zookeeperClientPort); + Connection connection; + try { + connection = ConnectionFactory.createConnection(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return new HBaseOnlineRetriever(connection); + } +} From 350783e2540b69e47a2d7f593a1b89217a628188 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Mon, 9 Sep 2024 12:28:52 +0800 Subject: [PATCH 3/3] Remove commented code --- .../store/bigtable/HBaseOnlineRetriever.java | 16 +--------------- .../store/bigtable/HBaseSchemaRegistry.java | 1 + 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index 847c42d..145a901 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -57,25 +57,11 @@ public List> convertRowToFeature( return featureReferences.stream() .map(ServingServiceProto.FeatureReference::getFeatureTable) .distinct() - .map( - cf -> { - List rowCells = row.getColumnCells(cf.getBytes(), null); - System.out.println("Column Family: " + cf); - System.out.println("Row Cells: " + rowCells); - return rowCells; - }) - // .map(cf -> row.getColumnCells(cf.getBytes(), null)) + .map(cf -> row.getColumnCells(cf.getBytes(), null)) .filter(ls -> !ls.isEmpty()) .flatMap( rowCells -> { Cell rowCell = rowCells.get(0); // Latest cell - // String family = - // Bytes.toString(rowCell.getFamilyArray()); - // System.out.println("rowCell: " + - // rowCell.toString()); - // ByteString value = - // ByteString.copyFrom(rowCell.getValueArray()); - // System.out.println("value: " + value); ByteBuffer valueBuffer = ByteBuffer.wrap(rowCell.getValueArray()) .position(rowCell.getValueOffset()) diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java index 3af2511..6062fdf 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java @@ -95,6 +95,7 @@ private GenericDatumReader loadReader(SchemaReference reference) Cell last = result.getColumnLatestCell(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes()); if (last == null) { + // NOTE: this should never happen throw new RuntimeException("Schema not found"); } ByteBuffer schemaBuffer =