Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Add support to use hbase online store #126

Merged
merged 14 commits into from
Sep 20, 2024
74 changes: 39 additions & 35 deletions caraml-store-serving/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,41 +33,41 @@ caraml:
maxExpectedCount: 150

store:
# Active store. Possible values: [redisCluster, redis, bigtable]
# Active store. Possible values: [redisCluster, redis, bigtable, hbase]
active: redis

redis:
host: localhost
port: 6379
password: ""
ssl: false

redisCluster:
# Connection string specifies the host:port of Redis instances in the redis cluster.
connectionString: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005"
# Password authentication. Empty string if password is not set.
password: ""
readFrom: MASTER
# Redis operation timeout in ISO-8601 format
timeout: PT0.5S
# # Uncomment to customize netty behaviour
# tcp:
# # Epoll Channel Option: TCP_KEEPIDLE
# keepIdle: 15
# # Epoll Channel Option: TCP_KEEPINTVL
# keepInterval: 5
# # Epoll Channel Option: TCP_KEEPCNT
# keepConnection: 3
# # Epoll Channel Option: TCP_USER_TIMEOUT
# userConnection: 60000
# # Uncomment to customize redis cluster topology refresh config
# topologyRefresh:
# # enable adaptive topology refresh from all triggers : MOVED_REDIRECT, ASK_REDIRECT, PERSISTENT_RECONNECTS, UNKNOWN_NODE (since 5.1), and UNCOVERED_SLOT (since 5.2) (see also reconnect attempts for the reconnect trigger)
# enableAllAdaptiveTriggerRefresh: true
# # enable periodic refresh
# enablePeriodicRefresh: false
# # topology refresh period in seconds
# refreshPeriodSecond: 30
#
# redis:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it expected for redis and redisCluster commented?

# host: localhost
# port: 6379
# password: ""
# ssl: false
#
# redisCluster:
# # Connection string specifies the host:port of Redis instances in the redis cluster.
# connectionString: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005"
# # Password authentication. Empty string if password is not set.
# password: ""
# readFrom: MASTER
# # Redis operation timeout in ISO-8601 format
# timeout: PT0.5S
# # Uncomment to customize netty behaviour
# tcp:
# # Epoll Channel Option: TCP_KEEPIDLE
# keepIdle: 15
# # Epoll Channel Option: TCP_KEEPINTVL
# keepInterval: 5
# # Epoll Channel Option: TCP_KEEPCNT
# keepConnection: 3
# # Epoll Channel Option: TCP_USER_TIMEOUT
# userConnection: 60000
# # Uncomment to customize redis cluster topology refresh config
# topologyRefresh:
# # enable adaptive topology refresh from all triggers : MOVED_REDIRECT, ASK_REDIRECT, PERSISTENT_RECONNECTS, UNKNOWN_NODE (since 5.1), and UNCOVERED_SLOT (since 5.2) (see also reconnect attempts for the reconnect trigger)
# enableAllAdaptiveTriggerRefresh: true
# # enable periodic refresh
# enablePeriodicRefresh: false
# # topology refresh period in seconds
# refreshPeriodSecond: 30

bigtable:
projectId: gcp-project-name
Expand All @@ -78,6 +78,10 @@ caraml:
timeoutMs: 0
isUsingHBaseSDK: true

hbase:
zookeeperQuorum: 127.0.0.1
zookeeperClientPort: 2181

grpc:
server:
port: 6566
Expand All @@ -96,4 +100,4 @@ spring:

logging:
level:
root: "info"
root: "info"
2 changes: 1 addition & 1 deletion caraml-store-spark/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM apache/spark-py:v3.1.3
FROM --platform=linux/amd64 apache/spark-py:v3.1.3

ARG GCS_CONNECTOR_VERSION=2.2.5
ARG BQ_CONNECTOR_VERSION=0.27.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ object BasePipeline {
conf
.set("spark.bigtable.projectId", projectId)
.set("spark.bigtable.instanceId", instanceId)
case HBaseConfig(zookeeperQuorum, zookeeperPort) =>
conf
.set("spark.hbase.zookeeper.quorum", zookeeperQuorum)
.set("spark.hbase.zookeeper.port", zookeeperPort.toString)
}

jobConfig.metrics match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,19 @@ object BatchPipeline extends BasePipeline {
.map(metrics.incrementRead)
.filter(rowValidator.allChecks)

val onlineStore = config.store match {
case _: RedisConfig => "redis"
case _: BigTableConfig => "bigtable"
case _: HBaseConfig => "hbase"
}

validRows.write
.format(config.store match {
case _: RedisConfig => "dev.caraml.spark.stores.redis"
case _: BigTableConfig => "dev.caraml.spark.stores.bigtable"
case _: HBaseConfig => "dev.caraml.spark.stores.bigtable"
})
.option("online_store", onlineStore)
.option("entity_columns", featureTable.entities.map(_.name).mkString(","))
.option("namespace", featureTable.name)
.option("project_name", featureTable.project)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ object IngestionJob {
opt[String](name = "bigtable")
.action((x, c) => c.copy(store = parseJSON(x).camelizeKeys.extract[BigTableConfig]))

opt[String](name = "hbase")
.action((x, c) => c.copy(store = parseJSON(x).extract[HBaseConfig]))

opt[String](name = "statsd")
.action((x, c) => c.copy(metrics = Some(parseJSON(x).extract[StatsDConfig])))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ case class RedisWriteProperties(
enableRateLimit: Boolean = false,
ratePerSecondLimit: Int = 50000
)
case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig
case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig
case class HBaseConfig(zookeeperQuorum: String, zookeeperPort: Int) extends StoreConfig

sealed trait MetricConfig

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import com.google.cloud.bigtable.hbase.BigtableConfiguration
import dev.caraml.spark.serialization.Serializer
import dev.caraml.spark.utils.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.{
Admin,
ColumnFamilyDescriptorBuilder,
Connection,
Put,
TableDescriptorBuilder
}
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.mapred.JobConf
Expand All @@ -30,42 +36,49 @@ class BigTableSinkRelation(

override def schema: StructType = ???

def getConnection(hadoopConfig: Configuration): Connection = {
BigtableConfiguration.connect(hadoopConfig)
}

def createTable(): Unit = {
val btConn = BigtableConfiguration.connect(hadoopConfig)
val btConn = getConnection(hadoopConfig)
try {
val admin = btConn.getAdmin

val table = if (!admin.isTableAvailable(TableName.valueOf(tableName))) {
val t = new HTableDescriptor(TableName.valueOf(tableName))
val metadataCF = new HColumnDescriptor(metadataColumnFamily)
t.addFamily(metadataCF)
t
val tableBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
val cf = ColumnFamilyDescriptorBuilder.of(metadataColumnFamily)
tableBuilder.setColumnFamily(cf)
val table = tableBuilder.build()
table
} else {
admin.getTableDescriptor(TableName.valueOf(tableName))
val t = btConn.getTable(TableName.valueOf(tableName))
t.getDescriptor()
}

val featuresCF = new HColumnDescriptor(config.namespace)
val featuresCFBuilder = ColumnFamilyDescriptorBuilder.newBuilder(config.namespace.getBytes)
if (config.maxAge > 0) {
featuresCF.setTimeToLive(config.maxAge.toInt)
featuresCFBuilder.setTimeToLive(config.maxAge.toInt)
}
featuresCFBuilder.setMaxVersions(1)
val featuresCF = featuresCFBuilder.build()

featuresCF.setMaxVersions(1)

// TODO: Set compression type for column family
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems this comment can be removed, or is it expected for future notes?

val tdb = TableDescriptorBuilder.newBuilder(table)
if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) {
table.addFamily(featuresCF)

tdb.setColumnFamily(featuresCF)
val t = tdb.build()
if (!admin.isTableAvailable(table.getTableName)) {
admin.createTable(table)
admin.createTable(t)
} else {
admin.modifyTable(table)
admin.modifyTable(t)
}
} else if (
config.maxAge > 0 && table
.getColumnFamily(config.namespace.getBytes)
.getTimeToLive != featuresCF.getTimeToLive
) {
table.modifyFamily(featuresCF)
admin.modifyTable(table)
tdb.modifyColumnFamily(featuresCF)
admin.modifyTable(tdb.build())
}
} finally {
btConn.close()
Expand Down Expand Up @@ -115,7 +128,7 @@ class BigTableSinkRelation(
val qualifier = "avro".getBytes
put.addColumn(metadataColumnFamily.getBytes, qualifier, schema.asInstanceOf[String].getBytes)

val btConn = BigtableConfiguration.connect(hadoopConfig)
val btConn = getConnection(hadoopConfig)
try {
val table = btConn.getTable(TableName.valueOf(tableName))
table.checkAndPut(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,43 @@ class DefaultSource extends CreatableRelationProvider {
parameters: Map[String, String],
data: DataFrame
): BaseRelation = {
val bigtableConf = BigtableConfiguration.configure(
sqlContext.getConf(PROJECT_KEY),
sqlContext.getConf(INSTANCE_KEY)
)

if (sqlContext.getConf("spark.bigtable.emulatorHost", "").nonEmpty) {
bigtableConf.set(
BIGTABLE_EMULATOR_HOST_KEY,
sqlContext.getConf("spark.bigtable.emulatorHost")
val onlineStore = parameters.getOrElse("online_store", "bigtable")
var rel: BigTableSinkRelation = null
println(s"onlineStore: $onlineStore")
if (onlineStore == "bigtable") {
val bigtableConf = BigtableConfiguration.configure(
sqlContext.getConf(PROJECT_KEY),
sqlContext.getConf(INSTANCE_KEY)
)
}

configureBigTableClient(bigtableConf, sqlContext)
if (sqlContext.getConf("spark.bigtable.emulatorHost", "").nonEmpty) {
bigtableConf.set(
BIGTABLE_EMULATOR_HOST_KEY,
sqlContext.getConf("spark.bigtable.emulatorHost")
)
}

configureBigTableClient(bigtableConf, sqlContext)

val rel =
new BigTableSinkRelation(
rel = new BigTableSinkRelation(
sqlContext,
new AvroSerializer,
SparkBigtableConfig.parse(parameters),
bigtableConf
)
} else if (onlineStore == "hbase") {
val hbaseConf = new Configuration()
hbaseConf.set("hbase.zookeeper.quorum", sqlContext.getConf(ZOOKEEPER_QUOROM_KEY))
hbaseConf.set("hbase.zookeeper.property.clientPort", sqlContext.getConf(ZOOKEEPER_PORT_KEY))
rel = new HbaseSinkRelation(
sqlContext,
new AvroSerializer,
SparkBigtableConfig.parse(parameters),
hbaseConf
)
} else {
throw new UnsupportedOperationException(s"Unsupported online store: $onlineStore")
}
rel.createTable()
rel.saveWriteSchema(data)
rel.insert(data, overwrite = false)
Expand Down Expand Up @@ -79,4 +95,7 @@ object DefaultSource {
private val THROTTLING_THRESHOLD_MILLIS_KEY = "spark.bigtable.throttlingThresholdMs"
private val MAX_ROW_COUNT_KEY = "spark.bigtable.maxRowCount"
private val MAX_INFLIGHT_KEY = "spark.bigtable.maxInflightRpcs"

private val ZOOKEEPER_QUOROM_KEY = "spark.hbase.zookeeper.quorum"
private val ZOOKEEPER_PORT_KEY = "spark.hbase.zookeeper.port"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package dev.caraml.spark.stores.bigtable

import dev.caraml.spark.serialization.Serializer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
import org.apache.spark.sql.SQLContext

class HbaseSinkRelation(
sqlContext: SQLContext,
serializer: Serializer,
config: SparkBigtableConfig,
hadoopConfig: Configuration
) extends BigTableSinkRelation(sqlContext, serializer, config, hadoopConfig) {
override def getConnection(hadoopConfig: Configuration): Connection = {
ConnectionFactory.createConnection(hadoopConfig)
}
}
Loading