Skip to content

Spark MongoDB Module

Devender Yadav edited this page Jul 27, 2015 · 7 revisions

This module includes support for MongoDB. User can perform Read-Write & Query operations over data.

Support

To use it, you need to have the following dependency in your pom.xml.

<dependency>
     <groupId>com.impetus.kundera.client</groupId>
     <artifactId>kundera-spark-mongodb</artifactId>
     <version>${kundera.version}</version>
</dependency>

Persistence unit configuration

 <persistence-unit name="spark_mongo_pu">
   <provider>com.impetus.kundera.KunderaPersistence</provider>
   <class>com.impetus.client.spark.entities.Person</class>
   <properties>
      <property name="kundera.nodes" value="localhost" />
      <property name="kundera.port" value="7077" />
      <property name="kundera.keyspace" value="sparktest" />
      <property name="kundera.dialect" value="spark" />
      <property name="kundera.client" value="mongodb" />
      <property name="kundera.client.lookup.class" value="com.impetus.spark.client.SparkClientFactory" />
      <property name="kundera.client.property" value="KunderaSparkMongoProperties.xml" />
   </properties>
</persistence-unit>

Spark Related Properties

Spark Related Properties are added using xml file. For example in above persistence.xml we mentioned KunderaSparkMongoProperties.xml.

Sample Property File:

<?xml version="1.0" encoding="UTF-8"?>
<clientProperties>
   <datastores>
      <dataStore>
         <name>mongodb</name>
         <connection>
            <properties>
               <property name="spark.master" value="local" />
               <property name="spark.app.name" value="sparkMongo" />
               <property name="spark.driver.allowMultipleContexts" value="true" />
            </properties>
            <servers>
               <server>
                  <host>localhost</host>
                  <port>27017</port>
               </server>
           </servers>
         </connection>
      </dataStore>
   </datastores>
</clientProperties>

Here "spark.master" and "spark.app.name" properties are mendatory. User can add more [spark related properties] (http://spark.apache.org/docs/latest/configuration.html#available-properties) as per their need.

Entity

@Entity
@Table(name = "spark_person")
public class Person implements Serializable
{

    /** The Constant serialVersionUID. */
    private static final long serialVersionUID = 1L;

    /** The person id. */
    @Id
    private String personId;

    /** The person name. */
    private String personName;

    /** The age. */
    private int age;

    /** The salary. */
    private Double salary;

   // setters and getters. 
}

Read-Write Operation

    EntityManagerFactory emf = Persistence.createEntityManagerFactory("spark_mongo_pu");
    EntityManager em = emf.createEntityManager();
    Person person = new Person();
    person.setAge(23);
    person.setPersonId("1");
    person.setPersonName("Dev");
    person.setSalary(100000.0);

    // save data 
    em.persist(person);
    
    em.clear();

   Person peronFound = em.find(Person.class, "1"); 

   em.close();
   emf.close();

Query Operation

Select all :

String query = "select * from spark_person"; 
List results = em.createNativeQuery(query).getResultList();

Select with WHERE :

String query = "select * from spark_person where salary > 35000";
List results = em.createNativeQuery(query).getResultList();

Select with LIKE :

String query = "select * from spark_person where personName like 'De%'";
List results = em.createNativeQuery(query).getResultList();

Sum (Aggregation) :

String query = "select sum(salary) from spark_person";
List results = em.createNativeQuery(query).getResultList();

Saving data after Querying

Details of syntax can be found here.

To save in FS as CSV:

    String query = "INSERT INTO fs.[src/test/resources/testspark_csv] AS CSV FROM (select * from spark_person)";
    Query q = em.createNativeQuery(query, Person.class);
    q.executeUpdate();

To save in FS as JSON:

    query = "INSERT INTO fs.[src/test/resources/testspark_json] AS JSON FROM (select * from spark_person)";
    q = em.createNativeQuery(query, Person.class);
    q.executeUpdate();

For more details find the testcase.

Clone this wiki locally