diff --git a/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala b/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala index 314b108..f6f43b4 100755 --- a/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala @@ -19,13 +19,14 @@ package org.apache.spark.sql.hbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HBaseConfiguration, _} -import org.apache.hadoop.hbase.client.{Get, HTable, Put, Result, Scan} +import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.filter._ import org.apache.log4j.Logger import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.hbase.catalyst.expressions.PartialPredicateOperations.partialPredicateReducer import org.apache.spark.sql.hbase.catalyst.NotPusher @@ -648,12 +649,11 @@ private[hbase] case class HBaseRelation( case NonKeyColumn(name, dt, _, _) => StructField(name, dt, nullable = true) }) - override def insert(data: DataFrame, overwrite: Boolean) = { - if (!overwrite) { + override def insert(data: DataFrame, overwrite: Boolean = true) = { + if (overwrite) { sqlContext.sparkContext.runJob(data.rdd, writeToHBase _) } else { - // TODO: Support INSERT OVERWRITE INTO - sys.error("HBASE Table does not support INSERT OVERWRITE for now.") + sys.error("HBASE Table does not support append mode.") } } @@ -701,6 +701,44 @@ private[hbase] case class HBaseRelation( closeHTable() } + def delete(data: DataFrame) = { + sqlContext.sparkContext.runJob(data.rdd, deleteFromHBase _) + } + + def deleteFromHBase(context: TaskContext, iterator: Iterator[Row]) = { + // TODO:make the BatchMaxSize configurable + val BatchMaxSize = 100 + var rowIndexInBatch = 0 + + // note: this is a hack, we can not use scala list + // it will throw UnsupportedOperationException. + var deletes = new java.util.ArrayList[Delete]() + while (iterator.hasNext) { + val row = iterator.next() + val rawKeyCol = keyColumns.map( + kc => { + val rowColumn = DataTypeUtils.getRowColumnInHBaseRawType( + row, kc.ordinal, kc.dataType) + (rowColumn, kc.dataType) + } + ) + val key = HBaseKVHelper.encodingRawKeyColumns(rawKeyCol) + val delete = new Delete(key) + deletes.add(delete) + rowIndexInBatch += 1 + if (rowIndexInBatch >= BatchMaxSize) { + htable.delete(deletes) + deletes.clear() + rowIndexInBatch = 0 + } + } + if (deletes.nonEmpty) { + htable.delete(deletes) + deletes.clear() + rowIndexInBatch = 0 + } + closeHTable() + } def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] = { require(filters.size < 2, "Internal logical error: unexpected filter list size") diff --git a/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala b/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala index 1f39e50..2425c4e 100644 --- a/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hbase import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.SqlParser +import org.apache.spark.sql.catalyst.analysis.{UnresolvedRelation, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.RunnableCommand @@ -58,11 +59,16 @@ class HBaseSQLParser extends SqlParser { protected val TABLES = Keyword("TABLES") protected val VALUES = Keyword("VALUES") protected val TERMINATED = Keyword("TERMINATED") + protected val UPDATE = Keyword("UPDATE") + protected val DELETE = Keyword("DELETE") + protected val SET = Keyword("SET") + protected val EQ = Keyword("=") + override protected lazy val start: Parser[LogicalPlan] = start1 | insert | cte | create | drop | alterDrop | alterAdd | - insertValues | load | show | describe + insertValues | load | show | describe | update | delete protected lazy val insertValues: Parser[LogicalPlan] = INSERT ~> INTO ~> TABLE ~> ident ~ (VALUES ~> "(" ~> values <~ ")") ^^ { @@ -74,6 +80,34 @@ class HBaseSQLParser extends SqlParser { InsertValueIntoTableCommand(tableName, valueStringSeq) } + // Standard Syntax: + // UPDATE tablename SET column = value [, column = value ...] [WHERE expression] + protected lazy val update: Parser[LogicalPlan] = + (UPDATE ~> relation <~ SET) ~ rep1sep(updateColumn, ",") ~ (WHERE ~> expression) ^^ { + case table ~ updateColumns ~ exp => + val (columns, values) = updateColumns.unzip + catalyst.logical.UpdateTable( + table.asInstanceOf[UnresolvedRelation].tableName, + columns.map(UnresolvedAttribute.quoted), + values, + Filter(exp, table)) + } + + protected lazy val updateColumn: Parser[(String, String)] = + ident ~ (EQ ~> literal) ^^ { + case column ~ value => (column, value.value.toString) + } + + // Standard Syntax: + // DELETE FROM tablename [WHERE expression] + protected lazy val delete: Parser[LogicalPlan] = + DELETE ~ FROM ~> relation ~ (WHERE ~> expression) ^^ { + case table ~ exp => + catalyst.logical.DeleteFromTable( + table.asInstanceOf[UnresolvedRelation].tableName, + Filter(exp, table)) + } + protected lazy val create: Parser[LogicalPlan] = CREATE ~> TABLE ~> ident ~ ("(" ~> tableCols <~ ",") ~ diff --git a/src/main/scala/org/apache/spark/sql/hbase/catalyst/logical/operators.scala b/src/main/scala/org/apache/spark/sql/hbase/catalyst/logical/operators.scala new file mode 100644 index 0000000..b0147f7 --- /dev/null +++ b/src/main/scala/org/apache/spark/sql/hbase/catalyst/logical/operators.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase.catalyst.logical + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, LogicalPlan} +@DeveloperApi +case class UpdateTable( + tableName: String, + columnsToUpdate: Seq[Expression], + values: Seq[String], + child: LogicalPlan) extends UnaryNode { + + override lazy val resolved: Boolean = columnsToUpdate.forall(_.resolved) && childrenResolved + + override def output: Seq[Attribute] = Seq.empty + +} + +@DeveloperApi +case class DeleteFromTable(tableName: String, child: LogicalPlan) extends UnaryNode { + + override lazy val resolved: Boolean = childrenResolved + + override def output: Seq[Attribute] = Seq.empty + +} diff --git a/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseStrategies.scala b/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseStrategies.scala index e36b42b..da4c9d8 100755 --- a/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseStrategies.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseStrategies.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{Project, SparkPlan} import org.apache.spark.sql.hbase.{HBasePartition, HBaseRawType, HBaseRelation, KeyColumn} +import org.apache.spark.sql.hbase.catalyst.{logical => hbaseLogical} import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ import org.apache.spark.sql.{SQLContext, Strategy, execution} @@ -67,6 +68,12 @@ private[hbase] trait HBaseStrategies { inPredicates, (a, f) => relation.buildScan(a, f)) :: Nil + case hbaseLogical.UpdateTable(tableName, columnsToUpdate, values, child) => + UpdateTable(tableName, columnsToUpdate, values, planLater(child)) :: Nil + + case hbaseLogical.DeleteFromTable(tableName, child) => + DeleteFromTable(tableName, planLater(child)) :: Nil + case _ => Nil } diff --git a/src/main/scala/org/apache/spark/sql/hbase/execution/hbaseCommands.scala b/src/main/scala/org/apache/spark/sql/hbase/execution/hbaseCommands.scala index 5a7c0a9..cb99049 100644 --- a/src/main/scala/org/apache/spark/sql/hbase/execution/hbaseCommands.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/execution/hbaseCommands.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFi import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter import org.apache.hadoop.mapreduce.{Job, RecordWriter} + import org.apache.spark.annotation.DeveloperApi import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil import org.apache.spark.sql._ @@ -55,11 +56,11 @@ case class AlterDropColCommand(tableName: String, columnName: String) extends Ru @DeveloperApi case class AlterAddColCommand( - tableName: String, - colName: String, - colType: String, - colFamily: String, - colQualifier: String) extends RunnableCommand { + tableName: String, + colName: String, + colType: String, + colFamily: String, + colQualifier: String) extends RunnableCommand { def run(sqlContext: SQLContext): Seq[Row] = { val hbaseCatalog = sqlContext.catalog.asInstanceOf[HBaseCatalog] @@ -137,7 +138,7 @@ case class InsertValueIntoTableCommand(tableName: String, valueSeq: Seq[String]) val rows = sqlContext.sparkContext.makeRDD(Seq(Row.fromSeq(bytes))) val inputValuesDF = sqlContext.createDataFrame(rows, relation.schema) - relation.insert(inputValuesDF, overwrite = false) + relation.insert(inputValuesDF) Seq.empty[Row] } @@ -147,11 +148,11 @@ case class InsertValueIntoTableCommand(tableName: String, valueSeq: Seq[String]) @DeveloperApi case class BulkLoadIntoTableCommand( - inputPath: String, - tableName: String, - isLocal: Boolean, - delimiter: Option[String], - parallel: Boolean) + inputPath: String, + tableName: String, + isLocal: Boolean, + delimiter: Option[String], + parallel: Boolean) extends RunnableCommand with SparkHadoopMapReduceUtil with Logging { @@ -303,4 +304,3 @@ case class BulkLoadIntoTableCommand( override def output = Nil } - diff --git a/src/main/scala/org/apache/spark/sql/hbase/execution/oprators.scala b/src/main/scala/org/apache/spark/sql/hbase/execution/oprators.scala new file mode 100644 index 0000000..d5d722a --- /dev/null +++ b/src/main/scala/org/apache/spark/sql/hbase/execution/oprators.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.errors._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.{execution, SQLContext} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.Subquery +import org.apache.spark.sql.hbase.HBaseRelation +import org.apache.spark.sql.hbase.util.DataTypeUtils +import org.apache.spark.sql.sources.LogicalRelation + +@DeveloperApi +case class UpdateTable( + tableName: String, + columnsToUpdate: Seq[Expression], + values: Seq[String], + child: SparkPlan) extends execution.UnaryNode { + + override def output: Seq[Attribute] = Seq.empty + + protected override def doExecute(): RDD[Row] = attachTree(this, "execute") { + val solvedRelation = sqlContext.catalog.lookupRelation(Seq(tableName)) + val relation: HBaseRelation = solvedRelation.asInstanceOf[Subquery] + .child.asInstanceOf[LogicalRelation] + .relation.asInstanceOf[HBaseRelation] + + val typesValues = values.zip(columnsToUpdate.map(_.dataType)).map { v => + DataTypeUtils.string2TypeData(v._1, v._2) + } + val input = child.output + val mutableRow = new SpecificMutableRow(input.map(_.dataType)) + val ordinals = columnsToUpdate.map { att => + BindReferences.bindReference(att, input) + }.map(_.asInstanceOf[BoundReference].ordinal) + + val resRdd = child.execute().mapPartitions { iter => + val len = input.length + iter.map { row => + var i = 0 + while (i < len) { + mutableRow.update(i, row(i)) + i += 1 + } + ordinals.zip(typesValues).map { x => mutableRow.update(x._1, x._2) } + mutableRow: Row + } + } + val inputValuesDF = sqlContext.createDataFrame(resRdd, relation.schema) + relation.insert(inputValuesDF) + sqlContext.sparkContext.emptyRDD[Row] + } + +} + +@DeveloperApi +case class DeleteFromTable(tableName: String, child: SparkPlan) extends execution.UnaryNode { + override def output: Seq[Attribute] = Seq.empty + + protected override def doExecute(): RDD[Row] = attachTree(this, "execute") { + val solvedRelation = sqlContext.catalog.lookupRelation(Seq(tableName)) + val relation: HBaseRelation = solvedRelation.asInstanceOf[Subquery] + .child.asInstanceOf[LogicalRelation] + .relation.asInstanceOf[HBaseRelation] + + val input = child.output + val inputValuesDF = sqlContext.createDataFrame(child.execute(), relation.schema) + relation.delete(inputValuesDF) + sqlContext.sparkContext.emptyRDD[Row] + } +} diff --git a/src/test/scala/org/apache/spark/sql/hbase/HBaseUpdateDeleteSuite.scala b/src/test/scala/org/apache/spark/sql/hbase/HBaseUpdateDeleteSuite.scala new file mode 100644 index 0000000..f2fa2c7 --- /dev/null +++ b/src/test/scala/org/apache/spark/sql/hbase/HBaseUpdateDeleteSuite.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +class HBaseUpdateDeleteSuite extends TestBase { + + test("update support") { + + val parser = new HBaseSQLParser() + val sql = raw"update wf set col2 = value where col4=7" + + val plan: LogicalPlan = parser.parse(sql) + assert(plan.isInstanceOf[catalyst.logical.UpdateTable]) + } + + test("delete support") { + + val parser = new HBaseSQLParser() + val sql = raw"delete from tb1 where col2 = 0" + + val plan: LogicalPlan = parser.parse(sql) + assert(plan.isInstanceOf[catalyst.logical.DeleteFromTable]) + } +}