diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..1f3fde0 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "readyset_proxysql_scheduler" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +clap = { version = "*", features = ["derive"] } +mysql = "*" +toml = "0.8.12" +serde = "1.0" +chrono = "0.4.35" +file-guard = "0.2.0" + + +[package.metadata.generate-rpm] +assets = [ + { source = "target/release/readyset_proxysql_scheduler", dest = "/usr/bin/readyset_proxysql_scheduler", mode = "755" }, + { source = "./readyset_proxysql_scheduler.cnf", dest = "/etc/readyset_proxysql_scheduler.cnf", mode = "644" }, +] +license = "Apache 2.0" +description = "Readyset ProxySQL Scheduler" + +[package.metadata.deb] +extended-description = """\ +Readyset ProxySQL Scheduler""" +copyright = "2023, ReadySet, Inc." +maintainer = "ReadySet, Inc. " +assets = [ + ["target/release/readyset_proxysql_scheduler", "/usr/bin/readyset_proxysql_scheduler", "755"], + ["./readyset_proxysql_scheduler.cnf", "/etc/readyset_proxysql_scheduler.cnf", "644"], +] diff --git a/README.md b/README.md index 7fb390c..9cf6163 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,48 @@ # proxysql_scheduler Scheduler to integrate ProxySQL and Readyset + +# Workflow +This scheduler executes the following steps: + +1. Locks an in disk file (configured by `lock_file`) to avoid multiple instances of the scheduler to overlap their execution. +2. Queries the table `stats_mysql_query_digest` from ProxySQL and validates if each query is supported by Readyset +3. If the query is supported it adds a cache in Readyset by executing `CREATE CACHE FROM __query__`. +4. If `warmup_time` is NOT configure, a new query rule will be added redirecting this query to Readyset +5. If `warmup_time` is configured, a new query rule will be added to mirror this query to readyset. The query will still be redirected to the original hostgroup +6. Once `warmup_time` seconds has elapsed since the query was mirrored, the query rule will be updated to redirect the qury to Readyset instead of mirroring. + + + +# Configuration + +Assuming you have your ProxySQL already Configured you will need to create a new hostgroup and add Readyset to this hostgroup: + +``` +INSERT INTO mysql_servers (hostgroup_id, hostname, port) VALUES (99, '127.0.0.1', 3307); +LOAD MYSQL SERVERS TO RUNTIME; +SAVE MYSQL SERVERS TO DISK; +``` + +To configure the scheduler to run execute: + +``` +INSERT INTO scheduler (active, interval_ms, filename, arg1) VALUES (1, 10000, '/usr/bin/readyset_proxysql_scheduler', '--config=/etc/readyset_proxysql_scheduler.cnf'); +LOAD SCHEDULER TO RUNTIME; +SAVE SCHEDULER TO DISK; +``` + +Configure `/etc/readyset_proxysql_scheduler.cnf` as follow: +* `proxysql_user` - (Required) - Proxysql admin user +* `proxysql_password` - (Required) - Proxysql admin password +* `proxysql_host` - (Required) - Proxysql admin host +* `proxysql_port` - (Required) - Proxysql admin port +* `readyset_user` - (Required) - Readyset application user +* `readyset_password` - (Required) - Readyset application password +* `readyset_host` - (Required) - Readyset host +* `readyset_port` - (Required) - Readyset port +* `source_hostgroup` - (Required) - Hostgroup running your Read workload +* `readyset_hostgroup` - (Required) - Hostgroup where Readyset is configure +* `warmup_time` - (Optinal) - Time in seconds to mirror a query supported before redirecting the query to Readyset (Default 0 - no mirror) +* `lock_file` - (Optinal) - Lock file to prevent two instances of the scheduler to run at the same time (Default '/work/readyset_proxysql_scheduler/readyset_scheduler.lock') + + diff --git a/readyset_proxysql_scheduler.cnf b/readyset_proxysql_scheduler.cnf new file mode 100644 index 0000000..98746ba --- /dev/null +++ b/readyset_proxysql_scheduler.cnf @@ -0,0 +1,12 @@ +proxysql_user = 'admin' +proxysql_password = 'admin' +proxysql_host = '127.0.0.1' +proxysql_port = 6032 +readyset_user = 'root' +readyset_password = 'root' +readyset_host = '127.0.0.1' +readyset_port = 3307 +source_hostgroup = 11 +readyset_hostgroup = 12 +warmup_time = 60 +lock_file = '/work/readyset_proxysql_scheduler/readyset_scheduler.lock' diff --git a/readyset_scheduler.lock b/readyset_scheduler.lock new file mode 100644 index 0000000..e69de29 diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..a490629 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,29 @@ +use std::{fs::File, io::Read}; + + +#[derive(serde::Deserialize, Clone)] +pub struct Config { + pub proxysql_user: String, + pub proxysql_password: String, + pub proxysql_host: String, + pub proxysql_port: u16, + pub readyset_user: String, + pub readyset_password: String, + pub readyset_host: String, + pub readyset_port: u16, + pub source_hostgroup: u16, + pub readyset_hostgroup: u16, + pub warmup_time: Option, + pub lock_file: Option, +} + +pub fn read_config_file(path: &str) -> Result { + let mut file = File::open(path).expect(format!("Failed to open config file at path {}", path).as_str()); + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + Ok(contents) +} + +pub fn parse_config_file(contents: &str) -> Result { + toml::from_str(contents) +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..4affecd --- /dev/null +++ b/src/main.rs @@ -0,0 +1,103 @@ +mod config; +mod queries; +mod messages; + +use clap::Parser; +use config::read_config_file; +use mysql::OptsBuilder; +use mysql::{Pool, PoolConstraints, PoolOpts}; + +use file_guard::Lock; +use std::fs::OpenOptions; + +/// Readyset ProxySQL Scheduler +/// This tool is used to query ProxySQL Stats tables to find queries that are not yet cached in Readyset and then cache them. +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// path to the config file + #[arg(long)] + config: String, +} + +fn main() { + messages::print_info("Running readyset_scheduler"); + let args = Args::parse(); + let config_file = read_config_file(&args.config).expect("Failed to read config file"); + let config = config::parse_config_file(&config_file).expect("Failed to parse config file"); + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(config.clone().lock_file.unwrap_or("/tmp/readyset_scheduler.lock".to_string())) + .unwrap(); + + let _guard = match file_guard::try_lock(&file, Lock::Exclusive, 0, 1){ + Ok(guard) => guard, + Err(err) => { + messages::print_error(format!("Failed to acquire lock: {}", err).as_str()); + std::process::exit(1); + } + }; + + let proxysql_pool = Pool::new( + OptsBuilder::new() + .ip_or_hostname(Some(config.proxysql_host.as_str())) + .tcp_port(config.proxysql_port) + .user(Some(config.proxysql_user.as_str())) + .pass(Some(config.proxysql_password.as_str())) + .prefer_socket(false) + .pool_opts( + PoolOpts::default() + .with_reset_connection(false) + .with_constraints(PoolConstraints::new(1, 1).unwrap()), + ), + ) + .unwrap(); + let mut proxysql_conn = proxysql_pool.get_conn().unwrap(); + + let readyset_pool = Pool::new( + OptsBuilder::new() + .ip_or_hostname(Some(config.readyset_host.as_str())) + .tcp_port(config.readyset_port) + .user(Some(config.readyset_user.as_str())) + .pass(Some(config.readyset_password.as_str())) + .prefer_socket(false) + .pool_opts( + PoolOpts::default() + .with_reset_connection(false) + .with_constraints(PoolConstraints::new(1, 1).unwrap()), + ), + ) + .unwrap(); + let mut readyset_conn = readyset_pool.get_conn().unwrap(); + + let mut queries_added_or_change = queries::adjust_mirror_rules(&mut proxysql_conn, &config).unwrap(); + + + let rows: Vec<(String, String)> = queries::find_queries_to_cache(&mut proxysql_conn); + + for (digest_text, digest) in rows { + messages::print_info(format!("Going to test query support for {}", digest_text).as_str()); + let supported = queries::check_readyset_query_support(&mut readyset_conn, &digest_text); + match supported { + Ok(true) => { + messages::print_info(format!("Query is supported, adding it to proxysql and readyset").as_str()); + queries_added_or_change = true; + queries::cache_query(&mut readyset_conn, &digest_text).expect("Failed to create readyset cache"); + queries::add_query_rule(&mut proxysql_conn, &digest, &config).expect("Failed to add query rule"); + } + Ok(false) => { + messages::print_info("Query is not supported"); + } + Err(err) => { + messages::print_warning(format!("Failed to check query support: {}", err).as_str()); + } + } + } + if queries_added_or_change { + queries::load_query_rules(&mut proxysql_conn).expect("Failed to load query rules"); + queries::save_query_rules(&mut proxysql_conn).expect("Failed to save query rules"); + } + messages::print_info("Finished readyset_scheduler"); +} diff --git a/src/messages.rs b/src/messages.rs new file mode 100644 index 0000000..3906e83 --- /dev/null +++ b/src/messages.rs @@ -0,0 +1,31 @@ +use std::process; + +use chrono::{DateTime, Local}; + +enum MessageType { + Info, + Warning, + Error, +} +fn print_message_with_ts(message: &str, message_type: MessageType) { + let datetime_now: DateTime = Local::now(); + let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S"); + let pid = process::id(); + match message_type { + MessageType::Info => println!("{} [INFO] Readyset[{}]: {}", date_formatted, pid, message), + MessageType::Warning => println!("{} [WARNING] Readyset[{}]: {}", date_formatted, pid, message), + MessageType::Error => eprintln!("{} [ERROR] Readyset[{}]: {}", date_formatted, pid, message), + } +} + +pub fn print_info(message: &str) { + print_message_with_ts(message, MessageType::Info); +} + +pub fn print_warning(message: &str) { + print_message_with_ts(message, MessageType::Warning); +} + +pub fn print_error(message: &str) { + print_message_with_ts(message, MessageType::Error); +} \ No newline at end of file diff --git a/src/queries.rs b/src/queries.rs new file mode 100644 index 0000000..1d2d3fa --- /dev/null +++ b/src/queries.rs @@ -0,0 +1,96 @@ +use chrono::{DateTime, Local}; +use mysql::{prelude::Queryable, PooledConn}; + +use crate::{config::Config, messages}; + +pub fn find_queries_to_cache(conn: &mut PooledConn) -> Vec<(String, String)> { + let rows: Vec<(String, String)> = conn + .query( + "SELECT s.digest_text, s.digest + FROM stats_mysql_query_digest s + LEFT JOIN mysql_query_rules q + USING(digest) + WHERE s.hostgroup = 11 + AND s.schemaname = 'employees' + AND s.digest_text LIKE 'SELECT%FROM%' + AND q.rule_id IS NULL", + ) + .expect("Failed to query proxysql_conn"); + rows +} + +pub fn check_readyset_query_support( + conn: &mut PooledConn, + digest_text: &String, +) -> Result { + let row: Option<(String, String, String)> = + conn.query_first(format!("EXPLAIN CREATE CACHE FROM {}", digest_text))?; + match row { + Some((_, _, value)) => Ok(value == "yes" || value == "cached"), + None => Ok(false), + } +} + +pub fn cache_query(conn: &mut PooledConn, digest_text: &String) -> Result { + conn.query_drop(format!("CREATE CACHE FROM {}", digest_text)) + .expect("Failed to create readyset cache"); + Ok(true) +} + +pub fn add_query_rule( + conn: &mut PooledConn, + digest: &String, + config: &Config, +) -> Result { + let datetime_now: DateTime = Local::now(); + let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S"); + if config.warmup_time.is_some() { + conn.query_drop(format!("INSERT INTO mysql_query_rules (username, mirror_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, 'Mirror by readyset scheduler at: {}')", config.readyset_user, config.readyset_hostgroup, digest, date_formatted)).expect("Failed to insert into mysql_query_rules"); + messages::print_info("Inserted warm-up rule"); + } else { + conn.query_drop(format!("INSERT INTO mysql_query_rules (username, destination_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, 'Added by readyset scheduler at: {}')", config.readyset_user, config.readyset_hostgroup, digest, date_formatted)).expect("Failed to insert into mysql_query_rules"); + messages::print_info("Inserted destination rule"); + } + Ok(true) +} + +pub fn load_query_rules(conn: &mut PooledConn) -> Result { + conn.query_drop("LOAD MYSQL QUERY RULES TO RUNTIME") + .expect("Failed to load query rules"); + Ok(true) +} +pub fn save_query_rules(conn: &mut PooledConn) -> Result { + conn.query_drop("SAVE MYSQL QUERY RULES TO DISK") + .expect("Failed to load query rules"); + Ok(true) +} + +pub fn adjust_mirror_rules(conn: &mut PooledConn, config: &Config) -> Result { + let mut updated_rules = false; + let datetime_now: DateTime = Local::now(); + let tz = datetime_now.format("%z").to_string(); + let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S"); + let rows: Vec<(u16, String)> = conn.query("SELECT rule_id, comment FROM mysql_query_rules WHERE comment LIKE 'Mirror by readyset scheduler at: ____-__-__ __:__:__';").expect("Failed to select mirror rules"); + for (rule_id, comment) in rows { + + let datetime_mirror_str = comment + .split("Mirror by readyset scheduler at:") + .nth(1) + .unwrap_or("") + .trim(); + let datetime_mirror_str = format!("{} {}", datetime_mirror_str, tz); + let datetime_mirror_rule = DateTime::parse_from_str(datetime_mirror_str.as_str(), "%Y-%m-%d %H:%M:%S %z") + .unwrap_or_else(|_| { + panic!("Failed to parse datetime from comment: {}", comment); + }); + let elapsed = datetime_now.signed_duration_since(datetime_mirror_rule).num_seconds(); + if elapsed > config.warmup_time.unwrap() as i64 { + let comment = format!("{}\n Added by readyset scheduler at: {}", comment, date_formatted); + conn.query_drop(format!("UPDATE mysql_query_rules SET mirror_hostgroup = NULL, destination_hostgroup = {}, comment = '{}' WHERE rule_id = {}", config.readyset_hostgroup, comment, rule_id)).expect("Failed to update rule"); + messages::print_info(format!("Updated rule ID {} from warmup to destination", rule_id).as_str()); + updated_rules = true; + } + } + Ok(updated_rules) + +}