Skip to content

Commit

Permalink
Merge pull request #11 from readysettech/mode
Browse files Browse the repository at this point in the history
Implement mode of operation
  • Loading branch information
altmannmarcelo authored Jul 11, 2024
2 parents 3778ccf + bc50868 commit fa73e31
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 57 deletions.
1 change: 1 addition & 0 deletions readyset_proxysql_scheduler.cnf
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ source_hostgroup = 11
readyset_hostgroup = 99
warmup_time = 60
lock_file = '/tmp/readyset_scheduler.lock'
operation_mode=all
36 changes: 35 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,37 @@
use std::{fs::File, io::Read};
use std::{
fmt::{Display, Formatter},
fs::File,
io::Read,
};

#[derive(serde::Deserialize, Clone, Copy, PartialEq, PartialOrd, Default)]
pub enum OperationMode {
HealthCheck,
QueryDiscovery,
#[default]
All,
}

impl From<String> for OperationMode {
fn from(s: String) -> Self {
match s.as_str() {
"health_check" => OperationMode::HealthCheck,
"query_discovery" => OperationMode::QueryDiscovery,
"all" => OperationMode::All,
_ => OperationMode::All,
}
}
}

impl Display for OperationMode {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
OperationMode::HealthCheck => write!(f, "health_check"),
OperationMode::QueryDiscovery => write!(f, "query_discovery"),
OperationMode::All => write!(f, "all"),
}
}
}

#[derive(serde::Deserialize, Clone)]
pub struct Config {
Expand All @@ -14,6 +47,7 @@ pub struct Config {
pub readyset_hostgroup: u16,
pub warmup_time: Option<u16>,
pub lock_file: Option<String>,
pub operation_mode: Option<OperationMode>,
}

pub fn read_config_file(path: &str) -> Result<String, std::io::Error> {
Expand Down
27 changes: 27 additions & 0 deletions src/health_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::{
config, messages,
server::{self, ServerStatus},
};

pub fn health_check(
proxysql_conn: &mut mysql::PooledConn,
config: &config::Config,
readyset_conn: &mut mysql::PooledConn,
) {
match server::check_readyset_is_ready(readyset_conn) {
Ok(ready) => {
if ready {
let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Online);
} else {
messages::print_info("Readyset is still running Snapshot.");
let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Shunned);
std::process::exit(0);
}
}
Err(e) => {
messages::print_error(format!("Cannot check Readyset status: {}.", e).as_str());
let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Shunned);
std::process::exit(1);
}
};
}
70 changes: 15 additions & 55 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod config;
mod health_check;
mod messages;
mod queries;
mod server;
Expand All @@ -9,6 +10,7 @@ use mysql::OptsBuilder;
use mysql::{Pool, PoolConstraints, PoolOpts};

use file_guard::Lock;
use queries::query_discovery;
use server::ServerStatus;
use std::fs::OpenOptions;

Expand Down Expand Up @@ -101,64 +103,22 @@ fn main() {
};
let mut readyset_conn = readyset_pool.get_conn().unwrap();

match server::check_readyset_is_ready(&mut readyset_conn) {
Ok(ready) => {
if ready {
let _ =
server::change_server_status(&mut proxysql_conn, &config, ServerStatus::Online);
} else {
messages::print_info("Readyset is still running Snapshot.");
let _ = server::change_server_status(
&mut proxysql_conn,
&config,
ServerStatus::Shunned,
);
std::process::exit(0);
}
}
Err(e) => {
messages::print_error(format!("Cannot check Readyset status: {}.", e).as_str());
let _ =
server::change_server_status(&mut proxysql_conn, &config, ServerStatus::Shunned);
std::process::exit(1);
}
let running_mode = match config.operation_mode {
Some(mode) => mode,
None => config::OperationMode::All,
};

let mut queries_added_or_change =
queries::adjust_mirror_rules(&mut proxysql_conn, &config).unwrap();

let rows: Vec<(String, String, String)> =
queries::find_queries_to_cache(&mut proxysql_conn, &config);

for (digest_text, digest, schema) in rows {
let digest_text = queries::replace_placeholders(&digest_text);
messages::print_info(format!("Going to test query support for {}", digest_text).as_str());
let supported =
queries::check_readyset_query_support(&mut readyset_conn, &digest_text, &schema);
match supported {
Ok(true) => {
messages::print_info(
"Query is supported, adding it to proxysql and readyset"
.to_string()
.as_str(),
);
queries_added_or_change = true;
queries::cache_query(&mut readyset_conn, &digest_text)
.expect("Failed to create readyset cache");
queries::add_query_rule(&mut proxysql_conn, &digest, &config)
.expect("Failed to add query rule");
}
Ok(false) => {
messages::print_info("Query is not supported");
}
Err(err) => {
messages::print_warning(format!("Failed to check query support: {}", err).as_str());
}
}
if running_mode == config::OperationMode::HealthCheck
|| running_mode == config::OperationMode::All
{
health_check::health_check(&mut proxysql_conn, &config, &mut readyset_conn)
}
if queries_added_or_change {
queries::load_query_rules(&mut proxysql_conn).expect("Failed to load query rules");
queries::save_query_rules(&mut proxysql_conn).expect("Failed to save query rules");

if running_mode == config::OperationMode::QueryDiscovery
|| running_mode == config::OperationMode::All
{
query_discovery(&mut proxysql_conn, &config, &mut readyset_conn);
}

messages::print_info("Finished readyset_scheduler");
}
43 changes: 42 additions & 1 deletion src/queries.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use chrono::{DateTime, Local};
use mysql::{prelude::Queryable, PooledConn};

use crate::{config::Config, messages};
use crate::{
config::{self, Config},
messages,
};

pub fn find_queries_to_cache(
conn: &mut PooledConn,
Expand Down Expand Up @@ -117,3 +120,41 @@ pub fn adjust_mirror_rules(conn: &mut PooledConn, config: &Config) -> Result<boo
}
Ok(updated_rules)
}

pub fn query_discovery(
proxysql_conn: &mut mysql::PooledConn,
config: &config::Config,
readyset_conn: &mut mysql::PooledConn,
) {
let mut queries_added_or_change = adjust_mirror_rules(proxysql_conn, config).unwrap();

let rows: Vec<(String, String, String)> = find_queries_to_cache(proxysql_conn, config);

for (digest_text, digest, schema) in rows {
let digest_text = replace_placeholders(&digest_text);
messages::print_info(format!("Going to test query support for {}", digest_text).as_str());
let supported = check_readyset_query_support(readyset_conn, &digest_text, &schema);
match supported {
Ok(true) => {
messages::print_info(
"Query is supported, adding it to proxysql and readyset"
.to_string()
.as_str(),
);
queries_added_or_change = true;
cache_query(readyset_conn, &digest_text).expect("Failed to create readyset cache");
add_query_rule(proxysql_conn, &digest, config).expect("Failed to add query rule");
}
Ok(false) => {
messages::print_info("Query is not supported");
}
Err(err) => {
messages::print_warning(format!("Failed to check query support: {}", err).as_str());
}
}
}
if queries_added_or_change {
load_query_rules(proxysql_conn).expect("Failed to load query rules");
save_query_rules(proxysql_conn).expect("Failed to save query rules");
}
}

0 comments on commit fa73e31

Please sign in to comment.