Skip to content

Commit

Permalink
Allow customization of C* cluster and session creation.
Browse files Browse the repository at this point in the history
- Expose C* cluster and session instances directly through Guice
- Allow overriding cluster creation through module constructor methods
- Uses a factory pattern for the session to allow overriding how the session is exposed to Guice
  • Loading branch information
llinder committed Feb 3, 2017
1 parent e731a09 commit b0fe5ee
Show file tree
Hide file tree
Showing 9 changed files with 408 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import smartthings.migration.MigrationParameters;
import smartthings.migration.MigrationRunner;

@DependsOn(CassandraService.class)
@DependsOn(Session.class)
public class CassandraMigrationService implements Service {

private Logger logger = LoggerFactory.getLogger(CassandraMigrationService.class);
Expand All @@ -25,7 +25,7 @@ public CassandraMigrationService(CassandraModule.Config config) {
@Override
public void onStart(StartEvent event) throws Exception {
if (config.autoMigrate) {
Session session = event.getRegistry().get(CassandraService.class).getSession();
Session session = event.getRegistry().get(Session.class);
logger.info("Auto Migrating Cassandra");
MigrationRunner migrationRunner = new MigrationRunner();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package smartthings.ratpack.cassandra

import com.datastax.driver.core.Cluster
import com.datastax.driver.core.Host
import com.datastax.driver.core.Session
import org.cassandraunit.CassandraCQLUnit
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet
import org.junit.Rule
Expand All @@ -19,12 +21,15 @@ class MigrationIntegrationSpec extends Specification {
CassandraModule.Config config = new CassandraModule.Config(keyspace: 'test', migrationFile: 'changelog.txt', seeds: seeds, autoMigrate: true)
CassandraMigrationService service = new CassandraMigrationService(config)
StartEvent mockStartEvent = Mock()
CassandraModule cassandraModule = new CassandraModule()
Cluster cluster = cassandraModule.cluster(config)

and:
CassandraService cassandraService = new CassandraService(config)
CassandraService cassandraService = new CassandraService(new DefaultSession(cluster, config))

and:
def registry = new SimpleMutableRegistry()
registry.add(Session, cassandraService.session)
registry.add(CassandraService, cassandraService)

and:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package smartthings.ratpack.cassandra;

import com.datastax.driver.core.CloseFuture;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Map;
import java.util.Optional;
import ratpack.exec.Promise;
import ratpack.service.Service;
import ratpack.service.StartEvent;
import ratpack.service.StopEvent;

/**
* Abstract class that transparently provides the role of both an
* injectable {@link Session} and fills the contract of a Ratpack {@link Service}.
*
* This is an abstract class to allow users to extend it for different reasons including
* the ability to provide different types for DI resolution.
*/
public abstract class AbstractSession implements RatpackSession, Session, Service {

private final Cluster cluster;
private final Optional<String> keyspace;
private Session delegate;

AbstractSession(Cluster cluster) {
this.cluster = cluster;
this.keyspace = Optional.empty();
}

AbstractSession(Cluster cluster, String keyspace) {
this.cluster = cluster;
this.keyspace = Optional.of(keyspace);
}

@Override
public String getLoggedKeyspace() {
return getDelegate().getLoggedKeyspace();
}

@Override
public Session init() {
return getDelegate().init();
}

@Override
public ListenableFuture<Session> initAsync() {
return getDelegate().initAsync();
}

@Override
public ResultSet execute(String s) {
return getDelegate().execute(s);
}

@Override
public ResultSet execute(String s, Object... objects) {
return getDelegate().execute(s, objects);
}

@Override
public ResultSet execute(String s, Map<String, Object> map) {
return getDelegate().execute(s, map);
}

@Override
public ResultSet execute(Statement statement) {
return getDelegate().execute(statement);
}

@Override
public ResultSetFuture executeAsync(String s) {
return getDelegate().executeAsync(s);
}

@Override
public ResultSetFuture executeAsync(String s, Object... objects) {
return getDelegate().executeAsync(s, objects);
}

@Override
public ResultSetFuture executeAsync(String s, Map<String, Object> map) {
return getDelegate().executeAsync(s, map);
}

@Override
public ResultSetFuture executeAsync(Statement statement) {
return getDelegate().executeAsync(statement);
}

@Override
public PreparedStatement prepare(String s) {
return getDelegate().prepare(s);
}

@Override
public PreparedStatement prepare(RegularStatement regularStatement) {
return getDelegate().prepare(regularStatement);
}

@Override
public ListenableFuture<PreparedStatement> prepareAsync(String s) {
return getDelegate().prepareAsync(s);
}

@Override
public
ListenableFuture<PreparedStatement> prepareAsync(RegularStatement regularStatement) {
return getDelegate().prepareAsync(regularStatement);
}

@Override
public Promise<ResultSet> executePromise(String query) {
return Promise.async(upstream -> {
ResultSetFuture resultSetFuture = getDelegate().executeAsync(query);
upstream.accept(resultSetFuture);
});
}

@Override
public Promise<ResultSet> executePromise(String query, Object... values) {
return Promise.async(upstream -> {
ResultSetFuture resultSetFuture = getDelegate().executeAsync(query, values);
upstream.accept(resultSetFuture);
});
}

@Override
public Promise<ResultSet> executePromise(String query, Map<String, Object> values) {
return Promise.async(upstream -> {
ResultSetFuture resultSetFuture = getDelegate().executeAsync(query, values);
upstream.accept(resultSetFuture);
});
}

@Override
public Promise<ResultSet> executePromise(Statement statement) {
return Promise.async(upstream -> {
ResultSetFuture resultSetFuture = getDelegate().executeAsync(statement);
upstream.accept(resultSetFuture);
});
}

@Override
public Promise<PreparedStatement> preparePromise(String query) {
return Promise.async(upstream -> {
ListenableFuture<PreparedStatement> f = getDelegate().prepareAsync(query);
upstream.accept(f);
});
}

@Override
public Promise<PreparedStatement> preparePromise(RegularStatement statement) {
return Promise.async(upstream -> {
ListenableFuture<PreparedStatement> f = getDelegate().prepareAsync(statement);
upstream.accept(f);
});
}

@Override
public CloseFuture closeAsync() {
return getDelegate().closeAsync();
}

@Override
public void close() {
getDelegate().close();
}

@Override
public boolean isClosed() {
return getDelegate().isClosed();
}

@Override
public Cluster getCluster() {
return cluster;
}

@Override
public State getState() {
return getDelegate().getState();
}

@Override
public void onStart(StartEvent event) throws Exception {
delegate = getDelegate();
}

@Override
public void onStop(StopEvent event) throws Exception {
if (getDelegate() != null && !getDelegate().isClosed()) {
getDelegate().closeAsync();
}
}

protected final Session getDelegate() {
if (delegate == null) {
synchronized (this) {
if (delegate == null) {
delegate = createDelegate();
}
}
}
return delegate;
}

protected Session createDelegate() {
return (keyspace.isPresent()) ? cluster.connect(keyspace.get()) : cluster.connect();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package smartthings.ratpack.cassandra;

import com.datastax.driver.core.Session;
import com.google.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -8,15 +9,15 @@
import ratpack.registry.Registry;

public class CassandraHealthCheck implements HealthCheck {
private final CassandraService cassandraService;
private final Session session;
private final String validationQuery;

Logger logger = LoggerFactory.getLogger(CassandraHealthCheck.class);

@Inject
public CassandraHealthCheck(CassandraModule.Config cassandraConfig, CassandraService cassandraService) {
public CassandraHealthCheck(CassandraModule.Config cassandraConfig, Session session) {
this.validationQuery = cassandraConfig.getValidationQuery();
this.cassandraService = cassandraService;
this.session = session;
}

@Override
Expand All @@ -28,7 +29,7 @@ public String getName() {
public Promise<Result> check(Registry registry) throws Exception {
return Promise.async(upstream -> {
try {
cassandraService.getSession().execute(validationQuery);
session.execute(validationQuery);
upstream.success(Result.healthy());
} catch (Exception ex) {
logger.error("Cassandra connection is unhealthy", ex);
Expand Down
Loading

0 comments on commit b0fe5ee

Please sign in to comment.