Skip to content

Commit

Permalink
comment fix
Browse files Browse the repository at this point in the history
  • Loading branch information
scwf committed Aug 20, 2015
1 parent e6cc6fb commit 2b96840
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 23 deletions.
7 changes: 3 additions & 4 deletions src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -649,12 +649,11 @@ private[hbase] case class HBaseRelation(
case NonKeyColumn(name, dt, _, _) => StructField(name, dt, nullable = true)
})

override def insert(data: DataFrame, overwrite: Boolean) = {
if (!overwrite) {
override def insert(data: DataFrame, overwrite: Boolean = true) = {
if (overwrite) {
sqlContext.sparkContext.runJob(data.rdd, writeToHBase _)
} else {
// TODO: Support INSERT OVERWRITE INTO
sys.error("HBASE Table does not support INSERT OVERWRITE for now.")
sys.error("HBASE Table does not support append mode.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ case class DeleteFromTable(tableName: String, child: LogicalPlan) extends UnaryN

override def output: Seq[Attribute] = Seq.empty

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFi
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.hadoop.mapreduce.{Job, RecordWriter}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute, Row}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
import org.apache.spark.sql.catalyst.plans.logical.Subquery
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hbase.HBasePartitioner.HBaseRawOrdering
import org.apache.spark.sql.hbase._
Expand All @@ -55,11 +56,11 @@ case class AlterDropColCommand(tableName: String, columnName: String) extends Ru

@DeveloperApi
case class AlterAddColCommand(
tableName: String,
colName: String,
colType: String,
colFamily: String,
colQualifier: String) extends RunnableCommand {
tableName: String,
colName: String,
colType: String,
colFamily: String,
colQualifier: String) extends RunnableCommand {

def run(sqlContext: SQLContext): Seq[Row] = {
val hbaseCatalog = sqlContext.catalog.asInstanceOf[HBaseCatalog]
Expand Down Expand Up @@ -137,7 +138,7 @@ case class InsertValueIntoTableCommand(tableName: String, valueSeq: Seq[String])

val rows = sqlContext.sparkContext.makeRDD(Seq(Row.fromSeq(bytes)))
val inputValuesDF = sqlContext.createDataFrame(rows, relation.schema)
relation.insert(inputValuesDF, overwrite = false)
relation.insert(inputValuesDF)

Seq.empty[Row]
}
Expand All @@ -147,11 +148,11 @@ case class InsertValueIntoTableCommand(tableName: String, valueSeq: Seq[String])

@DeveloperApi
case class BulkLoadIntoTableCommand(
inputPath: String,
tableName: String,
isLocal: Boolean,
delimiter: Option[String],
parallel: Boolean)
inputPath: String,
tableName: String,
isLocal: Boolean,
delimiter: Option[String],
parallel: Boolean)
extends RunnableCommand
with SparkHadoopMapReduceUtil
with Logging {
Expand Down Expand Up @@ -303,7 +304,3 @@ case class BulkLoadIntoTableCommand(

override def output = Nil
}




Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ case class UpdateTable(
}
}
val inputValuesDF = sqlContext.createDataFrame(resRdd, relation.schema)
relation.insert(inputValuesDF, overwrite = false)
relation.insert(inputValuesDF)
sqlContext.sparkContext.emptyRDD[Row]
}

Expand Down

0 comments on commit 2b96840

Please sign in to comment.