From ee1c7f93a8b2f28334d29174293bc86132a90e18 Mon Sep 17 00:00:00 2001 From: Marcelo Altmann Date: Fri, 6 Sep 2024 09:54:25 -0300 Subject: [PATCH] Refactor and Query Discovery Mode - Refactor the code to make it more readable and maintainable. Added a new module `proxysql` to handle all the proxysql related functionalities. - Refactor the code to use the `query` module to handle all the query discovery related functionalities. - Added Query Discovery configuration. Now users can specify the query discovery mode in the configuration file. The set of rules to discover the queries are defined in the README.md file. Closes: #6 --- README.md | 67 +++++- build/test.cnf | 7 +- readyset_proxysql_scheduler.cnf | 1 + src/config.rs | 38 +++- src/health_check.rs | 22 -- src/hosts.rs | 141 ++---------- src/main.rs | 39 ++-- src/proxysql.rs | 240 ++++++++++++++++++++ src/queries.rs | 378 +++++++++++++++++++------------- 9 files changed, 606 insertions(+), 327 deletions(-) delete mode 100644 src/health_check.rs create mode 100644 src/proxysql.rs diff --git a/README.md b/README.md index 20a8fa5..572d977 100644 --- a/README.md +++ b/README.md @@ -6,12 +6,12 @@ Unlock the full potential of your database integrating ReadySet and ProxySQL by This scheduler executes the following steps: 1. Locks an in disk file (configured by `lock_file`) to avoid multiple instances of the scheduler to overlap their execution. -2. Check if it can connect to Readyset and validate if `Snapshot Status` is `Completed`. In case it cannot connect or Readyset is still performing snapshot it adjust the server status to `SHUNNED` in ProxySQL. -2. Queries the table `stats_mysql_query_digest` from ProxySQL and validates if each query is supported by Readyset +2. If `mode=(All|HealthCheck)` - Query `mysql_servers` and check all servers that have `comment='Readyset` (case insensitive) and `hostgroup=readyset_hostgroup`. For each server it checks if it can connect to Readyset and validate if `Snapshot Status` is `Completed`. In case it cannot connect or Readyset is still performing snapshot it adjust the server status to `SHUNNED` in ProxySQL. +3. If `mode=(All|QueryDiscovery)` Query the table `stats_mysql_query_digest` finding queries executed at `source_hostgroup` by `readyset_user` and validates if each query is supported by Readyset. The rules to order queries are configured by [Query Discovery](#query-discovery) configurations. 3. If the query is supported it adds a cache in Readyset by executing `CREATE CACHE FROM __query__`. 4. If `warmup_time` is NOT configure, a new query rule will be added redirecting this query to Readyset 5. If `warmup_time` is configured, a new query rule will be added to mirror this query to Readyset. The query will still be redirected to the original hostgroup -6. Once `warmup_time` seconds has elapsed since the query was mirrored, the query rule will be updated to redirect the qury to Readyset instead of mirroring. +6. Once `warmup_time` seconds has elapsed since the query was mirrored, the query rule will be updated to redirect the query to Readyset instead of mirroring. @@ -20,11 +20,13 @@ This scheduler executes the following steps: Assuming you have your ProxySQL already Configured you will need to create a new hostgroup and add Readyset to this hostgroup: ``` -INSERT INTO mysql_servers (hostgroup_id, hostname, port) VALUES (99, '127.0.0.1', 3307); +INSERT INTO mysql_servers (hostgroup_id, hostname, port, comment) VALUES (99, '127.0.0.1', 3307, 'Readyset'); LOAD MYSQL SERVERS TO RUNTIME; SAVE MYSQL SERVERS TO DISK; ``` +*NOTE*: It's required to add `Readyset` as a comment to the server to be able to identify it in the scheduler. + To configure the scheduler to run execute: ``` @@ -40,13 +42,66 @@ Configure `/etc/readyset_proxysql_scheduler.cnf` as follow: * `proxysql_port` - (Required) - Proxysql admin port * `readyset_user` - (Required) - Readyset application user * `readyset_password` - (Required) - Readyset application password -* `readyset_host` - (Required) - Readyset host -* `readyset_port` - (Required) - Readyset port * `source_hostgroup` - (Required) - Hostgroup running your Read workload * `readyset_hostgroup` - (Required) - Hostgroup where Readyset is configure * `warmup_time` - (Optional) - Time in seconds to mirror a query supported before redirecting the query to Readyset (Default 0 - no mirror) * `lock_file` - (Optional) - Lock file to prevent two instances of the scheduler to run at the same time (Default '/etc/readyset_scheduler.lock') +* `operation_mode` - (Optional) - Operation mode to run the scheduler. The options are described in [Operation Mode](#operation-mode) (Default All). +* `number_of_queries` - (Optional) - Number of queries to cache in Readyset (Default 10). +* `query_discovery_mode` / `query_discovery_min_execution` / `query_discovery_min_row_sent` - (Optional) - Query Discovery configurations. The options are described in [Query Discovery](#query-discovery) (Default CountStar / 0 / 0). + + +# Query Discovery +The Query Discovery is a set of configuration to find queries that are supported by Readyset. The configurations are defined by the following fields: + +* `query_discovery_mode`: (Optional) - Mode to discover queries to automatically cache in Readyset. The options are described in [Query Discovery Mode](#query-discovery-mode) (Default CountStar). +* `query_discovery_min_execution`: (Optional) - Minimum number of executions of a query to be considered a candidate to be cached (Default 0). +* `query_discovery_min_row_sent`: (Optional) - Minimum number of rows sent by a query to be considered a candidate to be cached (Default 0). + +# Query Discovery Mode +The Query Discovery Mode is a set of possible rules to discover queries to automatically cache in Readyset. The options are: + +1. `CountStar` - Total Number of Query Executions + * Formula: `total_executions = count_star` + * Description: This metric gives the total number of times the query has been executed. It is valuable for understanding how frequently the query runs. A high count_star value suggests that the query is executed often. + +2. `SumTime` - Total Time Spent Executing the Query + * Formula: `total_execution_time = sum_time` + * Description: This metric represents the total cumulative time spent executing the query across all its executions. It provides a clear understanding of how much processing time the query is consuming over time. A high total execution time can indicate that the query is either frequently executed or is time-intensive to process. + +3. `SumRowsSent` - Total Number of Rows Sent by the Query (sum_rows_sent) + * Formula: `total_rows_sent = sum_rows_sent` + * Description: This metric provides the total number of rows sent to the client across all executions of the query. It helps you understand the query’s output volume and the amount of data being transmitted. + +4. `MeanTime` - Average Query Execution Time (Mean) + * Formula: `mean_time = sum_time / count_star` + * Description: The mean time gives you an idea of the typical performance of the query over all executions. It provides a central tendency of how long the query generally takes to execute. + +5. `ExecutionTimeDistance` - Time Distance Between Query Executions + * Formula: `execution_time_distance = max_time - min_time` + * Description: This shows the spread between the fastest and slowest executions of the query. A large range might indicate variability in system load, input sizes, or external factors affecting performance. + +6. `QueryThroughput` - Query Throughput + * Formula: `query_throughput = count_star / sum_time` + * Description: This shows how many queries are processed per unit of time. It’s useful for understanding system capacity and how efficiently the database is handling the queries. + +7. `WorstBestCase` - Worst Best-Case Query Performance + * Formula: `worst_case = min_time` + * Description: The min_time metric gives the fastest time the query was ever executed. It reflects the best-case performance scenario, which could indicate the query’s performance under optimal conditions. + +8. `WorstWorstCase` - Worst Worst-Case Query Performance + * Formula: `worst_case = max_time` + * Description: The max_time shows the slowest time the query was executed. This can indicate potential bottlenecks or edge cases where the query underperforms, which could be due to larger data sets, locks, or high server load. + +9. `DistanceMeanMax` - Distance Between Mean Time and Max Time (mean_time vs max_time) + * Formula: `distance_mean_max = max_time - mean_time` + * Description: The distance between the mean execution time and the maximum execution time provides insight into how much slower the worst-case execution is compared to the average. A large gap indicates significant variability in query performance, which could be caused by certain executions encountering performance bottlenecks, such as large datasets, locking, or high system load. +# Operation Mode +The Operation Mode is a set of possible rules to run the scheduler. The options are: +* `All` - Run `HealthCheck` and `QueryDiscovery` operations. +* `HealthCheck` - Run only the health check operation. +* `QueryDiscovery` - Run only the query discovery operation. # Note Readyset support of MySQL and this scheduler are alpha quality, meaning they are not currently part of our test cycle. Run your own testing before plugging this to your production system. diff --git a/build/test.cnf b/build/test.cnf index cc3b7d0..8942e3f 100644 --- a/build/test.cnf +++ b/build/test.cnf @@ -6,7 +6,8 @@ readyset_user = 'root' readyset_password = 'noria' source_hostgroup = 1 readyset_hostgroup = 2 -warmup_time = 60 +warmup_time = 5 lock_file = '/tmp/readyset_scheduler.lock' -operation_mode="All" -number_of_queries=10 +operation_mode='All' +number_of_queries=2 +query_discovery_mode='SumTime' diff --git a/readyset_proxysql_scheduler.cnf b/readyset_proxysql_scheduler.cnf index 8599758..f4654b8 100644 --- a/readyset_proxysql_scheduler.cnf +++ b/readyset_proxysql_scheduler.cnf @@ -10,3 +10,4 @@ warmup_time = 60 lock_file = '/tmp/readyset_scheduler.lock' operation_mode="All" number_of_queries=10 +query_discovery_mode='SumTime' \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 90ae6f1..da41f80 100644 --- a/src/config.rs +++ b/src/config.rs @@ -14,7 +14,7 @@ pub enum OperationMode { impl From for OperationMode { fn from(s: String) -> Self { - match s.as_str() { + match s.to_lowercase().as_str() { "health_check" => OperationMode::HealthCheck, "query_discovery" => OperationMode::QueryDiscovery, "all" => OperationMode::All, @@ -33,6 +33,39 @@ impl Display for OperationMode { } } +#[derive(serde::Deserialize, Clone, Copy, PartialEq, PartialOrd, Default)] +pub enum QueryDiscoveryMode { + #[default] + CountStar, + SumTime, + SumRowsSent, + MeanTime, + ExecutionTimeDistance, + QueryThroughput, + WorstBestCase, + WorstWorstCase, + DistanceMeanMax, + External, +} + +impl From for QueryDiscoveryMode { + fn from(s: String) -> Self { + match s.to_lowercase().as_str() { + "count_star" => QueryDiscoveryMode::CountStar, + "sum_time" => QueryDiscoveryMode::SumTime, + "sum_rows_sent" => QueryDiscoveryMode::SumRowsSent, + "mean_time" => QueryDiscoveryMode::MeanTime, + "execution_time_distance" => QueryDiscoveryMode::ExecutionTimeDistance, + "query_throughput" => QueryDiscoveryMode::QueryThroughput, + "worst_best_case" => QueryDiscoveryMode::WorstBestCase, + "worst_worst_case" => QueryDiscoveryMode::WorstWorstCase, + "distance_mean_max" => QueryDiscoveryMode::DistanceMeanMax, + "external" => QueryDiscoveryMode::External, + _ => QueryDiscoveryMode::CountStar, + } + } +} + #[derive(serde::Deserialize, Clone)] pub struct Config { pub proxysql_user: String, @@ -47,6 +80,9 @@ pub struct Config { pub lock_file: Option, pub operation_mode: Option, pub number_of_queries: u16, + pub query_discovery_mode: Option, + pub query_discovery_min_execution: Option, + pub query_discovery_min_row_sent: Option, } pub fn read_config_file(path: &str) -> Result { diff --git a/src/health_check.rs b/src/health_check.rs deleted file mode 100644 index bde914d..0000000 --- a/src/health_check.rs +++ /dev/null @@ -1,22 +0,0 @@ -use crate::{ - config, - hosts::{Host, HostStatus}, - messages, -}; - -pub fn health_check(proxysql_conn: &mut mysql::Conn, config: &config::Config, host: &mut Host) { - match host.check_readyset_is_ready() { - Ok(ready) => { - if ready { - let _ = host.change_status(proxysql_conn, config, HostStatus::Online); - } else { - messages::print_info("Readyset is still running Snapshot."); - let _ = host.change_status(proxysql_conn, config, HostStatus::Shunned); - } - } - Err(e) => { - messages::print_error(format!("Cannot check Readyset status: {}.", e).as_str()); - let _ = host.change_status(proxysql_conn, config, HostStatus::Shunned); - } - }; -} diff --git a/src/hosts.rs b/src/hosts.rs index fea8d5d..38786d4 100644 --- a/src/hosts.rs +++ b/src/hosts.rs @@ -1,4 +1,4 @@ -use crate::{config::Config, messages}; +use crate::{config::Config, queries::Query}; use core::fmt; use mysql::{prelude::Queryable, Conn, OptsBuilder}; @@ -113,10 +113,19 @@ impl Host { /// # Returns /// /// The status of the host. - fn get_status(&self) -> HostStatus { + pub fn get_status(&self) -> HostStatus { self.status } + /// Changes the status of the host. + /// + /// # Arguments + /// + /// * `status` - The new status of the host. + pub fn change_status(&mut self, status: HostStatus) { + self.status = status; + } + /// Checks if the host is online. /// /// # Returns @@ -192,132 +201,18 @@ impl Host { /// # Returns /// /// true if the query was cached successfully, false otherwise. - pub fn cache_query( - &mut self, - digest_text: &String, - digest: &String, - ) -> Result { + pub fn cache_query(&mut self, query: &Query) -> Result { match &mut self.conn { None => return Ok(false), Some(conn) => { - conn.query_drop(format!("CREATE CACHE d_{} FROM {}", digest, digest_text)) - .expect("Failed to create readyset cache"); + conn.query_drop(format!( + "CREATE CACHE d_{} FROM {}", + query.get_digest(), + query.get_digest_text() + )) + .expect("Failed to create readyset cache"); } } Ok(true) } - - /// Changes the status of the host in the ProxySQL mysql_servers table. - /// The status is set to the given `status`. - pub fn change_status( - &mut self, - ps_conn: &mut Conn, - config: &Config, - status: HostStatus, - ) -> Result { - let where_clause = format!( - "WHERE hostgroup_id = {} AND hostname = '{}' AND port = {}", - config.readyset_hostgroup, - self.get_hostname(), - self.get_port() - ); - if self.status != status { - messages::print_info( - format!( - "Server HG: {}, Host: {}, Port: {} is currently {}. Changing to {}", - config.readyset_hostgroup, - self.get_hostname(), - self.get_port(), - self.get_status(), - status - ) - .as_str(), - ); - self.status = status; - ps_conn.query_drop(format!( - "UPDATE mysql_servers SET status = '{}' {}", - self.get_status(), - where_clause - ))?; - ps_conn.query_drop("LOAD MYSQL SERVERS TO RUNTIME")?; - ps_conn.query_drop("SAVE MYSQL SERVERS TO DISK")?; - } - - Ok(true) - } -} - -/// Represents a list of Readyset hosts -pub struct Hosts { - hosts: Vec, -} - -impl From> for Hosts { - fn from(hosts: Vec) -> Self { - Hosts { hosts } - } -} - -impl Hosts { - /// Fetches the hosts from the ProxySQL mysql_servers table. - /// - /// # Arguments - /// - /// * `proxysql_conn` - The connection to the ProxySQL instance. - /// * `config` - The configuration object. - /// - /// # Returns - /// - /// A vector of `Host` instances. - pub fn new<'a>(proxysql_conn: &'a mut Conn, config: &'a Config) -> Self { - let query = format!( - "SELECT hostname, port, status, comment FROM mysql_servers WHERE hostgroup_id = {}", - config.readyset_hostgroup - ); - let results: Vec<(String, u16, String, String)> = proxysql_conn.query(query).unwrap(); - results - .into_iter() - .filter_map(|(hostname, port, status, comment)| { - if comment.to_lowercase().contains("readyset") { - Some(Host::new(hostname, port, status, config)) - } else { - None - } - }) - .collect::>() - .into() - } - - /// Gets a mutable iterator over the hosts. - /// - /// # Returns - /// - /// A mutable iterator over the hosts. - pub fn iter_mut(&mut self) -> core::slice::IterMut { - self.hosts.iter_mut() - } - - /// Mutate self by retaining only the hosts that are online. - pub fn retain_online(&mut self) { - self.hosts.retain(|host| host.is_online()); - } - - /// Checks if the hosts list is empty. - /// - /// # Returns - /// - /// true if the hosts list is empty, false otherwise. - pub fn is_empty(&self) -> bool { - self.hosts.is_empty() - } - - /// Gets a mutable reference for the first host in the list. - /// - /// # Returns - /// - /// A reference to the first host in the list. - /// If the list is empty, the function returns None. - pub fn first_mut(&mut self) -> Option<&mut Host> { - self.hosts.first_mut() - } } diff --git a/src/main.rs b/src/main.rs index f428101..9177ef6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,16 +1,14 @@ mod config; -mod health_check; mod hosts; mod messages; +mod proxysql; mod queries; use clap::Parser; use config::read_config_file; -use hosts::Hosts; -use mysql::{Conn, OptsBuilder}; - use file_guard::Lock; -use queries::query_discovery; +use mysql::{Conn, OptsBuilder}; +use proxysql::ProxySQL; use std::fs::OpenOptions; /// Readyset ProxySQL Scheduler @@ -62,37 +60,36 @@ fn main() { std::process::exit(1); } }; - let mut proxysql_conn = Conn::new( - OptsBuilder::new() - .ip_or_hostname(Some(config.proxysql_host.as_str())) - .tcp_port(config.proxysql_port) - .user(Some(config.proxysql_user.as_str())) - .pass(Some(config.proxysql_password.as_str())) - .prefer_socket(false), - ) - .unwrap(); + + let mut proxysql = ProxySQL::new(&config); let running_mode = match config.operation_mode { Some(mode) => mode, None => config::OperationMode::All, }; - let mut hosts = Hosts::new(&mut proxysql_conn, &config); - if running_mode == config::OperationMode::HealthCheck || running_mode == config::OperationMode::All { - hosts.iter_mut().for_each(|host| { - health_check::health_check(&mut proxysql_conn, &config, host); - }); + proxysql.health_check(); } // retain only healthy hosts - hosts.retain_online(); + //hosts.retain_online(); if running_mode == config::OperationMode::QueryDiscovery || running_mode == config::OperationMode::All { - query_discovery(&mut proxysql_conn, &config, &mut hosts); + let mut conn = Conn::new( + OptsBuilder::new() + .ip_or_hostname(Some(config.proxysql_host.as_str())) + .tcp_port(config.proxysql_port) + .user(Some(config.proxysql_user.as_str())) + .pass(Some(config.proxysql_password.clone().as_str())) + .prefer_socket(false), + ) + .expect("Failed to create ProxySQL connection"); + let mut query_discovery = queries::QueryDiscovery::new(config); + query_discovery.run(&mut proxysql, &mut conn); } messages::print_info("Finished readyset_scheduler"); diff --git a/src/proxysql.rs b/src/proxysql.rs new file mode 100644 index 0000000..8dfe40b --- /dev/null +++ b/src/proxysql.rs @@ -0,0 +1,240 @@ +use chrono::{DateTime, Local}; +use mysql::{prelude::Queryable, Conn, OptsBuilder}; + +use crate::{ + config, + hosts::{Host, HostStatus}, + messages, + queries::Query, +}; + +const MIRROR_QUERY_TOKEN: &str = "Mirror by readyset scheduler at"; +const DESTINATION_QUERY_TOKEN: &str = "Added by readyset scheduler at"; +pub struct ProxySQL { + readyset_hostgroup: u16, + warmup_time: u16, + conn: mysql::Conn, + hosts: Vec, + //queries: Vec, +} + +impl ProxySQL { + /// This function is used to create a new ProxySQL struct. + /// + /// # Arguments + /// + /// * `config` - A reference to a config::Config containing the configuration for the ProxySQL connection. + /// + /// # Returns + /// + /// A new ProxySQL struct. + pub fn new(config: &config::Config) -> Self { + let mut conn = Conn::new( + OptsBuilder::new() + .ip_or_hostname(Some(config.proxysql_host.as_str())) + .tcp_port(config.proxysql_port) + .user(Some(config.proxysql_user.as_str())) + .pass(Some(config.proxysql_password.as_str())) + .prefer_socket(false), + ) + .expect("Failed to create ProxySQL connection"); + + let query = format!( + "SELECT hostname, port, status, comment FROM mysql_servers WHERE hostgroup_id = {}", + config.readyset_hostgroup + ); + let results: Vec<(String, u16, String, String)> = conn.query(query).unwrap(); + let hosts = results + .into_iter() + .filter_map(|(hostname, port, status, comment)| { + if comment.to_lowercase().contains("readyset") { + Some(Host::new(hostname, port, status, config)) + } else { + None + } + }) + .collect::>(); + + ProxySQL { + conn, + readyset_hostgroup: config.readyset_hostgroup, + warmup_time: config.warmup_time.unwrap_or(0), + hosts, + } + } + + /// This function is used to add a query rule to ProxySQL. + /// + pub fn add_as_query_rule(&mut self, query: &Query) -> Result { + let datetime_now: DateTime = Local::now(); + let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S"); + if self.warmup_time > 0 { + self.conn.query_drop(format!("INSERT INTO mysql_query_rules (username, mirror_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", query.get_user(), self.readyset_hostgroup, query.get_digest(), MIRROR_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules"); + messages::print_info("Inserted warm-up rule"); + } else { + self.conn.query_drop(format!("INSERT INTO mysql_query_rules (username, destination_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", query.get_user(), self.readyset_hostgroup, query.get_digest(), DESTINATION_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules"); + messages::print_info("Inserted destination rule"); + } + Ok(true) + } + + pub fn load_query_rules(&mut self) -> Result { + self.conn + .query_drop("LOAD MYSQL QUERY RULES TO RUNTIME") + .expect("Failed to load query rules"); + Ok(true) + } + pub fn save_query_rules(&mut self) -> Result { + self.conn + .query_drop("SAVE MYSQL QUERY RULES TO DISK") + .expect("Failed to load query rules"); + Ok(true) + } + + /// This function is used to check the current list of queries routed to Readyset. + /// + /// # Arguments + /// * `conn` - A reference to a connection to ProxySQL. + /// + /// # Returns + /// A vector of tuples containing the digest_text, digest, and schemaname of the queries that are currently routed to ReadySet. + pub fn find_queries_routed_to_readyset(&mut self) -> Vec { + let rows: Vec = self + .conn + .query(format!( + "SELECT digest FROM mysql_query_rules WHERE comment LIKE '{}%' OR comment LIKE '{}%'", + MIRROR_QUERY_TOKEN, DESTINATION_QUERY_TOKEN + )) + .expect("Failed to find queries routed to ReadySet"); + rows + } + + /// This function is used to check if any mirror query rule needs to be changed to destination. + /// + /// # Returns + /// + /// A boolean indicating if any mirror query rule was changed to destination. + pub fn adjust_mirror_rules(&mut self) -> Result { + let mut updated_rules = false; + let datetime_now: DateTime = Local::now(); + let tz = datetime_now.format("%z").to_string(); + let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S"); + let rows: Vec<(u16, String)> = self.conn.query(format!("SELECT rule_id, comment FROM mysql_query_rules WHERE comment LIKE '{}: ____-__-__ __:__:__';", MIRROR_QUERY_TOKEN)).expect("Failed to select mirror rules"); + for (rule_id, comment) in rows { + let datetime_mirror_str = comment + .split("Mirror by readyset scheduler at:") + .nth(1) + .unwrap_or("") + .trim(); + let datetime_mirror_str = format!("{} {}", datetime_mirror_str, tz); + let datetime_mirror_rule = + DateTime::parse_from_str(datetime_mirror_str.as_str(), "%Y-%m-%d %H:%M:%S %z") + .unwrap_or_else(|_| { + panic!("Failed to parse datetime from comment: {}", comment); + }); + let elapsed = datetime_now + .signed_duration_since(datetime_mirror_rule) + .num_seconds(); + if elapsed > self.warmup_time as i64 { + let comment = format!( + "{}\n Added by readyset scheduler at: {}", + comment, date_formatted + ); + self.conn.query_drop(format!("UPDATE mysql_query_rules SET mirror_hostgroup = NULL, destination_hostgroup = {}, comment = '{}' WHERE rule_id = {}", self.readyset_hostgroup, comment, rule_id)).expect("Failed to update rule"); + messages::print_info( + format!("Updated rule ID {} from warmup to destination", rule_id).as_str(), + ); + updated_rules = true; + } + } + Ok(updated_rules) + } + + pub fn health_check(&mut self) { + let mut status_changes = Vec::new(); + + for host in self.hosts.iter_mut() { + match host.check_readyset_is_ready() { + Ok(ready) => { + if ready { + status_changes.push((host, HostStatus::Online)); + } else { + messages::print_info("Readyset is still running Snapshot."); + status_changes.push((host, HostStatus::Shunned)); + } + } + Err(e) => { + messages::print_error(format!("Cannot check Readyset status: {}.", e).as_str()); + status_changes.push((host, HostStatus::Shunned)); + } + }; + } + + for (host, status) in status_changes { + if host.get_status() != status { + let where_clause = format!( + "WHERE hostgroup_id = {} AND hostname = '{}' AND port = {}", + self.readyset_hostgroup, + host.get_hostname(), + host.get_port() + ); + messages::print_info( + format!( + "Server HG: {}, Host: {}, Port: {} is currently {}. Changing to {}", + self.readyset_hostgroup, + host.get_hostname(), + host.get_port(), + host.get_status(), + status + ) + .as_str(), + ); + let _ = self.conn.query_drop(format!( + "UPDATE mysql_servers SET status = '{}' {}", + host.get_status(), + where_clause + )); + host.change_status(status); + let _ = self.conn.query_drop("LOAD MYSQL SERVERS TO RUNTIME"); + let _ = self.conn.query_drop("SAVE MYSQL SERVERS TO DISK"); + } + } + } + + /// This function is used to get the number of online hosts. + /// This is done by filtering the hosts vector and counting the number of hosts with status Online. + /// + /// # Returns + /// + /// A u16 containing the number of online hosts. + pub fn number_of_online_hosts(&self) -> u16 { + self.hosts + .iter() + .filter(|host| host.is_online()) + .collect::>() + .len() as u16 + } + + /// This function is used to get the first online host. + /// This is done by iterating over the hosts vector and returning the first host with status Online. + /// + /// # Returns + /// + /// An Option containing a reference to the first online host. + pub fn get_first_online_host(&mut self) -> Option<&mut Host> { + self.hosts.iter_mut().find(|host| host.is_online()) + } + + /// This function is used to get all the online hosts. + /// This is done by filtering the hosts vector and collecting the hosts with status Online. + /// + /// # Returns + /// + /// A vector containing references to the online hosts. + pub fn get_online_hosts(&mut self) -> Vec<&mut Host> { + self.hosts + .iter_mut() + .filter(|host| host.is_online()) + .collect() + } +} diff --git a/src/queries.rs b/src/queries.rs index 8aca89b..7ac5e18 100644 --- a/src/queries.rs +++ b/src/queries.rs @@ -1,179 +1,255 @@ -use chrono::{DateTime, Local}; -use mysql::{prelude::Queryable, Conn}; - use crate::{ - config::{self, Config}, - hosts::Hosts, + config::{Config, QueryDiscoveryMode}, messages, + proxysql::ProxySQL, }; +use mysql::{prelude::Queryable, Conn}; -const MIRROR_QUERY_TOKEN: &str = "Mirror by readyset scheduler at"; -const DESTINATION_QUERY_TOKEN: &str = "Added by readyset scheduler at"; - -/// This function is used to find queries that are not cached in ReadySet and are not in the mysql_query_rules table. -/// -/// # Arguments -/// * `conn` - A reference to a connection to ProxySQL. -/// * `config` - A reference to the configuration struct. -/// -/// # Returns -/// A vector of tuples containing the digest_text, digest, and schema name of the queries that are not cached in ReadySet and are not in the mysql_query_rules table. -pub fn find_queries_to_cache(conn: &mut Conn, config: &Config) -> Vec<(String, String, String)> { - let rows: Vec<(String, String, String)> = conn - .query(format!( - "SELECT s.digest_text, s.digest, s.schemaname - FROM stats_mysql_query_digest s - LEFT JOIN mysql_query_rules q - USING(digest) - WHERE s.hostgroup = {} - AND s.username = '{}' - AND s.schemaname NOT IN ('sys', 'information_schema', 'performance_schema', 'mysql') - AND s.digest_text LIKE 'SELECT%FROM%' - AND digest_text NOT LIKE '%?=?%' - AND s.sum_rows_sent > 0 - AND q.rule_id IS NULL - ORDER BY s.sum_rows_sent DESC - LIMIT {}", - config.source_hostgroup, - config.readyset_user, - (config.number_of_queries * 2) - )) - .expect("Failed to find queries to cache"); - rows +pub struct Query { + digest_text: String, + digest: String, + schema: String, + user: String, } -/// This function is used to check the current list of queries routed to Readyset. -/// -/// # Arguments -/// * `conn` - A reference to a connection to ProxySQL. -/// -/// # Returns -/// A vector of tuples containing the digest_text, digest, and schemaname of the queries that are currently routed to ReadySet. -pub fn find_queries_routed_to_readyset(conn: &mut Conn) -> Vec { - let rows: Vec = conn - .query(format!( - "SELECT digest FROM mysql_query_rules WHERE comment LIKE '{}%' OR comment LIKE '{}%'", - MIRROR_QUERY_TOKEN, DESTINATION_QUERY_TOKEN - )) - .expect("Failed to find queries routed to ReadySet"); - rows -} +impl Query { + /// This function is used to create a new Query struct. + /// + /// # Arguments + /// + /// * `digest_text` - A string containing the digest text of the query. + /// * `digest` - A string containing the digest of the query. + /// * `schema` - A string containing the schema name of the query. + /// * `user` - A string containing the user that executed the query. + /// + /// # Returns + /// + /// A new Query struct. + fn new(digest_text: String, digest: String, schema: String, user: String) -> Self { + Query { + digest_text, + digest, + schema, + user, + } + } -pub fn replace_placeholders(query: &str) -> String { - // date placeholder + /// This function is used to get the digest text of the query. + /// + /// # Returns + /// A string containing the digest text of the query. + pub fn get_digest_text(&self) -> &String { + &self.digest_text + } - query.replace("?-?-?", "?") -} + /// This function is used to get the digest of the query. + /// + /// # Returns + /// + /// A string containing the digest of the query. + pub fn get_digest(&self) -> &String { + &self.digest + } -pub fn add_query_rule( - conn: &mut Conn, - digest: &String, - config: &Config, -) -> Result { - let datetime_now: DateTime = Local::now(); - let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S"); - if config.warmup_time.is_some() { - conn.query_drop(format!("INSERT INTO mysql_query_rules (username, mirror_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", config.readyset_user, config.readyset_hostgroup, digest, MIRROR_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules"); - messages::print_info("Inserted warm-up rule"); - } else { - conn.query_drop(format!("INSERT INTO mysql_query_rules (username, destination_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", config.readyset_user, config.readyset_hostgroup, digest, DESTINATION_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules"); - messages::print_info("Inserted destination rule"); + /// This function is used to get the schema name of the query. + /// + /// # Returns + /// + /// A string containing the schema name of the query. + pub fn get_schema(&self) -> &String { + &self.schema } - Ok(true) -} -pub fn load_query_rules(conn: &mut Conn) -> Result { - conn.query_drop("LOAD MYSQL QUERY RULES TO RUNTIME") - .expect("Failed to load query rules"); - Ok(true) + /// This function is used to get the user that executed the query. + /// + /// # Returns + /// + /// A string containing the user that executed the query. + pub fn get_user(&self) -> &String { + &self.user + } } -pub fn save_query_rules(conn: &mut Conn) -> Result { - conn.query_drop("SAVE MYSQL QUERY RULES TO DISK") - .expect("Failed to load query rules"); - Ok(true) + +pub struct QueryDiscovery { + query_discovery_mode: QueryDiscoveryMode, + query_discovery_min_execution: u64, + query_discovery_min_rows_sent: u64, + source_hostgroup: u16, + readyset_user: String, + number_of_queries: u16, + offset: u16, } -pub fn adjust_mirror_rules(conn: &mut Conn, config: &Config) -> Result { - let mut updated_rules = false; - let datetime_now: DateTime = Local::now(); - let tz = datetime_now.format("%z").to_string(); - let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S"); - let rows: Vec<(u16, String)> = conn.query(format!("SELECT rule_id, comment FROM mysql_query_rules WHERE comment LIKE '{}: ____-__-__ __:__:__';", MIRROR_QUERY_TOKEN)).expect("Failed to select mirror rules"); - for (rule_id, comment) in rows { - let datetime_mirror_str = comment - .split("Mirror by readyset scheduler at:") - .nth(1) - .unwrap_or("") - .trim(); - let datetime_mirror_str = format!("{} {}", datetime_mirror_str, tz); - let datetime_mirror_rule = - DateTime::parse_from_str(datetime_mirror_str.as_str(), "%Y-%m-%d %H:%M:%S %z") - .unwrap_or_else(|_| { - panic!("Failed to parse datetime from comment: {}", comment); - }); - let elapsed = datetime_now - .signed_duration_since(datetime_mirror_rule) - .num_seconds(); - if elapsed > config.warmup_time.unwrap() as i64 { - let comment = format!( - "{}\n Added by readyset scheduler at: {}", - comment, date_formatted - ); - conn.query_drop(format!("UPDATE mysql_query_rules SET mirror_hostgroup = NULL, destination_hostgroup = {}, comment = '{}' WHERE rule_id = {}", config.readyset_hostgroup, comment, rule_id)).expect("Failed to update rule"); - messages::print_info( - format!("Updated rule ID {} from warmup to destination", rule_id).as_str(), - ); - updated_rules = true; +/// Query Discovery is a feature responsible for discovering queries that are hurting the database performance. +/// The queries are discovered by analyzing the stats_mysql_query_digest table and finding queries that are not cached in ReadySet and are not in the mysql_query_rules table. +/// The query discover is also responsible for promoting the queries from mirror(warmup) to destination. +impl QueryDiscovery { + /// This function is used to create a new QueryDiscovery struct. + /// + /// # Arguments + /// + /// * `query_discovery_mode` - A QueryDiscoveryMode containing the mode to use for query discovery. + /// * `config` - A Config containing the configuration for the query discovery. + /// * `offset` - A u16 containing the offset to use for query discovery. + /// + /// # Returns + /// + /// A new QueryDiscovery struct. + pub fn new(config: Config) -> Self { + QueryDiscovery { + query_discovery_mode: config + .query_discovery_mode + .unwrap_or(QueryDiscoveryMode::CountStar), + query_discovery_min_execution: config.query_discovery_min_execution.unwrap_or(0), + query_discovery_min_rows_sent: config.query_discovery_min_row_sent.unwrap_or(0), + source_hostgroup: config.source_hostgroup, + readyset_user: config.readyset_user.clone(), + number_of_queries: config.number_of_queries, + offset: 0, } } - Ok(updated_rules) -} -pub fn query_discovery(proxysql_conn: &mut Conn, config: &config::Config, hosts: &mut Hosts) { - if hosts.is_empty() { - return; + /// This function is used to generate the query responsible for finding queries that are not cached in ReadySet and are not in the mysql_query_rules table. + /// Queries have to return 3 fields: digest_text, digest, and schema name. + /// + /// # Arguments + /// + /// * `query_discovery_mode` - A QueryDiscoveryMode containing the mode to use for query discovery. + /// + /// # Returns + /// + /// A string containing the query responsible for finding queries that are not cached in ReadySet and are not in the mysql_query_rules table. + fn query_builder(&self) -> String { + let order_by = match self.query_discovery_mode { + QueryDiscoveryMode::SumRowsSent => "s.sum_rows_sent".to_string(), + QueryDiscoveryMode::SumTime => "s.sum_time".to_string(), + QueryDiscoveryMode::MeanTime => "(s.sum_time / s.count_star)".to_string(), + QueryDiscoveryMode::CountStar => "s.count_star".to_string(), + QueryDiscoveryMode::ExecutionTimeDistance => "(s.max_time - s.min_time)".to_string(), + QueryDiscoveryMode::QueryThroughput => "(s.count_star / s.sum_time)".to_string(), + QueryDiscoveryMode::WorstBestCase => "s.min_time".to_string(), + QueryDiscoveryMode::WorstWorstCase => "s.max_time".to_string(), + QueryDiscoveryMode::DistanceMeanMax => { + "(s.max_time - (s.sum_time / s.count_star))".to_string() + } + QueryDiscoveryMode::External => todo!("external query discovery not implemented"), + }; + + format!( + "SELECT s.digest_text, s.digest, s.schemaname + FROM stats_mysql_query_digest s + LEFT JOIN mysql_query_rules q + USING(digest) + WHERE s.hostgroup = {} + AND s.username = '{}' + AND s.schemaname NOT IN ('sys', 'information_schema', 'performance_schema', 'mysql') + AND s.digest_text LIKE 'SELECT%FROM%' + AND digest_text NOT LIKE '%?=?%' + AND s.count_star > {} + AND s.sum_rows_sent > {} + AND q.rule_id IS NULL + {} DESC + LIMIT {} OFFSET {}", + self.source_hostgroup, + self.readyset_user, + self.query_discovery_min_execution, + self.query_discovery_min_rows_sent, + order_by, + self.number_of_queries, + self.offset + ) } - let mut queries_added_or_change = adjust_mirror_rules(proxysql_conn, config).unwrap(); + pub fn run(&mut self, proxysql: &mut ProxySQL, conn: &mut Conn) { + if proxysql.number_of_online_hosts() == 0 { + return; + } - let mut current_queries_digest: Vec = find_queries_routed_to_readyset(proxysql_conn); - let rows: Vec<(String, String, String)> = find_queries_to_cache(proxysql_conn, config); + let mut queries_added_or_change = proxysql.adjust_mirror_rules().unwrap(); - for (digest_text, digest, schema) in rows { - if current_queries_digest.len() > config.number_of_queries as usize { - break; - } - let digest_text = replace_placeholders(&digest_text); - messages::print_info(format!("Going to test query support for {}", digest_text).as_str()); - let supported = hosts - .first_mut() - .unwrap() - .check_query_support(&digest_text, &schema); // Safe to unwrap because we checked if hosts is empty - match supported { - Ok(true) => { + let mut current_queries_digest: Vec = proxysql.find_queries_routed_to_readyset(); + + let mut more_queries = true; + while more_queries && current_queries_digest.len() < self.number_of_queries as usize { + let queries_to_cache = self.find_queries_to_cache(conn); + more_queries = !queries_to_cache.is_empty(); + for query in queries_to_cache[0..queries_to_cache.len()].iter() { + if current_queries_digest.len() > self.number_of_queries as usize { + break; + } + let digest_text = self.replace_placeholders(query.get_digest_text()); messages::print_info( - "Query is supported, adding it to proxysql and readyset" - .to_string() - .as_str(), + format!("Going to test query support for {}", digest_text).as_str(), ); - queries_added_or_change = true; - hosts.iter_mut().for_each(|host| { - host.cache_query(&digest_text, &digest) - .expect("Failed to create readyset cache"); - }); - add_query_rule(proxysql_conn, &digest, config).expect("Failed to add query rule"); - current_queries_digest.push(digest); - } - Ok(false) => { - messages::print_info("Query is not supported"); - } - Err(err) => { - messages::print_warning(format!("Failed to check query support: {}", err).as_str()); + let supported = proxysql + .get_first_online_host() + .unwrap() + .check_query_support(&digest_text, query.get_schema()); // Safe to unwrap because we checked if hosts is empty + match supported { + Ok(true) => { + messages::print_info( + "Query is supported, adding it to proxysql and readyset" + .to_string() + .as_str(), + ); + queries_added_or_change = true; + proxysql.get_online_hosts().iter_mut().for_each(|host| { + host.cache_query(query) + .expect("Failed to create readyset cache"); + }); + proxysql + .add_as_query_rule(query) + .expect("Failed to add query rule"); + current_queries_digest.push(query.get_digest().to_string()); + } + Ok(false) => { + messages::print_info("Query is not supported"); + } + Err(err) => { + messages::print_warning( + format!("Failed to check query support: {}", err).as_str(), + ); + } + } } + self.offset += queries_to_cache.len() as u16; + } + if queries_added_or_change { + proxysql + .load_query_rules() + .expect("Failed to load query rules"); + proxysql + .save_query_rules() + .expect("Failed to save query rules"); } } - if queries_added_or_change { - load_query_rules(proxysql_conn).expect("Failed to load query rules"); - save_query_rules(proxysql_conn).expect("Failed to save query rules"); + + /// This function is used to find queries that are not cached in ReadySet and are not in the mysql_query_rules table. + /// + /// # Arguments + /// * `conn` - A reference to a connection to ProxySQL. + /// * `config` - A reference to the configuration struct. + /// + /// # Returns + /// A vector of tuples containing the digest_text, digest, and schema name of the queries that are not cached in ReadySet and are not in the mysql_query_rules table. + fn find_queries_to_cache(&self, con: &mut Conn) -> Vec { + let query = self.query_builder(); + let rows: Vec<(String, String, String)> = + con.query(query).expect("Failed to find queries to cache"); + rows.iter() + .map(|(digest_text, digest, schema)| { + Query::new( + digest_text.to_string(), + digest.to_string(), + schema.to_string(), + self.readyset_user.clone(), + ) + }) + .collect() + } + + fn replace_placeholders(&self, query: &str) -> String { + // date placeholder + query.replace("?-?-?", "?") } }