Skip to content

Latest commit

 

History

History
118 lines (91 loc) · 4.83 KB

csv.adoc

File metadata and controls

118 lines (91 loc) · 4.83 KB

Indexing and Querying NYC yellow taxi csv data

localhost:9983 will be used as zkhost in this example. Instead of the main jar file, the shaded artifact should be used for these examples.

Once the shaded artifact is downloaded or built, it can be imported to the spark-shell by using the --jars config

./bin/spark-shell --jars spark-solr-3.0.0-alpha-shaded.jar

Writing data

  • Create a collection in Solr to index data to.

    Example: The below HTTP call creates a Solr collection with the name 'test-spark-solr'
    curl -X GET "http://localhost:8983/solr/admin/collections?action=create&name=test-spark-solr&collection.configName=techproducts&numShards=2&maxShardsPerNode=2"
  • Read the csv file as a Spark DataFrame. The CSV file I have used is located here

val csvFileLocation = "src/test/resources/test-data/nyc_yellow_taxi_sample_1k.csv"
var csvDF = spark.read.format("com.databricks.spark.csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load(csvFileLocation)
  • Clean up the data and create pickup, dropoff fields

// Filter out invalid lat/lon cols
csvDF = csvDF.filter("pickup_latitude >= -90 AND pickup_latitude <= 90 AND pickup_longitude >= -180 AND pickup_longitude <= 180")
csvDF = csvDF.filter("dropoff_latitude >= -90 AND dropoff_latitude <= 90 AND dropoff_longitude >= -180 AND dropoff_longitude <= 180")

// concat the lat/lon cols into a single value expected by solr location fields
csvDF = csvDF.withColumn("pickup", concat_ws(",", col("pickup_latitude"),col("pickup_longitude"))).drop("pickup_latitude").drop("pickup_longitude")
csvDF = csvDF.withColumn("dropoff", concat_ws(",", col("dropoff_latitude"),col("dropoff_longitude"))).drop("dropoff_latitude").drop("dropoff_longitude")
  • Write data to Solr. Before writing data to Solr, spark-solr tries to create the fields that exist in the csvDF but not in Solr via Schema API. For schema API to be usable in Solr, the ManagedIndexSchemaFactory should be enabled. If you do not want to enable managed schema, then please manually create all the fields in the csv file in Solr

val options = Map(
  "zkhost" -> "localhost:9983",
  "collection" -> "test-spark-solr",
  "gen_uniq_key" -> "true" // Generate unique key if the 'id' field does not exist
)

// Write to Solr
csvDF.write.format("solr").options(options).mode(org.apache.spark.sql.SaveMode.Overwrite).save
  • 999 documents should appear in Solr. If all the docs are not yet visible, then an explicit commit can be done via HTTP call.

Reading data

In this section, we will try to read the csv data that is indexed to the Solr collection test-spark-solr

  • Load the solr collection as a DataFrame

val options = Map(
  "zkHost" -> "localhost:9983",
  "collection" -> "test-spark-solr"
)

val df = spark.read.format("solr").options(options).load
  • Every DataFrame has a schema. You can use the printSchema() function to get information about the fields available for the tweets DataFrame

scala> df.printSchema()
root
 |-- improvement_surcharge: double (nullable = true)
 |-- vendor_id: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- id: string (nullable = false)
 |-- pickup: string (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- dropoff: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- extra: double (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- rate_code_id: long (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- mta_tax: double (nullable = true)
  • To be able to query with SQL syntax, we need to register this DataFrame as a table

df.registerTempTable("trips")
  • Fire off SQL queries

// Cache the DataFrame for efficiency. See http://spark.apache.org/docs/latest/sql-programming-guide.html#caching-data-in-memory
scala>df.cache()
scala> sqlContext.sql("SELECT avg(tip_amount), avg(fare_amount) FROM trips").show()
+-----------------+-----------------+
|              _c0|              _c1|
+-----------------+-----------------+
|1.630050050050051|12.27087087087087|
+-----------------+-----------------+

scala>  sqlContext.sql("SELECT max(tip_amount), max(fare_amount) FROM trips WHERE trip_distance > 10").show()
+-----+----+
|  _c0| _c1|
+-----+----+
|16.44|83.5|
+-----+----+