Skip to content

Commit

Permalink
Implement mode of operation
Browse files Browse the repository at this point in the history
This commits implement mode of operation, allowing users to instruct
scheduler to operate in health check mode, query discovery mode or both.
  • Loading branch information
altmannmarcelo committed May 23, 2024
1 parent 3778ccf commit 612d339
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 57 deletions.
1 change: 1 addition & 0 deletions readyset_proxysql_scheduler.cnf
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ source_hostgroup = 11
readyset_hostgroup = 99
warmup_time = 60
lock_file = '/tmp/readyset_scheduler.lock'
operation_mode=all
36 changes: 35 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,37 @@
use std::{fs::File, io::Read};
use std::{
fmt::{Display, Formatter},
fs::File,
io::Read,
};

#[derive(serde::Deserialize, Clone, Copy, PartialEq, PartialOrd, Default)]
pub enum OperationMode {
HealthCheck,
QueryDiscovery,
#[default]
All,
}

impl From<String> for OperationMode {
fn from(s: String) -> Self {
match s.as_str() {
"health_check" => OperationMode::HealthCheck,
"query_discovery" => OperationMode::QueryDiscovery,
"all" => OperationMode::All,
_ => OperationMode::All,
}
}
}

impl Display for OperationMode {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
OperationMode::HealthCheck => write!(f, "health_check"),
OperationMode::QueryDiscovery => write!(f, "query_discovery"),
OperationMode::All => write!(f, "all"),
}
}
}

#[derive(serde::Deserialize, Clone)]
pub struct Config {
Expand All @@ -14,6 +47,7 @@ pub struct Config {
pub readyset_hostgroup: u16,
pub warmup_time: Option<u16>,
pub lock_file: Option<String>,
pub operation_mode: Option<OperationMode>,
}

pub fn read_config_file(path: &str) -> Result<String, std::io::Error> {
Expand Down
27 changes: 27 additions & 0 deletions src/health_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::{
config, messages,
server::{self, ServerStatus},
};

pub fn health_check(
proxysql_conn: &mut mysql::PooledConn,
config: &config::Config,
readyset_conn: &mut mysql::PooledConn,
) {
match server::check_readyset_is_ready(readyset_conn) {
Ok(ready) => {
if ready {
let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Online);
} else {
messages::print_info("Readyset is still running Snapshot.");
let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Shunned);
std::process::exit(0);
}
}
Err(e) => {
messages::print_error(format!("Cannot check Readyset status: {}.", e).as_str());
let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Shunned);
std::process::exit(1);
}
};
}
70 changes: 15 additions & 55 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod config;
mod health_check;
mod messages;
mod queries;
mod server;
Expand All @@ -9,6 +10,7 @@ use mysql::OptsBuilder;
use mysql::{Pool, PoolConstraints, PoolOpts};

use file_guard::Lock;
use queries::query_discovery;
use server::ServerStatus;
use std::fs::OpenOptions;

Expand Down Expand Up @@ -101,64 +103,22 @@ fn main() {
};
let mut readyset_conn = readyset_pool.get_conn().unwrap();

match server::check_readyset_is_ready(&mut readyset_conn) {
Ok(ready) => {
if ready {
let _ =
server::change_server_status(&mut proxysql_conn, &config, ServerStatus::Online);
} else {
messages::print_info("Readyset is still running Snapshot.");
let _ = server::change_server_status(
&mut proxysql_conn,
&config,
ServerStatus::Shunned,
);
std::process::exit(0);
}
}
Err(e) => {
messages::print_error(format!("Cannot check Readyset status: {}.", e).as_str());
let _ =
server::change_server_status(&mut proxysql_conn, &config, ServerStatus::Shunned);
std::process::exit(1);
}
let running_mode = match config.operation_mode {
Some(mode) => mode,
None => config::OperationMode::All,
};

let mut queries_added_or_change =
queries::adjust_mirror_rules(&mut proxysql_conn, &config).unwrap();

let rows: Vec<(String, String, String)> =
queries::find_queries_to_cache(&mut proxysql_conn, &config);

for (digest_text, digest, schema) in rows {
let digest_text = queries::replace_placeholders(&digest_text);
messages::print_info(format!("Going to test query support for {}", digest_text).as_str());
let supported =
queries::check_readyset_query_support(&mut readyset_conn, &digest_text, &schema);
match supported {
Ok(true) => {
messages::print_info(
"Query is supported, adding it to proxysql and readyset"
.to_string()
.as_str(),
);
queries_added_or_change = true;
queries::cache_query(&mut readyset_conn, &digest_text)
.expect("Failed to create readyset cache");
queries::add_query_rule(&mut proxysql_conn, &digest, &config)
.expect("Failed to add query rule");
}
Ok(false) => {
messages::print_info("Query is not supported");
}
Err(err) => {
messages::print_warning(format!("Failed to check query support: {}", err).as_str());
}
}
if running_mode == config::OperationMode::HealthCheck
|| running_mode == config::OperationMode::All
{
health_check::health_check(&mut proxysql_conn, &config, &mut readyset_conn)
}
if queries_added_or_change {
queries::load_query_rules(&mut proxysql_conn).expect("Failed to load query rules");
queries::save_query_rules(&mut proxysql_conn).expect("Failed to save query rules");

if running_mode == config::OperationMode::QueryDiscovery
|| running_mode == config::OperationMode::All
{
query_discovery(&mut proxysql_conn, &config, &mut readyset_conn);
}

messages::print_info("Finished readyset_scheduler");
}
43 changes: 42 additions & 1 deletion src/queries.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use chrono::{DateTime, Local};
use mysql::{prelude::Queryable, PooledConn};

use crate::{config::Config, messages};
use crate::{
config::{self, Config},
messages,
};

pub fn find_queries_to_cache(
conn: &mut PooledConn,
Expand Down Expand Up @@ -117,3 +120,41 @@ pub fn adjust_mirror_rules(conn: &mut PooledConn, config: &Config) -> Result<boo
}
Ok(updated_rules)
}

pub fn query_discovery(
proxysql_conn: &mut mysql::PooledConn,
config: &config::Config,
readyset_conn: &mut mysql::PooledConn,
) {
let mut queries_added_or_change = adjust_mirror_rules(proxysql_conn, config).unwrap();

let rows: Vec<(String, String, String)> = find_queries_to_cache(proxysql_conn, config);

for (digest_text, digest, schema) in rows {
let digest_text = replace_placeholders(&digest_text);
messages::print_info(format!("Going to test query support for {}", digest_text).as_str());
let supported = check_readyset_query_support(readyset_conn, &digest_text, &schema);
match supported {
Ok(true) => {
messages::print_info(
"Query is supported, adding it to proxysql and readyset"
.to_string()
.as_str(),
);
queries_added_or_change = true;
cache_query(readyset_conn, &digest_text).expect("Failed to create readyset cache");
add_query_rule(proxysql_conn, &digest, config).expect("Failed to add query rule");
}
Ok(false) => {
messages::print_info("Query is not supported");
}
Err(err) => {
messages::print_warning(format!("Failed to check query support: {}", err).as_str());
}
}
}
if queries_added_or_change {
load_query_rules(proxysql_conn).expect("Failed to load query rules");
save_query_rules(proxysql_conn).expect("Failed to save query rules");
}
}

0 comments on commit 612d339

Please sign in to comment.