From e6cc6fb7d927edee411c216eb0f171ceab8e94bb Mon Sep 17 00:00:00 2001 From: scwf Date: Tue, 18 Aug 2015 11:44:58 +0800 Subject: [PATCH 1/5] draft support for update/delete --- .../spark/sql/hbase/HBaseRelation.scala | 39 +++++++- .../spark/sql/hbase/HBaseSQLParser.scala | 34 +++++++ .../hbase/catalyst/logical/operators.scala | 43 +++++++++ .../sql/hbase/execution/HBaseStrategies.scala | 7 ++ .../sql/hbase/execution/hbaseCommands.scala | 7 +- .../spark/sql/hbase/execution/oprators.scala | 89 +++++++++++++++++++ 6 files changed, 216 insertions(+), 3 deletions(-) create mode 100644 src/main/scala/org/apache/spark/sql/hbase/catalyst/logical/operators.scala create mode 100644 src/main/scala/org/apache/spark/sql/hbase/execution/oprators.scala diff --git a/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala b/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala index 314b108..0b0c336 100755 --- a/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala @@ -19,13 +19,14 @@ package org.apache.spark.sql.hbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HBaseConfiguration, _} -import org.apache.hadoop.hbase.client.{Get, HTable, Put, Result, Scan} +import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.filter._ import org.apache.log4j.Logger import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.hbase.catalyst.expressions.PartialPredicateOperations.partialPredicateReducer import org.apache.spark.sql.hbase.catalyst.NotPusher @@ -701,6 +702,42 @@ private[hbase] case class HBaseRelation( closeHTable() } + def delete(data: DataFrame) = { + sqlContext.sparkContext.runJob(data.rdd, deleteFromHBase _) + } + + def deleteFromHBase(context: TaskContext, iterator: Iterator[Row]) = { + // TODO:make the BatchMaxSize configurable + val BatchMaxSize = 100 + var rowIndexInBatch = 0 + + var deletes = new ListBuffer[Delete]() + while (iterator.hasNext) { + val row = iterator.next() + val rawKeyCol = keyColumns.map( + kc => { + val rowColumn = DataTypeUtils.getRowColumnInHBaseRawType( + row, kc.ordinal, kc.dataType) + (rowColumn, kc.dataType) + } + ) + val key = HBaseKVHelper.encodingRawKeyColumns(rawKeyCol) + val delete = new Delete(key) + deletes += delete + rowIndexInBatch += 1 + if (rowIndexInBatch >= BatchMaxSize) { + htable.delete(deletes.toList) + deletes.clear() + rowIndexInBatch = 0 + } + } + if (deletes.nonEmpty) { + htable.delete(deletes.toList) + deletes.clear() + rowIndexInBatch = 0 + } + closeHTable() + } def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] = { require(filters.size < 2, "Internal logical error: unexpected filter list size") diff --git a/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala b/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala index 1f39e50..9c98376 100644 --- a/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hbase import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.SqlParser +import org.apache.spark.sql.catalyst.analysis.{UnresolvedRelation, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.RunnableCommand @@ -58,6 +59,11 @@ class HBaseSQLParser extends SqlParser { protected val TABLES = Keyword("TABLES") protected val VALUES = Keyword("VALUES") protected val TERMINATED = Keyword("TERMINATED") + protected val UPDATE = Keyword("UPDATE") + protected val DELETE = Keyword("DELETE") + protected val SET = Keyword("SET") + protected val EQ = Keyword("=") + override protected lazy val start: Parser[LogicalPlan] = start1 | insert | cte | @@ -74,6 +80,34 @@ class HBaseSQLParser extends SqlParser { InsertValueIntoTableCommand(tableName, valueStringSeq) } + // Standard Syntax: + // UPDATE tablename SET column = value [, column = value ...] [WHERE expression] + protected lazy val update: Parser[LogicalPlan] = + (UPDATE ~> relation <~ SET) ~ rep1sep(updateColumn, ",") ~ (WHERE ~> expression) ^^ { + case table ~ updateColumns ~ exp => + val (columns, values) = updateColumns.unzip + catalyst.logical.UpdateTable( + table.asInstanceOf[UnresolvedRelation].tableName, + columns.map(UnresolvedAttribute.quoted), + values, + Filter(exp, table)) + } + + protected lazy val updateColumn: Parser[(String, String)] = + ident ~ (EQ ~> ident) ^^ { + case column ~ value => (column, value) + } + + // Standard Syntax: + // DELETE FROM tablename [WHERE expression] + protected lazy val delete: Parser[LogicalPlan] = + DELETE ~ FROM ~> relation ~ (WHERE ~> expression) ^^ { + case table ~ exp => + catalyst.logical.DeleteFromTable( + table.asInstanceOf[UnresolvedRelation].tableName, + Filter(exp, table)) + } + protected lazy val create: Parser[LogicalPlan] = CREATE ~> TABLE ~> ident ~ ("(" ~> tableCols <~ ",") ~ diff --git a/src/main/scala/org/apache/spark/sql/hbase/catalyst/logical/operators.scala b/src/main/scala/org/apache/spark/sql/hbase/catalyst/logical/operators.scala new file mode 100644 index 0000000..8f12b93 --- /dev/null +++ b/src/main/scala/org/apache/spark/sql/hbase/catalyst/logical/operators.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase.catalyst.logical + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, LogicalPlan} +@DeveloperApi +case class UpdateTable( + tableName: String, + columnsToUpdate: Seq[Attribute], + values: Seq[String], + child: LogicalPlan) extends UnaryNode { + + override lazy val resolved: Boolean = columnsToUpdate.forall(_.resolved) && childrenResolved + + override def output: Seq[Attribute] = Seq.empty + +} + +@DeveloperApi +case class DeleteFromTable(tableName: String, child: LogicalPlan) extends UnaryNode { + + override lazy val resolved: Boolean = childrenResolved + + override def output: Seq[Attribute] = Seq.empty + +} \ No newline at end of file diff --git a/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseStrategies.scala b/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseStrategies.scala index e36b42b..da4c9d8 100755 --- a/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseStrategies.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseStrategies.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{Project, SparkPlan} import org.apache.spark.sql.hbase.{HBasePartition, HBaseRawType, HBaseRelation, KeyColumn} +import org.apache.spark.sql.hbase.catalyst.{logical => hbaseLogical} import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ import org.apache.spark.sql.{SQLContext, Strategy, execution} @@ -67,6 +68,12 @@ private[hbase] trait HBaseStrategies { inPredicates, (a, f) => relation.buildScan(a, f)) :: Nil + case hbaseLogical.UpdateTable(tableName, columnsToUpdate, values, child) => + UpdateTable(tableName, columnsToUpdate, values, planLater(child)) :: Nil + + case hbaseLogical.DeleteFromTable(tableName, child) => + DeleteFromTable(tableName, planLater(child)) :: Nil + case _ => Nil } diff --git a/src/main/scala/org/apache/spark/sql/hbase/execution/hbaseCommands.scala b/src/main/scala/org/apache/spark/sql/hbase/execution/hbaseCommands.scala index 5a7c0a9..3e521ab 100644 --- a/src/main/scala/org/apache/spark/sql/hbase/execution/hbaseCommands.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/execution/hbaseCommands.scala @@ -30,8 +30,8 @@ import org.apache.hadoop.mapreduce.{Job, RecordWriter} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.expressions.{Attribute, Row} -import org.apache.spark.sql.catalyst.plans.logical.Subquery +import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute, Row} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery} import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hbase.HBasePartitioner.HBaseRawOrdering import org.apache.spark.sql.hbase._ @@ -304,3 +304,6 @@ case class BulkLoadIntoTableCommand( override def output = Nil } + + + diff --git a/src/main/scala/org/apache/spark/sql/hbase/execution/oprators.scala b/src/main/scala/org/apache/spark/sql/hbase/execution/oprators.scala new file mode 100644 index 0000000..e901e3a --- /dev/null +++ b/src/main/scala/org/apache/spark/sql/hbase/execution/oprators.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.errors._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.{execution, SQLContext} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.Subquery +import org.apache.spark.sql.hbase.HBaseRelation +import org.apache.spark.sql.hbase.util.DataTypeUtils +import org.apache.spark.sql.sources.LogicalRelation + +@DeveloperApi +case class UpdateTable( + tableName: String, + columnsToUpdate: Seq[Attribute], + values: Seq[String], + child: SparkPlan) extends execution.UnaryNode { + + override def output: Seq[Attribute] = Seq.empty + + protected override def doExecute(): RDD[Row] = attachTree(this, "execute") { + val solvedRelation = sqlContext.catalog.lookupRelation(Seq(tableName)) + val relation: HBaseRelation = solvedRelation.asInstanceOf[Subquery] + .child.asInstanceOf[LogicalRelation] + .relation.asInstanceOf[HBaseRelation] + + val typesValues = values.zip(columnsToUpdate.map(_.dataType)).map { v => + DataTypeUtils.string2TypeData(v._1, v._2) + } + val input = child.output + val mutableRow = new SpecificMutableRow(input.map(_.dataType)) + val ordinals = columnsToUpdate.map { att => + BindReferences.bindReference(att, input) + }.map(_.asInstanceOf[BoundReference].ordinal) + + val resRdd = child.execute().mapPartitions { iter => + val len = input.length + iter.map { row => + var i = 0 + while (i < len) { + mutableRow.update(i, row(i)) + i += 1 + } + ordinals.zip(typesValues).map { x => mutableRow.update(x._1, x._2) } + mutableRow: Row + } + } + val inputValuesDF = sqlContext.createDataFrame(resRdd, relation.schema) + relation.insert(inputValuesDF, overwrite = false) + sqlContext.sparkContext.emptyRDD[Row] + } + +} + +@DeveloperApi +case class DeleteFromTable(tableName: String, child: SparkPlan) extends execution.UnaryNode { + override def output: Seq[Attribute] = Seq.empty + + protected override def doExecute(): RDD[Row] = attachTree(this, "execute") { + val solvedRelation = sqlContext.catalog.lookupRelation(Seq(tableName)) + val relation: HBaseRelation = solvedRelation.asInstanceOf[Subquery] + .child.asInstanceOf[LogicalRelation] + .relation.asInstanceOf[HBaseRelation] + + val input = child.output + val inputValuesDF = sqlContext.createDataFrame(child.execute(), relation.schema) + relation.delete(inputValuesDF) + sqlContext.sparkContext.emptyRDD[Row] + } +} From 2b968403eade53d4d5e547c4298ef9cb72e0762d Mon Sep 17 00:00:00 2001 From: scwf Date: Thu, 20 Aug 2015 09:59:06 +0800 Subject: [PATCH 2/5] comment fix --- .../spark/sql/hbase/HBaseRelation.scala | 7 ++--- .../hbase/catalyst/logical/operators.scala | 2 +- .../sql/hbase/execution/hbaseCommands.scala | 31 +++++++++---------- .../spark/sql/hbase/execution/oprators.scala | 2 +- 4 files changed, 19 insertions(+), 23 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala b/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala index 0b0c336..ebf7ca9 100755 --- a/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala @@ -649,12 +649,11 @@ private[hbase] case class HBaseRelation( case NonKeyColumn(name, dt, _, _) => StructField(name, dt, nullable = true) }) - override def insert(data: DataFrame, overwrite: Boolean) = { - if (!overwrite) { + override def insert(data: DataFrame, overwrite: Boolean = true) = { + if (overwrite) { sqlContext.sparkContext.runJob(data.rdd, writeToHBase _) } else { - // TODO: Support INSERT OVERWRITE INTO - sys.error("HBASE Table does not support INSERT OVERWRITE for now.") + sys.error("HBASE Table does not support append mode.") } } diff --git a/src/main/scala/org/apache/spark/sql/hbase/catalyst/logical/operators.scala b/src/main/scala/org/apache/spark/sql/hbase/catalyst/logical/operators.scala index 8f12b93..3f78a71 100644 --- a/src/main/scala/org/apache/spark/sql/hbase/catalyst/logical/operators.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/catalyst/logical/operators.scala @@ -40,4 +40,4 @@ case class DeleteFromTable(tableName: String, child: LogicalPlan) extends UnaryN override def output: Seq[Attribute] = Seq.empty -} \ No newline at end of file +} diff --git a/src/main/scala/org/apache/spark/sql/hbase/execution/hbaseCommands.scala b/src/main/scala/org/apache/spark/sql/hbase/execution/hbaseCommands.scala index 3e521ab..cb99049 100644 --- a/src/main/scala/org/apache/spark/sql/hbase/execution/hbaseCommands.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/execution/hbaseCommands.scala @@ -27,11 +27,12 @@ import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFi import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter import org.apache.hadoop.mapreduce.{Job, RecordWriter} + import org.apache.spark.annotation.DeveloperApi import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute, Row} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Row} +import org.apache.spark.sql.catalyst.plans.logical.Subquery import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hbase.HBasePartitioner.HBaseRawOrdering import org.apache.spark.sql.hbase._ @@ -55,11 +56,11 @@ case class AlterDropColCommand(tableName: String, columnName: String) extends Ru @DeveloperApi case class AlterAddColCommand( - tableName: String, - colName: String, - colType: String, - colFamily: String, - colQualifier: String) extends RunnableCommand { + tableName: String, + colName: String, + colType: String, + colFamily: String, + colQualifier: String) extends RunnableCommand { def run(sqlContext: SQLContext): Seq[Row] = { val hbaseCatalog = sqlContext.catalog.asInstanceOf[HBaseCatalog] @@ -137,7 +138,7 @@ case class InsertValueIntoTableCommand(tableName: String, valueSeq: Seq[String]) val rows = sqlContext.sparkContext.makeRDD(Seq(Row.fromSeq(bytes))) val inputValuesDF = sqlContext.createDataFrame(rows, relation.schema) - relation.insert(inputValuesDF, overwrite = false) + relation.insert(inputValuesDF) Seq.empty[Row] } @@ -147,11 +148,11 @@ case class InsertValueIntoTableCommand(tableName: String, valueSeq: Seq[String]) @DeveloperApi case class BulkLoadIntoTableCommand( - inputPath: String, - tableName: String, - isLocal: Boolean, - delimiter: Option[String], - parallel: Boolean) + inputPath: String, + tableName: String, + isLocal: Boolean, + delimiter: Option[String], + parallel: Boolean) extends RunnableCommand with SparkHadoopMapReduceUtil with Logging { @@ -303,7 +304,3 @@ case class BulkLoadIntoTableCommand( override def output = Nil } - - - - diff --git a/src/main/scala/org/apache/spark/sql/hbase/execution/oprators.scala b/src/main/scala/org/apache/spark/sql/hbase/execution/oprators.scala index e901e3a..8a46312 100644 --- a/src/main/scala/org/apache/spark/sql/hbase/execution/oprators.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/execution/oprators.scala @@ -65,7 +65,7 @@ case class UpdateTable( } } val inputValuesDF = sqlContext.createDataFrame(resRdd, relation.schema) - relation.insert(inputValuesDF, overwrite = false) + relation.insert(inputValuesDF) sqlContext.sparkContext.emptyRDD[Row] } From c474e1b21cebc22c376044c6fe15f341325e0135 Mon Sep 17 00:00:00 2001 From: scwf Date: Thu, 20 Aug 2015 14:42:13 +0800 Subject: [PATCH 3/5] forget to add update/delete to parser --- src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala b/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala index 9c98376..94ac206 100644 --- a/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala @@ -68,7 +68,7 @@ class HBaseSQLParser extends SqlParser { override protected lazy val start: Parser[LogicalPlan] = start1 | insert | cte | create | drop | alterDrop | alterAdd | - insertValues | load | show | describe + insertValues | load | show | describe | update | delete protected lazy val insertValues: Parser[LogicalPlan] = INSERT ~> INTO ~> TABLE ~> ident ~ (VALUES ~> "(" ~> values <~ ")") ^^ { From f1e8b31a1234774b697317c9bc3827244a222249 Mon Sep 17 00:00:00 2001 From: scwf Date: Thu, 20 Aug 2015 15:50:27 +0800 Subject: [PATCH 4/5] add tests --- .../spark/sql/hbase/HBaseSQLParser.scala | 4 +- .../hbase/catalyst/logical/operators.scala | 2 +- .../spark/sql/hbase/execution/oprators.scala | 2 +- .../sql/hbase/HBaseUpdateDeleteSuite.scala | 41 +++++++++++++++++++ 4 files changed, 45 insertions(+), 4 deletions(-) create mode 100644 src/test/scala/org/apache/spark/sql/hbase/HBaseUpdateDeleteSuite.scala diff --git a/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala b/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala index 94ac206..2425c4e 100644 --- a/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala @@ -94,8 +94,8 @@ class HBaseSQLParser extends SqlParser { } protected lazy val updateColumn: Parser[(String, String)] = - ident ~ (EQ ~> ident) ^^ { - case column ~ value => (column, value) + ident ~ (EQ ~> literal) ^^ { + case column ~ value => (column, value.value.toString) } // Standard Syntax: diff --git a/src/main/scala/org/apache/spark/sql/hbase/catalyst/logical/operators.scala b/src/main/scala/org/apache/spark/sql/hbase/catalyst/logical/operators.scala index 3f78a71..b0147f7 100644 --- a/src/main/scala/org/apache/spark/sql/hbase/catalyst/logical/operators.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/catalyst/logical/operators.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, LogicalPlan} @DeveloperApi case class UpdateTable( tableName: String, - columnsToUpdate: Seq[Attribute], + columnsToUpdate: Seq[Expression], values: Seq[String], child: LogicalPlan) extends UnaryNode { diff --git a/src/main/scala/org/apache/spark/sql/hbase/execution/oprators.scala b/src/main/scala/org/apache/spark/sql/hbase/execution/oprators.scala index 8a46312..d5d722a 100644 --- a/src/main/scala/org/apache/spark/sql/hbase/execution/oprators.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/execution/oprators.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.sources.LogicalRelation @DeveloperApi case class UpdateTable( tableName: String, - columnsToUpdate: Seq[Attribute], + columnsToUpdate: Seq[Expression], values: Seq[String], child: SparkPlan) extends execution.UnaryNode { diff --git a/src/test/scala/org/apache/spark/sql/hbase/HBaseUpdateDeleteSuite.scala b/src/test/scala/org/apache/spark/sql/hbase/HBaseUpdateDeleteSuite.scala new file mode 100644 index 0000000..5e6885e --- /dev/null +++ b/src/test/scala/org/apache/spark/sql/hbase/HBaseUpdateDeleteSuite.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +class HBaseUpdateDeleteSuite extends TestBase { + + test("update support") { + + val parser = new HBaseSQLParser() + val sql = raw"update tb1 set col1 = 2 where col2 = 0" + + val plan: LogicalPlan = parser.parse(sql) + assert(plan.isInstanceOf[catalyst.logical.UpdateTable]) + } + + test("delete support") { + + val parser = new HBaseSQLParser() + val sql = raw"delete from tb1 where col2 = 0" + + val plan: LogicalPlan = parser.parse(sql) + assert(plan.isInstanceOf[catalyst.logical.DeleteFromTable]) + } +} From 4904f5f112b56303ce811712cd877266bab77f9b Mon Sep 17 00:00:00 2001 From: scwf Date: Thu, 20 Aug 2015 16:30:30 +0800 Subject: [PATCH 5/5] hack for delete --- .../org/apache/spark/sql/hbase/HBaseRelation.scala | 10 ++++++---- .../spark/sql/hbase/HBaseUpdateDeleteSuite.scala | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala b/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala index ebf7ca9..f6f43b4 100755 --- a/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala @@ -710,7 +710,9 @@ private[hbase] case class HBaseRelation( val BatchMaxSize = 100 var rowIndexInBatch = 0 - var deletes = new ListBuffer[Delete]() + // note: this is a hack, we can not use scala list + // it will throw UnsupportedOperationException. + var deletes = new java.util.ArrayList[Delete]() while (iterator.hasNext) { val row = iterator.next() val rawKeyCol = keyColumns.map( @@ -722,16 +724,16 @@ private[hbase] case class HBaseRelation( ) val key = HBaseKVHelper.encodingRawKeyColumns(rawKeyCol) val delete = new Delete(key) - deletes += delete + deletes.add(delete) rowIndexInBatch += 1 if (rowIndexInBatch >= BatchMaxSize) { - htable.delete(deletes.toList) + htable.delete(deletes) deletes.clear() rowIndexInBatch = 0 } } if (deletes.nonEmpty) { - htable.delete(deletes.toList) + htable.delete(deletes) deletes.clear() rowIndexInBatch = 0 } diff --git a/src/test/scala/org/apache/spark/sql/hbase/HBaseUpdateDeleteSuite.scala b/src/test/scala/org/apache/spark/sql/hbase/HBaseUpdateDeleteSuite.scala index 5e6885e..f2fa2c7 100644 --- a/src/test/scala/org/apache/spark/sql/hbase/HBaseUpdateDeleteSuite.scala +++ b/src/test/scala/org/apache/spark/sql/hbase/HBaseUpdateDeleteSuite.scala @@ -24,7 +24,7 @@ class HBaseUpdateDeleteSuite extends TestBase { test("update support") { val parser = new HBaseSQLParser() - val sql = raw"update tb1 set col1 = 2 where col2 = 0" + val sql = raw"update wf set col2 = value where col4=7" val plan: LogicalPlan = parser.parse(sql) assert(plan.isInstanceOf[catalyst.logical.UpdateTable])