Skip to content

Spark MongoDB Module

Devender Yadav edited this page Jul 27, 2015 · 2 revisions

This module includes support for MongoDB. User can perform Read-Write & Query operations over data.

Support

To use it, you need to have the following dependency in your pom.xml.

<dependency>
     <groupId>com.impetus.kundera.client</groupId>
     <artifactId>kundera-spark-mongodb</artifactId>
     <version>${kundera.version}</version>
</dependency>

Persistence unit configuration

 <persistence-unit name="spark_mongo_pu">
   <provider>com.impetus.kundera.KunderaPersistence</provider>
   <class>com.impetus.client.spark.entities.Person</class>
   <properties>
      <property name="kundera.nodes" value="localhost" />
      <property name="kundera.port" value="7077" />
      <property name="kundera.keyspace" value="sparktest" />
      <property name="kundera.dialect" value="spark" />
      <property name="kundera.client" value="mongodb" />
      <property name="kundera.client.lookup.class" value="com.impetus.spark.client.SparkClientFactory" />
      <property name="kundera.client.property" value="KunderaSparkMongoProperties.xml" />
   </properties>
</persistence-unit>

Spark Related Properties

Spark Related Properties are added using xml file. For example in above persistence.xml we mentioned KunderaSparkMongoProperties.xml.

Sample Property File:

<?xml version="1.0" encoding="UTF-8"?>
<clientProperties>
   <datastores>
      <dataStore>
         <name>mongodb</name>
         <connection>
            <properties>
               <property name="spark.master" value="local" />
               <property name="spark.app.name" value="sparkMongo" />
               <property name="spark.driver.allowMultipleContexts" value="true" />
            </properties>
            <servers>
               <server>
                  <host>localhost</host>
                  <port>27017</port>
               </server>
           </servers>
         </connection>
      </dataStore>
   </datastores>
</clientProperties>

Here "spark.master" and "spark.app.name" properties are mendatory. User can add more [spark related properties] (http://spark.apache.org/docs/latest/configuration.html#available-properties) as per their need.

Entity

@Entity
@Table(name = "spark_person")
public class Person implements Serializable
{

    /** The Constant serialVersionUID. */
    private static final long serialVersionUID = 1L;

    /** The person id. */
    @Id
    private String personId;

    /** The person name. */
    private String personName;

    /** The age. */
    private int age;

    /** The salary. */
    private Double salary;

   // setters and getters. 
}

Read-Write Operation

    EntityManagerFactory emf = Persistence.createEntityManagerFactory("spark_mongo_pu");
    EntityManager em = emf.createEntityManager();
    Person person = new Person();
    person.setAge(23);
    person.setPersonId("1");
    person.setPersonName("Dev");
    person.setSalary(100000.0);

    // save data 
    em.persist(person);
    
    em.clear();

   Person peronFound = em.find(Person.class, "1"); 

   em.close();
   emf.close();

Query Operation

Select all :

String query = "select * from spark_person"; 
List results = em.createNativeQuery(query).getResultList();

Select with WHERE :

String query = "select * from spark_person where salary > 35000";
List results = em.createNativeQuery(query).getResultList();

Select with LIKE :

String query = "select * from spark_person where personName like 'kp%'";
List results = em.createNativeQuery(query).getResultList();

Sum (Aggregation) :

String query = "select sum(salary) from spark_person";
List results = em.createNativeQuery(query).getResultList();

Saving data after Querying

User can save the results of the query in HDFS/FS as CSV or JSON.

User needs to mention INSERT INTO fs to save in File System or INSERT INTO hdfs to save in HDFS. User will provide SQL query in FROM clause and format is chosen with AS clause. Example:

To save in FS as CSV:

    String query = "INSERT INTO fs.[src/test/resources/testspark_csv] AS CSV FROM (select * from spark_person)";
    Query q = em.createNativeQuery(query, Person.class);
    q.executeUpdate();

To save in FS as JSON:

    query = "INSERT INTO fs.[src/test/resources/testspark_json] AS JSON FROM (select * from spark_person)";
    q = em.createNativeQuery(query, Person.class);
    q.executeUpdate();

For more details find the testcase.

Clone this wiki locally