Skip to content
This repository has been archived by the owner on Sep 25, 2019. It is now read-only.

Commit

Permalink
allow namespacing for redis, allow TLS config for postgres data store…
Browse files Browse the repository at this point in the history
… (need to test)
  • Loading branch information
jeromegn committed Nov 7, 2018
1 parent 420a443 commit 4f94f46
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 34 deletions.
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ r2d2 = "0.8"
r2d2_sqlite = "0.7"
r2d2_redis = "0.8"
r2d2_postgres = "0.14"
postgres = {version="0.15", features=["with-serde_json"]}
postgres = {version="0.15", features=["with-serde_json","with-openssl"]}
sha2 = "0.7"
sha-1 = "0.7"
url = "1.7"
Expand Down
66 changes: 57 additions & 9 deletions src/postgres_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ use std::sync::Arc;
extern crate postgres;
extern crate r2d2;
extern crate r2d2_postgres;
use self::r2d2_postgres::PostgresConnectionManager;
use self::r2d2_postgres::{PostgresConnectionManager, TlsMode};

use self::postgres::params::{Builder, ConnectParams, IntoConnectParams};
use self::postgres::tls::openssl::openssl::ssl::{SslConnectorBuilder, SslMethod};
use self::postgres::tls::openssl::openssl::x509::{X509_FILETYPE_DEFAULT, X509_FILETYPE_PEM};
use self::postgres::types::ToSql;
use self::postgres::{Connection, TlsMode};
use self::postgres::Connection;

use settings::PostgresStoreConfig;

extern crate serde_json;

Expand All @@ -19,7 +23,9 @@ pub struct PostgresDataStore {
}

impl PostgresDataStore {
pub fn new(url: String, maybe_dbname: Option<String>) -> Self {
pub fn new(conf: &PostgresStoreConfig) -> Self {
let url = conf.url.clone();
let maybe_dbname = conf.database.as_ref().cloned();
let params: ConnectParams = url.into_connect_params().unwrap();
let mut builder = Builder::new();
builder.port(params.port());
Expand All @@ -33,8 +39,32 @@ impl PostgresDataStore {

let params = builder.build(params.host().clone());

let maybe_tls = if conf.tls_client_crt.is_some() {
let mut connbuilder = SslConnectorBuilder::new(SslMethod::tls()).unwrap();
if let Some(ref ca) = conf.tls_ca_crt {
connbuilder.set_ca_file(ca).unwrap();
}
connbuilder
.set_certificate_file(conf.tls_client_crt.as_ref().unwrap(), X509_FILETYPE_DEFAULT)
.unwrap();
connbuilder
.set_private_key_file(conf.tls_client_key.as_ref().unwrap(), X509_FILETYPE_PEM)
.unwrap();
// connbuilder.
// connbuilder.set_verify(postgres::tls::openssl::openssl::ssl::);
Some(postgres::tls::openssl::OpenSsl::from(connbuilder.build()))
} else {
None
};

let pool = if let Some(dbname) = &maybe_dbname {
let conn = Connection::connect(params.clone(), TlsMode::None).unwrap();
let conn = Connection::connect(
params.clone(),
match maybe_tls.as_ref() {
Some(tls) => postgres::TlsMode::Require(tls),
None => postgres::TlsMode::None,
},
).unwrap();
match conn.execute(&format!("CREATE DATABASE \"{}\"", dbname), NO_PARAMS) {
Ok(_) => debug!("database created with success"),
Err(e) => warn!(
Expand All @@ -50,11 +80,22 @@ impl PostgresDataStore {
}

let pool_params = builder.build(params.host().clone());
let manager =
PostgresConnectionManager::new(pool_params, r2d2_postgres::TlsMode::None).unwrap();
let manager = PostgresConnectionManager::new(
pool_params,
match maybe_tls {
Some(tls) => TlsMode::Require(Box::new(tls)),
None => TlsMode::None,
},
).unwrap();
r2d2::Pool::builder().build(manager).unwrap()
} else {
let manager = PostgresConnectionManager::new(params, r2d2_postgres::TlsMode::None).unwrap();
let manager = PostgresConnectionManager::new(
params,
match maybe_tls {
Some(tls) => TlsMode::Require(Box::new(tls)),
None => TlsMode::None,
},
).unwrap();
r2d2::Pool::builder().build(manager).unwrap()
};
PostgresDataStore {
Expand Down Expand Up @@ -182,11 +223,18 @@ mod tests {
}

fn setup(dbname: Option<String>) -> PostgresDataStore {
PostgresDataStore::new((*PG_URL).clone(), dbname)
let conf = PostgresStoreConfig {
url: (*PG_URL).clone(),
database: dbname,
tls_client_crt: None,
tls_client_key: None,
tls_ca_crt: None,
};
PostgresDataStore::new(&conf)
}

fn teardown(dbname: &str) {
let conn = Connection::connect((*PG_URL).as_str(), TlsMode::None).unwrap();
let conn = Connection::connect((*PG_URL).as_str(), postgres::TlsMode::None).unwrap();

conn
.execute(&format!("DROP DATABASE {}", dbname), NO_PARAMS)
Expand Down
41 changes: 29 additions & 12 deletions src/redis_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,29 @@ use cache::*;
extern crate r2d2_redis;
use self::r2d2_redis::{r2d2, redis, RedisConnectionManager};

use settings::RedisStoreConfig;

#[derive(Debug)]
pub struct RedisCacheStore {
pool: Arc<r2d2::Pool<RedisConnectionManager>>,
ns: Option<String>,
}

impl RedisCacheStore {
pub fn new(url: String) -> Self {
let manager = RedisConnectionManager::new(url.as_str()).unwrap();
pub fn new(conf: &RedisStoreConfig) -> Self {
let manager = RedisConnectionManager::new(conf.url.as_str()).unwrap();
let pool = r2d2::Pool::builder().build(manager).unwrap();
RedisCacheStore {
pool: Arc::new(pool),
ns: conf.namespace.as_ref().cloned(),
}
}

fn key(&self, k: String) -> String {
if self.ns.is_none() {
return k;
}
format!("{}:{}", self.ns.as_ref().unwrap(), k)
}
}

Expand All @@ -28,18 +39,19 @@ impl CacheStore for RedisCacheStore {
&self,
key: String,
) -> CacheResult<Option<Box<Stream<Item = Vec<u8>, Error = CacheError> + Send>>> {
debug!("redis cache get with key: {}", key);
let pool = Arc::clone(&self.pool);
let conn = pool.get().unwrap(); // TODO: no unwrap
let size = 256 * 1024;
let fullkey = self.key(key);
debug!("redis cache get with key: {}", fullkey);
Ok(Some(Box::new(stream::unfold(0, move |pos| {
// End early given some rules!
// not a multiple of size, means we're done.
if pos > 0 && pos % size > 0 {
return None;
}
match redis::cmd("GETRANGE")
.arg(key.clone())
.arg(fullkey.clone())
.arg(pos)
.arg(pos + size - 1) // end arg is inclusive
.query::<Vec<u8>>(conn.deref())
Expand All @@ -62,8 +74,12 @@ impl CacheStore for RedisCacheStore {
data_stream: Box<Stream<Item = Vec<u8>, Error = ()> + Send>,
maybe_ttl: Option<u32>,
) -> Box<Future<Item = (), Error = CacheError> + Send> {
debug!("redis cache set with key: {} and ttl: {:?}", key, maybe_ttl);
let pool = Arc::clone(&self.pool);
let fullkey = self.key(key);
debug!(
"redis cache set with key: {} and ttl: {:?}",
fullkey, maybe_ttl
);
Box::new(
data_stream
.concat2()
Expand All @@ -73,7 +89,7 @@ impl CacheStore for RedisCacheStore {
}).and_then(move |b| {
let conn = pool.get().unwrap(); // TODO: no unwrap
let mut cmd = redis::cmd("SET");
cmd.arg(key).arg(b);
cmd.arg(fullkey).arg(b);
if let Some(ttl) = maybe_ttl {
cmd.arg("EX").arg(ttl);
}
Expand All @@ -86,26 +102,27 @@ impl CacheStore for RedisCacheStore {
}

fn del(&self, key: String) -> Box<Future<Item = (), Error = CacheError> + Send> {
debug!("redis cache del key: {}", key);

let pool = Arc::clone(&self.pool);
let fullkey = self.key(key);
debug!("redis cache del key: {}", fullkey);
Box::new(future::lazy(move || -> Result<(), CacheError> {
let conn = pool.get().unwrap(); // TODO: no unwrap
match redis::cmd("DEL").arg(key).query::<i8>(conn.deref()) {
match redis::cmd("DEL").arg(fullkey).query::<i8>(conn.deref()) {
Ok(_) => Ok(()),
Err(e) => Err(CacheError::Failure(format!("{}", e))),
}
}))
}

fn expire(&self, key: String, ttl: u32) -> Box<Future<Item = (), Error = CacheError> + Send> {
debug!("redis cache expire key: {} w/ ttl: {}", key, ttl);

let pool = Arc::clone(&self.pool);
let fullkey = self.key(key);
debug!("redis cache expire key: {} w/ ttl: {}", fullkey, ttl);

Box::new(future::lazy(move || -> CacheResult<()> {
let conn = pool.get().unwrap(); // TODO: no unwrap
match redis::cmd("EXPIRE")
.arg(key)
.arg(fullkey)
.arg(ttl)
.query::<i8>(conn.deref())
{
Expand Down
7 changes: 2 additions & 5 deletions src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl Runtime {
CacheStore::Sqlite(conf) => {
Box::new(sqlite_cache::SqliteCacheStore::new(conf.filename.clone()))
}
CacheStore::Redis(conf) => Box::new(redis_cache::RedisCacheStore::new(conf.url.clone())),
CacheStore::Redis(conf) => Box::new(redis_cache::RedisCacheStore::new(&conf)),
},
None => Box::new(sqlite_cache::SqliteCacheStore::new("cache.db".to_string())),
},
Expand All @@ -225,10 +225,7 @@ impl Runtime {
DataStore::Sqlite(conf) => {
Box::new(sqlite_data::SqliteDataStore::new(conf.filename.clone()))
}
DataStore::Postgres(conf) => Box::new(postgres_data::PostgresDataStore::new(
conf.url.clone(),
conf.dbname.as_ref().cloned(),
)),
DataStore::Postgres(conf) => Box::new(postgres_data::PostgresDataStore::new(&conf)),
},
None => Box::new(sqlite_data::SqliteDataStore::new("data.db".to_string())),
},
Expand Down
18 changes: 11 additions & 7 deletions src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,39 @@ lazy_static! {
pub static ref SETTINGS: RwLock<Settings> = RwLock::new(Settings::new().unwrap());
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct SqliteStoreConfig {
pub filename: String,
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct PostgresStoreConfig {
pub url: String,
pub dbname: Option<String>,
pub database: Option<String>,
pub tls_client_crt: Option<String>,
pub tls_client_key: Option<String>,
pub tls_ca_crt: Option<String>,
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct RedisStoreConfig {
pub url: String,
pub namespace: Option<String>,
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub enum DataStore {
Sqlite(SqliteStoreConfig),
Postgres(PostgresStoreConfig),
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub enum CacheStore {
Sqlite(SqliteStoreConfig),
Redis(RedisStoreConfig),
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct Settings {
pub data_store: Option<DataStore>,
pub cache_store: Option<CacheStore>,
Expand Down

0 comments on commit 4f94f46

Please sign in to comment.