-
Notifications
You must be signed in to change notification settings - Fork 233
Spark Cassandra Module
Devender Yadav edited this page Jul 28, 2015
·
7 revisions
This module includes support for Cassandra. User can perform Read-Write & Query operations over data.
To use it, you need to have the following dependency in your pom.xml.
<dependency>
<groupId>com.impetus.kundera.client</groupId>
<artifactId>kundera-spark-cassandra</artifactId>
<version>${kundera.version}</version>
</dependency>
<persistence-unit name="spark_cass_pu">
<provider>com.impetus.kundera.KunderaPersistence</provider>
<class>com.impetus.client.spark.Book</class>
<properties>
<property name="kundera.nodes" value="localhost" />
<property name="kundera.port" value="7077" />
<property name="kundera.keyspace" value="sparktest" />
<property name="kundera.dialect" value="spark" />
<property name="kundera.client" value="cassandra" />
<property name="kundera.client.lookup.class" value="com.impetus.spark.client.SparkClientFactory" />
<property name="kundera.client.property" value="KunderaSparkCassProperties.xml" />
</properties>
</persistence-unit>
Spark Related Properties are added using xml file. For example in above persistence.xml we mentioned KunderaSparkCassProperties.xml
.
Sample Property File:
<?xml version="1.0" encoding="UTF-8"?>
<clientProperties>
<datastores>
<dataStore>
<name>cassandra</name>
<connection>
<properties>
<property name="spark.executor.memory" value="1g" />
<property name="spark.cassandra.connection.host" value="localhost" />
<property name="spark.cassandra.connection.native.port" value="9042" />
<property name="spark.cassandra.connection.rpc.port" value="9160" />
<property name="spark.master" value="local" />
<property name="spark.app.name" value="testspark" />
<property name="spark.driver.allowMultipleContexts" value="true" />
</properties>
</connection>
</dataStore>
</datastores>
</clientProperties>
Here "spark.master" and "spark.app.name" properties are mandatory. User can add more [spark related properties] (http://spark.apache.org/docs/latest/configuration.html#available-properties) as per their need.
@Entity
@Table(name = "spark_book")
public class Book implements Serializable
{
/** The Constant serialVersionUID. */
private static final long serialVersionUID = 1L;
/** The id. */
@Id
private String id;
/** The title. */
@Column
private String title;
/** The author. */
@Column
private String author;
/** The category. */
@Column
private String category;
/** The num pages. */
@Column
private int numPages;
// setters and getters.
}
EntityManagerFactory emf = Persistence.createEntityManagerFactory("spark_cass_pu");
EntityManager em = emf.createEntityManager();
Book book = new Book();
book.setId("1");
book.setTitle("A Tale of Two Cities");
book.setAuthor("Charles Dickens");
book.setCategory("History");
book.setNumPages(441);
// save data
em.persist(book);
em.clear();
Book bookFound = em.find(Book.class, "1");
em.close();
emf.close();
Select all :
String query = "select * from spark_book";
List results = em.createNativeQuery(query).getResultList();
Select with WHERE :
String query = "select * from spark_book where numPages > 450";
List results = em.createNativeQuery(query).getResultList();
Select with LIKE :
String query = "select * from spark_book where title like 'The%'";
List results = em.createNativeQuery(query).getResultList();
Sum (Aggregation) :
String query = "select sum(numPages) from spark_book";
List results = em.createNativeQuery(query).getResultList();
Details of syntax can be found here.
To save in Cassandra:
String query = "INSERT INTO cassandra.sparktest.spark_book_copy FROM (select * from spark_book)";
Query q = em.createNativeQuery(query, Book.class);
q.executeUpdate();
To save in FS as CSV:
String query = "INSERT INTO fs.[src/test/resources/testspark_csv] AS CSV FROM (select * from spark_book)";
Query q = em.createNativeQuery(query, Book.class);
q.executeUpdate();
To save in FS as JSON:
query = "INSERT INTO fs.[src/test/resources/testspark_json] AS JSON FROM (select * from spark_book)";
q = em.createNativeQuery(query, Book.class);
q.executeUpdate();
For more details find the testcase.