Skip to content

Commit

Permalink
Fix query pooling
Browse files Browse the repository at this point in the history
Fix query pooling.
Fix default lock file
Add more readable error message when lock file is not found
  • Loading branch information
altmannmarcelo committed Mar 25, 2024
1 parent 8a2666d commit f9bb1db
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 43 deletions.
2 changes: 1 addition & 1 deletion readyset_proxysql_scheduler.cnf
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ readyset_port = 3307
source_hostgroup = 11
readyset_hostgroup = 12
warmup_time = 20
lock_file = '/etc/readyset_scheduler.lock'
lock_file = '/tmp/readyset_scheduler.lock'
6 changes: 3 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{fs::File, io::Read};


#[derive(serde::Deserialize, Clone)]
pub struct Config {
pub proxysql_user: String,
Expand All @@ -18,12 +17,13 @@ pub struct Config {
}

pub fn read_config_file(path: &str) -> Result<String, std::io::Error> {
let mut file = File::open(path).expect(format!("Failed to open config file at path {}", path).as_str());
let mut file =
File::open(path).expect(format!("Failed to open config file at path {}", path).as_str());
let mut contents = String::new();
file.read_to_string(&mut contents)?;
Ok(contents)
}

pub fn parse_config_file(contents: &str) -> Result<Config, toml::de::Error> {
toml::from_str(contents)
}
}
57 changes: 39 additions & 18 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod config;
mod queries;
mod messages;
mod queries;

use clap::Parser;
use config::read_config_file;
Expand All @@ -25,21 +25,36 @@ fn main() {
let args = Args::parse();
let config_file = read_config_file(&args.config).expect("Failed to read config file");
let config = config::parse_config_file(&config_file).expect("Failed to parse config file");
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(config.clone().lock_file.unwrap_or("/tmp/readyset_scheduler.lock".to_string()))
.unwrap();
let file = match OpenOptions::new().read(true).write(true).create(true).open(
config
.clone()
.lock_file
.unwrap_or("/tmp/readyset_scheduler.lock".to_string()),
) {
Ok(file) => file,
Err(err) => {
messages::print_error(
format!(
"Failed to open lock file {}: {}",
config
.lock_file
.unwrap_or("/tmp/readyset_scheduler.lock".to_string()),
err
)
.as_str(),
);
std::process::exit(1);
}
};

let _guard = match file_guard::try_lock(&file, Lock::Exclusive, 0, 1){
let _guard = match file_guard::try_lock(&file, Lock::Exclusive, 0, 1) {
Ok(guard) => guard,
Err(err) => {
messages::print_error(format!("Failed to acquire lock: {}", err).as_str());
std::process::exit(1);
}
}
};

let proxysql_pool = Pool::new(
OptsBuilder::new()
.ip_or_hostname(Some(config.proxysql_host.as_str()))
Expand Down Expand Up @@ -72,20 +87,26 @@ fn main() {
.unwrap();
let mut readyset_conn = readyset_pool.get_conn().unwrap();

let mut queries_added_or_change = queries::adjust_mirror_rules(&mut proxysql_conn, &config).unwrap();

let mut queries_added_or_change =
queries::adjust_mirror_rules(&mut proxysql_conn, &config).unwrap();

let rows: Vec<(String, String)> = queries::find_queries_to_cache(&mut proxysql_conn);
let rows: Vec<(String, String, String)> =
queries::find_queries_to_cache(&mut proxysql_conn, &config);

for (digest_text, digest) in rows {
for (digest_text, digest, schema) in rows {
messages::print_info(format!("Going to test query support for {}", digest_text).as_str());
let supported = queries::check_readyset_query_support(&mut readyset_conn, &digest_text);
let supported =
queries::check_readyset_query_support(&mut readyset_conn, &digest_text, &schema);
match supported {
Ok(true) => {
messages::print_info(format!("Query is supported, adding it to proxysql and readyset").as_str());
messages::print_info(
format!("Query is supported, adding it to proxysql and readyset").as_str(),
);
queries_added_or_change = true;
queries::cache_query(&mut readyset_conn, &digest_text).expect("Failed to create readyset cache");
queries::add_query_rule(&mut proxysql_conn, &digest, &config).expect("Failed to add query rule");
queries::cache_query(&mut readyset_conn, &digest_text)
.expect("Failed to create readyset cache");
queries::add_query_rule(&mut proxysql_conn, &digest, &config)
.expect("Failed to add query rule");
}
Ok(false) => {
messages::print_info("Query is not supported");
Expand Down
11 changes: 8 additions & 3 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,13 @@ fn print_message_with_ts(message: &str, message_type: MessageType) {
let pid = process::id();
match message_type {
MessageType::Info => println!("{} [INFO] Readyset[{}]: {}", date_formatted, pid, message),
MessageType::Warning => println!("{} [WARNING] Readyset[{}]: {}", date_formatted, pid, message),
MessageType::Error => eprintln!("{} [ERROR] Readyset[{}]: {}", date_formatted, pid, message),
MessageType::Warning => println!(
"{} [WARNING] Readyset[{}]: {}",
date_formatted, pid, message
),
MessageType::Error => {
eprintln!("{} [ERROR] Readyset[{}]: {}", date_formatted, pid, message)
}
}
}

Expand All @@ -28,4 +33,4 @@ pub fn print_warning(message: &str) {

pub fn print_error(message: &str) {
print_message_with_ts(message, MessageType::Error);
}
}
51 changes: 33 additions & 18 deletions src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,35 @@ use mysql::{prelude::Queryable, PooledConn};

use crate::{config::Config, messages};

pub fn find_queries_to_cache(conn: &mut PooledConn) -> Vec<(String, String)> {
let rows: Vec<(String, String)> = conn
.query(
"SELECT s.digest_text, s.digest
pub fn find_queries_to_cache(
conn: &mut PooledConn,
config: &Config,
) -> Vec<(String, String, String)> {
let rows: Vec<(String, String, String)> = conn
.query(format!(
"SELECT s.digest_text, s.digest, s.schemaname
FROM stats_mysql_query_digest s
LEFT JOIN mysql_query_rules q
USING(digest)
WHERE s.hostgroup = 11
AND s.schemaname = 'employees'
AND s.digest_text LIKE 'SELECT%FROM%'
WHERE s.hostgroup = {}
AND s.username = '{}'
AND s.schemaname NOT IN ('sys', 'information_schema', 'performance_schema', 'mysql')
AND s.digest_text LIKE 'SELECT%FROM%'
AND digest_text NOT LIKE '%?=?%'
AND q.rule_id IS NULL",
)
config.source_hostgroup, config.readyset_user
))
.expect("Failed to query proxysql_conn");
rows
}

pub fn check_readyset_query_support(
conn: &mut PooledConn,
digest_text: &String,
schema: &String,
) -> Result<bool, mysql::Error> {
conn.query_drop(format!("USE {}", schema))
.expect("Failed to use schema");
let row: Option<(String, String, String)> =
conn.query_first(format!("EXPLAIN CREATE CACHE FROM {}", digest_text))?;
match row {
Expand All @@ -44,7 +53,7 @@ pub fn add_query_rule(
) -> Result<bool, mysql::Error> {
let datetime_now: DateTime<Local> = Local::now();
let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S");
if config.warmup_time.is_some() {
if config.warmup_time.is_some() {
conn.query_drop(format!("INSERT INTO mysql_query_rules (username, mirror_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, 'Mirror by readyset scheduler at: {}')", config.readyset_user, config.readyset_hostgroup, digest, date_formatted)).expect("Failed to insert into mysql_query_rules");
messages::print_info("Inserted warm-up rule");
} else {
Expand Down Expand Up @@ -72,25 +81,31 @@ pub fn adjust_mirror_rules(conn: &mut PooledConn, config: &Config) -> Result<boo
let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S");
let rows: Vec<(u16, String)> = conn.query("SELECT rule_id, comment FROM mysql_query_rules WHERE comment LIKE 'Mirror by readyset scheduler at: ____-__-__ __:__:__';").expect("Failed to select mirror rules");
for (rule_id, comment) in rows {

let datetime_mirror_str = comment
.split("Mirror by readyset scheduler at:")
.nth(1)
.unwrap_or("")
.trim();
let datetime_mirror_str = format!("{} {}", datetime_mirror_str, tz);
let datetime_mirror_rule = DateTime::parse_from_str(datetime_mirror_str.as_str(), "%Y-%m-%d %H:%M:%S %z")
.unwrap_or_else(|_| {
panic!("Failed to parse datetime from comment: {}", comment);
});
let elapsed = datetime_now.signed_duration_since(datetime_mirror_rule).num_seconds();
let datetime_mirror_rule =
DateTime::parse_from_str(datetime_mirror_str.as_str(), "%Y-%m-%d %H:%M:%S %z")
.unwrap_or_else(|_| {
panic!("Failed to parse datetime from comment: {}", comment);
});
let elapsed = datetime_now
.signed_duration_since(datetime_mirror_rule)
.num_seconds();
if elapsed > config.warmup_time.unwrap() as i64 {
let comment = format!("{}\n Added by readyset scheduler at: {}", comment, date_formatted);
let comment = format!(
"{}\n Added by readyset scheduler at: {}",
comment, date_formatted
);
conn.query_drop(format!("UPDATE mysql_query_rules SET mirror_hostgroup = NULL, destination_hostgroup = {}, comment = '{}' WHERE rule_id = {}", config.readyset_hostgroup, comment, rule_id)).expect("Failed to update rule");
messages::print_info(format!("Updated rule ID {} from warmup to destination", rule_id).as_str());
messages::print_info(
format!("Updated rule ID {} from warmup to destination", rule_id).as_str(),
);
updated_rules = true;
}
}
Ok(updated_rules)

}

0 comments on commit f9bb1db

Please sign in to comment.