Skip to content

Commit

Permalink
feat: add HBase SDK for serving
Browse files Browse the repository at this point in the history
  • Loading branch information
bayu-aditya committed Sep 4, 2024
1 parent 9178b66 commit fd4470c
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 1 deletion.
3 changes: 2 additions & 1 deletion caraml-store-serving/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ dependencies {
implementation 'org.apache.commons:commons-lang3:3.10'
implementation 'org.apache.avro:avro:1.10.2'
implementation platform('com.google.cloud:libraries-bom:26.43.0')
implementation 'com.google.cloud:google-cloud-bigtable:2.40.0'
implementation 'com.google.cloud:google-cloud-bigtable:2.39.2'
implementation 'com.google.cloud.bigtable:bigtable-hbase-2.x:2.14.3'
implementation 'commons-codec:commons-codec:1.17.1'
implementation 'io.lettuce:lettuce-core:6.2.0.RELEASE'
implementation 'io.netty:netty-transport-native-epoll:4.1.52.Final:linux-x86_64'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import dev.caraml.serving.store.OnlineRetriever;
import java.io.IOException;
import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.hbase.client.Connection;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
Expand All @@ -23,9 +26,21 @@ public class BigTableStoreConfig {
private String appProfileId;
private Boolean enableClientSideMetrics;
private Long timeoutMs;
private Boolean isUsingHBaseSDK;

@Bean
public OnlineRetriever getRetriever() {
// Using HBase SDK
if (isUsingHBaseSDK) {
org.apache.hadoop.conf.Configuration config =
BigtableConfiguration.configure(projectId, instanceId);
config.set(BigtableOptionsFactory.APP_PROFILE_ID_KEY, appProfileId);

Connection connection = BigtableConfiguration.connect(config);
return new HBaseOnlineRetriever(connection);
}

// Using BigTable SDK
try {
BigtableDataSettings.Builder builder =
BigtableDataSettings.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package dev.caraml.serving.store.bigtable;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import dev.caraml.serving.store.AvroFeature;
import dev.caraml.serving.store.Feature;
import dev.caraml.store.protobuf.serving.ServingServiceProto;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseOnlineRetriever implements SSTableOnlineRetriever<ByteString, Result> {
private final Connection client;
private final HBaseSchemaRegistry schemaRegistry;

public HBaseOnlineRetriever(Connection client) {
this.client = client;
this.schemaRegistry = new HBaseSchemaRegistry(client);
}

@Override
public ByteString convertEntityValueToKey(
ServingServiceProto.GetOnlineFeaturesRequest.EntityRow entityRow, List<String> entityNames) {
return ByteString.copyFrom(
entityNames.stream()
.sorted()
.map(entity -> entityRow.getFieldsMap().get(entity))
.map(this::valueToString)
.collect(Collectors.joining("#"))
.getBytes());
}

@Override
public List<List<Feature>> convertRowToFeature(
String tableName,
List<ByteString> rowKeys,
Map<ByteString, Result> rows,
List<ServingServiceProto.FeatureReference> featureReferences) {
BinaryDecoder reusedDecoder = DecoderFactory.get().binaryDecoder(new byte[0], null);

return rowKeys.stream()
.map(
rowKey -> {
if (!rows.containsKey(rowKey)) {
return Collections.<Feature>emptyList();
} else {
Result row = rows.get(rowKey);
return featureReferences.stream()
.map(ServingServiceProto.FeatureReference::getFeatureTable)
.distinct()
.map(cf -> row.getColumnCells(cf.getBytes(), null))
.filter(ls -> !ls.isEmpty())
.flatMap(
rowCells -> {
Cell rowCell = rowCells.get(0); // Latest cell
String family = Bytes.toString(rowCell.getFamilyArray());
ByteString value = ByteString.copyFrom(rowCell.getValueArray());

List<Feature> features;
List<ServingServiceProto.FeatureReference> localFeatureReferences =
featureReferences.stream()
.filter(
featureReference ->
featureReference.getFeatureTable().equals(family))
.collect(Collectors.toList());

try {
features =
decodeFeatures(
tableName,
value,
localFeatureReferences,
reusedDecoder,
rowCell.getTimestamp());
} catch (IOException e) {
throw new RuntimeException("Failed to decode features from BigTable");
}

return features.stream();
})
.collect(Collectors.toList());
}
})
.collect(Collectors.toList());
}

@Override
public Map<ByteString, Result> getFeaturesFromSSTable(
String tableName, List<ByteString> rowKeys, List<String> columnFamilies) {
try {
Table table = this.client.getTable(TableName.valueOf(tableName));

// construct query get list
List<Get> queryGetList = new ArrayList<>();
rowKeys.forEach(
rowKey -> {
Get get = new Get(rowKey.toByteArray());
columnFamilies.forEach(cf -> get.addFamily(cf.getBytes()));

queryGetList.add(get);
});

// fetch data from table
Result[] rows = table.get(queryGetList);

// construct result
Map<ByteString, Result> result = new HashMap<>();
Arrays.stream(rows)
.filter(row -> !row.isEmpty())
.forEach(row -> result.put(ByteString.copyFrom(row.getRow()), row));

return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private List<Feature> decodeFeatures(
String tableName,
ByteString value,
List<ServingServiceProto.FeatureReference> featureReferences,
BinaryDecoder reusedDecoder,
long timestamp)
throws IOException {
ByteString schemaReferenceBytes = value.substring(0, 4);
byte[] featureValueBytes = value.substring(4).toByteArray();

HBaseSchemaRegistry.SchemaReference schemaReference =
new HBaseSchemaRegistry.SchemaReference(tableName, schemaReferenceBytes);

GenericDatumReader<GenericRecord> reader = this.schemaRegistry.getReader(schemaReference);

reusedDecoder = DecoderFactory.get().binaryDecoder(featureValueBytes, reusedDecoder);
GenericRecord record = reader.read(null, reusedDecoder);

return featureReferences.stream()
.map(
featureReference -> {
Object featureValue;
try {
featureValue = record.get(featureReference.getName());
} catch (AvroRuntimeException e) {
// Feature is not found in schema
return null;
}
return new AvroFeature(
featureReference,
Timestamp.newBuilder().setSeconds(timestamp / 1000).build(),
Objects.requireNonNullElseGet(featureValue, Object::new));
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package dev.caraml.serving.store.bigtable;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseSchemaRegistry {
private final Connection hbaseClient;
private final LoadingCache<SchemaReference, GenericDatumReader<GenericRecord>> cache;

private static String COLUMN_FAMILY = "metadata";
private static String QUALIFIER = "avro";
private static String KEY_PREFIX = "schema#";

public static class SchemaReference {
private final String tableName;
private final ByteString schemaHash;

public SchemaReference(String tableName, ByteString schemaHash) {
this.tableName = tableName;
this.schemaHash = schemaHash;
}

public String getTableName() {
return tableName;
}

public ByteString getSchemaHash() {
return schemaHash;
}

@Override
public int hashCode() {
int result = tableName.hashCode();
result = 31 * result + schemaHash.hashCode();
return result;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SchemaReference that = (SchemaReference) o;

if (!tableName.equals(that.tableName)) return false;
return schemaHash.equals(that.schemaHash);
}
}

public HBaseSchemaRegistry(Connection hbaseClient) {
this.hbaseClient = hbaseClient;

CacheLoader<SchemaReference, GenericDatumReader<GenericRecord>> schemaCacheLoader =
CacheLoader.from(this::loadReader);

cache = CacheBuilder.newBuilder().build(schemaCacheLoader);
}

public GenericDatumReader<GenericRecord> getReader(SchemaReference reference) {
GenericDatumReader<GenericRecord> reader;
try {
reader = this.cache.get(reference);
} catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) {
throw new RuntimeException(String.format("Unable to find Schema"), e);
}
return reader;
}

private GenericDatumReader<GenericRecord> loadReader(SchemaReference reference) {
try {
Table table = this.hbaseClient.getTable(TableName.valueOf(reference.getTableName()));

byte[] rowKey =
ByteString.copyFrom(KEY_PREFIX.getBytes())
.concat(reference.getSchemaHash())
.toByteArray();
Get query = new Get(rowKey);
query.addColumn(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes());

Result result = table.get(query);

Cell last = result.getColumnLatestCell(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes());
Schema schema = new Schema.Parser().parse(Bytes.toString(last.getValueArray()));
return new GenericDatumReader<>(schema);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
1 change: 1 addition & 0 deletions caraml-store-serving/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ caraml:
enableClientSideMetrics: false
# Timeout configuration for BigTable client. Set 0 to use the default client configuration.
timeoutMs: 0
isUsingHBaseSDK: true

grpc:
server:
Expand Down

0 comments on commit fd4470c

Please sign in to comment.