Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Support for update/delete #6

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package org.apache.spark.sql.hbase
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, _}
import org.apache.hadoop.hbase.client.{Get, HTable, Put, Result, Scan}
import org.apache.hadoop.hbase.client._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not use wildcard for import. follow the coding convention

import org.apache.hadoop.hbase.filter._
import org.apache.log4j.Logger
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.hbase.catalyst.expressions.PartialPredicateOperations.partialPredicateReducer
import org.apache.spark.sql.hbase.catalyst.NotPusher
Expand Down Expand Up @@ -648,12 +649,11 @@ private[hbase] case class HBaseRelation(
case NonKeyColumn(name, dt, _, _) => StructField(name, dt, nullable = true)
})

override def insert(data: DataFrame, overwrite: Boolean) = {
if (!overwrite) {
override def insert(data: DataFrame, overwrite: Boolean = true) = {
if (overwrite) {
sqlContext.sparkContext.runJob(data.rdd, writeToHBase _)
} else {
// TODO: Support INSERT OVERWRITE INTO
sys.error("HBASE Table does not support INSERT OVERWRITE for now.")
sys.error("HBASE Table does not support append mode.")
}
}

Expand Down Expand Up @@ -701,6 +701,44 @@ private[hbase] case class HBaseRelation(
closeHTable()
}

def delete(data: DataFrame) = {
sqlContext.sparkContext.runJob(data.rdd, deleteFromHBase _)
}

def deleteFromHBase(context: TaskContext, iterator: Iterator[Row]) = {
// TODO:make the BatchMaxSize configurable
val BatchMaxSize = 100
var rowIndexInBatch = 0

// note: this is a hack, we can not use scala list
// it will throw UnsupportedOperationException.
var deletes = new java.util.ArrayList[Delete]()
while (iterator.hasNext) {
val row = iterator.next()
val rawKeyCol = keyColumns.map(
kc => {
val rowColumn = DataTypeUtils.getRowColumnInHBaseRawType(
row, kc.ordinal, kc.dataType)
(rowColumn, kc.dataType)
}
)
val key = HBaseKVHelper.encodingRawKeyColumns(rawKeyCol)
val delete = new Delete(key)
deletes.add(delete)
rowIndexInBatch += 1
if (rowIndexInBatch >= BatchMaxSize) {
htable.delete(deletes)
deletes.clear()
rowIndexInBatch = 0
}
}
if (deletes.nonEmpty) {
htable.delete(deletes)
deletes.clear()
rowIndexInBatch = 0
}
closeHTable()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yzhou2001 why we should not close here? actually in method writeToHBase it also call closeHTable() at the end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i see your last comment, leave it here is ok

}

def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] = {
require(filters.size < 2, "Internal logical error: unexpected filter list size")
Expand Down
36 changes: 35 additions & 1 deletion src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.hbase

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.SqlParser
import org.apache.spark.sql.catalyst.analysis.{UnresolvedRelation, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.RunnableCommand
Expand Down Expand Up @@ -58,11 +59,16 @@ class HBaseSQLParser extends SqlParser {
protected val TABLES = Keyword("TABLES")
protected val VALUES = Keyword("VALUES")
protected val TERMINATED = Keyword("TERMINATED")
protected val UPDATE = Keyword("UPDATE")
protected val DELETE = Keyword("DELETE")
protected val SET = Keyword("SET")
protected val EQ = Keyword("=")


override protected lazy val start: Parser[LogicalPlan] =
start1 | insert | cte |
create | drop | alterDrop | alterAdd |
insertValues | load | show | describe
insertValues | load | show | describe | update | delete

protected lazy val insertValues: Parser[LogicalPlan] =
INSERT ~> INTO ~> TABLE ~> ident ~ (VALUES ~> "(" ~> values <~ ")") ^^ {
Expand All @@ -74,6 +80,34 @@ class HBaseSQLParser extends SqlParser {
InsertValueIntoTableCommand(tableName, valueStringSeq)
}

// Standard Syntax:
// UPDATE tablename SET column = value [, column = value ...] [WHERE expression]
protected lazy val update: Parser[LogicalPlan] =
(UPDATE ~> relation <~ SET) ~ rep1sep(updateColumn, ",") ~ (WHERE ~> expression) ^^ {
case table ~ updateColumns ~ exp =>
val (columns, values) = updateColumns.unzip
catalyst.logical.UpdateTable(
table.asInstanceOf[UnresolvedRelation].tableName,
columns.map(UnresolvedAttribute.quoted),
values,
Filter(exp, table))
}

protected lazy val updateColumn: Parser[(String, String)] =
ident ~ (EQ ~> literal) ^^ {
case column ~ value => (column, value.value.toString)
}

// Standard Syntax:
// DELETE FROM tablename [WHERE expression]
protected lazy val delete: Parser[LogicalPlan] =
DELETE ~ FROM ~> relation ~ (WHERE ~> expression) ^^ {
case table ~ exp =>
catalyst.logical.DeleteFromTable(
table.asInstanceOf[UnresolvedRelation].tableName,
Filter(exp, table))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yzhou2001 for your point 1. i think it is ok now, because here i just do filter, that means i select all column from the table.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scwf The projection should include the references from the "exp". I'm not sure if all columns are included by default. Even if so, this is a waste. HBase tables may be very wide and selecting unnecessary columns could mean a big performance hit. We should have projections that are just references in the "exp" unioned with all key columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we should optimize the case for both update and delete

}

protected lazy val create: Parser[LogicalPlan] =
CREATE ~> TABLE ~> ident ~
("(" ~> tableCols <~ ",") ~
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hbase.catalyst.logical

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, LogicalPlan}
@DeveloperApi
case class UpdateTable(
tableName: String,
columnsToUpdate: Seq[Expression],
values: Seq[String],
child: LogicalPlan) extends UnaryNode {

override lazy val resolved: Boolean = columnsToUpdate.forall(_.resolved) && childrenResolved

override def output: Seq[Attribute] = Seq.empty

}

@DeveloperApi
case class DeleteFromTable(tableName: String, child: LogicalPlan) extends UnaryNode {

override lazy val resolved: Boolean = childrenResolved

override def output: Seq[Attribute] = Seq.empty

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{Project, SparkPlan}
import org.apache.spark.sql.hbase.{HBasePartition, HBaseRawType, HBaseRelation, KeyColumn}
import org.apache.spark.sql.hbase.catalyst.{logical => hbaseLogical}
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.{SQLContext, Strategy, execution}
Expand Down Expand Up @@ -67,6 +68,12 @@ private[hbase] trait HBaseStrategies {
inPredicates,
(a, f) => relation.buildScan(a, f)) :: Nil

case hbaseLogical.UpdateTable(tableName, columnsToUpdate, values, child) =>
UpdateTable(tableName, columnsToUpdate, values, planLater(child)) :: Nil

case hbaseLogical.DeleteFromTable(tableName, child) =>
DeleteFromTable(tableName, planLater(child)) :: Nil

case _ => Nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFi
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.hadoop.mapreduce.{Job, RecordWriter}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql._
Expand Down Expand Up @@ -55,11 +56,11 @@ case class AlterDropColCommand(tableName: String, columnName: String) extends Ru

@DeveloperApi
case class AlterAddColCommand(
tableName: String,
colName: String,
colType: String,
colFamily: String,
colQualifier: String) extends RunnableCommand {
tableName: String,
colName: String,
colType: String,
colFamily: String,
colQualifier: String) extends RunnableCommand {

def run(sqlContext: SQLContext): Seq[Row] = {
val hbaseCatalog = sqlContext.catalog.asInstanceOf[HBaseCatalog]
Expand Down Expand Up @@ -137,7 +138,7 @@ case class InsertValueIntoTableCommand(tableName: String, valueSeq: Seq[String])

val rows = sqlContext.sparkContext.makeRDD(Seq(Row.fromSeq(bytes)))
val inputValuesDF = sqlContext.createDataFrame(rows, relation.schema)
relation.insert(inputValuesDF, overwrite = false)
relation.insert(inputValuesDF)

Seq.empty[Row]
}
Expand All @@ -147,11 +148,11 @@ case class InsertValueIntoTableCommand(tableName: String, valueSeq: Seq[String])

@DeveloperApi
case class BulkLoadIntoTableCommand(
inputPath: String,
tableName: String,
isLocal: Boolean,
delimiter: Option[String],
parallel: Boolean)
inputPath: String,
tableName: String,
isLocal: Boolean,
delimiter: Option[String],
parallel: Boolean)
extends RunnableCommand
with SparkHadoopMapReduceUtil
with Logging {
Expand Down Expand Up @@ -303,4 +304,3 @@ case class BulkLoadIntoTableCommand(

override def output = Nil
}

89 changes: 89 additions & 0 deletions src/main/scala/org/apache/spark/sql/hbase/execution/oprators.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hbase.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.{execution, SQLContext}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.Subquery
import org.apache.spark.sql.hbase.HBaseRelation
import org.apache.spark.sql.hbase.util.DataTypeUtils
import org.apache.spark.sql.sources.LogicalRelation

@DeveloperApi
case class UpdateTable(
tableName: String,
columnsToUpdate: Seq[Expression],
values: Seq[String],
child: SparkPlan) extends execution.UnaryNode {

override def output: Seq[Attribute] = Seq.empty

protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
val solvedRelation = sqlContext.catalog.lookupRelation(Seq(tableName))
val relation: HBaseRelation = solvedRelation.asInstanceOf[Subquery]
.child.asInstanceOf[LogicalRelation]
.relation.asInstanceOf[HBaseRelation]

val typesValues = values.zip(columnsToUpdate.map(_.dataType)).map { v =>
DataTypeUtils.string2TypeData(v._1, v._2)
}
val input = child.output
val mutableRow = new SpecificMutableRow(input.map(_.dataType))
val ordinals = columnsToUpdate.map { att =>
BindReferences.bindReference(att, input)
}.map(_.asInstanceOf[BoundReference].ordinal)

val resRdd = child.execute().mapPartitions { iter =>
val len = input.length
iter.map { row =>
var i = 0
while (i < len) {
mutableRow.update(i, row(i))
i += 1
}
ordinals.zip(typesValues).map { x => mutableRow.update(x._1, x._2) }
mutableRow: Row
}
}
val inputValuesDF = sqlContext.createDataFrame(resRdd, relation.schema)
relation.insert(inputValuesDF)
sqlContext.sparkContext.emptyRDD[Row]
}

}

@DeveloperApi
case class DeleteFromTable(tableName: String, child: SparkPlan) extends execution.UnaryNode {
override def output: Seq[Attribute] = Seq.empty

protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
val solvedRelation = sqlContext.catalog.lookupRelation(Seq(tableName))
val relation: HBaseRelation = solvedRelation.asInstanceOf[Subquery]
.child.asInstanceOf[LogicalRelation]
.relation.asInstanceOf[HBaseRelation]

val input = child.output
val inputValuesDF = sqlContext.createDataFrame(child.execute(), relation.schema)
relation.delete(inputValuesDF)
sqlContext.sparkContext.emptyRDD[Row]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hbase

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

class HBaseUpdateDeleteSuite extends TestBase {

test("update support") {

val parser = new HBaseSQLParser()
val sql = raw"update wf set col2 = value where col4=7"

val plan: LogicalPlan = parser.parse(sql)
assert(plan.isInstanceOf[catalyst.logical.UpdateTable])
}

test("delete support") {

val parser = new HBaseSQLParser()
val sql = raw"delete from tb1 where col2 = 0"

val plan: LogicalPlan = parser.parse(sql)
assert(plan.isInstanceOf[catalyst.logical.DeleteFromTable])
}
}