diff --git a/readyset_proxysql_scheduler.cnf b/readyset_proxysql_scheduler.cnf index 278fc7f..7e7c089 100644 --- a/readyset_proxysql_scheduler.cnf +++ b/readyset_proxysql_scheduler.cnf @@ -10,3 +10,4 @@ source_hostgroup = 11 readyset_hostgroup = 99 warmup_time = 60 lock_file = '/tmp/readyset_scheduler.lock' +operation_mode=all diff --git a/src/config.rs b/src/config.rs index c69144e..58eda48 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,37 @@ -use std::{fs::File, io::Read}; +use std::{ + fmt::{Display, Formatter}, + fs::File, + io::Read, +}; + +#[derive(serde::Deserialize, Clone, Copy, PartialEq, PartialOrd, Default)] +pub enum OperationMode { + HealthCheck, + QueryDiscovery, + #[default] + All, +} + +impl From for OperationMode { + fn from(s: String) -> Self { + match s.as_str() { + "health_check" => OperationMode::HealthCheck, + "query_discovery" => OperationMode::QueryDiscovery, + "all" => OperationMode::All, + _ => OperationMode::All, + } + } +} + +impl Display for OperationMode { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + match self { + OperationMode::HealthCheck => write!(f, "health_check"), + OperationMode::QueryDiscovery => write!(f, "query_discovery"), + OperationMode::All => write!(f, "all"), + } + } +} #[derive(serde::Deserialize, Clone)] pub struct Config { @@ -14,6 +47,7 @@ pub struct Config { pub readyset_hostgroup: u16, pub warmup_time: Option, pub lock_file: Option, + pub operation_mode: Option, } pub fn read_config_file(path: &str) -> Result { diff --git a/src/health_check.rs b/src/health_check.rs new file mode 100644 index 0000000..37b2930 --- /dev/null +++ b/src/health_check.rs @@ -0,0 +1,27 @@ +use crate::{ + config, messages, + server::{self, ServerStatus}, +}; + +pub fn health_check( + proxysql_conn: &mut mysql::PooledConn, + config: &config::Config, + readyset_conn: &mut mysql::PooledConn, +) { + match server::check_readyset_is_ready(readyset_conn) { + Ok(ready) => { + if ready { + let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Online); + } else { + messages::print_info("Readyset is still running Snapshot."); + let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Shunned); + std::process::exit(0); + } + } + Err(e) => { + messages::print_error(format!("Cannot check Readyset status: {}.", e).as_str()); + let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Shunned); + std::process::exit(1); + } + }; +} diff --git a/src/main.rs b/src/main.rs index 40ebc2e..16e8e59 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ mod config; +mod health_check; mod messages; mod queries; mod server; @@ -9,6 +10,7 @@ use mysql::OptsBuilder; use mysql::{Pool, PoolConstraints, PoolOpts}; use file_guard::Lock; +use queries::query_discovery; use server::ServerStatus; use std::fs::OpenOptions; @@ -101,64 +103,22 @@ fn main() { }; let mut readyset_conn = readyset_pool.get_conn().unwrap(); - match server::check_readyset_is_ready(&mut readyset_conn) { - Ok(ready) => { - if ready { - let _ = - server::change_server_status(&mut proxysql_conn, &config, ServerStatus::Online); - } else { - messages::print_info("Readyset is still running Snapshot."); - let _ = server::change_server_status( - &mut proxysql_conn, - &config, - ServerStatus::Shunned, - ); - std::process::exit(0); - } - } - Err(e) => { - messages::print_error(format!("Cannot check Readyset status: {}.", e).as_str()); - let _ = - server::change_server_status(&mut proxysql_conn, &config, ServerStatus::Shunned); - std::process::exit(1); - } + let running_mode = match config.operation_mode { + Some(mode) => mode, + None => config::OperationMode::All, }; - let mut queries_added_or_change = - queries::adjust_mirror_rules(&mut proxysql_conn, &config).unwrap(); - - let rows: Vec<(String, String, String)> = - queries::find_queries_to_cache(&mut proxysql_conn, &config); - - for (digest_text, digest, schema) in rows { - let digest_text = queries::replace_placeholders(&digest_text); - messages::print_info(format!("Going to test query support for {}", digest_text).as_str()); - let supported = - queries::check_readyset_query_support(&mut readyset_conn, &digest_text, &schema); - match supported { - Ok(true) => { - messages::print_info( - "Query is supported, adding it to proxysql and readyset" - .to_string() - .as_str(), - ); - queries_added_or_change = true; - queries::cache_query(&mut readyset_conn, &digest_text) - .expect("Failed to create readyset cache"); - queries::add_query_rule(&mut proxysql_conn, &digest, &config) - .expect("Failed to add query rule"); - } - Ok(false) => { - messages::print_info("Query is not supported"); - } - Err(err) => { - messages::print_warning(format!("Failed to check query support: {}", err).as_str()); - } - } + if running_mode == config::OperationMode::HealthCheck + || running_mode == config::OperationMode::All + { + health_check::health_check(&mut proxysql_conn, &config, &mut readyset_conn) } - if queries_added_or_change { - queries::load_query_rules(&mut proxysql_conn).expect("Failed to load query rules"); - queries::save_query_rules(&mut proxysql_conn).expect("Failed to save query rules"); + + if running_mode == config::OperationMode::QueryDiscovery + || running_mode == config::OperationMode::All + { + query_discovery(&mut proxysql_conn, &config, &mut readyset_conn); } + messages::print_info("Finished readyset_scheduler"); } diff --git a/src/queries.rs b/src/queries.rs index 8646bc9..b53d479 100644 --- a/src/queries.rs +++ b/src/queries.rs @@ -1,7 +1,10 @@ use chrono::{DateTime, Local}; use mysql::{prelude::Queryable, PooledConn}; -use crate::{config::Config, messages}; +use crate::{ + config::{self, Config}, + messages, +}; pub fn find_queries_to_cache( conn: &mut PooledConn, @@ -117,3 +120,41 @@ pub fn adjust_mirror_rules(conn: &mut PooledConn, config: &Config) -> Result = find_queries_to_cache(proxysql_conn, config); + + for (digest_text, digest, schema) in rows { + let digest_text = replace_placeholders(&digest_text); + messages::print_info(format!("Going to test query support for {}", digest_text).as_str()); + let supported = check_readyset_query_support(readyset_conn, &digest_text, &schema); + match supported { + Ok(true) => { + messages::print_info( + "Query is supported, adding it to proxysql and readyset" + .to_string() + .as_str(), + ); + queries_added_or_change = true; + cache_query(readyset_conn, &digest_text).expect("Failed to create readyset cache"); + add_query_rule(proxysql_conn, &digest, config).expect("Failed to add query rule"); + } + Ok(false) => { + messages::print_info("Query is not supported"); + } + Err(err) => { + messages::print_warning(format!("Failed to check query support: {}", err).as_str()); + } + } + } + if queries_added_or_change { + load_query_rules(proxysql_conn).expect("Failed to load query rules"); + save_query_rules(proxysql_conn).expect("Failed to save query rules"); + } +}