-
Notifications
You must be signed in to change notification settings - Fork 233
Spark MongoDB Module
Devender Yadav edited this page Jul 27, 2015
·
7 revisions
This module includes support for MongoDB. User can perform Read-Write & Query operations over data.
To use it, you need to have the following dependency in your pom.xml
.
<dependency>
<groupId>com.impetus.kundera.client</groupId>
<artifactId>kundera-spark-mongodb</artifactId>
<version>${kundera.version}</version>
</dependency>
<persistence-unit name="spark_mongo_pu">
<provider>com.impetus.kundera.KunderaPersistence</provider>
<class>com.impetus.client.spark.entities.Person</class>
<properties>
<property name="kundera.nodes" value="localhost" />
<property name="kundera.port" value="7077" />
<property name="kundera.keyspace" value="sparktest" />
<property name="kundera.dialect" value="spark" />
<property name="kundera.client" value="mongodb" />
<property name="kundera.client.lookup.class" value="com.impetus.spark.client.SparkClientFactory" />
<property name="kundera.client.property" value="KunderaSparkMongoProperties.xml" />
</properties>
</persistence-unit>
Spark Related Properties are added using xml file. For example in above persistence.xml we mentioned KunderaSparkMongoProperties.xml
.
Sample Property File:
<?xml version="1.0" encoding="UTF-8"?>
<clientProperties>
<datastores>
<dataStore>
<name>mongodb</name>
<connection>
<properties>
<property name="spark.master" value="local" />
<property name="spark.app.name" value="sparkMongo" />
<property name="spark.driver.allowMultipleContexts" value="true" />
</properties>
<servers>
<server>
<host>localhost</host>
<port>27017</port>
</server>
</servers>
</connection>
</dataStore>
</datastores>
</clientProperties>
Here "spark.master" and "spark.app.name" properties are mendatory. User can add more [spark related properties] (http://spark.apache.org/docs/latest/configuration.html#available-properties) as per their need.
@Entity
@Table(name = "spark_person")
public class Person implements Serializable
{
/** The Constant serialVersionUID. */
private static final long serialVersionUID = 1L;
/** The person id. */
@Id
private String personId;
/** The person name. */
private String personName;
/** The age. */
private int age;
/** The salary. */
private Double salary;
// setters and getters.
}
EntityManagerFactory emf = Persistence.createEntityManagerFactory("spark_mongo_pu");
EntityManager em = emf.createEntityManager();
Person person = new Person();
person.setAge(23);
person.setPersonId("1");
person.setPersonName("Dev");
person.setSalary(100000.0);
// save data
em.persist(person);
em.clear();
Person peronFound = em.find(Person.class, "1");
em.close();
emf.close();
Select all :
String query = "select * from spark_person";
List results = em.createNativeQuery(query).getResultList();
Select with WHERE :
String query = "select * from spark_person where salary > 35000";
List results = em.createNativeQuery(query).getResultList();
Select with LIKE :
String query = "select * from spark_person where personName like 'kp%'";
List results = em.createNativeQuery(query).getResultList();
Sum (Aggregation) :
String query = "select sum(salary) from spark_person";
List results = em.createNativeQuery(query).getResultList();
Details of syntax can be found here.
To save in FS as CSV:
String query = "INSERT INTO fs.[src/test/resources/testspark_csv] AS CSV FROM (select * from spark_person)";
Query q = em.createNativeQuery(query, Person.class);
q.executeUpdate();
To save in FS as JSON:
query = "INSERT INTO fs.[src/test/resources/testspark_json] AS JSON FROM (select * from spark_person)";
q = em.createNativeQuery(query, Person.class);
q.executeUpdate();
For more details find the testcase.